Database/redis

django MQ 시리즈 4편 - celery with redis for pub/sub

bluebamus 2023. 10. 9.

1. django MQ 시리즈 1편 - task queue (1) : Asynchronous Tasks With Django and Celery

2. django MQ 시리즈 2편 - task queue (2) : Learn Django Celery with RabbitMQ

3. django MQ 시리즈 3편 - task queue (3) : Custom Study Project

4. django MQ 시리즈 4편 - celery with redis for pub/sub

 

오랜시간 지체되었던 redis의 channel을 이용한 pub/sub 기능을 구현해보고자 한다.

사실 django로 pub/sub을 구현하기 위해 관련한 예제나 포스팅을 국내외로 검색을 해봤지만, 

내 개인 기준으로 직관적으로 발행과 구독 일련의 과정을 django 코드로 구현한 코드를 확인할 수 없었다. 

 

또한 django에도 channel이라는 web socker 라이브러리가 있고 여기에서 redis의 pub/sub이 사용되고 있기에 참고를 하려 했으나 불필요한 코드가 많고 내부적으로 pub/sub 형태가 아닌 mq 형태로 사용하는 경우가 많았다.

 

이에 다음과 같은 목적으로 mini project를 만들고 이해하는 과정을 거쳤다.

 

프로젝트 요구 사항 :

1. celery를 사용하여 redis에서 mq 동작을 수행한다.

2. mq로 동작하는 함수를 이용하여 3개의 구독자를 생성한다.

3. for을 사용하여 10회 json으로 된 데이터를 쉬지 않고 발행한다.

 

프로젝트 수행 목적 :

1. celery를 사용하여 pub/sub 수행을 할 수 있는 기능 구현

2. 만약, 특정 함수의 실행 시간이 길어질 경우, 그때 발행된 데이터는 구독자에게 전달이 되는지 확인

   - redis의 channel을 이용한 pub/sub 기능은 데이터 유실에 대한 보장을 못한다고 정의되어 있다. 하지만 이 범위가 상세하게 설명되어 있지 않고 있기에 구독을 한 상태에서 지연이 발생한 경우에 대해 테스트를 해보고자 한다.

 

프로젝트 위치 :

https://github.com/bluebamus/django-mq-series

 

GitHub - bluebamus/django-mq-series: This repository covers mq and pus/sub learning projects for redis, rabbitmq, kafka, nats, e

This repository covers mq and pus/sub learning projects for redis, rabbitmq, kafka, nats, etc. - GitHub - bluebamus/django-mq-series: This repository covers mq and pus/sub learning projects for red...

github.com

 - 해당 저장소의 django-celery-redis-pubsub 프로젝트를 참고한다.

   - 구현된 프로젝트는 django-celery-redis-for-tasks 프로젝트를 기반으로 업데이트 및 수정 되었다.

 

서비스 동작 설명

 1. runserver 구동

 2. celery 구동

 3. http://127.0.0.1:8000/subscribe/ 도메인을 통해 3개의 구독자 생성

   - 각 구독작는 3개의 celery 함수로 구현되어 list를 이용해 계속 수신대기를 한다.

 4. http://127.0.0.1:8000/publish/ 도메인을 통해 10개의 json 데이터 발행

   - 각 수신된 데이터의 차이점을 식별하기 위해 1부터 10까지 증가하는 cnt를 json에 추가함

 

주요 코드 설명

 - task1/tasks/pub_sub.py

@shared_task()
def pub_sub3():
    r = redis.StrictRedis(host="127.0.0.1", port=6379, db=3)
    p = r.pubsub()
    p.psubscribe("notification*")
    # p.subscribe("notification.msg")
    print("start subscribe 3")
    for message in p.listen():
        print("message : ", message)
        # if message:
        #     print(message.get("type", ""))
        #     print(message.get("pattern", ""))
        #     print(message.get("channel", ""))
        #     print(message.get("data", ""))
        if message["type"] == "message":
            result = json.loads(message["data"])
            # print("result : ", result)
            with open("sub3.txt", "a", encoding="utf8") as file:
                file.writelines(result)

        if message["type"] == "subscribe":
            print("A new user is connected in pub_sub3")
        time.sleep(2)
    # while 1:
    #     message = p.get_message()
    #     if message:
    #         # Get data from message
    #         print("message raw : ", message)
    #         print("message : ", message["data"])
    #         if message["type"] == "message":
    #             result = json.loads(message["data"])
    #             print("result : ", result)

   - listen을 사용할 수 있고 get_message를 사용할 수도 있다. 

      - 테스트 결과 listen은 데이터가 수신된 상태에서 이후 코드를 수행하는데 get_message는 반복해서 데이터 확인을 하며 전체 코드를 계속 수행한다.

      - 윈도우에서 open으로 파일 생성이 안되서 로그로만 테스트했다

 

 - task1/urls.py

