Study/django

django MQ 시리즈 3편 - task queue (3) : Custom Study Project

bluebamus 2023. 3. 14.

1. django MQ 시리즈 1편 - task queue (1) : Asynchronous Tasks With Django and Celery

2. django MQ 시리즈 2편 - task queue (2) : Learn Django Celery with RabbitMQ

3. django MQ 시리즈 3편 - task queue (3) : Custom Study Project

4. django MQ 시리즈 4편 - celery with redis for pub/sub

 

1. Project 설명

1.1 프로젝트 계획

- 이 프로젝트는 아래 블로그의 포스트들을 기반으로 실습되었으며 이 외, 여러 사이트에서 실재 실무에서 사용할만한

방법과 설정들을 적용하였다.

- 앞선 프로젝트에서 rabbitMQ로 작업을 했기 때문에 이번 작업은 redis로 작업을 하였다.

- 프로젝트에 적용하기 어려운 항목들은 따로 분리해 새로운 포스트로 추가 작성할 계획이다.

reference : https://heodolf.tistory.com/73

 

[Celery] 무작정 시작하기 (5) - Monitoring

2020/01/10 - [Back-end/Python] - [Celery] 무작정 시작하기 (1) - 설치 및 실행 2020/01/17 - [Back-end/Python] - [Celery] 무작정 시작하기 (2) - Task 2020/01/20 - [Back-end/Python] - [Celery] 무작정 시작하기 (3) - Chain 2020/01/20 - [

heodolf.tistory.com

* 상위 reference는 총 5개의 시리즈로 정리되어 있다.

1.2 프로젝트 목표

1. celery - delay()를 이용한 기본 실행

2. celery - apply_async(), subtask()를 이용한 고급 실행

3. celery - chain()을 이용한 순차 작업

4. celery - group(), chord()를 이용한 병렬작업과 callback을 이용한 후처리 작업

5. celery - ProgressRecorder를 이용한 celery 작업 프로그래스바 동적으로 그리기

 

* flower 사용 방법에 대한 시리즈는 이 글에서 다루지 않기로 함

* flower는 Prometheus으로 연동하여 사용할 수 있다.

2. Project 준비

2.1 프로젝트 이동

1. django-celery-redis-for-tasks

cd django-celery-redis-for-tasks

2.2 프로젝트 실행

1. celery, flower, celery-beat, shell_plus --notebook

- celery 실행

    - 윈도우 : python -m celery -A config worker -l info -P gevent

    - 스래드 방식, worker 수, pid 파일 생성, 로그 파일 생성을 직접 정하는 방법 : celery -A config worker -l INFO -P threads -c 10 --pi dfile="%n.pid" --logfile="%n.log"

 

- celery-beat 실행

   - celery -A config beat -l INFO

   - admin page에서 반복 설정을 하고자 한다면, settings.py 에 변수를 선언할 수 있고 ( 아래 settins.py에 명시됨) 

     실행 커멘드에 직접 넣어 줄 수 있다.

      - celery -A config beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

 

- flower 실행

   - celery -A config flower --port=5555

   - 모니터링 방법 : http://localhost:5555/

# run celery
celery -A config worker -l info -P gevent

# run flower
# access http://localhost:5555/
celery -A config flower --port=5555

# run celery-beat
# The --scheduler option is not required if CELERY_BEAT_SCHEDULER is set in settings.py.
celery -A config beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

# run jupyternotebook
python manage.py shell_plus --notebook

2.3 settings.py 추가 정보

1. INSTALLED_APPS =[]

INSTALLED_APPS = [
.
.
"django_extensions",
"django_celery_beat",
"django_celery_results",
"celery_progress",
"task1",
]

2. CELERY_***

ELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"

# 아래 설정이 없는 경우, celery 실행시 task를 못찾는 경우가 발생할 수 있다.
CELERY_IMPORTS = [
    "task1.tasks.sample",
    "task1.tasks.add",
    "task1.tasks.progress",
]

#주로 테스트를 위한 설정으로 queue를 사용하지 않고 작업을 즉시 동기화 하여 실행한다.
# for test, Do not send messages to the queue. It works synchronously.
# CELERY_ALWAYS_EAGER = True

# log 정보를 더 세밀하게 출력한다.
CELERY_RESULT_EXTENDED = True

# queue 작업 데이터를 json화 하는 설정들
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"

# 타임존
CELERY_TIMEZONE = "UTC"

# true일 경우 worker가 작업을 시작하면 "stared" 상태를 보고한다.
CELERY_TASK_TRACK_STARTED = True

# celery result가 계속 쌓이면 불필요한 자원 낭비가 발생된다.
# 폐기될 시간을 정할 수 있다.
CELERY_RESULT_EXPIRES = 60 * 60 * 24 * 30  # Results expire after 1 month

# celery beat의 설정을 admin page를 이용해 설정할 수 있다.
# 아래 설정은 실행 커멘드의 옵션으로 넣을 수도 있다.
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

# 원하는 반복 시간을 직업 입력해 둘 수 있다.
CELERY_BEAT_SCHEDULE = {
    "scheduled_task-1": {
        "task": "task1.tasks.add.add",
        "schedule": 5.0,
        "args": (10, 10),
    },
    "scheduled_task-2": {
        "task": "task1.tasks.add.add",
        "schedule": 7.0,
        "args": (20, 30),
    },
}

# 모든 task는 키 값을 가지고 있다. 이 키 값의 길이를 제한할 수 있다.
DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH = 191

3. celery.py

- django에서 celery를 사용하는 경우 프로젝트 설정 폴더 (settings.py 가 있는 폴더)에 celery 인스턴스를 만들고 기본 설정을 하는 

  메일이라 할 수 있는 파일을 하나 만든다.

- debug_test는 테스트를 위한 작업으로 다른 위치에서 debug_test.deley() 등을 이용해 사용할 수 있다.

- app을 생성하고 (app = Celery("config")) 해당 인스턴스에 적용하고자 하는 설정을 이 곳에서 할 수 있다. 만약 settings.py에 되어 있다면 

  중복해서 할 필요 없다. 이 코드에서는 result_expires가 중복하여 설정되어 있다.

import os

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

app = Celery("config")

app.config_from_object("django.conf:settings", namespace="CELERY")

app.conf.update(
    result_expires=3600,
)

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print("Request: {0!r}".format(self.request))

3. Project 실습

3.1 delay()를 이용한 기본 실행

1. 파일 명 : part1 sleep, return.ipynb

- reference : https://heodolf.tistory.com/54

 

[Celery] 무작정 시작하기 (1) - 설치 및 실행

Scrapy+Selenium를 이용하여 크롤링을 하다가 오래 걸리는 한 녀석 때문에 다른 작업들이 밀려서 실시간으로 데이터를 가져오지 못하는 경우가 발생했다. 그래서 파이썬에서 비동기처리를 할 수 있

heodolf.tistory.com

- 해당 task는 task1 앱의 tasks 폴더에 sample.py의 working를 사용한다.

@shared_task()
def working(id=1):
    # 1~5초 사이의 랜덤한 Delay를 발생.
    time.sleep(random.randint(1, 3))
    return "{}번째, 일을 끝냈다.".format(id)

- 해당 작업은 랜덤한 초단위의 sleep을 수행하고 return 값을 수행한다.

- working( 1 )을 실행하면 결과는 즉시 나오나 working.delay( 3 )를 실행하면 일정 시간이 지난 후 출력이 되는 것을 확인할 수 있다.

 

3.2 apply_async(), subtask()를 이용한 고급 실행

1. 파일 명 : part2 apply_async, subtask .ipynb

- reference : https://heodolf.tistory.com/63

 

[Celery] 무작정 시작하기 (2) - Task

2020/01/10 - [Back-end/Python] - [Celery] 무작정 시작하기 (1) - 설치 및 실행 지난 포스트에서 Celery를 설치하고 간단하게 Task를 비동기로 실행해보았다. Celery가 비동기 태스크 큐이지만 동기적으로 Task를

heodolf.tistory.com

- task를 실행하는 방법으로 쉽게 delay를 이용할 수 있지만 applay_async를 통해 Arguments와 옵션들을 설정할 수 있다.

- subtask는 s 한 글자 만으로도 사용이 가능하다.

   - 사전에 Task를 미리 생성하고 필요한 경우에 applay_async 혹은 delay로 실행시킬 수 있다.

   - 특징으로 argument를 비우거나 일부분만 미리 선언하거나 옵션을 미리 선언할 수 있다.

   - 이러한 특징으로 task를 실행할 때 추가적인 정보만 입력 함으로, 작업의 재사용이 가능하다.

   - s는 subtask의 shortcut으로 delay와 비슷하다.

   - subtask의 입력 인자들의 활용 범위는 applay_async와 비슷하다.

   - 이와 관련한 내용들은 공식 사이트를 확인한다.

https://docs.celeryproject.org/en/latest/userguide/tasks.html

 

Tasks — Celery 5.3.0b2 documentation

This document describes the current stable version of Celery (5.3). For development docs, go here. Tasks Tasks are the building blocks of Celery applications. A task is a class that can be created out of any callable. It performs dual roles in that it defi

docs.celeryq.dev

- 해당 task는 task1 앱의 tasks 폴더에 add.py의 add를 사용한다.

@shared_task()
def add(num1, num2):
    time.sleep(1)

    print("{} + {} = {}".format(num1, num2, num1 + num2))

    return num1 + num2

 

 

- apply_async 테스트

# 원형 delay(*args, **kwargs)
task_1 = add.delay( 1, 2 )

# 결과 값을 db 혹은 cache로 저장하지 않는다.
task_2 = add.apply_async( args=[3, 4], ignore_result=True )

# kwargs={'kwarg1': 'x', 'kwarg2': 'y'}) 처럼 추가 입력 값을 넣을 수 있다.
task_3 = add.apply_async( args=[5, 6], kwargs={} )

# task의 현재 작업 상태를 확인한다. True이면 완료, False 이면 진행 중이거나 알 수 없음
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )

