솔루션/RabbitMQ

[Udemy] Learn How to use RabbitMQ for establishing your messaging framework

bluebamus 2025. 6. 6.

강좌 정보 : https://www.udemy.com/course/rabbitmq-ss/

 

1. Introduction

   1.1. rabbitmq 관리 플러그인 설치 - rabbitmq_management

      - rabbitmq command prompt 실행

      - 설치 명령어 입력

rabbitmq-plugins enable rabbitmq_management

 

      - 관리 웹서버 접속

localhost:15672

 

   1.2.  AMQP –Advanced Message Queuing Protocol

      1.2.1. AMQP의 주요 특징 

항목 설명
표준화된 프로토콜 ISO/IEC 19464 및 OASIS 공식 표준TCP 기반의 바이너리 프로토콜
메시지 지향 구조 메시지 단위로 통신함 (파일/스트림 전송 아님)
라우팅 기능 내장 Exchange와 Binding 개념을 이용한 유연한 메시지 라우팅
신뢰성 보장 메시지 영속성(persistent), 트랜잭션, ACK/NACK 지원으로 메시지 손실 방지
트랜잭션 지원 여러 메시지 처리의 원자성을 보장 (commit/rollback)
플로우 컨트롤 소비자의 처리 능력에 따라 메시지 흐름 제어 가능 (QoS)
보안 TLS 암호화 및 인증 지원 가능
언어/플랫폼 독립성 다양한 언어에서 AMQP 클라이언트 라이브러리 사용 가능 (Java, Python, C#, Go 등)
비동기 통신 발신자와 수신자의 독립적 처리 가능 → 느슨한 결합 구조 구성에 유리

 

      1.2.2. AMQP 메시징 모델의 핵심 구성요소 

구성요소 설명
Producer 메시지를 생성하고 발행
Exchange 메시지를 적절한 큐로 라우팅 (direct, fanout, topic, headers 등)
Queue 메시지가 저장되고 소비자를 기다리는 공간
Consumer 메시지를 큐에서 받아 처리
Binding Exchange와 Queue 사이의 연결 관계 (Routing Key 기반)
Channel 하나의 연결 안에서 다중 논리 스트림 (경량화된 메시지 통신 경로)
Connection TCP/IP를 통한 클라이언트와 브로커 간 물리적 연결

 

      1.2.3. AMQP vs 기타 메시징 프로토콜

항목 AMQP MQTT Kafka (비교 참고용)
표준화 ✔️ ISO/OASIS 표준 ✔️ OASIS 표준 ❌ 자체 프로토콜
보장 수준 Exactly-once, At-least-once 등 At-most-once, At-least-once At-least-once, Exactly-once (옵션)
사용 대상 기업 메시징 시스템 IoT 기기, 센서 대규모 데이터 스트림
메시지 모델 큐 기반, 교환기 + 큐 구조 Pub/Sub 로그 기반 분산 큐
트랜잭션 ✔️ 지원 ❌ 미지원 ✔️ 일부 지원
라우팅 ✔️ 강력함 (Topic, Header, Pattern 등) 간단한 Topic 기반 없음 (Partition 기반)

 

   1.3. RabbitMQ 정리

      1.3.1. 메시지 브로커 시스템의 구조

Publisher → Exchange → Queue → Subscriber

 

         - Publisher: 메시지를 발행
         - Exchange: 메시지를 큐로 라우팅
         - Queue: 메시지를 저장하고 대기
         - Subscriber: 메시지를 소비

 

      1.3.2. 주요 장점 요약

         1.3.2.1. 발행자(Publisher)는 수신자(Subscriber)의 물리적 주소를 알 필요가 없다
            - 단지 구독자의 논리적 식별자 (예: 이름)만 있으면 됨

            - 시스템 간 느슨한 결합(loose coupling) 가능

 

         1.3.2.2. 구독자 교체 가능

            - 하나의 구독자가 사라져도 다른 구독자가 동일 큐를 이어받아 처리 가능

 

         1.3.2.3. 구독자 수를 유연하게 조절 가능

            - 부하가 증가하면 큐에 연결된 구독자 수를 늘려 처리 분산 가능

 

         1.3.2.4. 브로커 재시작에도 메시지 보존 가능

            - 메시지를 영속성(persistent) 설정하면, 브로커가 재시작되더라도 메시지가 손실되지 않음

 

      1.3.3. rabbitmq에서 connection과 channel의 차이

         1.3.3.1. Connection

            - RabbitMQ 브로커와 클라이언트 간의 실제 물리적 TCP 연결
            - 특징:

               - TCP/IP 사용
               - 인증(Authentication)을 요구 (username/password, 인증서 등)
               - TLS를 이용한 암호화 가능 (보안)
            - 1개의 Connection은 다수의 Channel을 포함 가능
            - 비싸고 무겁기 때문에 남발을 피해야 함

 

         1.3.3.2. Channel

            - Connection 위에 생성되는 가볍고 논리적인 통신 단위
            - 특징:

               - TCP 연결을 공유하는 가벼운 스트림
               - 1개의 Connection 위에 수십~수백 개의 Channel 생성 가능
               - 예: 멀티스레드 앱의 각 스레드는 자신의 Channel을 가질 수 있음
               - Connection이 닫히면 하위 Channel도 모두 닫힘
               - Channel은 반드시 Connection 위에서만 존재 가능
               - 하나의 Channel은 큐 선언, 바인딩, 메시지 발행/소비 등을 독립적으로 수행

 

         1.3.3.3. 정리: 차이점 비교 표

항목 Connection Channel
정의 클라이언트 ↔ 브로커 간의 물리적 연결 Connection 내의 가벼운 논리적 통신 경로
프로토콜 TCP, TLS AMQP (Application level)
자원 소모 무거움 (많이 만들면 브로커에 부담) 가벼움 (실제 사용 권장 단위)
수명 관계 닫히면 모든 Channel도 닫힘 Connection 위에 종속됨
개수 제한 일반적으로 소수 (~수십 개 적정) 수백 개 이상도 가능
사용 목적 전체 연결 세션 유지 메시지 발행/소비, 큐/익스체인지 관리
실무 팁 최소화, 재사용 권장 멀티스레드 분리 등 유연한 활용 가능

 

   1.4. 주요 RabbitMQ 관련 함수 및 옵션

      1.4.1. exchange_declare(...) 주요 옵션

옵션 설명 기본값
exchange 익스체인지 이름 필수
exchange_type 'direct', 'fanout', 'topic', 'headers' 중 하나 필수
durable True 시 서버 재시작 후에도 유지됨 FALSE
auto_delete 마지막 바인딩이 해제되면 삭제 FALSE
passive True면 존재 확인만 수행, 없으면 에러 발생 FALSE
internal 서버 내부 메시징용 (직접 publish 불가) FALSE
arguments 고급 설정 (alternate-exchange, DLX 등) None

 

      1.4.2. basic_publish(...) 주요 옵션

옵션 설명
exchange 발행할 대상 익스체인지
routing_key 메시지 라우팅 키 (fanout에선 무시됨)
body 메시지 본문
properties=BasicProperties(...) 메시지 속성 (delivery_mode, headers 등) 설정

 

      1.4.3. exchange_delete(...) 주요 옵션

옵션 설명 기본값
exchange 삭제할 익스체인지 이름 필수
if_unused True면 바인딩된 큐가 없을 경우에만 삭제 FALSE

 

      1.4.4. queue_declare(...) 주요 옵션

파라미터 설명
queue='' 빈 문자열이면 서버가 임의의 고유 이름 생성
exclusive=True 연결 전용 큐, 연결 종료 시 삭제됨
durable=False False면 서버 재시작 시 삭제
auto_delete=False 마지막 소비자가 끊겨도 삭제 안 함

 

      1.4.5. basic_consume(...) 주요 옵션

파라미터 설명
queue 수신할 큐 이름
on_message_callback 메시지 수신 시 호출될 함수
auto_ack=True 수신 즉시 ACK 자동 전송 (False로 설정 시 수동 ack 필요)

 

      1.4.6. pika.BasicProperties 상세 설명

         - pika.BasicProperties는 메시지를 RabbitMQ에 발행할 때 사용할 수 있는 메타데이터들을 정의하는 클래스이다.

pika.BasicProperties(
    content_type='text/plain',
    content_encoding='utf-8',
    headers={'key': 'value'},
    delivery_mode=2,
    priority=0,
    correlation_id='abc123',
    reply_to='response_queue',
    expiration='60000',
    message_id='msg-001',
    timestamp=int(time.time()),
    type='my_type',
    user_id='guest',
    app_id='my_app'
)

 

         - 주요 옵션 설명:

옵션 설명
content_type MIME 타입 (예: 'text/plain', 'application/json')
content_encoding 인코딩 방식 (예: 'utf-8')
headers 사용자 정의 헤더 딕셔너리 (예: {'key': 'value'})
delivery_mode 메시지의 영속성 (1: non-persistent, 2: persistent)
priority 메시지 우선순위 (0~9, 큐에 따라 지원 여부 다름)
correlation_id 요청-응답(RPC) 메시지 추적용 고유 ID
reply_to 응답을 받을 큐 이름 (RPC 패턴에서 사용)
expiration 메시지 만료 시간 (ms 단위 문자열)
message_id 메시지 고유 ID
timestamp 메시지 발송 시간 (Unix timestamp, int)
type 메시지의 타입 또는 목적 설명용 문자열
user_id 메시지를 보낸 사용자 식별자
app_id 메시지를 보낸 애플리케이션 식별자

 

      1.4.7. queue_declare(...) 주요 옵션

옵션 설명
queue 큐 이름
durable True면 서버 재시작 시에도 큐 유지
exclusive True면 채널이 닫히면 큐 삭제됨 (1개 연결만 허용)
auto_delete True면 사용 후 자동 삭제됨 (큐 사용자가 없어질 때 삭제)

 

      1.4.8. queue_bind(...)

주요 옵션

         - 큐를 교환기와 라우팅 키로 연결
         - Direct/Topic exchange에서는 라우팅 키가 중요

         - 큐를 특정 Exchange에 바인딩하여 메시지를 수신할 수 있도록 연결한다.

파라미터 필수 설명
queue ✅ Yes 바인딩할 큐의 이름
exchange ✅ Yes 메시지를 수신할 교환기 이름
routing_key ❌ No (기본: '') 해당 큐로 전달될 메시지를 식별하는 키
arguments ❌ No 헤더 익스체인지에 사용하는 추가 필터 정보 (딕셔너리)
channel.queue_bind(exchange='logs_exchange', queue='task_queue', routing_key='Error')

 

      1.4.9. basic_qos(...) 주요 옵션

         - 메시지 소비 속도를 조절하여 동시 처리량을 제한할 수 있다 (Load Control 목적).

파라미터 필수 설명
prefetch_size ❌ No (기본: 0) 바이트 기준 한 번에 가져올 메시지 크기 (0은 무제한)
prefetch_count ❌ No (기본: 0) 한 번에 처리 가능한 메시지 개수. 일반적으로 1 사용
global ❌ No (기본: False) True이면 채널 전체, False면 현재 소비자에만 적용

 

         - prefetch_count=1을 설정하면 consumer가 ack 하기 전까지는 다음 메시지를 전달하지 않음 → 부하 분산 효과.

channel.basic_qos(prefetch_count=1)

 

      1.4.10. basic_ack(...) 주요 옵션

         - 수동 ack 모드에서 처리 완료된 메시지를 RabbitMQ에 명시적으로 알림으로써 재전송 방지.

파라미터 필수 설명
delivery_tag ✅ Yes 수신한 메시지의 고유 ID (method.delivery_tag에서 얻음)
multiple ❌ No (기본: False) True면 delivery_tag 이하의 모든 메시지에 대해 ack 처리 (batch ack)

 

         -  multiple=True는 batch ack를 위해 사용되며, True 설정 시 delivery_tag 이하 모든 메시지가 일괄 확인됨.

ch.basic_ack(delivery_tag=method.delivery_tag)

 

2. Basic Communication - Default Exchange

   2.1. RabbitMQ 익스체인지 타입 및 사용 정리

      - Direct Exchange, Fanout Exchange, Topic Exchange, Headers Exchange 에 대해 정리한다.

 

   2.2. Direct Exchange (직접 익스체인지)

      2.2.1. 개념

         - 메시지의 라우팅 키와 큐의 바인딩 키가 정확히 일치할 경우 메시지를 전달한다.

 

      2.2.2. 사용 사례

         - 로그 레벨별 큐 분리 (Error, Info, Warning)

 

      2.2.3. 예제 코드 (Python pika)

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.queue_declare(queue='error_queue')
channel.queue_bind(exchange='direct_logs', queue='error_queue', routing_key='error')

channel.basic_publish(exchange='direct_logs', routing_key='error', body='Error Message')

 

      2.2.4. 실무 예시 코드

         2.2.4.1. 결제 상태별 큐 처리

statuses = ['success', 'failed', 'pending']
channel.exchange_declare(exchange='payment_status', exchange_type='direct')
for status in statuses:
    queue_name = f'payment_{status}_queue'
    channel.queue_declare(queue=queue_name, durable=True)
    channel.queue_bind(exchange='payment_status', queue=queue_name, routing_key=status)

channel.basic_publish(exchange='payment_status', routing_key='success', body='Order #123 Paid')

 

         2.2.4.2. 유저 권한 분류 큐

roles = ['admin', 'staff', 'customer']
channel.exchange_declare(exchange='user_roles', exchange_type='direct')
for role in roles:
    qname = f'{role}_queue'
    channel.queue_declare(queue=qname, durable=True)
    channel.queue_bind(exchange='user_roles', queue=qname, routing_key=role)

channel.basic_publish(exchange='user_roles', routing_key='admin', body='System Admin Login')

 

   2.3. Fanout Exchange (팬아웃 익스체인지)

      2.3.1. 개념

         - 라우팅 키를 무시하고 모든 바인딩된 큐로 메시지를 브로드캐스트한다.

 

      2.3.2. 사용 사례

         - 전체 시스템 공지, 실시간 로그 브로드캐스트

 

      2.3.3. 예제 코드

channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)

