Study/django

django channels의 WebsocketConsumer/JsonWebsocketConsumer 클래스의 self 변수 정리

bluebamus 2025. 3. 31.

1. 기본 변수

   1.1. self.scope

      1.1.1. 역할:

         - 웹소켓 연결에 대한 모든 정보를 포함하는 딕셔너리입니다. 요청 정보, 사용자 정보, URL 경로 등이 포함됩니다.


      1.1.2. 사용 시기: 

         - 연결에 대한 메타데이터가 필요할 때 사용합니다. 인증, URL 파라미터 파싱, 클라이언트 식별 등에 유용합니다.

 

      1.1.1.  사용 방법:

from channels.generic.websocket import WebsocketConsumer

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        # scope에서 사용자 정보 가져오기
        self.user = self.scope["user"]
        
        # URL 라우트 파라미터 가져오기
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        
        # 클라이언트 IP 주소 가져오기
        self.client_ip = self.scope["client"][0]
        
        # HTTP 헤더 접근
        self.headers = dict(self.scope["headers"])
        
        # query_string 파싱
        import urllib.parse
        query_string = self.scope["query_string"].decode()
        self.query_params = dict(urllib.parse.parse_qsl(query_string))
        
        self.accept()

 

   1.2. self.channel_name

      1.2.1. 역할:

         - 현재 WebSocket 연결의 고유 채널 이름입니다.

 

      1.2.1.  사용 시기: 

         -  특정 연결에 직접 메시지를 보내거나, 그룹에 특정 채널을 추가/제거할 때 사용합니다.

 

      1.2.1.  사용 방법:

from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        self.room_group_name = f"chat_{self.room_name}"
        
        # 그룹에 채널 추가
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name,
            self.channel_name
        )
        
        self.accept()
    
    def disconnect(self, close_code):
        # 그룹에서 채널 제거
        async_to_sync(self.channel_layer.group_discard)(
            self.room_group_name,
            self.channel_name
        )

 

   1.3. self.channel_layer

      1.3.1. 역할:

         - 채널 레이어 인스턴스로, 다른 채널이나 그룹과 통신할 수 있게 해줍니다.

 

      1.3.1. 사용 시기:

         -  다른 Consumer 인스턴스로 메시지를 보내거나, 그룹 작업을 수행할 때 사용합니다.

 

      1.3.1. 사용 방법:

from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync

class NotificationConsumer(WebsocketConsumer):
    def connect(self):
        self.user_id = self.scope["user"].id
        self.user_group_name = f"user_{self.user_id}"
        
        async_to_sync(self.channel_layer.group_add)(
            self.user_group_name,
            self.channel_name
        )
        
        self.accept()
    
    def send_notification(self, event):
        # 그룹으로부터 받은 메시지 처리
        message = event["message"]
        
        # 웹소켓으로 메시지 전송
        self.send(text_data=json.dumps({
            "type": "notification",
            "message": message
        }))
    
    # 다른 위치(예: 뷰)에서 아래와 같이 사용 가능
    # from channels.layers import get_channel_layer
    # channel_layer = get_channel_layer()
    # async_to_sync(channel_layer.group_send)(
    #     f"user_{user_id}",
    #     {
    #         "type": "send_notification",
    #         "message": "새 알림이 있습니다!"
    #     }
    # )

 

2. 연결 관련 변수

   2.1. self.accept

      2.1.1. 역할:

         -  웹소켓 연결을 수락하는 메서드입니다.

 

      2.1.2. 사용 시기:

         -   connect 메서드에서 연결을 수락할 때 사용합니다.

 

      2.1.3. 사용 방법:

from channels.generic.websocket import WebsocketConsumer

class EchoConsumer(WebsocketConsumer):
    def connect(self):
        # 기본 수락
        self.accept()
        
        # 서브프로토콜 지정하여 수락 (선택적)
        # self.accept(subprotocol="custom-protocol")
        
        # 연결 후 초기 메시지 전송
        self.send(text_data="연결되었습니다!")

 

   2.2. self.close

      2.2.1. 역할:

         - 웹소켓 연결을 닫는 메서드입니다.

 

      2.2.2. 사용 시기:

         - 명시적으로 연결을 종료해야 할 때 사용합니다. 인증 실패, 비정상적인 동작 감지, 정상적인 종료 시나리오 등에 유용합니다.

 

      2.2.1. 사용 방법:

from channels.generic.websocket import WebsocketConsumer

class SecureConsumer(WebsocketConsumer):
    def connect(self):
        # 인증 확인
        if not self.scope["user"].is_authenticated:
            # 인증되지 않은 사용자는 연결을 닫음
            self.close(code=4003)
            return
        
        # 인증된 사용자는 연결 수락
        self.accept()
    
    def receive(self, text_data):
        data = json.loads(text_data)
        
        # 잘못된 요청 감지
        if "malicious_action" in data:
            self.close(code=4004)
            return

 

   2.3. self.send

      2.3.1. 역할:

         - 웹소켓을 통해 클라이언트에게 메시지를 보내는 메서드입니다.

 

      2.3.2. 사용 시기:

         - 클라이언트에게 텍스트 또는 바이너리 데이터를 보낼 때 사용합니다.

         - connect에서 사용하면 방금 연결된 client에게 전송합니다.

         - receive에서 사용하면, 방금 메시지를 보낸 client에게 전송합니다.

 

      2.3.3. 사용 방법:

from channels.generic.websocket import WebsocketConsumer
import json

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()
    
    def receive(self, text_data):
        # 텍스트 데이터 수신 및 처리
        data = json.loads(text_data)
        message = data["message"]
        
        # 텍스트 데이터 보내기
        self.send(text_data=json.dumps({
            "type": "chat.message",
            "message": message
        }))
        
        # 바이너리 데이터 보내기 (예: 파일)
        # self.send(bytes_data=binary_content)

 

3. 그룹 및 채널 관련 변수

   3.1. self.groups

      3.1.1. 역할:

         - Consumer가 자동으로 가입할 그룹 목록입니다. 클래스 변수로 정의하면 Consumer 인스턴스가 생성될 때 자동으로 그룹에 가입됩니다.

 

      3.1.2. 사용 시기:

         -  고정된 그룹에 자동으로 가입해야 할 때 사용합니다.

 

      3.1.3. 사용 방법:

from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync

class BroadcastConsumer(WebsocketConsumer):
    # 자동 그룹 가입 (고정 그룹)
    groups = ["broadcast"]
    
    def connect(self):
        self.accept()
    
    # broadcast 그룹으로부터 메시지 수신
    def broadcast_message(self, event):
        message = event["message"]
        
        # 클라이언트에게 메시지 전달
        self.send(text_data=json.dumps({
            "message": message
        }))

 

   3.2. self.base_send

      3.2.1. 역할:

         - 내부적으로 사용되는 저수준 메시지 전송 메서드입니다.


      3.2.2. 사용 시기: 

         - 일반적으로 직접 사용하지 않으며, self.send를 사용하는 것이 권장됩니다. 커스텀 프로토콜 구현 시 오버라이드할 수 있습니다.


      3.2.3. 사용 방법:

from channels.generic.websocket import WebsocketConsumer

class CustomProtocolConsumer(WebsocketConsumer):
    def base_send(self, message):
        # 기본 송신 메서드를 오버라이드
        # 메시지를 수정하거나 로깅
        print(f"Sending: {message}")
        super().base_send(message)

 

4. 메시지 처리 관련 변수

   4.1.  self.receive

      4.1.1. 역할:

         - 클라이언트로부터 메시지를 수신하는 메서드입니다. WebsocketConsumer에서는 이 메서드를 구현하여 메시지를 처리합니다.

 

      4.1.2. 사용 시기:

         - 클라이언트로부터 메시지를 수신하고 처리해야 할 때 사용합니다.

 

      4.1.3. 사용 방법:

from channels.generic.websocket import WebsocketConsumer
import json

class CommandConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()
    
    def receive(self, text_data=None, bytes_data=None):
        # 텍스트 메시지 처리
        if text_data:
            data = json.loads(text_data)
            command = data.get("command")
            
            if command == "ping":
                self.send(text_data=json.dumps({"response": "pong"}))
            elif command == "echo":
                self.send(text_data=text_data)
            else:
                self.send(text_data=json.dumps({"error": "Unknown command"}))
        
        # 바이너리 메시지 처리
        if bytes_data:
            # 바이너리 데이터 처리 (예: 파일 업로드)
            self.send(bytes_data=bytes_data)  # 에코 예시

 

   4.2. self.encode_json 및 self.decode_json (JsonWebsocketConsumer 전용)

      4.2.1. 역할:

         - JSON 데이터를 인코딩하고 디코딩하는 메서드입니다. JsonWebsocketConsumer에서만 사용 가능합니다.

 

      4.2.2. 사용 시기:

         - 기본 JSON 인코딩/디코딩 동작을 커스터마이즈하려는 경우에 사용합니다.

 

      4.2.3. 사용 방법:

from channels.generic.websocket import JsonWebsocketConsumer
import json

class CustomJsonConsumer(JsonWebsocketConsumer):
    def encode_json(self, content):
        # JSON 인코딩 방식을 커스터마이즈
        # 예: 특정 포맷으로 변환
        if "timestamp" not in content:
            content["timestamp"] = datetime.now().isoformat()
        
        return json.dumps(content)
    
    def decode_json(self, text_data):
        # JSON 디코딩 방식을 커스터마이즈
        data = json.loads(text_data)
        
        # 수신된 데이터 검증
        if "action" not in data:
            data["action"] = "unknown"
            
        return data
    
    def receive_json(self, content):
        # JsonWebsocketConsumer는 receive 대신 receive_json 사용
        action = content["action"]
        
        if action == "message":
            self.send_json({
                "type": "response",
                "message": content.get("message", "")
            })

 

   4.3. self.receive_json (JsonWebsocketConsumer 전용)

      4.3.1. 역할:

         - 클라이언트로부터 JSON 메시지를 수신하는 메서드입니다. JsonWebsocketConsumer에서 사용합니다.

 

      4.3.2. 사용 시기:

         - JsonWebsocketConsumer를 사용할 때 메시지 처리 로직을 구현하기 위해 사용합니다.

 

      4.3.3. 사용 방법:

from channels.generic.websocket import JsonWebsocketConsumer

class ChatJsonConsumer(JsonWebsocketConsumer):
    def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        self.room_group_name = f"chat_{self.room_name}"
        
        # 그룹에 가입
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name,
            self.channel_name
        )
        
        self.accept()
    
    def disconnect(self, close_code):
        # 그룹에서 탈퇴
        async_to_sync(self.channel_layer.group_discard)(
            self.room_group_name,
            self.channel_name
        )
    
    def receive_json(self, content):
        # JSON 데이터 수신 및 처리
        message = content["message"]
        user = self.scope["user"].username
        
        # 그룹에 메시지 전송
        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name,
            {
                "type": "chat.message",
                "message": message,
                "user": user
            }
        )
    
    def chat_message(self, event):
        # 그룹으로부터 메시지 수신
        message = event["message"]
        user = event["user"]
        
        # 클라이언트에게 JSON 메시지 전송
        self.send_json({
            "message": message,
            "user": user
        })

 

 

   4.4. self.send_json (JsonWebsocketConsumer 전용)

      4.4.1. 역할:

         - 클라이언트에게 JSON 메시지를 전송하는 메서드입니다. JsonWebsocketConsumer에서 사용합니다.

 

      4.4.2. 사용 시기:

         - JsonWebsocketConsumer에서 클라이언트에게 구조화된 데이터를 보낼 때 사용합니다.

 

      4.4.3. 사용 방법:

from channels.generic.websocket import JsonWebsocketConsumer