결과 
task_1 is ready? True
task_2 is ready? False
task_3 is ready? True

# get()은 현재 task의 실행 결과를 알려준다.
# get을 사용하면 task가 동기적으로 실행되어 결과가 나올 때까지 대기한다.
print( "task_1 is {}".format( task_1.get() ) )
print( "task_2 is {}".format( task_2.get() ) )
print( "task_3 is {}".format( task_3.get() ) )

결과
task_1 is 3
task_2 is None
task_3 is 11

- subtask 테스트

   - subtask의 사용 목적을 정의 하자면, 선언을 미리 해 두고 필요한 경우에 재활용 할 수 있다.

   - chain을 사용하면 task를 연속으로 실행할 수 있다.

   - chord를 사용하면 task를 병렬로 일괄 처리 할 수 있다

      - queue [broker]의 종류에 따라 사용 이 가능하다. 예) rabbitMQ 사용 불가, redis 사용 가능

#아래와 같이 변수에 선언을 할 수만 있을 뿐, 실행은 되지 않는다.
subtask_1 = add.s( 1, 2 )
# 옵션을 미리 설정
subtask_2 = add.subtask( args=[3, 4], ignore_result=True )
subtask_3 = add.subtask( args=[5, 6], kwargs={} )
subtask_4 = add.subtask()