channel.basic_publish(exchange='logs', routing_key='', body='Broadcast Message')

 

      2.3.4. 실무 예시 코드

         2.3.4.1. 뉴스 피드 전송

channel.exchange_declare(exchange='news_feed', exchange_type='fanout')
subscribers = ['user_1_queue', 'user_2_queue', 'user_3_queue']
for q in subscribers:
    channel.queue_declare(queue=q, durable=True)
    channel.queue_bind(exchange='news_feed', queue=q)

channel.basic_publish(exchange='news_feed', routing_key='', body='Breaking News: ...')

 

         2.3.4.2. 캐시 무효화 알림

channel.exchange_declare(exchange='cache_invalidation', exchange_type='fanout')
servers = ['cache_us', 'cache_kr']
for s in servers:
    channel.queue_declare(queue=s, durable=True)
    channel.queue_bind(exchange='cache_invalidation', queue=s)

channel.basic_publish(exchange='cache_invalidation', routing_key='', body='Invalidate /user/123')

 

   2.4. Topic Exchange (주제 기반 익스체인지)

      2.4.1. 개념

         - 라우팅 키의 패턴 매칭을 기반으로 메시지를 라우팅한다.
            - *: 하나의 단어 매칭
            - #: 0개 이상 단어 매칭

 

      2.4.2. 사용 사례

         - 구조화된 로그/이벤트 메시지 처리

 

      2.4.3. 예제 코드

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_declare(queue='kernel_queue')
channel.queue_bind(exchange='topic_logs', queue='kernel_queue', routing_key='kern.*')

channel.basic_publish(exchange='topic_logs', routing_key='kern.critical', body='Critical Kernel Error')

 

      2.4.4. 실무 예시 코드

         2.4.4.1. IoT 센서 데이터 처리

channel.exchange_declare(exchange='iot_data', exchange_type='topic')
channel.queue_declare(queue='temp_kitchen')
channel.queue_bind(exchange='iot_data', queue='temp_kitchen', routing_key='sensor.temp.kitchen')

channel.basic_publish(exchange='iot_data', routing_key='sensor.temp.kitchen', body='23.5°C')

 

         2.4.4.2. 마이크로서비스 이벤트 처리

channel.exchange_declare(exchange='order_events', exchange_type='topic')
channel.queue_declare(queue='order_created_queue')
channel.queue_bind(exchange='order_events', queue='order_created_queue', routing_key='order.created')

channel.basic_publish(exchange='order_events', routing_key='order.created', body='Order #321 Created')

 

   2.5. Headers Exchange (헤더 기반 익스체인지)

      2.5.1. 개념

         - 라우팅 키가 아닌 메시지의 헤더 필드 값을 기준으로 라우팅한다.
            - x-match: all 또는 x-match: any 지정 가능

 

      2.5.2. 사용 사례

         - 복합 조건 라우팅이 필요한 경우 (ex. region + format)

 

      2.5.3. 예제 코드

channel.exchange_declare(exchange='header_logs', exchange_type='headers')
channel.queue_declare(queue='logs_queue')
channel.queue_bind(
    exchange='header_logs', queue='logs_queue',
    arguments={'x-match': 'all', 'type': 'audit', 'format': 'json'}
)

channel.basic_publish(
    exchange='header_logs', routing_key='',
    body='JSON Audit Event',
    properties=pika.BasicProperties(headers={'type': 'audit', 'format': 'json'})
)

 

      2.5.4. 실무 예시 코드

         2.5.4.1. 데이터 파이프라인에서 조건 라우팅

channel.exchange_declare(exchange='data_pipeline', exchange_type='headers')
channel.queue_declare(queue='us_csv_queue')
channel.queue_bind(exchange='data_pipeline', queue='us_csv_queue',
    arguments={'x-match': 'all', 'region': 'us', 'format': 'csv'})