urlpatterns = [
    path("subscribe/", views.subscribe, name="subscribe"),
    path("publish/", views.publish, name="publish"),
]

 

 - task1/views.py

from django.shortcuts import render
from django.http import HttpResponse
import json
import redis
from .tasks.pub_sub import pub_sub1, pub_sub2, pub_sub3

redis_client = redis.StrictRedis(host="127.0.0.1", port=6379, db=3)


def subscribe(request):
    pub_sub1.delay()
    pub_sub2.delay()
    pub_sub3.delay()
    return HttpResponse("call subscribe users")



def publish_data_on_redis(json_data, channel_name):
    redis_client.publish(channel_name, message=json.dumps(json_data))


def publish(request):
    # pub_sub.delay()
    num = 0
    for i in range(1, 10):
        num = num + 1
        json_data = {
            "message": "Hello to all connected clients",
            "date": "2019-02-02",
            "title": "welcome",
            "command": "end",
            "cnt": str(num + 1),
        }
        publish_data_on_redis(json_data, "notification.msg")

    return HttpResponse("call publish")

 

* 이번 프로젝트를 수행하면서 다음과 같은 주요한 사항을 발견할 수 있었다.

   - 채널이 등록되면 psubscribe에 대한 메시지가 각 수신자에게 전송되며 아래와 같다.

   - 현재 구독 상태가 psubscribe이기 때문에 type에 psubscribe가 명시되어 있다. 이외 채널 정보가 있으며 data는 무조건 1로 전송된다. 이 메시지를 이용해 채널에 제대로 접속하여 구독된 상태인지 확인할 수 있다.

   - 이러한 정보를 검색을 통해 얻을 수 없었기에 첫 데이터가 제대로된 데이터가 수신이 왜 안되는지 헤맸다.

 

 - 데이터를 발행하게 되면 다음과 같은 결과를 확인할 수 있다.

   - sleep을 통해 데이터 전송에 대해 각각 정상, 1초 딜레이, 2초 딜레이를 수행시켰다.

   - 모든 구독자들은 딜레이 이후 10개에 대한 데이터를 누락 없이 전부 수신하였다.

   - 이 프로젝트의 결과로 구독상태에서 자체적으로 수신되는 데이터에 대해 버퍼링을 수행한다는 것을 확인할 수 있었다. redis pub/sub의 데이터 누락은 네트워크의 문제 등의 구독자의 지연과는 다른 이유를 통해 수신을 못하는 경우를 의미하는 것으로 생각된다.

   - kafka나 다른 솔루션의 경우 전송된 메시지의 수신보장을 위한 재전송 및 확인 기능을 가지고 있거나 전송 이후 메시지를 삭제하지 않고 전송 결과를 확인할 수 있는 방법을 제시하는 등의 기능이 있다. 하지만 redis channel은 이러한 기능을 제공하지 않는다. 이 차이를 두고 있다고 보면 될것 같다.

 

* 추가 내용

 - celery의 동작은 worker의 수에 제한되어 동작한다. 이에 실제 서비스를 만들기 위해서는 계속 listen 해야 하는 구독자의 경우 thread나 process의 데몬으로 만들어 구동하는것이 좋을것 같다. 

 - listen을 수행하는 데몬은 구독 상태를 폐기해야 하는 경우 종료되어야 한다. 이를 위해 command 항목을 json에 넣거나 특정 string 구문을 인지하게 만들어 데몬을 종료하게 만들 수 있다.

 

-  reference : 
http://redisgate.kr/redis/command/pubsub_intro.php

 

 

 

 

 

댓글