# 실재로 id를 확인하면 없는 것으로 나온다
print( "subtask_1 is {}".format( subtask_1.id ) )
print( "subtask_2 is {}".format( subtask_2.id ) )
print( "subtask_3 is {}".format( subtask_3.id ) )
print( "subtask_4 is {}".format( subtask_4.id ) )

# task를 시행한다.
ask_1 = subtask_1.delay()
task_2 = subtask_2.apply_async()
task_3 = subtask_3.delay()
# 옵션을 실행과 함께 설정
task_4 = subtask_4.apply_async( args=[7, 8], ignore_result=True )
# subtask_4 변수 재활용
task_5 = subtask_4.delay( 9, 10 )

# task_4, task_5는 같은 subtask를 사용했기 때문에 ID가 같음

# 상태를 확인한다.
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )
print( "task_4 is ready? {}".format( task_4.ready() ) )
print( "task_5 is ready? {}".format( task_5.ready() ) )

# 실행 하자마자 결과를 확인해서 아직 종료된 상태가 없다.
task_1 is ready? False
task_2 is ready? False
task_3 is ready? False
task_4 is ready? False
task_5 is ready? False

# 동기모드로 결과를 기다린다.
print( "task_1 is {}".format( task_1.get() ) )
print( "task_2 is {}".format( task_2.get() ) )
print( "task_3 is {}".format( task_3.get() ) )
print( "task_4 is {}".format( task_4.get() ) )
print( "task_5 is {}".format( task_5.get() ) )