channel.basic_publish(
    exchange='data_pipeline', routing_key='',
    body='data.csv',
    properties=pika.BasicProperties(headers={'region': 'us', 'format': 'csv'})
)

 

         2.5.4.2. 보안 감사 로그 분기

channel.exchange_declare(exchange='security_logs', exchange_type='headers')
channel.queue_declare(queue='auth_login_queue')
channel.queue_bind(exchange='security_logs', queue='auth_login_queue',
    arguments={'x-match': 'all', 'component': 'auth', 'event_type': 'login'})

channel.basic_publish(
    exchange='security_logs', routing_key='',
    body='User Login Attempt',
    properties=pika.BasicProperties(headers={'component': 'auth', 'event_type': 'login'})
)

 

 

   2.6. 강의 프로젝트

      2.6.1. producer

import pika  # RabbitMQ와 통신하기 위한 pika 라이브러리 임포트

# RabbitMQ 서버(localhost)에 연결을 생성 (기본 포트 5672)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

# 연결에서 채널(channel) 생성 - 메시지를 송수신하는 논리적인 통로
channel = connection.channel()

# 큐(queue) 선언 - 이름이 'hello'인 큐가 없으면 새로 생성됨
# 각 옵션 설명:
# durable=False : 서버 재시작 후 큐가 유지되지 않음 (True로 설정 시 영속적)
# exclusive=False : 이 큐는 현재 연결에서만 접근 가능하지 않음 (True 시 연결 종료 시 자동 삭제됨)
# auto_delete=False : 큐를 사용하는 소비자가 모두 연결 해제되더라도 큐를 삭제하지 않음 (True 시 자동 삭제)
channel.queue_declare(queue="hello", durable=False, exclusive=False, auto_delete=False)

# 메시지 발행
# exchange="" : 기본 익스체인지 (Default Exchange)를 사용하여 큐 이름을 그대로 routing key로 사용
# routing_key="hello" : 메시지를 보낼 대상 큐 이름
# body : 메시지 본문
channel.basic_publish(
    exchange="",
    routing_key="hello",
    body="hello_world 2"
)

print("[x] Sent Hello World")  # 메시지 발송 확인 메시지 출력

# 연결 종료 (채널도 함께 닫힘)
connection.close()

 

      2.6.2. consumer

import pika  # RabbitMQ와 통신하기 위한 pika 라이브러리
import sys, os

# 메인 실행 함수 정의
def main():
    # RabbitMQ 서버(localhost)에 연결 생성
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))

    # 채널 생성 (메시지 송수신 통로)
    channel = connection.channel()

    # 큐 선언: 큐가 이미 존재하면 설정을 변경하지 않음 (멱등성)
    # 아래 3개의 파라미터는 큐의 '생명 주기'와 관련된 중요한 설정
    channel.queue_declare(
        queue="hello",
        durable=False,        # False: 서버가 재시작되면 큐가 삭제됨 (True일 경우 영구 저장)
        exclusive=False,      # False: 여러 연결이 큐에 접근 가능 (True일 경우 현재 연결에서만 사용 가능하고 연결이 끊기면 큐 삭제됨)
        auto_delete=False     # False: 소비자가 모두 연결 해제되어도 큐가 유지됨 (True일 경우 마지막 소비자가 끊기면 큐 삭제됨)
    )

    # 콜백 함수: 메시지를 수신할 때 호출됨
    def callback(ch, method, properties, body):
        print("[x] received %r" % body)

    # 메시지 소비자 등록
    channel.basic_consume(
        queue="hello",                # 소비할 큐 이름
        on_message_callback=callback, # 수신 메시지 처리 함수
        auto_ack=True                 # True: 메시지 수신 즉시 자동 ack (주의: 소비자 장애 시 메시지 손실 위험)
    )

    print(" [*] waiting for the messages. To exit press Ctrl-C")

    # 메시지 소비 시작 (무한 대기 루프)
    channel.start_consuming()

# Ctrl+C 시 graceful 종료 처리
if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("Interrupted")
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

 

3. Broadcasting - Fanout Exchange

   3.1. Fanout Exchange 요약 정리

      - 발행자(Publisher)는 수신자(Subscriber)가 누구인지 알지 못한다.
      - 심지어 수신자가 존재하는지조차 알지 못한다.
      - 팬아웃 익스체인지에 큐가 바인딩되어 있지 않으면 메시지는 폐기된다.
      - 라우팅 키나 바인딩 규칙은 필요 없다.
      - 큐는 반드시 익스체인지에 바인딩되어야 메시지를 수신할 수 있다.

항목 설명
📤 Routing 방식 라우팅 키 무시. 바인딩된 모든 큐에 동일한 메시지 복사본 전달
📥 큐 조건 메시지를 받으려면 큐가 익스체인지에 바인딩되어 있어야 함
❌ 미바인딩 큐 큐가 없거나 바인딩되지 않은 경우, 메시지는 버려짐 (discarded)
💬 용도 시스템 브로드캐스트, 전체 알림, 캐시 무효화, 로그 브로드캐스트 등
⚠ Publisher와 Subscriber 관계 Publisher는 구독자 존재 여부, 수, 상태를 모름 (완전 비동기 브로드캐스트 모델)

 

   3.2. 실무 예시

      3.2.1. 실시간 시스템 로그 브로드캐스트

channel.exchange_declare(exchange='sys_logs', exchange_type='fanout')
channel.queue_declare(queue='log_server1')
channel.queue_declare(queue='log_server2')

channel.queue_bind(exchange='sys_logs', queue='log_server1')
channel.queue_bind(exchange='sys_logs', queue='log_server2')

channel.basic_publish(exchange='sys_logs', routing_key='', body='System rebooted')

 

      3.2.2. 캐시 무효화 메시지 전파

channel.exchange_declare(exchange='cache_bust', exchange_type='fanout')
channel.queue_declare(queue='redis_node1')
channel.queue_declare(queue='redis_node2')

channel.queue_bind(exchange='cache_bust', queue='redis_node1')
channel.queue_bind(exchange='cache_bust', queue='redis_node2')

channel.basic_publish(exchange='cache_bust', routing_key='', body='CLEAR /products')

 

   3.3. Temporary Queues and Exclusivity

      3.3.1. Temporary Queues (임시 큐)

         - Temporary Queue는 단기 사용 목적으로 생성되며, 다음과 같은 조건 중 하나 이상을 만족하면 자동으로 삭제된다.
            - exclusive=True: 큐가 특정 연결에만 독점적으로 바인딩됨
            - auto_delete=True: 해당 큐에 연결된 마지막 소비자가 종료되면 큐 자동 삭제
            - queue='' (빈 문자열): 서버가 랜덤 이름으로 생성하는 이름 없는 큐

 

         💡 일반적으로 "임시 큐"는 위 세 가지 옵션의 조합을 말한다.

 

         - 각 구독자는 자신의 큐를 가짐
         - Fanout Exchange 구조에서 주로 사용됨
         - 소비자 종료와 함께 큐도 제거됨
         - 큐는 exclusive=True 또는 auto_delete=True를 사용하여 임시 성격 부여

항목 내용
정의 단기 사용을 전제로 생성된 소비자 전용 큐
삭제 조건 소비자 또는 채널 종료 시 (exclusive), 소비자 없을 시 (auto_delete)
용도 이벤트 브로드캐스트, 일회성 소비자 등록
특징 서버가 큐 이름을 자동 생성하거나 소비자가 지정 가능

 

      3.3.2. Exclusive Queue (독점 큐)

         - Exclusive Queue는 다음 조건을 가진다.

            - 큐는 현재 연결(Connection)에서만 접근 가능

            - 연결이 종료되면 자동으로 큐 삭제
            - 다른 연결에서 동일한 이름으로 접근 시 에러 발생

            - 하나의 클라이언트(Consumer) 전용 큐

            - 메시지를 공유하지 않음 (다른 소비자 접근 불가)

항목 내용
정의 하나의 연결(Channel)에만 사용 가능한 큐
삭제 조건 연결 종료 시 자동 삭제
용도 단일 소비자, 고립된 응답 큐 필요 시 (예: RPC 응답 큐)
특징 보안 및 스코프 제어용. 큐 이름이 중복될 수 없음

 

      3.3.3. 비교 정리: Exclusive Queue vs Temporary Queue

항목 Exclusive Queue Temporary Queue
주 용도 연결 독점 (RPC 응답 등) 일회성 소비자 메시지 수신
접근 제한 하나의 연결만 접근 가능 여러 임시 소비자 가능
자동 삭제 조건 연결 종료 시 삭제 소비자 종료 or 바운드 없음 시 삭제
사용 방식 exclusive=True exclusive=True 또는 auto_delete=True, queue='' 사용 가능
Fanout Exchange와 조합 일반적이지 않음 매우 일반적 (subscriber마다 개별 큐 자동 생성 및 바인딩)
큐 이름 지정 일반적으로 명시적 이름 주로 빈 문자열로 랜덤 큐 이름 생성

 

      3.3.4. 기술적 속성 요약 (pika 기준)

