django MQ 시리즈 3편 - task queue (3) : Custom Study Project
1. django MQ 시리즈 1편 - task queue (1) : Asynchronous Tasks With Django and Celery
2. django MQ 시리즈 2편 - task queue (2) : Learn Django Celery with RabbitMQ
3. django MQ 시리즈 3편 - task queue (3) : Custom Study Project
4. django MQ 시리즈 4편 - celery with redis for pub/sub
1. Project 설명
1.1 프로젝트 계획
- 이 프로젝트는 아래 블로그의 포스트들을 기반으로 실습되었으며 이 외, 여러 사이트에서 실재 실무에서 사용할만한
방법과 설정들을 적용하였다.
- 앞선 프로젝트에서 rabbitMQ로 작업을 했기 때문에 이번 작업은 redis로 작업을 하였다.
- 프로젝트에 적용하기 어려운 항목들은 따로 분리해 새로운 포스트로 추가 작성할 계획이다.
reference : https://heodolf.tistory.com/73
* 상위 reference는 총 5개의 시리즈로 정리되어 있다.
1.2 프로젝트 목표
1. celery - delay()를 이용한 기본 실행
2. celery - apply_async(), subtask()를 이용한 고급 실행
3. celery - chain()을 이용한 순차 작업
4. celery - group(), chord()를 이용한 병렬작업과 callback을 이용한 후처리 작업
5. celery - ProgressRecorder를 이용한 celery 작업 프로그래스바 동적으로 그리기
* flower 사용 방법에 대한 시리즈는 이 글에서 다루지 않기로 함
* flower는 Prometheus으로 연동하여 사용할 수 있다.
2. Project 준비
2.1 프로젝트 이동
1. django-celery-redis-for-tasks
cd django-celery-redis-for-tasks
2.2 프로젝트 실행
1. celery, flower, celery-beat, shell_plus --notebook
- celery 실행
- 윈도우 : python -m celery -A config worker -l info -P gevent
- 스래드 방식, worker 수, pid 파일 생성, 로그 파일 생성을 직접 정하는 방법 : celery -A config worker -l INFO -P threads -c 10 --pi dfile="%n.pid" --logfile="%n.log"
- celery-beat 실행
- celery -A config beat -l INFO
- admin page에서 반복 설정을 하고자 한다면, settings.py 에 변수를 선언할 수 있고 ( 아래 settins.py에 명시됨)
실행 커멘드에 직접 넣어 줄 수 있다.
- celery -A config beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
- flower 실행
- celery -A config flower --port=5555
- 모니터링 방법 : http://localhost:5555/
# run celery
celery -A config worker -l info -P gevent
# run flower
# access http://localhost:5555/
celery -A config flower --port=5555
# run celery-beat
# The --scheduler option is not required if CELERY_BEAT_SCHEDULER is set in settings.py.
celery -A config beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
# run jupyternotebook
python manage.py shell_plus --notebook
2.3 settings.py 추가 정보
1. INSTALLED_APPS =[]
INSTALLED_APPS = [
.
.
"django_extensions",
"django_celery_beat",
"django_celery_results",
"celery_progress",
"task1",
]
2. CELERY_***
ELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"
# 아래 설정이 없는 경우, celery 실행시 task를 못찾는 경우가 발생할 수 있다.
CELERY_IMPORTS = [
"task1.tasks.sample",
"task1.tasks.add",
"task1.tasks.progress",
]
#주로 테스트를 위한 설정으로 queue를 사용하지 않고 작업을 즉시 동기화 하여 실행한다.
# for test, Do not send messages to the queue. It works synchronously.
# CELERY_ALWAYS_EAGER = True
# log 정보를 더 세밀하게 출력한다.
CELERY_RESULT_EXTENDED = True
# queue 작업 데이터를 json화 하는 설정들
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
# 타임존
CELERY_TIMEZONE = "UTC"
# true일 경우 worker가 작업을 시작하면 "stared" 상태를 보고한다.
CELERY_TASK_TRACK_STARTED = True
# celery result가 계속 쌓이면 불필요한 자원 낭비가 발생된다.
# 폐기될 시간을 정할 수 있다.
CELERY_RESULT_EXPIRES = 60 * 60 * 24 * 30 # Results expire after 1 month
# celery beat의 설정을 admin page를 이용해 설정할 수 있다.
# 아래 설정은 실행 커멘드의 옵션으로 넣을 수도 있다.
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
# 원하는 반복 시간을 직업 입력해 둘 수 있다.
CELERY_BEAT_SCHEDULE = {
"scheduled_task-1": {
"task": "task1.tasks.add.add",
"schedule": 5.0,
"args": (10, 10),
},
"scheduled_task-2": {
"task": "task1.tasks.add.add",
"schedule": 7.0,
"args": (20, 30),
},
}
# 모든 task는 키 값을 가지고 있다. 이 키 값의 길이를 제한할 수 있다.
DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH = 191
3. celery.py
- django에서 celery를 사용하는 경우 프로젝트 설정 폴더 (settings.py 가 있는 폴더)에 celery 인스턴스를 만들고 기본 설정을 하는
메일이라 할 수 있는 파일을 하나 만든다.
- debug_test는 테스트를 위한 작업으로 다른 위치에서 debug_test.deley() 등을 이용해 사용할 수 있다.
- app을 생성하고 (app = Celery("config")) 해당 인스턴스에 적용하고자 하는 설정을 이 곳에서 할 수 있다. 만약 settings.py에 되어 있다면
중복해서 할 필요 없다. 이 코드에서는 result_expires가 중복하여 설정되어 있다.
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("config")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.conf.update(
result_expires=3600,
)
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print("Request: {0!r}".format(self.request))
3. Project 실습
3.1 delay()를 이용한 기본 실행
1. 파일 명 : part1 sleep, return.ipynb
- reference : https://heodolf.tistory.com/54
- 해당 task는 task1 앱의 tasks 폴더에 sample.py의 working를 사용한다.
@shared_task()
def working(id=1):
# 1~5초 사이의 랜덤한 Delay를 발생.
time.sleep(random.randint(1, 3))
return "{}번째, 일을 끝냈다.".format(id)
- 해당 작업은 랜덤한 초단위의 sleep을 수행하고 return 값을 수행한다.
- working( 1 )을 실행하면 결과는 즉시 나오나 working.delay( 3 )를 실행하면 일정 시간이 지난 후 출력이 되는 것을 확인할 수 있다.
3.2 apply_async(), subtask()를 이용한 고급 실행
1. 파일 명 : part2 apply_async, subtask .ipynb
- reference : https://heodolf.tistory.com/63
- task를 실행하는 방법으로 쉽게 delay를 이용할 수 있지만 applay_async를 통해 Arguments와 옵션들을 설정할 수 있다.
- subtask는 s 한 글자 만으로도 사용이 가능하다.
- 사전에 Task를 미리 생성하고 필요한 경우에 applay_async 혹은 delay로 실행시킬 수 있다.
- 특징으로 argument를 비우거나 일부분만 미리 선언하거나 옵션을 미리 선언할 수 있다.
- 이러한 특징으로 task를 실행할 때 추가적인 정보만 입력 함으로, 작업의 재사용이 가능하다.
- s는 subtask의 shortcut으로 delay와 비슷하다.
- subtask의 입력 인자들의 활용 범위는 applay_async와 비슷하다.
- 이와 관련한 내용들은 공식 사이트를 확인한다.
https://docs.celeryproject.org/en/latest/userguide/tasks.html
- 해당 task는 task1 앱의 tasks 폴더에 add.py의 add를 사용한다.
@shared_task()
def add(num1, num2):
time.sleep(1)
print("{} + {} = {}".format(num1, num2, num1 + num2))
return num1 + num2
- apply_async 테스트
# 원형 delay(*args, **kwargs)
task_1 = add.delay( 1, 2 )
# 결과 값을 db 혹은 cache로 저장하지 않는다.
task_2 = add.apply_async( args=[3, 4], ignore_result=True )
# kwargs={'kwarg1': 'x', 'kwarg2': 'y'}) 처럼 추가 입력 값을 넣을 수 있다.
task_3 = add.apply_async( args=[5, 6], kwargs={} )
# task의 현재 작업 상태를 확인한다. True이면 완료, False 이면 진행 중이거나 알 수 없음
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )
결과
task_1 is ready? True
task_2 is ready? False
task_3 is ready? True
# get()은 현재 task의 실행 결과를 알려준다.
# get을 사용하면 task가 동기적으로 실행되어 결과가 나올 때까지 대기한다.
print( "task_1 is {}".format( task_1.get() ) )
print( "task_2 is {}".format( task_2.get() ) )
print( "task_3 is {}".format( task_3.get() ) )
결과
task_1 is 3
task_2 is None
task_3 is 11
- subtask 테스트
- subtask의 사용 목적을 정의 하자면, 선언을 미리 해 두고 필요한 경우에 재활용 할 수 있다.
- chain을 사용하면 task를 연속으로 실행할 수 있다.
- chord를 사용하면 task를 병렬로 일괄 처리 할 수 있다
- queue [broker]의 종류에 따라 사용 이 가능하다. 예) rabbitMQ 사용 불가, redis 사용 가능
#아래와 같이 변수에 선언을 할 수만 있을 뿐, 실행은 되지 않는다.
subtask_1 = add.s( 1, 2 )
# 옵션을 미리 설정
subtask_2 = add.subtask( args=[3, 4], ignore_result=True )
subtask_3 = add.subtask( args=[5, 6], kwargs={} )
subtask_4 = add.subtask()
# 실재로 id를 확인하면 없는 것으로 나온다
print( "subtask_1 is {}".format( subtask_1.id ) )
print( "subtask_2 is {}".format( subtask_2.id ) )
print( "subtask_3 is {}".format( subtask_3.id ) )
print( "subtask_4 is {}".format( subtask_4.id ) )
# task를 시행한다.
ask_1 = subtask_1.delay()
task_2 = subtask_2.apply_async()
task_3 = subtask_3.delay()
# 옵션을 실행과 함께 설정
task_4 = subtask_4.apply_async( args=[7, 8], ignore_result=True )
# subtask_4 변수 재활용
task_5 = subtask_4.delay( 9, 10 )
# task_4, task_5는 같은 subtask를 사용했기 때문에 ID가 같음
# 상태를 확인한다.
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )
print( "task_4 is ready? {}".format( task_4.ready() ) )
print( "task_5 is ready? {}".format( task_5.ready() ) )
# 실행 하자마자 결과를 확인해서 아직 종료된 상태가 없다.
task_1 is ready? False
task_2 is ready? False
task_3 is ready? False
task_4 is ready? False
task_5 is ready? False
# 동기모드로 결과를 기다린다.
print( "task_1 is {}".format( task_1.get() ) )
print( "task_2 is {}".format( task_2.get() ) )
print( "task_3 is {}".format( task_3.get() ) )
print( "task_4 is {}".format( task_4.get() ) )
print( "task_5 is {}".format( task_5.get() ) )
# 결과
task_1 is 3
task_2 is None
task_3 is 11
task_4 is None
task_5 is 19
# 상태를 확인한다.
print( "task_1 is ready? {}".format( task_1.ready() ) )
print( "task_2 is ready? {}".format( task_2.ready() ) )
print( "task_3 is ready? {}".format( task_3.ready() ) )
print( "task_4 is ready? {}".format( task_4.ready() ) )
print( "task_5 is ready? {}".format( task_5.ready() ) )
# 결과
task_1 is ready? True
task_2 is ready? False
task_3 is ready? True
task_4 is ready? False
task_5 is ready? True
3.3 chain()을 이용한 순차 작업
1. 파일 명 : part3 chain.ipynb
- reference : https://heodolf.tistory.com/65
- celery를 이용해 작업을 하다보면 task간 결과값을 참조해야 하는 경우가 있다. 즉 처음 실행된 task 결과값을 다음 task에 사용하는 경우가 될 것이다.
- chain의 기본 기능으로 task를 순차적으로 실행하게 해준다.
- subtask(s)를 사용하면, task들을 리스트, 튜블에 담거나 비트연산자로 연결하여 사용할 수 있다.
- add(num1, num2) 함수를 사용해 순차적은 작업을 하게된다면 여러개의 task는 다음과 같은 결과를 얻게 된다.
Task / Arguments | num1 = 1, num2 = 2, |
num2 = 3, | num2=4 | num2=5 |
Task-1 | 1 + 2 = 3 num1 + num2 = num1 |
|||
Task-2 | 3 + 3 = 6 num1 + num2 = num1 |
|||
Task-3 | 6 + 4 = 10 num1 + num2 = num1 |
|||
Task-4 | 10 + 5 = 15 |
* chain의 작업을 delay()와 get()을 이용하여 직접 구현할 수 있다.
하지만 get()을 사용하게 되면 매번 결과 값을 받아야 한다. 이는 비효율적이며 작업자 풀이 소진되면 교착 상채가 발생할 수 있다.
- 상세한건 공식 사이트를 확인한다.
https://docs.celeryq.dev/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks
- 나쁜 사용 사례 : delay와 get을 이용하여 순차적 실행을 구현했지만 순차적으로 결과값을 매번 확인한다.
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
-좋은 사용 사례 : s와 비트연산자를 이용하여 작업을 정의하고 chain을 이용하여 한번에 모든 작업을 순차적으로 실행시킨다.
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
- chain 테스트
subtask_1 = add.s( 1, 2 )
subtask_2 = add.s( 3 )
subtask_3 = add.s( 4 )
subtask_4 = add.s( 5 )
# 튜플 또는 리스트를 이용한 Chain
tasks = ( subtask_1, subtask_2, subtask_3, subtask_4 )
# 비트연산자( | )를 이용한 Chain
# chain_task = ( subtask_1 | subtask_2 | subtask_3 | subtask_4 ).apply_async()
chaining = chain( tasks ) # Chain Task 생성
chain_task = chaining( ) # Chain 실행
# id 확인
print( "\n# 1. Task ID 확인" )
print( "chain_task is {}".format( chain_task.id ) )
# 현재 작업 현황 확인
print( "\n# 2. Task 상태" )
print( "chain_task is {}".format( chain_task.ready() ) )
# 작업 실행 결과 확인
print( "\n# 3. 실행결과 확인" )
print( "chain_task is {}".format( chain_task.get() ) )
결과
chain_task is 15
print( "\n# 4. Task 상태" )
print( "chain_task is {}".format( chain_task.ready() ) )
* ready()보다 더 상세하게 작업 상태를 알 수 있는 방법이 있다.
- AsyncResult(id=str(chain_task)).state
- 해당 함수의 실행 결과는 다음을 참고하면 된다.
celery.states.PENDING = ‘PENDING’
– Task state is unknown (assumed pending since you know the id).
celery.states.RECEIVED = ‘RECEIVED’
– Task was received by a worker.
celery.states.STARTED = ‘STARTED’
– Task was started by a worker (CELERYTRACKSTARTED).
celery.states.SUCCESS = ‘SUCCESS’
– Task succeeded
celery.states.FAILURE = ‘FAILURE’
– Task failed
celery.states.REVOKED = ‘REVOKED’
– Task was revoked.(취소)
celery.states.RETRY = ‘RETRY’
– Task is waiting for retry.
- chain의 link 옵션을 이용한 테스트
subtask_1 = add.s( 3 )
subtask_2 = add.s( 4 )
subtask_3 = add.s( 5 )
# 리스트, 튜플로 link를 연결하는 경우 다르게 동작함
links = ( subtask_1 | subtask_2 | subtask_3 )
# 초기 args 값을 전달하고 link 옵션으로 연결된 subtask를 전달함
link_task = add.apply_async( args=[ 1, 2 ], link=links )
print( "\n# 1. Task ID 확인" )
print( "link_task is {}".format( link_task.id ) )
# 결과
# 1. Task ID 확인
link_task is c66680fc-47e5-4ccf-bac2-a2979bfc6d9d
print( "\n# 2. Task 상태" )
print( "link_task is {}".format( link_task.ready() ) )
# 결과
# 2. Task 상태
link_task is False
print( "\n# 3. 실행결과 확인" )
print( "link_task is {}".format( link_task.get() ) )
#결과
# 3. 실행결과 확인
link_task is 3
print( "\n# 4. Task 상태" )
print( "link_task is {}".format( link_task.ready() ) )
#결과
# 4. Task 상태
link_task is True
- 리스트를 튜플로 연결하는 경우
links = ( subtask_1 , subtask_2 , subtask_3 )
print( "\n# 3. 실행결과 확인" )
print( "link_task is {}".format( link_task.get() ) )
#결과
# 3. 실행결과 확인
link_task is 3
- 결과는 동일하다 3번의 작업이 수행되었다.
- 하지만 celery의 로그를 확인해 보면 아래 표와 같이 다른 결과가 출력되는 것을 확인할 수 있다.
- Task-1의 결과가 나머지 작업들의 첫번째 인자, num1 값으로 반복해서 입력되는 것을 확인할 수 있다.
Task / Arguments | num1 = 1, num2 = 2, |
num2 = 3, | num2=4 | num2=5 |
Task-1 | 1 + 2 = 3 num1 + num2 = num1 |
|||
Task-2 | 3 + 3 = 6 num1 + num2 = num1 |
|||
Task-3 | 3 + 4 = 7 num1 + num2 = num1 |
|||
Task-4 | 3 + 5 = 8 |
3.4 group(), chord()를 이용한 병렬작업과 callback을 이용한 후처리 작업
1. 파일 명 : part4 group, chord.ipynb
- reference : https://heodolf.tistory.com/66
- group은 task들을 하나의 집합으로 만들어 순차가 아닌 병렬로 실행할 수 있다.
- task 그룹화 방법은 두 가지가 있다.
- 비트연산자 : chain과 같이 순차적으로 실행된다.
- 리스트 or 튜플 : task가 병렬로 동시에 수행되며 그 결과는 리스트로 반환된다.
- group을 이용한 병렬처리를 실행하기 위해서는 celery 실행시 추가 변경해줘야 하는 옵션들이 있다.
- [ -P or --pool ] : solo가 아닌 다른 옵션으로 설정
- prefork, threads, gevent, eventlet, default: profork.
- 윈도우에서 테스트 했을 시 기본 gevent의 그린 스레드로 동작하는 것을 확인함 (2023/03/10)
- [ -c or -concurrency ] : 병렬처리할 수 있는 worker의 수를 2개 이상 설정
- group의 사용법에 대해 더 상세히 알고자 한다면 공식 사이트를 확인한다.
https://docs.celeryq.dev/en/latest/userguide/canvas.html#groups
- group 테스트
# 1에서 5까지 증가하는 두 인자 값을 더하기
tasks = [ add.s( i, i+1 ) for i in range(1, 5) ]
# subtask를 group 모듈로 그룹화
grouping = group( tasks )
# group화된 task를 실행
group_task = grouping() # 또는 grouping.apply_async()
print( "\n# 1. Task 확인" )
print( "tasks = {}".format( tasks ) )
print( "group_task.id = {}".format( group_task.id ) )
print( "group_task.type = {}".format( type(group_task) ) )
#결과
# 1. Task 확인
tasks = [task1.tasks.add.add(1, 2), task1.tasks.add.add(2, 3), task1.tasks.add.add(3, 4), task1.tasks.add.add(4, 5)]
group_task.id = e7312c79-6e67-47c5-923f-6f7cb6695284
group_task.type = <class 'celery.result.GroupResult'>
# 결과 값이 리스트로 반환된다. get으로 결과를 확인하면 완료된 순서대로 결과가 확인되고
# join으로 결과를 확인하면 호출한 순서대로 반환된다.
print( "\n# 3. 실행결과 확인" )
print( "몇건이 완료 되었는가? {}".format( group_task.completed_count() ) )
print( "완료된 결과를 반환. {}".format( group_task.get() ) )
print( "호출한 순서대로 반환. {}".format( group_task.join() ) )
print( "몇건이 완료 되었는가? {}".format( group_task.completed_count() ) )
#결과
# 3. 실행결과 확인
몇건이 완료 되었는가? 0
완료된 결과를 반환. [3, 5, 7, 9]
호출한 순서대로 반환. [3, 5, 7, 9]
몇건이 완료 되었는가? 4
print( "\n# 4. Task 상태 확인" )
print( "Subtask가 모두 준비되었는가? {}".format( group_task.ready() ) )
print( "모두 성공했는가? {}".format( group_task.successful() ) )
print( "실패가 있는가? {}".format( group_task.failed() ) )
#결과
# 4. Task 상태 확인
Subtask가 모두 준비되었는가? True
모두 성공했는가? True
실패가 있는가? False
- celery의 로그를 확인하면 모든 작업들이 한번에 received task 되고 이후 작업들이 수행되는 것을 확인할 수 있다.
- chord 테스트
- 그룹의 모든 작업이 완료된 후 실행되는 후처리 작업
- group에 다른 task를 연결하면 chord로 업그레이드 됨
예) group(tasks) | task
- 필수 사항으로 결과를 반환받을 callback이 있어야 함
- 주의사항
- 결과값을 무조건 받아야야 하기 때문에 ignore_result = True를 쓸 수 없다.
- Result Backend로 RabbitMQ/rpc 지원을 하지 않음 (버전업시 지원할 수 있기에 여부 확인이 필요함)
- 이러한 동기화 단계 작업들은 비용이 많이 들기 때문에 chord 사용은 될 수 있으면 피하는게 좋다.
- 상세한 내용은 공식 사이트 문서를 확인한다.
https://docs.celeryq.dev/en/latest/userguide/canvas.html#chords
- callback 함수가 필요하기 때문에 tasks/add.py 파일에 아래와 같이 함수를 정의해 준다.
# 콜백함수로 들어오는 인자값들의 결과를 더한다
@app.task
def callback(results):
return sum(results)
tasks = [ add.s( i, i+1 ) for i in range(1, 5) ]
chording = chord( tasks )
chord_task = chording( callback.s() )
print( "\n# 3. 실행결과 확인" )
print( "완료된 결과를 반환. {}".format( chord_task.get() ) )
#결과
# 3. 실행결과 확인
완료된 결과를 반환. 24
- chord에서는 group에서 사용했던 uccessful(), failed(), join() 등의 기능 사용이 제한된다.
3.5 ProgressRecorder를 이용한 celery 작업 프로그래스바 동적으로 그리기
1. celery progressrecorder : runserver를 이요한 웹 페이지 참고
- reference : https://www.youtube.com/watch?v=BbPswIqn2VI
- task1/tasks/progress.py 파일에 ProgressRecorder의 인스턴스를 생성해 set_progress를 통해 진행 과정을 업데이트 한다.
from time import sleep
from celery import Celery, shared_task
from celery_progress.backend import ProgressRecorder
from config.celery import app
@shared_task(bind=True)
def django_progress(self, duration):
progress_recorder = ProgressRecorder(self)
for i in range(100):
sleep(duration)
progress_recorder.set_progress(i + 1, 100, f"On iteration {i}")
return "Done"
- template에서 "celery_progress/celery_progress.js" 파일을 불러와 화면에 구현할 내용을 script에 입력한다.
<h1>Done!</h1>
<div class="progress-wrapper">
<div
id="progress-bar"
class="progress-bar"
style="background-color: #68a9ef; width: 0%"
>
</div>
</div>
<div id="progress-bar-message">Waiting for progress to start...</div>
<script src="{% static 'celery_progress/celery_progress.js' %}"></script>
<script>
var progressUrl = "{% url 'celery_progress:task_status' task_id %}";
function customResult(resultElement, result) {
$(resultElement).append(
$("<p>").text("Sum of all seconds is " + result)
);
}
CeleryProgressBar.initProgressBar(progressUrl, {
onResult: customResult,
});
</script>
- 이 작업 또한 동기화 단계의 작업이다. 각 celery는 작업 진행 중 현재 진행 결과를 result backend에 지속적으로 업데이트를 하게 된다.
- 컴퓨터/서버의 사양과 celery의 worker 수에 따라 성능 제한이 발생할 수 있다.
reference
https://heodolf.tistory.com/73
https://velog.io/@qlgks1/Django-Celery-MQ-message-que
https://velog.io/@sensemint_/1
'Study > django' 카테고리의 다른 글
django-redis의 hset 사용법에 대해 chatGPT에 물어봤을때 (0) | 2023.03.20 |
---|---|
django-redis의 cache 방법을 이해하자! (0) | 2023.03.18 |
django MQ 시리즈 2편 - task queue (2) :Learn Django Celery with RabbitMQ (0) | 2023.03.08 |
django MQ 시리즈 1편 - task queue (1) : Asynchronous Tasks With Django and Celery (1) | 2023.03.04 |
django에서 redis 사용하기 1편 (캐시 사용: 데이터 / 세션) (1) | 2023.02.16 |
댓글