# 결과
task_1 is 3
task_2 is None
task_3 is 11
task_4 is None
task_5 is 19

# 상태를 확인한다.
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )
print( "task_4 is ready? {}".format( task_4.ready() ) )
print( "task_5 is ready? {}".format( task_5.ready() ) )

# 결과 
task_1 is ready? True
task_2 is ready? False
task_3 is ready? True
task_4 is ready? False
task_5 is ready? True

3.3 chain()을 이용한 순차 작업

1. 파일 명 : part3 chain.ipynb

- reference : https://heodolf.tistory.com/65

 

[Celery] 무작정 시작하기 (3) - Chain

2020/01/10 - [Back-end/Python] - [Celery] 무작정 시작하기 (1) - 설치 및 실행 2020/01/17 - [Back-end/Python] - [Celery] 무작정 시작하기 (2) - Task 지난 포스트에서 apply_async(delay)로 Task를 단일 실행시키는 방법에 대

heodolf.tistory.com

- celery를 이용해 작업을 하다보면 task간 결과값을 참조해야 하는 경우가 있다. 즉 처음 실행된 task 결과값을 다음 task에 사용하는 경우가 될 것이다.

- chain의 기본 기능으로 task를 순차적으로 실행하게 해준다.

- subtask(s)를 사용하면, task들을 리스트, 튜블에 담거나 비트연산자로 연결하여 사용할 수 있다.

- add(num1, num2) 함수를 사용해 순차적은 작업을 하게된다면 여러개의 task는 다음과 같은 결과를 얻게 된다.

Task / Arguments num1 = 1,
num2 = 2,
num2 = 3, num2=4 num2=5
Task-1 1 + 2 = 3
num1 + num2 = num1
     
Task-2   3 + 3 = 6
num1 + num2 = num1
   
Task-3     6 + 4 = 10
num1 + num2 = num1
 
Task-4       10 + 5 = 15

* chain의 작업을 delay()와 get()을 이용하여 직접 구현할 수 있다.

  하지만 get()을 사용하게 되면 매번 결과 값을 받아야 한다. 이는 비효율적이며 작업자 풀이 소진되면 교착 상채가 발생할 수 있다.

   - 상세한건 공식 사이트를 확인한다.

https://docs.celeryq.dev/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks

 

Tasks — Celery 5.3.0b2 documentation

This document describes the current stable version of Celery (5.3). For development docs, go here. Tasks Tasks are the building blocks of Celery applications. A task is a class that can be created out of any callable. It performs dual roles in that it defi

docs.celeryq.dev

- 나쁜 사용 사례 : delay와 get을 이용하여 순차적 실행을 구현했지만 순차적으로 결과값을 매번 확인한다.

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get()
    info = parse_page.delay(page).get()
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)

 

-좋은 사용 사례 : s와 비트연산자를 이용하여 작업을 정의하고 chain을 이용하여 한번에 모든 작업을 순차적으로 실행시킨다.

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()

@app.task()
def fetch_page(url):
    return myhttplib.get(url)