class StockTickerConsumer(JsonWebsocketConsumer):
    def connect(self):
        self.accept()
        
        # 연결 후 초기 데이터 전송
        self.send_json({
            "type": "connection_established",
            "message": "실시간 주식 정보에 연결되었습니다."
        })
    
    def receive_json(self, content):
        # 요청된 종목 코드
        stock_code = content.get("stock_code")
        
        if stock_code:
            # 주식 정보 조회 (예시)
            stock_info = self._get_stock_info(stock_code)
            
            # JSON 응답 전송
            self.send_json({
                "type": "stock_update",
                "stock_code": stock_code,
                "price": stock_info["price"],
                "change": stock_info["change"],
                "timestamp": stock_info["timestamp"]
            })
    
    def _get_stock_info(self, code):
        # 실제로는 외부 API 호출 등으로 구현
        return {
            "price": 50000,
            "change": 1.5,
            "timestamp": "2024-03-30T14:30:00"
        }

 

5. 메타데이터 및 상태 관련 변수

   5.1. self.application_state

      5.1.1. 역할:

         - 채널 레이어의 애플리케이션 상태에 접근하는 프로퍼티입니다.

 

      5.1.2. 사용 시기:

         - 여러 Consumer 인스턴스 간에 공유해야 하는 전역 상태가 필요할 때 사용합니다.

 

      5.1.3. 사용 방법:

from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync

class GlobalStateConsumer(WebsocketConsumer):
    def connect(self):
        # 전역 상태에 접근
        if "connected_users" not in self.application_state:
            self.application_state["connected_users"] = set()
        
        # 사용자 ID
        user_id = self.scope["user"].id
        self.application_state["connected_users"].add(user_id)
        
        # 접속자 수 계산
        user_count = len(self.application_state["connected_users"])
        
        self.accept()
        
        # 접속자 수 전송
        self.send(text_data=json.dumps({
            "type": "stats",
            "connected_users": user_count
        }))
    
    def disconnect(self, close_code):
        # 연결 종료 시 상태 업데이트
        user_id = self.scope["user"].id
        if "connected_users" in self.application_state:
            self.application_state["connected_users"].discard(user_id)

 

   5.2. self.url_route

      5.2.1. 역할:

         - 웹소켓 URL 라우팅 정보를 담고 있는 변수입니다. self.scope["url_route"]의 단축 버전입니다.


      5.2.2. 사용 시기: 

         - URL 경로 매개변수에 접근해야 할 때 사용합니다.


      5.2.3. 사용 방법:

from channels.generic.websocket import WebsocketConsumer

class GameConsumer(WebsocketConsumer):
    def connect(self):
        # URL 라우트에서 매개변수 가져오기
        self.game_id = self.url_route["kwargs"]["game_id"]
        self.player_id = self.url_route["kwargs"]["player_id"]
        
        self.game_group_name = f"game_{self.game_id}"
        
        # 그룹에 가입
        async_to_sync(self.channel_layer.group_add)(
            self.game_group_name,
            self.channel_name
        )
        
        self.accept()
        
        # 플레이어 입장 알림
        async_to_sync(self.channel_layer.group_send)(
            self.game_group_name,
            {
                "type": "player.joined",
                "player_id": self.player_id
            }
        )

 

   5.3. self.consumer_ready

      5.3.1. 역할:

         - 소비자 인스턴스가 연결 준비 완료 여부를 나타내는 내부 플래그입니다.

 

      5.3.2. 사용 시기:

         - 일반적으로 직접 사용하지 않으며, 내부적으로 연결 상태 추적에 사용됩니다.

 

      5.3.3. 사용 방법:

from channels.generic.websocket import WebsocketConsumer