속성 설명 기본값
exclusive 해당 연결에서만 접근 가능 FALSE
auto_delete 마지막 소비자가 종료되면 삭제 FALSE
queue='' 이름 없는 임시 큐, 서버가 랜덤 이름 생성 (사용자 지정 필수 아님)
durable 서버 재시작 후에도 큐 유지 여부 FALSE

 

      3.3.5. 실전 예제 3가지: 각기 다른 방식의 임시 큐 구현

         3.3.5.1. 이름 없는 임시 큐 (서버 자동 생성)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 이름 없는 임시 큐 생성 (exclusive + auto_delete 포함)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

print("[*] Created temporary queue:", queue_name)

# 큐에 메시지 바인딩 및 소비자 등록
channel.queue_bind(exchange='logs', queue=queue_name)

channel.basic_consume(queue=queue_name, auto_ack=True,
                      on_message_callback=lambda ch, m, p, body: print("[x]", body))
channel.start_consuming()

 

         3.3.5.2. 이름 있는 독점 큐 (exclusive=True)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 고정된 이름으로 Exclusive 큐 선언
channel.queue_declare(queue='exclusive_queue', exclusive=True)

channel.basic_publish(exchange='', routing_key='exclusive_queue', body='Hello Exclusive')

channel.basic_consume(queue='exclusive_queue', auto_ack=True,
                      on_message_callback=lambda ch, m, p, body: print("[x]", body))
channel.start_consuming()

 

         3.3.5.3. auto_delete 큐 (소비자 종료 시 자동 삭제)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# auto_delete 큐 선언
channel.queue_declare(queue='ephemeral_queue', auto_delete=True)

channel.basic_consume(queue='ephemeral_queue', auto_ack=True,
                      on_message_callback=lambda ch, m, p, body: print("[x]", body))

print("[*] Waiting on ephemeral queue...")
channel.start_consuming()

 

   3.4. 강의 프로젝트

      3.4.1. publisher

import pika
import sys

# 1️⃣ RabbitMQ 서버(localhost)에 연결 생성
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')  # 기본 포트 5672, 가상 호스트 '/', guest/guest 계정 사용
)

# 2️⃣ 채널 생성 (메시지를 송수신하는 논리적 단위)
channel = connection.channel()

# 3️⃣ Fanout Exchange 생성
channel.exchange_declare(
    exchange='br_exchange',         # 익스체인지 이름
    exchange_type='fanout',         # 교환기 타입: fanout / direct / topic / headers
    durable=False,                  # 서버 재시작 시 유지 여부 (기본: False)
    auto_delete=False,              # 바인딩된 큐가 모두 해제되면 삭제 (기본: False)
    passive=False,                  # True면 존재 유무만 검사, 생성은 안 함
    internal=False,                 # True면 메시지를 직접 publish 불가 (서버 내부용)
    arguments=None                  # 기타 설정 (예: alternate-exchange, DLX 등)
)

# 4️⃣ 메시지 발행 루프
for i in range(4):
    message = "Hello" + str(i)

    # 메시지 발행
    channel.basic_publish(
        exchange='br_exchange',  # fanout은 routing_key를 무시하고 모든 큐로 전송
        routing_key='',          # 필수지만 fanout에서는 사용되지 않음
        body=message,            # 전송할 메시지 본문
        properties=pika.BasicProperties(
            delivery_mode=1      # 1: non-persistent, 2: persistent (큐가 durable이어야 저장됨)
        )
    )
    print("[x] sent %r" % message)

# 5️⃣ 익스체인지 삭제
# if_unused=True : 바인딩된 큐가 없을 경우에만 삭제 가능 (에러 방지)
channel.exchange_delete(
    exchange='br_exchange',
    if_unused=False   # False: 강제 삭제 (큐와 바인딩된 상태여도 삭제됨)
)

# 6️⃣ 연결 종료 (자동으로 채널도 함께 종료됨)
connection.close()

 

      3.4.2. subscriber

import pika  # RabbitMQ 통신을 위한 Python 라이브러리
import sys

# 1️⃣ RabbitMQ 서버에 연결(Connection 생성)
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')  # 호스트: 'localhost', 포트: 기본 5672
    # 다른 설정들:
    # - port=5672
    # - virtual_host='/'
    # - credentials=pika.PlainCredentials('user', 'pass')
)

# 2️⃣ 채널(Channel) 생성: 하나의 연결(Connection) 안에서 독립적인 통신 단위
channel = connection.channel()

# 3️⃣ 익스체인지(Exchange) 선언
channel.exchange_declare(
    exchange='br_exchange',        # 익스체인지 이름
    exchange_type='fanout',        # fanout: 라우팅 키 무시하고 모든 큐에 브로드캐스트
    durable=False,                 # True: 서버 재시작 후에도 익스체인지 유지 (영속성)
    auto_delete=False,             # True: 바인딩된 큐가 모두 없어지면 자동 삭제
    internal=False,                # True: 서버 내부 사용 (publish 불가)
    passive=False,                 # True: 존재 여부 확인만 (미존재 시 예외 발생)
    arguments=None                 # 추가 옵션 (alternate-exchange, TTL 등)
)

# 🔍 exchange_declare() 주요 파라미터 정리
# - exchange_type: 'direct', 'fanout', 'topic', 'headers'
# - durable: 메시지 영속성을 위해 queue도 durable이어야 함
# - auto_delete: 큐 바인딩이 모두 끊기면 익스체인지 삭제
# - passive: True 설정 시 익스체인지가 없으면 ChannelClosed 예외 발생
# - internal: true이면 producer가 메시지 발행 불가, broker 내부에서만 사용됨
# - arguments: 예: {'alternate-exchange': 'ae_name'}

# 4️⃣ 메시지 전송 (fanout이라 routing_key는 무시됨)
for i in range(4):
    message = "Hello" + str(i)

    channel.basic_publish(
        exchange='br_exchange',   # 메시지를 보낼 익스체인지
        routing_key='',           # fanout은 무시되므로 빈 문자열 사용
        body=message,             # 메시지 본문 (bytes 또는 string)
        properties=pika.BasicProperties(
            delivery_mode=1       # 1: non-persistent, 2: persistent (디스크 저장)
            # 기타 예시: 
            # content_type='text/plain',
            # headers={'key': 'value'},
            # expiration='60000',  # 메시지 TTL
            # priority=0,
            # correlation_id='uuid',
            # reply_to='callback_queue'
        )
    )
    print("[x] sent %r" % message)

# 🔍 basic_publish() 주요 속성
# - delivery_mode=2: 메시지를 영속적으로 저장하려면 큐도 durable이어야 함
# - headers: Headers exchange에 사용되는 key-value 쌍
# - expiration: 메시지 만료 시간 (ms)
# - reply_to: RPC 구조에서 응답용 큐 지정
# - correlation_id: 요청-응답 연계 추적에 사용

# 5️⃣ 익스체인지 삭제
channel.exchange_delete(
    exchange='br_exchange',
    if_unused=False  # False: 바인딩 여부와 상관없이 삭제
                     # True: 바인딩된 큐가 있으면 삭제 실패 → 예외 발생
)

# 🔍 exchange_delete() 옵션
# - exchange: 삭제할 익스체인지 이름
# - if_unused=True: 큐와 바인딩되어 있으면 삭제되지 않음 (안전 모드)

# 6️⃣ 연결 종료 (자동으로 채널도 닫힘)
connection.close()

 

4. Selective Routing - Direct Exchange

   4.1. 기본 익스체인지(Default Exchange)의 제한사항

      - 기본 익스체인지는 이름이 없는 익스체인지로, 라우팅 키와 동일한 이름의 큐로 직접 메시지를 전달한다.

 

      4.1.1. 문제점

         - Publisher는 구독자의 큐 이름을 명확히 알아야 함 (강한 결합)
         - 하나의 메시지 유형(라우팅 키)은 단 하나의 큐에만 전송 가능 (멀티 구독 불가)

 

      4.1.2. 예시 동작

         - 큐 이름이 image_processor일 경우에만 메시지 도달함

channel.basic_publish(exchange='', routing_key='image_processor', body='Resize Image')

 

   4.2. Fanout Exchange의 한계

      - Fanout Exchange는 연결된 모든 큐에 메시지를 브로드캐스트한다.

 

      4.2.1. 문제점

         - 라우팅 키 무시 → 메시지 필터링 불가능
         - 모든 구독자가 모든 메시지를 받아야 하므로 불필요한 메시지 처리 오버헤드 발생
         - 수신 자격이 없는 구독자도 메시지를 수신할 수 있음 → 보안 이슈 발생 가능

 

      4.2.2. 예시 동작

channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='System Down')

 

   4.3. Selective Routing 구조 (Direct Exchange 기반)

      - Direct Exchange는 라우팅 키와 큐의 바인딩 키가 정확히 일치할 때만 메시지를 전달한다. 이 구조를 통해 정교한 구독자 필터링이 가능하며, 하나의 메시지를 여러 큐에 동시에 보낼 수 있다.

 

      4.3.1. 개념 요약

