django MQ 시리즈 4편 - celery with redis for pub/sub
1. django MQ 시리즈 1편 - task queue (1) : Asynchronous Tasks With Django and Celery
2. django MQ 시리즈 2편 - task queue (2) : Learn Django Celery with RabbitMQ
3. django MQ 시리즈 3편 - task queue (3) : Custom Study Project
4. django MQ 시리즈 4편 - celery with redis for pub/sub
오랜시간 지체되었던 redis의 channel을 이용한 pub/sub 기능을 구현해보고자 한다.
사실 django로 pub/sub을 구현하기 위해 관련한 예제나 포스팅을 국내외로 검색을 해봤지만,
내 개인 기준으로 직관적으로 발행과 구독 일련의 과정을 django 코드로 구현한 코드를 확인할 수 없었다.
또한 django에도 channel이라는 web socker 라이브러리가 있고 여기에서 redis의 pub/sub이 사용되고 있기에 참고를 하려 했으나 불필요한 코드가 많고 내부적으로 pub/sub 형태가 아닌 mq 형태로 사용하는 경우가 많았다.
이에 다음과 같은 목적으로 mini project를 만들고 이해하는 과정을 거쳤다.
프로젝트 요구 사항 :
1. celery를 사용하여 redis에서 mq 동작을 수행한다.
2. mq로 동작하는 함수를 이용하여 3개의 구독자를 생성한다.
3. for을 사용하여 10회 json으로 된 데이터를 쉬지 않고 발행한다.
프로젝트 수행 목적 :
1. celery를 사용하여 pub/sub 수행을 할 수 있는 기능 구현
2. 만약, 특정 함수의 실행 시간이 길어질 경우, 그때 발행된 데이터는 구독자에게 전달이 되는지 확인
- redis의 channel을 이용한 pub/sub 기능은 데이터 유실에 대한 보장을 못한다고 정의되어 있다. 하지만 이 범위가 상세하게 설명되어 있지 않고 있기에 구독을 한 상태에서 지연이 발생한 경우에 대해 테스트를 해보고자 한다.
프로젝트 위치 :
https://github.com/bluebamus/django-mq-series
- 해당 저장소의 django-celery-redis-pubsub 프로젝트를 참고한다.
- 구현된 프로젝트는 django-celery-redis-for-tasks 프로젝트를 기반으로 업데이트 및 수정 되었다.
서비스 동작 설명
1. runserver 구동
2. celery 구동
3. http://127.0.0.1:8000/subscribe/ 도메인을 통해 3개의 구독자 생성
- 각 구독작는 3개의 celery 함수로 구현되어 list를 이용해 계속 수신대기를 한다.
4. http://127.0.0.1:8000/publish/ 도메인을 통해 10개의 json 데이터 발행
- 각 수신된 데이터의 차이점을 식별하기 위해 1부터 10까지 증가하는 cnt를 json에 추가함
주요 코드 설명
- task1/tasks/pub_sub.py
@shared_task()
def pub_sub3():
r = redis.StrictRedis(host="127.0.0.1", port=6379, db=3)
p = r.pubsub()
p.psubscribe("notification*")
# p.subscribe("notification.msg")
print("start subscribe 3")
for message in p.listen():
print("message : ", message)
# if message:
# print(message.get("type", ""))
# print(message.get("pattern", ""))
# print(message.get("channel", ""))
# print(message.get("data", ""))
if message["type"] == "message":
result = json.loads(message["data"])
# print("result : ", result)
with open("sub3.txt", "a", encoding="utf8") as file:
file.writelines(result)
if message["type"] == "subscribe":
print("A new user is connected in pub_sub3")
time.sleep(2)
# while 1:
# message = p.get_message()
# if message:
# # Get data from message
# print("message raw : ", message)
# print("message : ", message["data"])
# if message["type"] == "message":
# result = json.loads(message["data"])
# print("result : ", result)
- listen을 사용할 수 있고 get_message를 사용할 수도 있다.
- 테스트 결과 listen은 데이터가 수신된 상태에서 이후 코드를 수행하는데 get_message는 반복해서 데이터 확인을 하며 전체 코드를 계속 수행한다.
- 윈도우에서 open으로 파일 생성이 안되서 로그로만 테스트했다
- task1/urls.py
urlpatterns = [
path("subscribe/", views.subscribe, name="subscribe"),
path("publish/", views.publish, name="publish"),
]
- task1/views.py
from django.shortcuts import render
from django.http import HttpResponse
import json
import redis
from .tasks.pub_sub import pub_sub1, pub_sub2, pub_sub3
redis_client = redis.StrictRedis(host="127.0.0.1", port=6379, db=3)
def subscribe(request):
pub_sub1.delay()
pub_sub2.delay()
pub_sub3.delay()
return HttpResponse("call subscribe users")
def publish_data_on_redis(json_data, channel_name):
redis_client.publish(channel_name, message=json.dumps(json_data))
def publish(request):
# pub_sub.delay()
num = 0
for i in range(1, 10):
num = num + 1
json_data = {
"message": "Hello to all connected clients",
"date": "2019-02-02",
"title": "welcome",
"command": "end",
"cnt": str(num + 1),
}
publish_data_on_redis(json_data, "notification.msg")
return HttpResponse("call publish")
* 이번 프로젝트를 수행하면서 다음과 같은 주요한 사항을 발견할 수 있었다.
- 채널이 등록되면 psubscribe에 대한 메시지가 각 수신자에게 전송되며 아래와 같다.
- 현재 구독 상태가 psubscribe이기 때문에 type에 psubscribe가 명시되어 있다. 이외 채널 정보가 있으며 data는 무조건 1로 전송된다. 이 메시지를 이용해 채널에 제대로 접속하여 구독된 상태인지 확인할 수 있다.
- 이러한 정보를 검색을 통해 얻을 수 없었기에 첫 데이터가 제대로된 데이터가 수신이 왜 안되는지 헤맸다.
- 데이터를 발행하게 되면 다음과 같은 결과를 확인할 수 있다.
- sleep을 통해 데이터 전송에 대해 각각 정상, 1초 딜레이, 2초 딜레이를 수행시켰다.
- 모든 구독자들은 딜레이 이후 10개에 대한 데이터를 누락 없이 전부 수신하였다.
- 이 프로젝트의 결과로 구독상태에서 자체적으로 수신되는 데이터에 대해 버퍼링을 수행한다는 것을 확인할 수 있었다. redis pub/sub의 데이터 누락은 네트워크의 문제 등의 구독자의 지연과는 다른 이유를 통해 수신을 못하는 경우를 의미하는 것으로 생각된다.
- kafka나 다른 솔루션의 경우 전송된 메시지의 수신보장을 위한 재전송 및 확인 기능을 가지고 있거나 전송 이후 메시지를 삭제하지 않고 전송 결과를 확인할 수 있는 방법을 제시하는 등의 기능이 있다. 하지만 redis channel은 이러한 기능을 제공하지 않는다. 이 차이를 두고 있다고 보면 될것 같다.
* 추가 내용
- celery의 동작은 worker의 수에 제한되어 동작한다. 이에 실제 서비스를 만들기 위해서는 계속 listen 해야 하는 구독자의 경우 thread나 process의 데몬으로 만들어 구동하는것이 좋을것 같다.
- listen을 수행하는 데몬은 구독 상태를 폐기해야 하는 경우 종료되어야 한다. 이를 위해 command 항목을 json에 넣거나 특정 string 구문을 인지하게 만들어 데몬을 종료하게 만들 수 있다.
- reference :
http://redisgate.kr/redis/command/pubsub_intro.php
'Database > redis' 카테고리의 다른 글
실무에서 redis를 사용한 시스템 구축 사례 정리 (0) | 2023.10.08 |
---|---|
redis pub/sub 이해하기 (0) | 2023.04.29 |
우분투 22.04 redis 관리 툴 설치하기 redis-desktop-manager (0) | 2023.03.16 |
댓글