@app.task()
def parse_page(page):
    return myparser.parse_document(page)

@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

 

- chain 테스트

subtask_1 = add.s( 1, 2 )
subtask_2 = add.s( 3 )
subtask_3 = add.s( 4 )
subtask_4 = add.s( 5 )

# 튜플 또는 리스트를 이용한 Chain
tasks = ( subtask_1, subtask_2, subtask_3, subtask_4 )

# 비트연산자( | )를 이용한 Chain
# chain_task = ( subtask_1 | subtask_2 | subtask_3 | subtask_4 ).apply_async()

chaining = chain( tasks )   # Chain Task 생성
chain_task = chaining( )    # Chain 실행

# id 확인
print( "\n# 1. Task ID 확인" )
print( "chain_task is {}".format( chain_task.id ) )

# 현재 작업 현황 확인
print( "\n# 2. Task 상태" )
print( "chain_task is {}".format( chain_task.ready() ) )

# 작업 실행 결과 확인
print( "\n# 3. 실행결과 확인" )
print( "chain_task is {}".format( chain_task.get() ) )

결과
chain_task is 15

print( "\n# 4. Task 상태" )
print( "chain_task is {}".format( chain_task.ready() ) )

* ready()보다 더 상세하게 작업 상태를 알 수 있는 방법이 있다.

   - AsyncResult(id=str(chain_task)).state 

- 해당 함수의 실행 결과는 다음을 참고하면 된다.

celery.states.PENDING = ‘PENDING’
– Task state is unknown (assumed pending since you know the id).

celery.states.RECEIVED = ‘RECEIVED’
– Task was received by a worker.

celery.states.STARTED = ‘STARTED’
– Task was started by a worker (CELERYTRACKSTARTED).

celery.states.SUCCESS = ‘SUCCESS’
– Task succeeded

celery.states.FAILURE = ‘FAILURE’
– Task failed

celery.states.REVOKED = ‘REVOKED’
– Task was revoked.(취소)

celery.states.RETRY = ‘RETRY’
– Task is waiting for retry.

 

- chain의 link 옵션을 이용한 테스트

subtask_1 = add.s( 3 )
subtask_2 = add.s( 4 )
subtask_3 = add.s( 5 )

# 리스트, 튜플로 link를 연결하는 경우 다르게 동작함
links = ( subtask_1 | subtask_2 | subtask_3 )

# 초기 args 값을 전달하고 link 옵션으로 연결된 subtask를 전달함
link_task = add.apply_async( args=[ 1, 2 ], link=links )

print( "\n# 1. Task ID 확인" )
print( "link_task is {}".format( link_task.id ) )

# 결과
# 1. Task ID 확인
link_task is c66680fc-47e5-4ccf-bac2-a2979bfc6d9d

print( "\n# 2. Task 상태" )
print( "link_task is {}".format( link_task.ready() ) )

# 결과
# 2. Task 상태
link_task is False

print( "\n# 3. 실행결과 확인" )
print( "link_task is {}".format( link_task.get() ) )

#결과
# 3. 실행결과 확인
link_task is 3

print( "\n# 4. Task 상태" )
print( "link_task is {}".format( link_task.ready() ) )

#결과
# 4. Task 상태
link_task is True

 

- 리스트를 튜플로 연결하는 경우

links = ( subtask_1 , subtask_2 , subtask_3 )

print( "\n# 3. 실행결과 확인" )
print( "link_task is {}".format( link_task.get() ) )

#결과
# 3. 실행결과 확인
link_task is 3

- 결과는 동일하다 3번의 작업이 수행되었다.

- 하지만 celery의 로그를 확인해 보면 아래 표와 같이 다른 결과가 출력되는 것을 확인할 수 있다.

   - Task-1의 결과가 나머지 작업들의 첫번째 인자, num1 값으로 반복해서 입력되는 것을 확인할 수 있다.

Task / Arguments num1 = 1,
num2 = 2,
num2 = 3, num2=4 num2=5
Task-1 1 + 2 = 3
num1 + num2 = num1
     