항목 설명
Exchange Type direct
Routing Key 명확하게 일치해야 라우팅됨
다중 바인딩 하나의 키에 여러 큐 바인딩 가능 (N:1 / 1:N 모두 가능)
메시지 분기 바인딩 룰에 따라 정확히 분배됨

 

      4.3.2. Selective Routing 예시 구조

[Process A/B/C]  -->  [Direct Exchange]  -->  [AQueue / FQueue / SQueue]
                                 |                  |         |         |
                                 |           Error  Warn     Info
                             (Routing Key)

 

      4.3.3. 바인딩 규칙 예시

Routing Key 전달 대상 큐
Error AQueue
Warn FQueue
Info SQueue

 

   4.4. 구현 절차 및 코드 예제

      4.4.1. Publisher

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='log_exchange', exchange_type='direct')

logs = [('Error', 'Disk full'), ('Warn', 'Memory usage high'), ('Info', 'Startup complete')]
for rk, msg in logs:
    channel.basic_publish(exchange='log_exchange', routing_key=rk, body=msg)
    print(f"[x] Sent {rk}: {msg}")

connection.close()

 

      4.4.1. Subscriber (예: Error 전용)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='log_exchange', exchange_type='direct')
channel.queue_declare(queue='AQueue', durable=True)
channel.queue_bind(exchange='log_exchange', queue='AQueue', routing_key='Error')

def callback(ch, method, properties, body):
    print(f"[Error] {body.decode()}")

channel.basic_consume(queue='AQueue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

 

   4.5. 강의 프로젝트

      4.5.1. publisher

import pika       # RabbitMQ와 통신하기 위한 Python 라이브러리
import sys
import random

# 1️⃣ RabbitMQ 서버에 연결 생성 (기본 localhost:5672, vhost='/', guest 계정)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

# 2️⃣ 채널 생성 (하나의 연결에서 여러 채널 생성 가능. 실제 메시지 송수신 단위)
channel = connection.channel()

# 3️⃣ direct exchange 생성 (교환기 이름: 'logs_exchange')
channel.exchange_declare(
    exchange='logs_exchange',      # 익스체인지 이름
    exchange_type='direct',        # 타입: 'direct' (라우팅 키 일치 시에만 큐로 전달)
    durable=False,                 # 서버 재시작 후에도 유지할지 여부 (False면 삭제됨)
    auto_delete=False,             # 바인딩된 큐가 모두 끊기면 자동 삭제 여부
    internal=False,                # True면 메시지 발행 불가, 내부 교환기로만 사용됨
    passive=False,                 # True일 경우 존재 여부만 확인하고, 없으면 오류 발생
    arguments=None                 # 고급 설정 (alternate-exchange, TTL 등)
)

# 4️⃣ 메시지 전송 데이터 정의
severity = ["Error", "Warning", "Info", "Other"]  # 라우팅 키들
messages = ["EMsg", "WMsg", "IMsg", "OMsg"]       # 각각의 메시지 본문

# 5️⃣ 랜덤 메시지 10개 발행
for i in range(10):
    randomNum = random.randint(0, len(severity) - 1)  # 0 ~ 3 중 하나 선택
    print(randomNum)

    message = messages[randomNum]      # 전송할 메시지
    rk = severity[randomNum]           # 해당 메시지의 라우팅 키

    # 메시지 전송
    channel.basic_publish(
        exchange='logs_exchange',      # direct 익스체인지로 전송
        routing_key=rk,                # 큐에 바인딩된 키와 일치해야 전달됨
        body=message                   # 메시지 본문
        # 추가로 설정 가능한 properties:
        # properties=pika.BasicProperties(
        #     delivery_mode=2,               # 2: persistent (디스크 저장)
        #     content_type='text/plain',
        #     correlation_id='1234',
        #     reply_to='response_queue'
        # )
    )
    print("[x] sent %r" % message)

# 6️⃣ 익스체인지 삭제
channel.exchange_delete(
    exchange='logs_exchange',
    if_unused=False   # True: 바인딩된 큐가 없을 때만 삭제 (안전 삭제)
                      # False: 강제 삭제 (큐가 바인딩되어 있어도 삭제됨)
)

# 7️⃣ 연결 종료 (채널도 자동으로 종료됨)
connection.close()

 

      4.5.2. consumer - alarmraiser

import pika  # RabbitMQ와 통신하기 위한 라이브러리

# 1️⃣ RabbitMQ 서버에 연결 생성
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# 2️⃣ 채널(Channel) 생성: 메시지를 송수신할 논리적 경로
channel = connection.channel()

# 3️⃣ 'logs_exchange'라는 이름의 direct 익스체인지 선언
channel.exchange_declare(
    exchange='logs_exchange',        # 익스체인지 이름
    exchange_type='direct',          # 메시지를 정확히 일치하는 라우팅 키로 큐에 전달
    durable=False,                   # 서버 재시작 시 삭제됨 (기본: False)
    auto_delete=False,               # 바인딩 해제되더라도 삭제 안 됨
    internal=False,                  # False일 경우 publish 허용 (기본값)
    passive=False,                   # True 시 존재 확인만 하고 생성은 하지 않음
    arguments=None                   # 고급 설정 (TTL, alternate-exchange 등)
)

# 4️⃣ 임시 큐 선언 (큐 이름 없이 생성, RabbitMQ가 자동으로 랜덤 이름 할당)
result = channel.queue_declare(
    queue='',              # 빈 문자열이면 서버가 자동으로 이름 생성
    exclusive=True         # 현재 연결에서만 접근 가능, 연결 종료 시 자동 삭제
    # durable=False        # 기본값: False (서버 재시작 시 삭제)
    # auto_delete=False    # 기본값: False (소비자 모두 끊겨도 삭제 안 됨)
)

queue_name = result.method.queue  # 자동으로 생성된 큐 이름 저장

# 5️⃣ 큐를 익스체인지에 바인딩 (지정된 라우팅 키에 대해 큐로 메시지 전달)
channel.queue_bind(
    exchange='logs_exchange',    # 바인딩할 익스체인지
    queue=queue_name,            # 현재 연결된 임시 큐
    routing_key="Error"          # 이 라우팅 키로 발행된 메시지를 이 큐로 전달
)

channel.queue_bind(
    exchange='logs_exchange',
    queue=queue_name,
    routing_key="Warning"
)

# 📌 queue_bind(...) 주요 파라미터:
# - exchange: 메시지를 받을 익스체인지 이름
# - queue: 바인딩할 큐 이름
# - routing_key: 큐가 수신할 메시지의 라우팅 키
# - arguments: 선택적 필터 (headers 사용 시)

print('[*] waiting for the messages')  # 소비자 준비 상태 출력

# 6️⃣ 메시지 수신 시 처리할 콜백 함수 정의
def callback(ch, method, properties, body):
    print('[x] Alarm:::: %r' % body)

# 7️⃣ 소비자 등록
channel.basic_consume(
    queue=queue_name,             # 메시지를 수신할 큐
    on_message_callback=callback, # 메시지를 수신했을 때 호출할 함수
    auto_ack=True                 # True면 메시지 수신 즉시 ack (확인 응답) 전송
                                 # False일 경우 ch.basic_ack()를 직접 호출해야 함
)

# 📌 basic_consume(...) 주요 파라미터
# - queue: 수신할 대상 큐
# - on_message_callback: 메시지 처리 함수 (필수)
# - auto_ack: True 시 자동 ack, False 시 수동 ack 필요
# - exclusive: True 시 해당 consumer가 큐에 대해 독점 소비자
# - consumer_tag: 수동으로 식별 태그 지정 가능
# - arguments: AMQP arguments (예: 우선순위)

# 8️⃣ 소비 시작 (무한 대기 루프)
channel.start_consuming()

# 📌 start_consuming():
# - Blocking 방식으로 메시지를 기다림
# - Ctrl+C 또는 channel.stop_consuming()으로 종료 가능

 

      4.5.3. consumer - fiewriter

import pika  # RabbitMQ와 연결을 위한 Python 라이브러리

# 1️⃣ RabbitMQ 서버에 연결(Connection 객체 생성)
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')  # 기본 포트: 5672, 기본 vhost: '/', 기본 사용자: guest
)

# 2️⃣ 채널 생성 (논리적 메시지 송수신 경로)
channel = connection.channel()

# 3️⃣ Direct Exchange 생성 (존재하지 않으면 생성됨, 있으면 무시)
channel.exchange_declare(
    exchange='logs_exchange',        # 익스체인지 이름
    exchange_type='direct',          # 라우팅 키를 기반으로 메시지 라우팅
    durable=False,                   # False일 경우 서버 재시작 시 사라짐
    auto_delete=False,               # 바인딩된 큐가 모두 해제되어도 삭제 안 됨
    internal=False,                  # 외부 publish 허용 (True면 내부용으로만 사용)
    passive=False,                   # True면 존재 유무 확인만 하고 생성은 하지 않음
    arguments=None                   # 고급 옵션 (예: alternate-exchange, TTL)
)