class CustomConsumer(WebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 커스텀 초기화 코드
        self.initialized = False
    
    def connect(self):
        self.accept()
        
        # Consumer가 준비 완료 상태인지 확인
        if self.consumer_ready:
            # 초기화 수행
            self._initialize()
    
    def _initialize(self):
        if not self.initialized:
            self.initialized = True
            self.send(text_data="초기화 완료")

 

   5.4. self.encoding (WebsocketConsumer)

      5.4.1. 역할:

         - WebsocketConsumer에서 사용할 기본 텍스트 인코딩을 지정합니다.

 

      5.4.2. 사용 시기:

         - 기본값인 'utf-8'이 아닌 다른 인코딩을 사용해야 할 때 오버라이드합니다.

 

      5.4.3. 사용 방법:

from channels.generic.websocket import WebsocketConsumer

class CustomEncodingConsumer(WebsocketConsumer):
    # 클래스 변수로 인코딩 지정
    encoding = 'latin1'
    
    def connect(self):
        self.accept()
        
        # 인코딩 사용 예시
        special_chars = "ñáéíóú"
        self.send(text_data=special_chars)  # latin1 인코딩으로 전송됨

 

6. 추가 사용 패턴 예시

   6.1. 비동기 Consumer 사용하기

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class AsyncChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        self.room_group_name = f"chat_{self.room_name}"
        
        # 그룹에 가입
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        
        await self.accept()
    
    async def disconnect(self, close_code):
        # 그룹에서 탈퇴
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )
    
    async def receive(self, text_data):
        data = json.loads(text_data)
        message = data["message"]
        user = self.scope["user"].username
        
        # 그룹에 메시지 전송
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                "type": "chat_message",
                "message": message,
                "user": user
            }
        )
    
    async def chat_message(self, event):
        message = event["message"]
        user = event["user"]
        
        # 클라이언트에게 메시지 전송
        await self.send(text_data=json.dumps({
            "message": message,
            "user": user
        }))

 

   6.2. 인증 및 권한 확인 패턴

from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync

class SecureResourceConsumer(WebsocketConsumer):
    def connect(self):
        # 사용자 인증 확인
        if not self.scope["user"].is_authenticated:
            self.close(code=4001)
            return
        
        # 리소스 접근 권한 확인
        self.resource_id = self.scope["url_route"]["kwargs"]["resource_id"]
        if not self._has_access_to_resource(self.resource_id):
            self.close(code=4003)
            return
        
        # 리소스 그룹 가입
        self.resource_group = f"resource_{self.resource_id}"
        async_to_sync(self.channel_layer.group_add)(
            self.resource_group,
            self.channel_name
        )
        
        self.accept()
    
    def _has_access_to_resource(self, resource_id):
        # 실제 구현에서는 사용자의 권한을 확인
        from myapp.models import Resource
        try:
            resource = Resource.objects.get(id=resource_id)
            return resource.has_access(self.scope["user"])
        except Resource.DoesNotExist:
            return False

 

   6.3. 핑/퐁 유지 메커니즘

from channels.generic.websocket import JsonWebsocketConsumer
import asyncio
import time

class PingPongConsumer(JsonWebsocketConsumer):
    def connect(self):
        self.last_ping = time.time()
        self.ping_timeout = 30  # 30초
        self.accept()
        
        # 핑 체크 태스크 시작
        self.start_ping_check()
    
    def receive_json(self, content):
        if content.get("type") == "ping":
            # 핑 수신 시 시간 업데이트
            self.last_ping = time.time()
            # 퐁 응답
            self.send_json({"type": "pong"})
        else:
            # 다른 메시지 처리
            self.handle_message(content)
    
    def start_ping_check(self):
        # 백그라운드 태스크로 실행
        from threading import Thread
        self.ping_thread = Thread(target=self._ping_check_loop)
        self.ping_thread.daemon = True
        self.ping_thread.start()
    
    def _ping_check_loop(self):
        while True:
            time.sleep(5)  # 5초마다 확인
            
            # 연결 종료 조건
            if not self.consumer_ready:
                break
                
            # 타임아웃 확인
            if time.time() - self.last_ping > self.ping_timeout:
                print(f"연결 {self.channel_name} 타임아웃")
                # 메인 스레드에서 close를 호출해야 함
                from asgiref.sync import async_to_sync
                async_to_sync(self._close_from_thread)()
                break
    
    async def _close_from_thread(self):
        # 다른 스레드에서 호출할 때 사용하는 비동기 메서드
        await self.close(code=4008)  # 타임아웃 코드
    
    def handle_message(self, content):
        # 일반 메시지 처리 로직
        pass

 

- reference : 

https://testdriven.io/blog/django-channels/

 

Introduction to Django Channels

This tutorial shows how to use Django Channels to create a real-time application.

testdriven.io

 

 

댓글