Task-2   3 + 3 = 6
num1 + num2 = num1
   
Task-3     3 + 4 = 7
num1 + num2 = num1
 
Task-4       3 + 5 = 8

 

3.4 group(), chord()를 이용한 병렬작업과 callback을 이용한 후처리 작업

1. 파일 명 : part4 group, chord.ipynb

- reference : https://heodolf.tistory.com/66

 

[Celery] 무장적 시장하기 (4) - Group과 Chord

2020/01/10 - [Back-end/Python] - [Celery] 무작정 시작하기 (1) - 설치 및 실행 2020/01/17 - [Back-end/Python] - [Celery] 무작정 시작하기 (2) - Task 2020/01/20 - [Back-end/Python] - [Celery] 무작정 시작하기 (3) - Chain 지난 포스

heodolf.tistory.com

- group은 task들을 하나의 집합으로 만들어 순차가 아닌 병렬로 실행할 수 있다.

- task 그룹화 방법은 두 가지가 있다.

   - 비트연산자 : chain과 같이 순차적으로 실행된다.

   - 리스트 or  튜플 : task가 병렬로 동시에 수행되며 그 결과는 리스트로 반환된다.

- group을 이용한 병렬처리를 실행하기 위해서는 celery 실행시 추가 변경해줘야 하는 옵션들이 있다.

   - [ -P or --pool ] : solo가 아닌 다른 옵션으로 설정 

      - prefork, threads, gevent, eventlet, default: profork.

      - 윈도우에서 테스트 했을 시 기본 gevent의 그린 스레드로 동작하는 것을 확인함 (2023/03/10)

   - [ -c or -concurrency ] : 병렬처리할 수 있는 worker의 수를 2개 이상 설정

- group의 사용법에 대해 더 상세히 알고자 한다면 공식 사이트를 확인한다.

https://docs.celeryq.dev/en/latest/userguide/canvas.html#groups

 

Canvas: Designing Work-flows — Celery 5.3.0b2 documentation

This document describes the current stable version of Celery (5.3). For development docs, go here. Canvas: Designing Work-flows You just learned how to call a task using the tasks delay method in the calling guide, and this is often all you need, but somet

docs.celeryq.dev

 

- group 테스트

# 1에서 5까지 증가하는 두 인자 값을 더하기
tasks = [ add.s( i, i+1 ) for i in range(1, 5) ]

# subtask를 group 모듈로 그룹화
grouping = group( tasks )

# group화된 task를 실행
group_task = grouping() # 또는 grouping.apply_async()

print( "\n# 1. Task 확인" )
print( "tasks = {}".format( tasks ) )
print( "group_task.id = {}".format( group_task.id ) )
print( "group_task.type = {}".format( type(group_task) ) )

#결과
# 1. Task 확인
tasks = [task1.tasks.add.add(1, 2), task1.tasks.add.add(2, 3), task1.tasks.add.add(3, 4), task1.tasks.add.add(4, 5)]
group_task.id = e7312c79-6e67-47c5-923f-6f7cb6695284
group_task.type = <class 'celery.result.GroupResult'>

# 결과 값이 리스트로 반환된다. get으로 결과를 확인하면 완료된 순서대로 결과가 확인되고
# join으로 결과를 확인하면 호출한 순서대로 반환된다.
print( "\n# 3. 실행결과 확인" )
print( "몇건이 완료 되었는가? {}".format( group_task.completed_count() ) )
print( "완료된 결과를 반환. {}".format( group_task.get() ) )
print( "호출한 순서대로 반환. {}".format( group_task.join() ) )
print( "몇건이 완료 되었는가? {}".format( group_task.completed_count() ) )

#결과
# 3. 실행결과 확인
몇건이 완료 되었는가? 0
완료된 결과를 반환. [3, 5, 7, 9]
호출한 순서대로 반환. [3, 5, 7, 9]
몇건이 완료 되었는가? 4