# 4️⃣ 임시 큐 생성 (이름 없이 자동 생성, 독점 연결)
result = channel.queue_declare(
    queue='',            # 빈 문자열을 지정하면 서버가 임의의 이름을 생성해줌
    exclusive=True       # 이 연결에서만 접근 가능, 연결 종료 시 큐도 자동 삭제됨
)

queue_name = result.method.queue  # 자동으로 생성된 큐 이름 저장

# 5️⃣ 큐를 exchange에 바인딩 (라우팅 키: "Warning")
channel.queue_bind(
    exchange='logs_exchange',     # 바인딩할 익스체인지 이름
    queue=queue_name,             # 바인딩 대상 큐
    routing_key="Warning"         # 라우팅 키가 'Warning'인 메시지만 수신
)

print('[*] waiting for the messages')  # 메시지 수신 대기 상태 출력

# 6️⃣ 수신 메시지 처리 콜백 함수
def callback(ch, method, properties, body):
    print('[x] Writing to File:::: %r' % body)

# 7️⃣ 소비자 등록
channel.basic_consume(
    queue=queue_name,             # 수신할 큐 이름
    on_message_callback=callback, # 수신 시 실행할 콜백 함수
    auto_ack=True                 # 메시지를 수신하면 즉시 ACK 자동 전송
    # auto_ack=False로 설정할 경우, 메시지 처리 후 ch.basic_ack(...)을 명시적으로 호출해야 함
)

# 8️⃣ 메시지 소비 시작 (blocking 방식, 무한 루프)
channel.start_consuming()

# Ctrl+C 또는 channel.stop_consuming() 호출 시 중단 가능

 

      4.5.4. consumer - screenprinter

import pika  # RabbitMQ와 통신하기 위한 Python 라이브러리

# 1️⃣ RabbitMQ 서버에 연결
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')  # 기본 포트: 5672
)

# 2️⃣ 메시지를 송수신할 채널 생성
channel = connection.channel()

# 3️⃣ direct exchange 생성 또는 선언 (없으면 새로 생성됨)
channel.exchange_declare(
    exchange='logs_exchange',        # 익스체인지 이름
    exchange_type='direct',          # 정확히 일치하는 라우팅 키만 해당 큐로 전달됨
    durable=False,                   # 서버 재시작 시 익스체인지 유지 여부 (False: 비영속)
    auto_delete=False,               # 모든 큐가 바인딩 해제되면 자동 삭제 여부
    internal=False,                  # 외부에서 메시지 발행 허용 여부
    passive=False,                   # True면 존재만 확인하고 없으면 오류 발생
    arguments=None                   # 고급 설정 (alternate-exchange 등)
)

# 4️⃣ 이름 없는 임시 큐 생성 → RabbitMQ가 자동으로 이름 부여
result = channel.queue_declare(
    queue='',              # 빈 문자열: 자동 생성된 이름
    exclusive=True         # 이 연결에서만 사용할 수 있고, 연결 종료 시 자동 삭제
)
queue_name = result.method.queue  # 생성된 임시 큐 이름 추출

# 5️⃣ 큐를 익스체인지에 3가지 라우팅 키로 바인딩
severity = ["Error", "Warning", "Info"]

for rk in severity:
    channel.queue_bind(
        exchange='logs_exchange',   # 바인딩할 익스체인지
        queue=queue_name,           # 대상 큐
        routing_key=rk              # 큐가 수신할 메시지의 라우팅 키
    )

# 🔔 라우팅 키 설명:
# - Error: 심각한 문제
# - Warning: 경고 메시지
# - Info: 일반 정보 로그

print('[*] waiting for the messages')  # 준비 완료 메시지

# 6️⃣ 메시지 수신 시 호출될 콜백 함수 정의
def callback(ch, method, properties, body):
    print('[x] Alarm:::: %r' % body)

# 7️⃣ 큐에서 메시지 소비 시작 (콜백 등록)
channel.basic_consume(
    queue=queue_name,              # 수신할 큐 이름
    on_message_callback=callback,  # 메시지를 처리할 함수
    auto_ack=True                  # 수신 즉시 메시지 확인(ack) 전송 (실패 시 재전송 안됨)
)

# 📌 auto_ack 설명:
# - True: 수신 즉시 ACK (메시지 유실 가능성 ↑)
# - False: 메시지 처리 후 `ch.basic_ack()`으로 명시적으로 확인 필요 (안정성 ↑)

# 8️⃣ 메시지 수신 시작 (Blocking 방식)
channel.start_consuming()

# 📌 종료하려면: Ctrl+C 또는 `channel.stop_consuming()` 호출 필요

 

5. Topic Exchange - Pattern Based Routing

   5.1. 메시지 구조 및 라우팅 키 구성

      5.1.1. 메시지 속성 (message metadata)

속성 카테고리 가능한 값
Severity Error (E), Warning (W), Info (I)
Priority High (H), Medium (M), Low (L)
Action Action1 (A1), Action2 (A2), Action3 (A3)
Component C1, C2, C3

 

      5.1.2. 라우팅 키 구성 방식

<Severity>.<Priority>.<Action>.<Component>
예시: E.H.A1.C1

 

      5.1.3. 라우팅 키 예시

         - 이 구조는 4계층 라우팅 키 구조로, 패턴 기반의 필터링에 매우 적합하다.

E.H.A1.C1
E.H.A1.C2
E.M.A1.C1
W.L.A3.C3
I.L.A3.C3

 

   5.2. Subscriber 바인딩 전략 및 확장성 문제

구독 목적 바인딩 키
C1에서 발생한 High Priority Error Action A1 E.H.A1.C1
모든 컴포넌트에서의 E.H.A1 E.H.A1.C1, E.H.A1.C2, E.H.A1.C3

 

      5.2.1. 문제점

            - 컴포넌트가 추가되면 구독자는 새로운 바인딩 키를 수동으로 추가해야 함 → 유지보수 부담
            - 조건이 늘어날수록 바인딩 키 수가 급증함 → 관리 복잡성 증가

 

   5.3. 패턴 기반 바인딩 룰(Solution)

      - 참고: 와일드카드 *는 하나의 단어, #는 0개 이상 단어를 의미한다.

Requirement Binding Rule 설명
모든 Error 메시지 E.# Severity가 'Error'로 시작하는 모든 메시지 수신
모든 High Priority 메시지 *.H.# Priority가 'High'인 모든 메시지 수신
모든 A3 액션 메시지 *.*.A3.* 또는 #.*.A3.* Action이 A3인 메시지 전체 수신
Component C2에서 발생한 모든 메시지 #.#.#.C2 또는 #.*.C2 Component가 C2인 메시지 수신
High Priority Error 메시지 전체 E.H.# Severity=Error, Priority=High 인 메시지 수신
Component C3에서 발생한 Warning 메시지 W.#.C3 Severity=Warning, Component=C3인 메시지
Component C2에서 발생한 High Priority Error E.H.*.C2 E, H, C2 조합에 해당하는 모든 메시지 수신
Action2가 필요한 Medium Priority Warning W.M.A2.* Severity=Warning, Priority=Medium, Action=A2
C3에서 발생한 High Priority Info 메시지 (Action2 수행) I.H.A2.C3 모든 조건이 정확히 일치하는 메시지 수신

 

   5.4. Topic Exchange의 확장성 요점

      - 바인딩 키를 와일드카드 패턴으로 설계하면 컴포넌트 추가 시 유지보수가 필요 없음
      - * : 정확히 하나의 토큰 일치
      - # : 0개 이상의 토큰 일치

 

   5.5.  예시 비교

방식 바인딩 키 설명
정적 E.H.A1.C1, E.H.A1.C2 등 컴포넌트 추가 시 바인딩 수 증가
패턴 E.H.A1.* 컴포넌트가 몇 개든 자동 적용됨

 

   5.6. Topic Exchange 바인딩 설계 시 유의 사항 (Additionally 보완 정리)

항목 설명
다중 바인딩 하나의 큐에 여러 라우팅 키 바인딩 가능
메시지 중복 동일한 메시지가 여러 바인딩 규칙에 일치해도 큐에는 1회만 도달
공유 큐 하나의 큐에 다수 구독자 등록 가능 (exclusive=False)
라우팅 실패 어떠한 바인딩 규칙에도 일치하지 않으면 메시지는 drop 됨

 

   5.7. 강의 프로젝트

      5.7.1. publisher

import random
import sys
import pika

# RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

# 채널 생성
channel = connection.channel()

# Topic Exchange 선언
# exchange_type='topic' → 패턴 기반 라우팅 가능
# durable=True → 서버 재시작 후에도 exchange가 유지됨
channel.exchange_declare(
    exchange='system_exchange',
    exchange_type='topic',
    durable=True
)

# 각 메시지의 속성 요소 정의
# Severity (심각도), Priority (우선순위), Action (조치), Component (컴포넌트)
severity = ['E', 'W', 'I']       # Error, Warning, Info
priority = ['H', 'M', 'L']       # High, Medium, Low
action = ['A1', 'A2', 'A3']      # Action1, Action2, Action3
component = ['C1', 'C2', 'C3']   # Component1, Component2, Component3

# 10개의 무작위 메시지 생성 및 발행
for i in range(10):
    # 각각의 항목에서 무작위 선택
    randomSeverity = random.choice(severity)
    randomPriority = random.choice(priority)
    randomAction = random.choice(action)
    randomComponent = random.choice(component)

    # 라우팅 키 형식: <Severity>.<Priority>.<Action>.<Component>
    rk = f"{randomSeverity}.{randomPriority}.{randomAction}.{randomComponent}"

    # 메시지 본문 구성
    message = rk + " ::::: <Message>"

    # 메시지 발행
    # routing_key는 위에서 생성한 rk (ex. E.H.A1.C2)
    channel.basic_publish(
        exchange='system_exchange',
        routing_key=rk,
        body=message
    )

    print("[x] sent %r" % message)

# 교환기를 삭제할 경우 (사용 중이지 않으면)
# channel.exchange_delete(exchange='system_exchange', if_unused=False)

# 연결 종료
connection.close()

 


      5.7.2. subscriber - A3actiontaker

import pika

# RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# 연결을 통해 채널 생성
channel = connection.channel()

# Topic Exchange 선언
# durable=True → 서버 재시작 이후에도 exchange가 유지됨
channel.exchange_declare(
    exchange='system_exchange',
    exchange_type='topic',
    durable=True
)

# 임시 큐 선언
# queue='' → 이름 자동 생성
# exclusive=True → 해당 연결(connection)이 닫히면 큐도 삭제됨
result = channel.queue_declare(
    queue='',          # 비워두면 임시 이름 자동 생성
    exclusive=True     # 연결 종료 시 큐 자동 삭제
)

# 자동 생성된 큐 이름 저장
queue_name = result.method.queue

# (디버깅용 출력)
# print("Subscriber queue_name =", queue_name)

# 큐를 exchange에 바인딩
# routing_key="#.A3.#" → A3가 포함된 모든 라우팅 키에 대해 메시지 수신
# 예: E.H.A3.C1, W.L.A3.C3 등
channel.queue_bind(
    exchange='system_exchange',
    queue=queue_name,
    routing_key="#.A3.#"
)

print('[*] waiting for the messages')

# 메시지를 수신하면 호출될 콜백 함수 정의
def callback(ch, method, properties, body):
    print('[x] :::: %r' % body)

# 소비자 등록
# auto_ack=True → 메시지 수신 즉시 확인(ack) 전송 (신뢰성 낮음, 손실 위험 있음)
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

# 메시지 수신 대기 (무한 대기)
channel.start_consuming()

 

      5.7.3. subscriber - allwarningsfromC2

import pika

# RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# 연결된 채널 생성
channel = connection.channel()

# Topic Exchange 선언
# exchange_type='topic' → 메시지 라우팅 키의 패턴 기반 필터링
# durable=True → 서버 재시작 후에도 exchange 유지
channel.exchange_declare(
    exchange='system_exchange',
    exchange_type='topic',
    durable=True
)

# 임시 큐 선언
# queue='' → 서버가 자동으로 임의 이름 생성
# exclusive=True → 해당 연결이 끊기면 큐도 자동 삭제됨 (개인용 구독자에 적합)
result = channel.queue_declare(
    queue='',
    exclusive=True
)

# 자동 생성된 큐 이름 획득
queue_name = result.method.queue

# 디버깅용 큐 이름 출력 (필요시 주석 해제)
# print("Subscriber queue_name =", queue_name)

# 큐를 Exchange에 바인딩 (연결)
# routing_key="W.#.C2" → 라우팅 키가 Warning(W)으로 시작하고 중간에 0개 이상 토큰이 있으며 마지막이 C2인 경우 수신
# 예: W.H.A1.C2, W.M.C2, W..C2 등 (유효한 형식이어야 하며 정확한 점(.) 수는 라우팅 키에 따라 달라짐)
channel.queue_bind(
    exchange='system_exchange',
    queue=queue_name,
    routing_key="W.#.C2"
)

print('[*] waiting for the messages')

# 메시지를 수신했을 때 호출될 콜백 함수 정의
def callback(ch, method, properties, body):
    print('[x] :::: %r' % body)

# 메시지 소비자 등록
# auto_ack=True → 메시지 수신 즉시 자동으로 처리 완료(ack)됨 → 장애 발생 시 메시지 손실 위험 있음
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

# 무한 루프 형태로 메시지를 계속 수신
channel.start_consuming()

 

      5.7.4. subscriber - errorhandlingsub

import pika

# RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# 연결된 채널 생성
channel = connection.channel()

# Topic Exchange 생성 (없으면 새로 생성, 있으면 설정 유지)
# exchange_type='topic' → 라우팅 키 패턴 기반 메시지 필터링
# durable=True → 서버 재시작 시에도 exchange 유지
channel.exchange_declare(
    exchange='system_exchange',
    exchange_type='topic',
    durable=True
)

# 임시 큐 선언
# queue='' → RabbitMQ가 이름 자동 생성
# exclusive=True → 연결 종료 시 큐도 자동 삭제됨
result = channel.queue_declare(queue='', exclusive=True)

# 생성된 큐 이름 추출
queue_name = result.method.queue

# 디버깅용 출력 (필요시 사용)
# print("Subscriber queue_name =", queue_name)

# 큐를 Exchange에 바인딩
# routing_key="E.#" → 'E'로 시작하는 모든 라우팅 키 매칭
# 예: "E.H.A1.C1", "E.M.C3", "E" 등
channel.queue_bind(
    exchange='system_exchange',
    queue=queue_name,
    routing_key="E.#"
)

print('[*] waiting for the messages')

# 메시지 수신 시 실행할 콜백 함수 정의
def callback(ch, method, properties, body):
    print('[x] :::: %r' % body)

# 메시지 소비 등록
# auto_ack=True → 수신 즉시 ack 전송 (장애 발생 시 메시지 유실 가능)
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

# 메시지 수신 시작 (무한 루프)
channel.start_consuming()

# 🔴 중복된 호출 — 필요 없음 (start_consuming()은 blocking 함수로 여기서 멈춤)
# channel.start_consuming()

 

6. Reliable communication

   6.1. 메시지 흐름과 실패 지점 (Failure Points)

Publisher → [Exchange] → [Queue] → Subscriber

 

   6.2. 실패 가능 지점 요약:

구성 요소 실패 가능성 설명
Publisher → Exchange 연결 또는 채널 단절 메시지 자체가 브로커에 도달하지 않음
Exchange → Queue Exchange/Queue 다운 또는 재시작 메시지가 올바르게 라우팅되지 않음
Queue 자체 휘발성 큐는 메시지를 유실할 수 있음  
Queue → Subscriber 구독자 다운 또는 과부하 처리되지 않은 메시지는 유실 위험 있음

 

   6.3. RabbitMQ Provision 요약

      6.3.1. Provision #1: Publisher Confirmation

         - 기능: 브로커가 메시지를 수신했는지 확인
         - 이점:

            - 메시지 손실 방지
            - 실패 원인 파악 가능 (채널 종료, 연결 종료 등)

 

      6.3.2. Provision #2: Durable Exchange

         - 기능: Exchange를 영속성 있게 구성
         - 이점:
            - 브로커 재시작 시에도 Exchange는 유지됨

 

      6.3.3. Provision #3: Durable Queue

         - 기능: Queue를 Durable로 설정
         - 이점:
            - 미전달 메시지, 미확인 메시지 디스크 저장
            - 브로커 재시작에도 유지
         - 주의점:
            - 성능 저하
            - 여전히 메시지 유실 가능성 존재

 

      6.3.4. Provision #4: Manual Acknowledgment (수동 ACK)

         - 이점:
            - 메시지 처리 완료 후 ACK를 전송 → 메시지 유실 방지
         - 필요 조건:
            - Subscriber가 비정상 종료되거나 과부하되어도 다시 전달 가능
            - Channel과 Queue는 비독점적(exclusive=false)이어야 함
            - QoS 설정: subscriber에 전달될 메시지 수를 제한 가능 (prefetch 설정)

 

   6.4. 수동 ACK의 중요성과 이유

      - 자동 ACK는 메시지가 도착하자마자 확인되므로 Subscriber가 다운되면 메시지는 사라짐 = 유실 가능성 존재
      - 수동 ACK는 처리 완료 후 명시적으로 확인 → 신뢰성 높은 시스템 구현에 필수

      - basic_consume의 auto_ack=True 설정은 callback의 시작 전에 ack가 전송될 수 있다. 

      - ack가 rabbitMQ에게 전송되면, rabbitMQ는 다음 메시지를 송신하며 컨슈머의 기본적인 수신 가능한 크기는 무제한이다.

         - 수신된 메시지는 메모리에 저장되며, callback에 의해 꺼내어 처리된다.

         - 안전한 사용 방법으로 ack를 통해 수동으로 메시지를 하나씩 꺼내와 callback의 처리 후 ack를 다시 송신하는 방법을 이용하면 된다.

         - 이때, 발행자의 basic_publish의 BasicProperties는 delivery_mode=2로 설정한다. 

            - delivery_mode=1(Transient): 메시지가 메모리에만 저장되며, RabbitMQ 서버가 다운될 경우 메시지가 손실될 수 있다.

            - delivery_mode=2(Persistent): 메시지가 디스크에 저장됩니다. RabbitMQ 서버가 재시작되어도 메시지가 유지된다.

         - 수신자의 exchange_declare와 queue_declare에 durable=True 설정을 하여 삭제가 되지 않도록 설정한다.

         - basic_qos(prefetch_count=1)를 이용해 받을 메시지의 수를 제한한다.

         - basic_consume에 auto_ack=True 설정은 하면 안된다.

         - callback의 종료전, basic_ack(delivery_tag=method.delivery_tag)를 사용해 수동으로 ack를 전달한다.

 