print( "\n# 4. Task 상태 확인" )
print( "Subtask가 모두 준비되었는가? {}".format( group_task.ready() ) )
print( "모두 성공했는가? {}".format( group_task.successful() ) )
print( "실패가 있는가? {}".format( group_task.failed() ) )

#결과
# 4. Task 상태 확인
Subtask가 모두 준비되었는가? True
모두 성공했는가? True
실패가 있는가? False

- celery의 로그를 확인하면 모든 작업들이 한번에 received task 되고 이후 작업들이 수행되는 것을 확인할 수 있다.

 

- chord 테스트

- 그룹의 모든 작업이 완료된 후 실행되는 후처리 작업

- group에 다른 task를 연결하면 chord로 업그레이드 됨

   예) group(tasks) | task

- 필수 사항으로 결과를 반환받을 callback이 있어야 함

- 주의사항

   - 결과값을 무조건 받아야야 하기 때문에 ignore_result = True를 쓸 수 없다.

   - Result Backend로 RabbitMQ/rpc 지원을 하지 않음 (버전업시 지원할 수 있기에 여부 확인이 필요함)

   - 이러한 동기화 단계 작업들은 비용이 많이 들기 때문에 chord 사용은 될 수 있으면 피하는게 좋다.

- 상세한 내용은 공식 사이트 문서를 확인한다.

https://docs.celeryq.dev/en/latest/userguide/canvas.html#chords

 

Canvas: Designing Work-flows — Celery 5.3.0b2 documentation

This document describes the current stable version of Celery (5.3). For development docs, go here. Canvas: Designing Work-flows You just learned how to call a task using the tasks delay method in the calling guide, and this is often all you need, but somet

docs.celeryq.dev

- callback 함수가 필요하기 때문에 tasks/add.py 파일에 아래와 같이 함수를 정의해 준다.

# 콜백함수로 들어오는 인자값들의 결과를 더한다
@app.task
def callback(results):
    return sum(results)
tasks = [ add.s( i, i+1 ) for i in range(1, 5) ]

chording = chord( tasks )
chord_task = chording( callback.s() )

print( "\n# 3. 실행결과 확인" )
print( "완료된 결과를 반환. {}".format( chord_task.get() ) )

#결과
# 3. 실행결과 확인
완료된 결과를 반환. 24

- chord에서는 group에서 사용했던 uccessful(), failed(), join() 등의 기능 사용이 제한된다.

 

3.5 ProgressRecorder를 이용한 celery 작업 프로그래스바 동적으로 그리기

1. celery progressrecorder : runserver를 이요한 웹 페이지 참고

- reference : https://www.youtube.com/watch?v=BbPswIqn2VI 

- github : https://github.com/PrettyPrinted/youtube_video_code/tree/master/2020/06/24/How%20to%20Create%20a%20Celery%20Task%20Progress%20Bar%20in%20Django

 

GitHub - PrettyPrinted/youtube_video_code: The code for all the YouTube videos I publish on YouTube.

The code for all the YouTube videos I publish on YouTube. - GitHub - PrettyPrinted/youtube_video_code: The code for all the YouTube videos I publish on YouTube.

github.com

- task1/tasks/progress.py 파일에 ProgressRecorder의 인스턴스를 생성해 set_progress를 통해 진행 과정을 업데이트 한다.

from time import sleep

from celery import Celery, shared_task
from celery_progress.backend import ProgressRecorder
from config.celery import app


@shared_task(bind=True)
def django_progress(self, duration):
    progress_recorder = ProgressRecorder(self)
    for i in range(100):
        sleep(duration)
        progress_recorder.set_progress(i + 1, 100, f"On iteration {i}")
    return "Done"

- template에서 "celery_progress/celery_progress.js" 파일을 불러와 화면에 구현할 내용을 script에 입력한다.

<h1>Done!</h1>
<div class="progress-wrapper">
  <div
    id="progress-bar"
    class="progress-bar"
    style="background-color: #68a9ef; width: 0%"
  >
    &nbsp;
  </div>