이유 설명
안정성 향상 Consumer가 메시지를 정상적으로 처리했는지 확인 후 ACK
메시지 유실 방지 Consumer가 다운되면 메시지는 다시 큐에 남고 재전송 가능
성능 제어 QoS(presetch) 설정으로 subscriber 과부하 방지
실시간 오류 처리 NACK 또는 reject로 오류 상황을 브로커에 알릴 수 있음

 

   6.5. RabbitMQ Reliable Communication 요약 체크리스트

항목 설정 필요 여부 설명
✅ Durable Exchange 브로커 재시작에도 유지
✅ Durable Queue 메시지 디스크 저장
✅ Persistent Message 메시지를 디스크에 저장
✅ Manual ACK 필수 처리 완료 후 ACK
✅ QoS (Prefetch) 권장 과부하 방지
✅ DLX (Dead Letter Exchange) 권장 실패 메시지 저장 및 재처리

 

   6.6. 강의 프로젝트

      6.6.1. publisher

import random
import sys
import pika

# RabbitMQ 서버에 연결 생성
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

# 채널 생성
channel = connection.channel()

# Publisher confirm 모드 설정
# 이 설정은 메시지를 발행했을 때 RabbitMQ 브로커가 성공적으로 수신했는지를 확인할 수 있게 해준다.
channel.confirm_delivery()

# exchange 선언
# exchange 이름: 'logs_exchange'
# exchange 타입: 'direct'
# durable=True → 서버 재시작 후에도 exchange가 유지됨
channel.exchange_declare(exchange='logs_exchange', exchange_type='direct', durable=True)

# 메시지의 라우팅 키 (severity 수준)과 메시지 내용 (내용 자체)
severity = ["Error", "Warning", "Info", "Other"]
messages = ["EMsg", "WMsg", "IMsg", "OMsg"]

# 10개의 메시지를 무작위 라우팅 키와 함께 전송
for i in range(10):
    randomNum = random.randint(0, len(severity)-1)
    message = f"{messages[randomNum]} : {i}"  # 메시지 내용 구성
    rk = severity[randomNum]  # 라우팅 키 선택

    try:
        # 메시지 발행
        channel.basic_publish(
            exchange='logs_exchange',
            routing_key=rk,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2  # 메시지의 영속성 설정 (1=non-persistent, 2=persistent)
            )
        )
        print("[x] sent %r" % message)

    # 예외 처리: 채널 또는 연결이 닫힌 경우
    except pika.exceptions.ChannelClosed:
        print("Channel Closed")
    except pika.exceptions.ConnectionClosed:
        print("Connection Closed")

# exchange 삭제 (if_unused=False → 사용 중이라도 삭제 시도)
channel.exchange_delete(exchange='logs_exchange', if_unused=False)

# 연결 종료
connection.close()

 

      6.6.2. subscriber

import random
import time
import pika

# 각 Subscriber마다 랜덤한 ID를 부여
subId = random.randint(1, 100)
print("Subscriber Id = ", subId)

# RabbitMQ 서버 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# 채널 생성
channel = connection.channel()

# durable=True → 서버가 재시작되더라도 exchange가 유지됨
channel.exchange_declare(exchange='logs_exchange', exchange_type='direct', durable=True)

# 큐 이름 지정 (task_queue)
# durable=True → 큐가 영속적으로 유지됨 (서버 재시작에도 사라지지 않음)
# exclusive=True → 해당 채널에만 독점적으로 바인딩 (닫히면 자동 삭제됨, 현재 주석 처리)
result = channel.queue_declare(
    queue='task_queue',
    durable=True
    # exclusive=True
)

queue_name = "task_queue"

# 여러 Routing Key를 동일 큐에 바인딩 (Direct Exchange에선 각각 명시)
severity = ["Error", "Warning", "Info", "Other"]
for s in severity:
    channel.queue_bind(exchange='logs_exchange', queue=queue_name, routing_key=s)

print('[*] waiting for the messages')

# 메시지 수신 콜백 함수 정의
def callback(ch, method, properties, body):
    print('[x] Received message:::: %r' % body)

    # 메시지를 처리하는 데 걸리는 시간 (여기선 고정 5초로 시뮬레이션)
    randomSleep = 5
    print("Working for ", randomSleep, "seconds")

    while randomSleep > 0:
        print(".", end="")  # 진행 상태 시각화
        time.sleep(1)
        randomSleep -= 1
    print("!")

    # 메시지 처리 완료 후 ack 전송 (자동이 아닌 수동 승인 방식)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# QoS (Quality of Service) 설정
# prefetch_count=1 → 한 번에 하나의 메시지만 처리하도록 제한 (consumer가 ack하기 전까진 다음 메시지 전송 안됨)
channel.basic_qos(prefetch_count=1)

# 메시지 소비 시작
# auto_ack=False (기본값) → 수동 ack 모드
# auto_ack=True로 설정하면 콜백 실행 전에도 메시지가 즉시 ack 되어 메시지 손실 가능성 존재
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback
    # auto_ack=True
)

# 큐에서 메시지를 계속 소비 (blocking loop)
channel.start_consuming()

 

7. RPC - Remote Procedure Call

   7.1. 강의 프로젝트

      7.1.1. Server

import pika


# ▶️ 팩토리얼 함수 정의
def fact(n):
    if n <= 1:
        return 1
    return n * fact(n - 1)

# ▶️ 메시지 수신 콜백 함수 (RPC 요청 처리)
def on_request(ch, method, props, body):
    # 클라이언트가 응답받기를 원하는 큐 이름 (reply_to 속성)
    reply_queue_name = props.reply_to

    # 응답 메시지와 요청 메시지를 매칭하기 위한 ID
    corr_id = props.correlation_id

    # 요청 받은 숫자 추출 및 출력
    n = int(body)
    print("called fact(", n, ")")

    # 팩토리얼 계산
    response = fact(n)

    # 응답 메시지를 reply_to 큐에 전송
    ch.basic_publish(
        exchange='',  # default exchange 사용
        routing_key=reply_queue_name,  # 응답 큐로 전송
        properties=pika.BasicProperties(
            correlation_id=corr_id  # 클라이언트가 보낸 ID 유지
        ),
        body=str(response)
    )

    # 메시지 처리가 끝났으므로 ACK 전송
    ch.basic_ack(delivery_tag=method.delivery_tag)

########## Main Program #################

# ▶️ RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# ▶️ 서버가 받을 RPC 요청 큐 생성 (durable로 유지됨)
queue_name = "rpc_server_queue"
result = channel.queue_declare(
    queue=queue_name,
    # exclusive=True,  # 연결 종료 시 큐 삭제 (비활성화)
    durable=True       # 서버 재시작 후에도 유지
)

# ▶️ 동시에 하나의 요청만 처리하도록 제한 (처리 중 메시지 누적 방지)
channel.basic_qos(prefetch_count=1)

# ▶️ 큐에서 메시지를 꺼내면 on_request 함수 호출
channel.basic_consume(
    queue=queue_name,
    on_message_callback=on_request
    # auto_ack=False → 수동 ACK 방식
)

# ▶️ 메시지 수신 루프 시작 (RPC 서버 시작)
print("Awaiting RPC requests")
channel.start_consuming()

 

      7.1.2. Client

import pika
import uuid

class FactRPCClient:
    def __init__(self):
        self.connection = \
            pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()

        self.queue_name = 'rpc_client_queue'
        self.server_queue_name = 'rpc_server_queue'
        self.channel.queue_declare(queue=self.queue_name, exclusive=True)


        self.channel.basic_consume(queue=self.queue_name,
                                   on_message_callback= self.on_response,
                                   auto_ack= True)

    def on_response(self, ch, method, props, body):
        if(self.correlation_id == props.correlation_id):
            self.response = body

    def call(self, n):
        self.response = None
        self.correlation_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key=self.server_queue_name,
                                   properties=pika.BasicProperties(
                                       reply_to=self.queue_name,
                                       correlation_id= self.correlation_id,
                                   ),
                                   body=str(n),
                                   )
        while(self.response is None):
            self.connection.process_data_events()
        return(int(self.response))


fact_rpc = FactRPCClient()
n = 5
print("Requesting Fact(", n, ")")
response = fact_rpc.call(n)
print("Got the response ", response)

 

댓글