</div>
<div id="progress-bar-message">Waiting for progress to start...</div>
<script src="{% static 'celery_progress/celery_progress.js' %}"></script>
<script>
  var progressUrl = "{% url 'celery_progress:task_status' task_id %}";

  function customResult(resultElement, result) {
    $(resultElement).append(
      $("<p>").text("Sum of all seconds is " + result)
    );
  }

  CeleryProgressBar.initProgressBar(progressUrl, {
    onResult: customResult,
  });
</script>

- 이 작업 또한 동기화 단계의 작업이다. 각 celery는 작업 진행 중 현재 진행 결과를 result backend에 지속적으로 업데이트를 하게 된다.

- 컴퓨터/서버의 사양과 celery의 worker 수에 따라 성능 제한이 발생할 수 있다.

 

reference

https://heodolf.tistory.com/73

 

[Celery] 무작정 시작하기 (5) - Monitoring

2020/01/10 - [Back-end/Python] - [Celery] 무작정 시작하기 (1) - 설치 및 실행 2020/01/17 - [Back-end/Python] - [Celery] 무작정 시작하기 (2) - Task 2020/01/20 - [Back-end/Python] - [Celery] 무작정 시작하기 (3) - Chain 2020/01/20 - [

heodolf.tistory.com

https://velog.io/@qlgks1/Django-Celery-%EB%8B%A8%EC%A0%90-Task-subTask-Signature-%EB%B9%84%EB%8F%99%EA%B8%B0-%EC%9E%91%EC%97%85-%EB%8B%A4%EB%A3%A8%EA%B8%B0-with-network-IO

 

Django Celery - 단점, Task & subTask & Signature 비동기 작업 다루기 with network I/O

Celery: Distributed processing worker의 task & subtask(signature) 활용과 실습, 그에 따른 celery의 단점과 해결법.

velog.io

https://velog.io/@qlgks1/Django-Celery-Task-%EA%B7%B8%EB%A3%B9-%EC%9E%91%EC%97%85-%EC%84%A0%ED%9B%84%ED%96%89-Chain-%EC%9D%BC%EA%B4%84-%EC%B2%98%EB%A6%AC-%EB%B0%B0%EC%B9%98-Group-Chord

 

Django Celery - Task 그룹 작업, 선후행 Chain, 일괄 처리 (배치) Group & Chord

task 자체와 실행에 초점을 살펴본 앞 글에 이어, task의 선&후행 실행(chaining)과 grouping하여 chord와 같이 task를 묶고 `for-loop` 없이 한 꺼번에 비동기 작업을 수행하는 것에 대해 알아보자.

velog.io

https://velog.io/@qlgks1/Django-Celery-%ED%9A%A8%EA%B3%BC%EC%A0%81%EC%9D%B8-%EB%94%94%EB%B2%84%EA%B9%85-%EB%AA%A8%EB%8B%88%ED%84%B0%EB%A7%81-Logging-Flower-Prometheus-Grafanawith-Loki-Promtail

 

Django Celery - 효과적인 디버깅 & 모니터링: Logging + Flower + Prometheus + Grafana(with Loki & Promtail)

celery는 퍼포먼스 체크나 디버깅이 쉽지않다. celery를 전체적으로 최적화 및 depth있는 분석을 위해 기본적인 모니터링 환경을 구성하고, 더 나아가 전체 web stack [Prometheus + Grafana + Loki + Promtail] 구

velog.io

https://velog.io/@qlgks1/Django-Celery-MQ-message-que

 

Django Celery - async worker celery & redis (message que) basic

API Server <==> Redis(M.Q) <==> Celery stack이해하기. 핵심은 "한정 된 자원"을 잘 활용하기 위해, 여러가지 요청을 "비동기 적으로" 모두 처리하기 위해.

velog.io

https://velog.io/@sensemint_/1

 

Model Service를 위한 Celery 구성 및 모니터링(Flower + Prometheus + Grafana)

추론 시간이 오래 걸리는 모델 서비스를 위한 Celery 구성을 기록한 글입니다.

velog.io

 

댓글