[Udemy] Learn How to use RabbitMQ for establishing your messaging framework
강좌 정보 : https://www.udemy.com/course/rabbitmq-ss/
1. Introduction
1.1. rabbitmq 관리 플러그인 설치 - rabbitmq_management
- rabbitmq command prompt 실행
- 설치 명령어 입력
rabbitmq-plugins enable rabbitmq_management
- 관리 웹서버 접속
localhost:15672
1.2. AMQP –Advanced Message Queuing Protocol
1.2.1. AMQP의 주요 특징
항목 | 설명 |
표준화된 프로토콜 | ISO/IEC 19464 및 OASIS 공식 표준TCP 기반의 바이너리 프로토콜 |
메시지 지향 구조 | 메시지 단위로 통신함 (파일/스트림 전송 아님) |
라우팅 기능 내장 | Exchange와 Binding 개념을 이용한 유연한 메시지 라우팅 |
신뢰성 보장 | 메시지 영속성(persistent), 트랜잭션, ACK/NACK 지원으로 메시지 손실 방지 |
트랜잭션 지원 | 여러 메시지 처리의 원자성을 보장 (commit/rollback) |
플로우 컨트롤 | 소비자의 처리 능력에 따라 메시지 흐름 제어 가능 (QoS) |
보안 | TLS 암호화 및 인증 지원 가능 |
언어/플랫폼 독립성 | 다양한 언어에서 AMQP 클라이언트 라이브러리 사용 가능 (Java, Python, C#, Go 등) |
비동기 통신 | 발신자와 수신자의 독립적 처리 가능 → 느슨한 결합 구조 구성에 유리 |
1.2.2. AMQP 메시징 모델의 핵심 구성요소
구성요소 | 설명 |
Producer | 메시지를 생성하고 발행 |
Exchange | 메시지를 적절한 큐로 라우팅 (direct, fanout, topic, headers 등) |
Queue | 메시지가 저장되고 소비자를 기다리는 공간 |
Consumer | 메시지를 큐에서 받아 처리 |
Binding | Exchange와 Queue 사이의 연결 관계 (Routing Key 기반) |
Channel | 하나의 연결 안에서 다중 논리 스트림 (경량화된 메시지 통신 경로) |
Connection | TCP/IP를 통한 클라이언트와 브로커 간 물리적 연결 |
1.2.3. AMQP vs 기타 메시징 프로토콜
항목 | AMQP | MQTT | Kafka (비교 참고용) |
표준화 | ✔️ ISO/OASIS 표준 | ✔️ OASIS 표준 | ❌ 자체 프로토콜 |
보장 수준 | Exactly-once, At-least-once 등 | At-most-once, At-least-once | At-least-once, Exactly-once (옵션) |
사용 대상 | 기업 메시징 시스템 | IoT 기기, 센서 | 대규모 데이터 스트림 |
메시지 모델 | 큐 기반, 교환기 + 큐 구조 | Pub/Sub | 로그 기반 분산 큐 |
트랜잭션 | ✔️ 지원 | ❌ 미지원 | ✔️ 일부 지원 |
라우팅 | ✔️ 강력함 (Topic, Header, Pattern 등) | 간단한 Topic 기반 | 없음 (Partition 기반) |
1.3. RabbitMQ 정리
1.3.1. 메시지 브로커 시스템의 구조
Publisher → Exchange → Queue → Subscriber
- Publisher: 메시지를 발행
- Exchange: 메시지를 큐로 라우팅
- Queue: 메시지를 저장하고 대기
- Subscriber: 메시지를 소비
1.3.2. 주요 장점 요약
1.3.2.1. 발행자(Publisher)는 수신자(Subscriber)의 물리적 주소를 알 필요가 없다
- 단지 구독자의 논리적 식별자 (예: 이름)만 있으면 됨
- 시스템 간 느슨한 결합(loose coupling) 가능
1.3.2.2. 구독자 교체 가능
- 하나의 구독자가 사라져도 다른 구독자가 동일 큐를 이어받아 처리 가능
1.3.2.3. 구독자 수를 유연하게 조절 가능
- 부하가 증가하면 큐에 연결된 구독자 수를 늘려 처리 분산 가능
1.3.2.4. 브로커 재시작에도 메시지 보존 가능
- 메시지를 영속성(persistent) 설정하면, 브로커가 재시작되더라도 메시지가 손실되지 않음
1.3.3. rabbitmq에서 connection과 channel의 차이
1.3.3.1. Connection
- RabbitMQ 브로커와 클라이언트 간의 실제 물리적 TCP 연결
- 특징:
- TCP/IP 사용
- 인증(Authentication)을 요구 (username/password, 인증서 등)
- TLS를 이용한 암호화 가능 (보안)
- 1개의 Connection은 다수의 Channel을 포함 가능
- 비싸고 무겁기 때문에 남발을 피해야 함
1.3.3.2. Channel
- Connection 위에 생성되는 가볍고 논리적인 통신 단위
- 특징:
- TCP 연결을 공유하는 가벼운 스트림
- 1개의 Connection 위에 수십~수백 개의 Channel 생성 가능
- 예: 멀티스레드 앱의 각 스레드는 자신의 Channel을 가질 수 있음
- Connection이 닫히면 하위 Channel도 모두 닫힘
- Channel은 반드시 Connection 위에서만 존재 가능
- 하나의 Channel은 큐 선언, 바인딩, 메시지 발행/소비 등을 독립적으로 수행
1.3.3.3. 정리: 차이점 비교 표
항목 | Connection | Channel |
정의 | 클라이언트 ↔ 브로커 간의 물리적 연결 | Connection 내의 가벼운 논리적 통신 경로 |
프로토콜 | TCP, TLS | AMQP (Application level) |
자원 소모 | 무거움 (많이 만들면 브로커에 부담) | 가벼움 (실제 사용 권장 단위) |
수명 관계 | 닫히면 모든 Channel도 닫힘 | Connection 위에 종속됨 |
개수 제한 | 일반적으로 소수 (~수십 개 적정) | 수백 개 이상도 가능 |
사용 목적 | 전체 연결 세션 유지 | 메시지 발행/소비, 큐/익스체인지 관리 |
실무 팁 | 최소화, 재사용 권장 | 멀티스레드 분리 등 유연한 활용 가능 |
1.4. 주요 RabbitMQ 관련 함수 및 옵션
1.4.1. exchange_declare(...) 주요 옵션
옵션 | 설명 | 기본값 |
exchange | 익스체인지 이름 | 필수 |
exchange_type | 'direct', 'fanout', 'topic', 'headers' 중 하나 | 필수 |
durable | True 시 서버 재시작 후에도 유지됨 | FALSE |
auto_delete | 마지막 바인딩이 해제되면 삭제 | FALSE |
passive | True면 존재 확인만 수행, 없으면 에러 발생 | FALSE |
internal | 서버 내부 메시징용 (직접 publish 불가) | FALSE |
arguments | 고급 설정 (alternate-exchange, DLX 등) | None |
1.4.2. basic_publish(...) 주요 옵션
옵션 | 설명 |
exchange | 발행할 대상 익스체인지 |
routing_key | 메시지 라우팅 키 (fanout에선 무시됨) |
body | 메시지 본문 |
properties=BasicProperties(...) | 메시지 속성 (delivery_mode, headers 등) 설정 |
1.4.3. exchange_delete(...) 주요 옵션
옵션 | 설명 | 기본값 |
exchange | 삭제할 익스체인지 이름 | 필수 |
if_unused | True면 바인딩된 큐가 없을 경우에만 삭제 | FALSE |
1.4.4. queue_declare(...) 주요 옵션
파라미터 | 설명 |
queue='' | 빈 문자열이면 서버가 임의의 고유 이름 생성 |
exclusive=True | 연결 전용 큐, 연결 종료 시 삭제됨 |
durable=False | False면 서버 재시작 시 삭제 |
auto_delete=False | 마지막 소비자가 끊겨도 삭제 안 함 |
1.4.5. basic_consume(...) 주요 옵션
파라미터 | 설명 |
queue | 수신할 큐 이름 |
on_message_callback | 메시지 수신 시 호출될 함수 |
auto_ack=True | 수신 즉시 ACK 자동 전송 (False로 설정 시 수동 ack 필요) |
1.4.6. pika.BasicProperties 상세 설명
- pika.BasicProperties는 메시지를 RabbitMQ에 발행할 때 사용할 수 있는 메타데이터들을 정의하는 클래스이다.
pika.BasicProperties(
content_type='text/plain',
content_encoding='utf-8',
headers={'key': 'value'},
delivery_mode=2,
priority=0,
correlation_id='abc123',
reply_to='response_queue',
expiration='60000',
message_id='msg-001',
timestamp=int(time.time()),
type='my_type',
user_id='guest',
app_id='my_app'
)
- 주요 옵션 설명:
옵션 | 설명 |
content_type | MIME 타입 (예: 'text/plain', 'application/json') |
content_encoding | 인코딩 방식 (예: 'utf-8') |
headers | 사용자 정의 헤더 딕셔너리 (예: {'key': 'value'}) |
delivery_mode | 메시지의 영속성 (1: non-persistent, 2: persistent) |
priority | 메시지 우선순위 (0~9, 큐에 따라 지원 여부 다름) |
correlation_id | 요청-응답(RPC) 메시지 추적용 고유 ID |
reply_to | 응답을 받을 큐 이름 (RPC 패턴에서 사용) |
expiration | 메시지 만료 시간 (ms 단위 문자열) |
message_id | 메시지 고유 ID |
timestamp | 메시지 발송 시간 (Unix timestamp, int) |
type | 메시지의 타입 또는 목적 설명용 문자열 |
user_id | 메시지를 보낸 사용자 식별자 |
app_id | 메시지를 보낸 애플리케이션 식별자 |
1.4.7. queue_declare(...) 주요 옵션
옵션 | 설명 |
queue | 큐 이름 |
durable | True면 서버 재시작 시에도 큐 유지 |
exclusive | True면 채널이 닫히면 큐 삭제됨 (1개 연결만 허용) |
auto_delete | True면 사용 후 자동 삭제됨 (큐 사용자가 없어질 때 삭제) |
1.4.8. queue_bind(...)
주요 옵션
- 큐를 교환기와 라우팅 키로 연결
- Direct/Topic exchange에서는 라우팅 키가 중요
- 큐를 특정 Exchange에 바인딩하여 메시지를 수신할 수 있도록 연결한다.
파라미터 | 필수 | 설명 |
queue | ✅ Yes | 바인딩할 큐의 이름 |
exchange | ✅ Yes | 메시지를 수신할 교환기 이름 |
routing_key | ❌ No (기본: '') | 해당 큐로 전달될 메시지를 식별하는 키 |
arguments | ❌ No | 헤더 익스체인지에 사용하는 추가 필터 정보 (딕셔너리) |
channel.queue_bind(exchange='logs_exchange', queue='task_queue', routing_key='Error')
1.4.9. basic_qos(...) 주요 옵션
- 메시지 소비 속도를 조절하여 동시 처리량을 제한할 수 있다 (Load Control 목적).
파라미터 | 필수 | 설명 |
prefetch_size | ❌ No (기본: 0) | 바이트 기준 한 번에 가져올 메시지 크기 (0은 무제한) |
prefetch_count | ❌ No (기본: 0) | 한 번에 처리 가능한 메시지 개수. 일반적으로 1 사용 |
global | ❌ No (기본: False) | True이면 채널 전체, False면 현재 소비자에만 적용 |
- prefetch_count=1을 설정하면 consumer가 ack 하기 전까지는 다음 메시지를 전달하지 않음 → 부하 분산 효과.
channel.basic_qos(prefetch_count=1)
1.4.10. basic_ack(...) 주요 옵션
- 수동 ack 모드에서 처리 완료된 메시지를 RabbitMQ에 명시적으로 알림으로써 재전송 방지.
파라미터 | 필수 | 설명 |
delivery_tag | ✅ Yes | 수신한 메시지의 고유 ID (method.delivery_tag에서 얻음) |
multiple | ❌ No (기본: False) | True면 delivery_tag 이하의 모든 메시지에 대해 ack 처리 (batch ack) |
- multiple=True는 batch ack를 위해 사용되며, True 설정 시 delivery_tag 이하 모든 메시지가 일괄 확인됨.
ch.basic_ack(delivery_tag=method.delivery_tag)
2. Basic Communication - Default Exchange
2.1. RabbitMQ 익스체인지 타입 및 사용 정리
- Direct Exchange, Fanout Exchange, Topic Exchange, Headers Exchange 에 대해 정리한다.
2.2. Direct Exchange (직접 익스체인지)
2.2.1. 개념
- 메시지의 라우팅 키와 큐의 바인딩 키가 정확히 일치할 경우 메시지를 전달한다.
2.2.2. 사용 사례
- 로그 레벨별 큐 분리 (Error, Info, Warning)
2.2.3. 예제 코드 (Python pika)
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.queue_declare(queue='error_queue')
channel.queue_bind(exchange='direct_logs', queue='error_queue', routing_key='error')
channel.basic_publish(exchange='direct_logs', routing_key='error', body='Error Message')
2.2.4. 실무 예시 코드
2.2.4.1. 결제 상태별 큐 처리
statuses = ['success', 'failed', 'pending']
channel.exchange_declare(exchange='payment_status', exchange_type='direct')
for status in statuses:
queue_name = f'payment_{status}_queue'
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange='payment_status', queue=queue_name, routing_key=status)
channel.basic_publish(exchange='payment_status', routing_key='success', body='Order #123 Paid')
2.2.4.2. 유저 권한 분류 큐
roles = ['admin', 'staff', 'customer']
channel.exchange_declare(exchange='user_roles', exchange_type='direct')
for role in roles:
qname = f'{role}_queue'
channel.queue_declare(queue=qname, durable=True)
channel.queue_bind(exchange='user_roles', queue=qname, routing_key=role)
channel.basic_publish(exchange='user_roles', routing_key='admin', body='System Admin Login')
2.3. Fanout Exchange (팬아웃 익스체인지)
2.3.1. 개념
- 라우팅 키를 무시하고 모든 바인딩된 큐로 메시지를 브로드캐스트한다.
2.3.2. 사용 사례
- 전체 시스템 공지, 실시간 로그 브로드캐스트
2.3.3. 예제 코드
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
channel.basic_publish(exchange='logs', routing_key='', body='Broadcast Message')
2.3.4. 실무 예시 코드
2.3.4.1. 뉴스 피드 전송
channel.exchange_declare(exchange='news_feed', exchange_type='fanout')
subscribers = ['user_1_queue', 'user_2_queue', 'user_3_queue']
for q in subscribers:
channel.queue_declare(queue=q, durable=True)
channel.queue_bind(exchange='news_feed', queue=q)
channel.basic_publish(exchange='news_feed', routing_key='', body='Breaking News: ...')
2.3.4.2. 캐시 무효화 알림
channel.exchange_declare(exchange='cache_invalidation', exchange_type='fanout')
servers = ['cache_us', 'cache_kr']
for s in servers:
channel.queue_declare(queue=s, durable=True)
channel.queue_bind(exchange='cache_invalidation', queue=s)
channel.basic_publish(exchange='cache_invalidation', routing_key='', body='Invalidate /user/123')
2.4. Topic Exchange (주제 기반 익스체인지)
2.4.1. 개념
- 라우팅 키의 패턴 매칭을 기반으로 메시지를 라우팅한다.
- *: 하나의 단어 매칭
- #: 0개 이상 단어 매칭
2.4.2. 사용 사례
- 구조화된 로그/이벤트 메시지 처리
2.4.3. 예제 코드
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_declare(queue='kernel_queue')
channel.queue_bind(exchange='topic_logs', queue='kernel_queue', routing_key='kern.*')
channel.basic_publish(exchange='topic_logs', routing_key='kern.critical', body='Critical Kernel Error')
2.4.4. 실무 예시 코드
2.4.4.1. IoT 센서 데이터 처리
channel.exchange_declare(exchange='iot_data', exchange_type='topic')
channel.queue_declare(queue='temp_kitchen')
channel.queue_bind(exchange='iot_data', queue='temp_kitchen', routing_key='sensor.temp.kitchen')
channel.basic_publish(exchange='iot_data', routing_key='sensor.temp.kitchen', body='23.5°C')
2.4.4.2. 마이크로서비스 이벤트 처리
channel.exchange_declare(exchange='order_events', exchange_type='topic')
channel.queue_declare(queue='order_created_queue')
channel.queue_bind(exchange='order_events', queue='order_created_queue', routing_key='order.created')
channel.basic_publish(exchange='order_events', routing_key='order.created', body='Order #321 Created')
2.5. Headers Exchange (헤더 기반 익스체인지)
2.5.1. 개념
- 라우팅 키가 아닌 메시지의 헤더 필드 값을 기준으로 라우팅한다.
- x-match: all 또는 x-match: any 지정 가능
2.5.2. 사용 사례
- 복합 조건 라우팅이 필요한 경우 (ex. region + format)
2.5.3. 예제 코드
channel.exchange_declare(exchange='header_logs', exchange_type='headers')
channel.queue_declare(queue='logs_queue')
channel.queue_bind(
exchange='header_logs', queue='logs_queue',
arguments={'x-match': 'all', 'type': 'audit', 'format': 'json'}
)
channel.basic_publish(
exchange='header_logs', routing_key='',
body='JSON Audit Event',
properties=pika.BasicProperties(headers={'type': 'audit', 'format': 'json'})
)
2.5.4. 실무 예시 코드
2.5.4.1. 데이터 파이프라인에서 조건 라우팅
channel.exchange_declare(exchange='data_pipeline', exchange_type='headers')
channel.queue_declare(queue='us_csv_queue')
channel.queue_bind(exchange='data_pipeline', queue='us_csv_queue',
arguments={'x-match': 'all', 'region': 'us', 'format': 'csv'})
channel.basic_publish(
exchange='data_pipeline', routing_key='',
body='data.csv',
properties=pika.BasicProperties(headers={'region': 'us', 'format': 'csv'})
)
2.5.4.2. 보안 감사 로그 분기
channel.exchange_declare(exchange='security_logs', exchange_type='headers')
channel.queue_declare(queue='auth_login_queue')
channel.queue_bind(exchange='security_logs', queue='auth_login_queue',
arguments={'x-match': 'all', 'component': 'auth', 'event_type': 'login'})
channel.basic_publish(
exchange='security_logs', routing_key='',
body='User Login Attempt',
properties=pika.BasicProperties(headers={'component': 'auth', 'event_type': 'login'})
)
2.6. 강의 프로젝트
2.6.1. producer
import pika # RabbitMQ와 통신하기 위한 pika 라이브러리 임포트
# RabbitMQ 서버(localhost)에 연결을 생성 (기본 포트 5672)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 연결에서 채널(channel) 생성 - 메시지를 송수신하는 논리적인 통로
channel = connection.channel()
# 큐(queue) 선언 - 이름이 'hello'인 큐가 없으면 새로 생성됨
# 각 옵션 설명:
# durable=False : 서버 재시작 후 큐가 유지되지 않음 (True로 설정 시 영속적)
# exclusive=False : 이 큐는 현재 연결에서만 접근 가능하지 않음 (True 시 연결 종료 시 자동 삭제됨)
# auto_delete=False : 큐를 사용하는 소비자가 모두 연결 해제되더라도 큐를 삭제하지 않음 (True 시 자동 삭제)
channel.queue_declare(queue="hello", durable=False, exclusive=False, auto_delete=False)
# 메시지 발행
# exchange="" : 기본 익스체인지 (Default Exchange)를 사용하여 큐 이름을 그대로 routing key로 사용
# routing_key="hello" : 메시지를 보낼 대상 큐 이름
# body : 메시지 본문
channel.basic_publish(
exchange="",
routing_key="hello",
body="hello_world 2"
)
print("[x] Sent Hello World") # 메시지 발송 확인 메시지 출력
# 연결 종료 (채널도 함께 닫힘)
connection.close()
2.6.2. consumer
import pika # RabbitMQ와 통신하기 위한 pika 라이브러리
import sys, os
# 메인 실행 함수 정의
def main():
# RabbitMQ 서버(localhost)에 연결 생성
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
# 채널 생성 (메시지 송수신 통로)
channel = connection.channel()
# 큐 선언: 큐가 이미 존재하면 설정을 변경하지 않음 (멱등성)
# 아래 3개의 파라미터는 큐의 '생명 주기'와 관련된 중요한 설정
channel.queue_declare(
queue="hello",
durable=False, # False: 서버가 재시작되면 큐가 삭제됨 (True일 경우 영구 저장)
exclusive=False, # False: 여러 연결이 큐에 접근 가능 (True일 경우 현재 연결에서만 사용 가능하고 연결이 끊기면 큐 삭제됨)
auto_delete=False # False: 소비자가 모두 연결 해제되어도 큐가 유지됨 (True일 경우 마지막 소비자가 끊기면 큐 삭제됨)
)
# 콜백 함수: 메시지를 수신할 때 호출됨
def callback(ch, method, properties, body):
print("[x] received %r" % body)
# 메시지 소비자 등록
channel.basic_consume(
queue="hello", # 소비할 큐 이름
on_message_callback=callback, # 수신 메시지 처리 함수
auto_ack=True # True: 메시지 수신 즉시 자동 ack (주의: 소비자 장애 시 메시지 손실 위험)
)
print(" [*] waiting for the messages. To exit press Ctrl-C")
# 메시지 소비 시작 (무한 대기 루프)
channel.start_consuming()
# Ctrl+C 시 graceful 종료 처리
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)
3. Broadcasting - Fanout Exchange
3.1. Fanout Exchange 요약 정리
- 발행자(Publisher)는 수신자(Subscriber)가 누구인지 알지 못한다.
- 심지어 수신자가 존재하는지조차 알지 못한다.
- 팬아웃 익스체인지에 큐가 바인딩되어 있지 않으면 메시지는 폐기된다.
- 라우팅 키나 바인딩 규칙은 필요 없다.
- 큐는 반드시 익스체인지에 바인딩되어야 메시지를 수신할 수 있다.
항목 | 설명 |
📤 Routing 방식 | 라우팅 키 무시. 바인딩된 모든 큐에 동일한 메시지 복사본 전달 |
📥 큐 조건 | 메시지를 받으려면 큐가 익스체인지에 바인딩되어 있어야 함 |
❌ 미바인딩 큐 | 큐가 없거나 바인딩되지 않은 경우, 메시지는 버려짐 (discarded) |
💬 용도 | 시스템 브로드캐스트, 전체 알림, 캐시 무효화, 로그 브로드캐스트 등 |
⚠ Publisher와 Subscriber 관계 | Publisher는 구독자 존재 여부, 수, 상태를 모름 (완전 비동기 브로드캐스트 모델) |
3.2. 실무 예시
3.2.1. 실시간 시스템 로그 브로드캐스트
channel.exchange_declare(exchange='sys_logs', exchange_type='fanout')
channel.queue_declare(queue='log_server1')
channel.queue_declare(queue='log_server2')
channel.queue_bind(exchange='sys_logs', queue='log_server1')
channel.queue_bind(exchange='sys_logs', queue='log_server2')
channel.basic_publish(exchange='sys_logs', routing_key='', body='System rebooted')
3.2.2. 캐시 무효화 메시지 전파
channel.exchange_declare(exchange='cache_bust', exchange_type='fanout')
channel.queue_declare(queue='redis_node1')
channel.queue_declare(queue='redis_node2')
channel.queue_bind(exchange='cache_bust', queue='redis_node1')
channel.queue_bind(exchange='cache_bust', queue='redis_node2')
channel.basic_publish(exchange='cache_bust', routing_key='', body='CLEAR /products')
3.3. Temporary Queues and Exclusivity
3.3.1. Temporary Queues (임시 큐)
- Temporary Queue는 단기 사용 목적으로 생성되며, 다음과 같은 조건 중 하나 이상을 만족하면 자동으로 삭제된다.
- exclusive=True: 큐가 특정 연결에만 독점적으로 바인딩됨
- auto_delete=True: 해당 큐에 연결된 마지막 소비자가 종료되면 큐 자동 삭제
- queue='' (빈 문자열): 서버가 랜덤 이름으로 생성하는 이름 없는 큐
💡 일반적으로 "임시 큐"는 위 세 가지 옵션의 조합을 말한다.
- 각 구독자는 자신의 큐를 가짐
- Fanout Exchange 구조에서 주로 사용됨
- 소비자 종료와 함께 큐도 제거됨
- 큐는 exclusive=True 또는 auto_delete=True를 사용하여 임시 성격 부여
항목 | 내용 |
정의 | 단기 사용을 전제로 생성된 소비자 전용 큐 |
삭제 조건 | 소비자 또는 채널 종료 시 (exclusive), 소비자 없을 시 (auto_delete) |
용도 | 이벤트 브로드캐스트, 일회성 소비자 등록 |
특징 | 서버가 큐 이름을 자동 생성하거나 소비자가 지정 가능 |
3.3.2. Exclusive Queue (독점 큐)
- Exclusive Queue는 다음 조건을 가진다.
- 큐는 현재 연결(Connection)에서만 접근 가능
- 연결이 종료되면 자동으로 큐 삭제
- 다른 연결에서 동일한 이름으로 접근 시 에러 발생
- 하나의 클라이언트(Consumer) 전용 큐
- 메시지를 공유하지 않음 (다른 소비자 접근 불가)
항목 | 내용 |
정의 | 하나의 연결(Channel)에만 사용 가능한 큐 |
삭제 조건 | 연결 종료 시 자동 삭제 |
용도 | 단일 소비자, 고립된 응답 큐 필요 시 (예: RPC 응답 큐) |
특징 | 보안 및 스코프 제어용. 큐 이름이 중복될 수 없음 |
3.3.3. 비교 정리: Exclusive Queue vs Temporary Queue
항목 | Exclusive Queue | Temporary Queue |
주 용도 | 연결 독점 (RPC 응답 등) | 일회성 소비자 메시지 수신 |
접근 제한 | 하나의 연결만 접근 가능 | 여러 임시 소비자 가능 |
자동 삭제 조건 | 연결 종료 시 삭제 | 소비자 종료 or 바운드 없음 시 삭제 |
사용 방식 | exclusive=True | exclusive=True 또는 auto_delete=True, queue='' 사용 가능 |
Fanout Exchange와 조합 | 일반적이지 않음 | 매우 일반적 (subscriber마다 개별 큐 자동 생성 및 바인딩) |
큐 이름 지정 | 일반적으로 명시적 이름 | 주로 빈 문자열로 랜덤 큐 이름 생성 |
3.3.4. 기술적 속성 요약 (pika 기준)
속성 | 설명 | 기본값 |
exclusive | 해당 연결에서만 접근 가능 | FALSE |
auto_delete | 마지막 소비자가 종료되면 삭제 | FALSE |
queue='' | 이름 없는 임시 큐, 서버가 랜덤 이름 생성 | (사용자 지정 필수 아님) |
durable | 서버 재시작 후에도 큐 유지 여부 | FALSE |
3.3.5. 실전 예제 3가지: 각기 다른 방식의 임시 큐 구현
3.3.5.1. 이름 없는 임시 큐 (서버 자동 생성)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 이름 없는 임시 큐 생성 (exclusive + auto_delete 포함)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print("[*] Created temporary queue:", queue_name)
# 큐에 메시지 바인딩 및 소비자 등록
channel.queue_bind(exchange='logs', queue=queue_name)
channel.basic_consume(queue=queue_name, auto_ack=True,
on_message_callback=lambda ch, m, p, body: print("[x]", body))
channel.start_consuming()
3.3.5.2. 이름 있는 독점 큐 (exclusive=True)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 고정된 이름으로 Exclusive 큐 선언
channel.queue_declare(queue='exclusive_queue', exclusive=True)
channel.basic_publish(exchange='', routing_key='exclusive_queue', body='Hello Exclusive')
channel.basic_consume(queue='exclusive_queue', auto_ack=True,
on_message_callback=lambda ch, m, p, body: print("[x]", body))
channel.start_consuming()
3.3.5.3. auto_delete 큐 (소비자 종료 시 자동 삭제)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# auto_delete 큐 선언
channel.queue_declare(queue='ephemeral_queue', auto_delete=True)
channel.basic_consume(queue='ephemeral_queue', auto_ack=True,
on_message_callback=lambda ch, m, p, body: print("[x]", body))
print("[*] Waiting on ephemeral queue...")
channel.start_consuming()
3.4. 강의 프로젝트
3.4.1. publisher
import pika
import sys
# 1️⃣ RabbitMQ 서버(localhost)에 연결 생성
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost') # 기본 포트 5672, 가상 호스트 '/', guest/guest 계정 사용
)
# 2️⃣ 채널 생성 (메시지를 송수신하는 논리적 단위)
channel = connection.channel()
# 3️⃣ Fanout Exchange 생성
channel.exchange_declare(
exchange='br_exchange', # 익스체인지 이름
exchange_type='fanout', # 교환기 타입: fanout / direct / topic / headers
durable=False, # 서버 재시작 시 유지 여부 (기본: False)
auto_delete=False, # 바인딩된 큐가 모두 해제되면 삭제 (기본: False)
passive=False, # True면 존재 유무만 검사, 생성은 안 함
internal=False, # True면 메시지를 직접 publish 불가 (서버 내부용)
arguments=None # 기타 설정 (예: alternate-exchange, DLX 등)
)
# 4️⃣ 메시지 발행 루프
for i in range(4):
message = "Hello" + str(i)
# 메시지 발행
channel.basic_publish(
exchange='br_exchange', # fanout은 routing_key를 무시하고 모든 큐로 전송
routing_key='', # 필수지만 fanout에서는 사용되지 않음
body=message, # 전송할 메시지 본문
properties=pika.BasicProperties(
delivery_mode=1 # 1: non-persistent, 2: persistent (큐가 durable이어야 저장됨)
)
)
print("[x] sent %r" % message)
# 5️⃣ 익스체인지 삭제
# if_unused=True : 바인딩된 큐가 없을 경우에만 삭제 가능 (에러 방지)
channel.exchange_delete(
exchange='br_exchange',
if_unused=False # False: 강제 삭제 (큐와 바인딩된 상태여도 삭제됨)
)
# 6️⃣ 연결 종료 (자동으로 채널도 함께 종료됨)
connection.close()
3.4.2. subscriber
import pika # RabbitMQ 통신을 위한 Python 라이브러리
import sys
# 1️⃣ RabbitMQ 서버에 연결(Connection 생성)
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost') # 호스트: 'localhost', 포트: 기본 5672
# 다른 설정들:
# - port=5672
# - virtual_host='/'
# - credentials=pika.PlainCredentials('user', 'pass')
)
# 2️⃣ 채널(Channel) 생성: 하나의 연결(Connection) 안에서 독립적인 통신 단위
channel = connection.channel()
# 3️⃣ 익스체인지(Exchange) 선언
channel.exchange_declare(
exchange='br_exchange', # 익스체인지 이름
exchange_type='fanout', # fanout: 라우팅 키 무시하고 모든 큐에 브로드캐스트
durable=False, # True: 서버 재시작 후에도 익스체인지 유지 (영속성)
auto_delete=False, # True: 바인딩된 큐가 모두 없어지면 자동 삭제
internal=False, # True: 서버 내부 사용 (publish 불가)
passive=False, # True: 존재 여부 확인만 (미존재 시 예외 발생)
arguments=None # 추가 옵션 (alternate-exchange, TTL 등)
)
# 🔍 exchange_declare() 주요 파라미터 정리
# - exchange_type: 'direct', 'fanout', 'topic', 'headers'
# - durable: 메시지 영속성을 위해 queue도 durable이어야 함
# - auto_delete: 큐 바인딩이 모두 끊기면 익스체인지 삭제
# - passive: True 설정 시 익스체인지가 없으면 ChannelClosed 예외 발생
# - internal: true이면 producer가 메시지 발행 불가, broker 내부에서만 사용됨
# - arguments: 예: {'alternate-exchange': 'ae_name'}
# 4️⃣ 메시지 전송 (fanout이라 routing_key는 무시됨)
for i in range(4):
message = "Hello" + str(i)
channel.basic_publish(
exchange='br_exchange', # 메시지를 보낼 익스체인지
routing_key='', # fanout은 무시되므로 빈 문자열 사용
body=message, # 메시지 본문 (bytes 또는 string)
properties=pika.BasicProperties(
delivery_mode=1 # 1: non-persistent, 2: persistent (디스크 저장)
# 기타 예시:
# content_type='text/plain',
# headers={'key': 'value'},
# expiration='60000', # 메시지 TTL
# priority=0,
# correlation_id='uuid',
# reply_to='callback_queue'
)
)
print("[x] sent %r" % message)
# 🔍 basic_publish() 주요 속성
# - delivery_mode=2: 메시지를 영속적으로 저장하려면 큐도 durable이어야 함
# - headers: Headers exchange에 사용되는 key-value 쌍
# - expiration: 메시지 만료 시간 (ms)
# - reply_to: RPC 구조에서 응답용 큐 지정
# - correlation_id: 요청-응답 연계 추적에 사용
# 5️⃣ 익스체인지 삭제
channel.exchange_delete(
exchange='br_exchange',
if_unused=False # False: 바인딩 여부와 상관없이 삭제
# True: 바인딩된 큐가 있으면 삭제 실패 → 예외 발생
)
# 🔍 exchange_delete() 옵션
# - exchange: 삭제할 익스체인지 이름
# - if_unused=True: 큐와 바인딩되어 있으면 삭제되지 않음 (안전 모드)
# 6️⃣ 연결 종료 (자동으로 채널도 닫힘)
connection.close()
4. Selective Routing - Direct Exchange
4.1. 기본 익스체인지(Default Exchange)의 제한사항
- 기본 익스체인지는 이름이 없는 익스체인지로, 라우팅 키와 동일한 이름의 큐로 직접 메시지를 전달한다.
4.1.1. 문제점
- Publisher는 구독자의 큐 이름을 명확히 알아야 함 (강한 결합)
- 하나의 메시지 유형(라우팅 키)은 단 하나의 큐에만 전송 가능 (멀티 구독 불가)
4.1.2. 예시 동작
- 큐 이름이 image_processor일 경우에만 메시지 도달함
channel.basic_publish(exchange='', routing_key='image_processor', body='Resize Image')
4.2. Fanout Exchange의 한계
- Fanout Exchange는 연결된 모든 큐에 메시지를 브로드캐스트한다.
4.2.1. 문제점
- 라우팅 키 무시 → 메시지 필터링 불가능
- 모든 구독자가 모든 메시지를 받아야 하므로 불필요한 메시지 처리 오버헤드 발생
- 수신 자격이 없는 구독자도 메시지를 수신할 수 있음 → 보안 이슈 발생 가능
4.2.2. 예시 동작
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='System Down')
4.3. Selective Routing 구조 (Direct Exchange 기반)
- Direct Exchange는 라우팅 키와 큐의 바인딩 키가 정확히 일치할 때만 메시지를 전달한다. 이 구조를 통해 정교한 구독자 필터링이 가능하며, 하나의 메시지를 여러 큐에 동시에 보낼 수 있다.
4.3.1. 개념 요약
항목 | 설명 |
Exchange Type | direct |
Routing Key | 명확하게 일치해야 라우팅됨 |
다중 바인딩 | 하나의 키에 여러 큐 바인딩 가능 (N:1 / 1:N 모두 가능) |
메시지 분기 | 바인딩 룰에 따라 정확히 분배됨 |
4.3.2. Selective Routing 예시 구조
[Process A/B/C] --> [Direct Exchange] --> [AQueue / FQueue / SQueue]
| | | |
| Error Warn Info
(Routing Key)
4.3.3. 바인딩 규칙 예시
Routing Key | 전달 대상 큐 |
Error | AQueue |
Warn | FQueue |
Info | SQueue |
4.4. 구현 절차 및 코드 예제
4.4.1. Publisher
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='log_exchange', exchange_type='direct')
logs = [('Error', 'Disk full'), ('Warn', 'Memory usage high'), ('Info', 'Startup complete')]
for rk, msg in logs:
channel.basic_publish(exchange='log_exchange', routing_key=rk, body=msg)
print(f"[x] Sent {rk}: {msg}")
connection.close()
4.4.1. Subscriber (예: Error 전용)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='log_exchange', exchange_type='direct')
channel.queue_declare(queue='AQueue', durable=True)
channel.queue_bind(exchange='log_exchange', queue='AQueue', routing_key='Error')
def callback(ch, method, properties, body):
print(f"[Error] {body.decode()}")
channel.basic_consume(queue='AQueue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
4.5. 강의 프로젝트
4.5.1. publisher
import pika # RabbitMQ와 통신하기 위한 Python 라이브러리
import sys
import random
# 1️⃣ RabbitMQ 서버에 연결 생성 (기본 localhost:5672, vhost='/', guest 계정)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 2️⃣ 채널 생성 (하나의 연결에서 여러 채널 생성 가능. 실제 메시지 송수신 단위)
channel = connection.channel()
# 3️⃣ direct exchange 생성 (교환기 이름: 'logs_exchange')
channel.exchange_declare(
exchange='logs_exchange', # 익스체인지 이름
exchange_type='direct', # 타입: 'direct' (라우팅 키 일치 시에만 큐로 전달)
durable=False, # 서버 재시작 후에도 유지할지 여부 (False면 삭제됨)
auto_delete=False, # 바인딩된 큐가 모두 끊기면 자동 삭제 여부
internal=False, # True면 메시지 발행 불가, 내부 교환기로만 사용됨
passive=False, # True일 경우 존재 여부만 확인하고, 없으면 오류 발생
arguments=None # 고급 설정 (alternate-exchange, TTL 등)
)
# 4️⃣ 메시지 전송 데이터 정의
severity = ["Error", "Warning", "Info", "Other"] # 라우팅 키들
messages = ["EMsg", "WMsg", "IMsg", "OMsg"] # 각각의 메시지 본문
# 5️⃣ 랜덤 메시지 10개 발행
for i in range(10):
randomNum = random.randint(0, len(severity) - 1) # 0 ~ 3 중 하나 선택
print(randomNum)
message = messages[randomNum] # 전송할 메시지
rk = severity[randomNum] # 해당 메시지의 라우팅 키
# 메시지 전송
channel.basic_publish(
exchange='logs_exchange', # direct 익스체인지로 전송
routing_key=rk, # 큐에 바인딩된 키와 일치해야 전달됨
body=message # 메시지 본문
# 추가로 설정 가능한 properties:
# properties=pika.BasicProperties(
# delivery_mode=2, # 2: persistent (디스크 저장)
# content_type='text/plain',
# correlation_id='1234',
# reply_to='response_queue'
# )
)
print("[x] sent %r" % message)
# 6️⃣ 익스체인지 삭제
channel.exchange_delete(
exchange='logs_exchange',
if_unused=False # True: 바인딩된 큐가 없을 때만 삭제 (안전 삭제)
# False: 강제 삭제 (큐가 바인딩되어 있어도 삭제됨)
)
# 7️⃣ 연결 종료 (채널도 자동으로 종료됨)
connection.close()
4.5.2. consumer - alarmraiser
import pika # RabbitMQ와 통신하기 위한 라이브러리
# 1️⃣ RabbitMQ 서버에 연결 생성
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 2️⃣ 채널(Channel) 생성: 메시지를 송수신할 논리적 경로
channel = connection.channel()
# 3️⃣ 'logs_exchange'라는 이름의 direct 익스체인지 선언
channel.exchange_declare(
exchange='logs_exchange', # 익스체인지 이름
exchange_type='direct', # 메시지를 정확히 일치하는 라우팅 키로 큐에 전달
durable=False, # 서버 재시작 시 삭제됨 (기본: False)
auto_delete=False, # 바인딩 해제되더라도 삭제 안 됨
internal=False, # False일 경우 publish 허용 (기본값)
passive=False, # True 시 존재 확인만 하고 생성은 하지 않음
arguments=None # 고급 설정 (TTL, alternate-exchange 등)
)
# 4️⃣ 임시 큐 선언 (큐 이름 없이 생성, RabbitMQ가 자동으로 랜덤 이름 할당)
result = channel.queue_declare(
queue='', # 빈 문자열이면 서버가 자동으로 이름 생성
exclusive=True # 현재 연결에서만 접근 가능, 연결 종료 시 자동 삭제
# durable=False # 기본값: False (서버 재시작 시 삭제)
# auto_delete=False # 기본값: False (소비자 모두 끊겨도 삭제 안 됨)
)
queue_name = result.method.queue # 자동으로 생성된 큐 이름 저장
# 5️⃣ 큐를 익스체인지에 바인딩 (지정된 라우팅 키에 대해 큐로 메시지 전달)
channel.queue_bind(
exchange='logs_exchange', # 바인딩할 익스체인지
queue=queue_name, # 현재 연결된 임시 큐
routing_key="Error" # 이 라우팅 키로 발행된 메시지를 이 큐로 전달
)
channel.queue_bind(
exchange='logs_exchange',
queue=queue_name,
routing_key="Warning"
)
# 📌 queue_bind(...) 주요 파라미터:
# - exchange: 메시지를 받을 익스체인지 이름
# - queue: 바인딩할 큐 이름
# - routing_key: 큐가 수신할 메시지의 라우팅 키
# - arguments: 선택적 필터 (headers 사용 시)
print('[*] waiting for the messages') # 소비자 준비 상태 출력
# 6️⃣ 메시지 수신 시 처리할 콜백 함수 정의
def callback(ch, method, properties, body):
print('[x] Alarm:::: %r' % body)
# 7️⃣ 소비자 등록
channel.basic_consume(
queue=queue_name, # 메시지를 수신할 큐
on_message_callback=callback, # 메시지를 수신했을 때 호출할 함수
auto_ack=True # True면 메시지 수신 즉시 ack (확인 응답) 전송
# False일 경우 ch.basic_ack()를 직접 호출해야 함
)
# 📌 basic_consume(...) 주요 파라미터
# - queue: 수신할 대상 큐
# - on_message_callback: 메시지 처리 함수 (필수)
# - auto_ack: True 시 자동 ack, False 시 수동 ack 필요
# - exclusive: True 시 해당 consumer가 큐에 대해 독점 소비자
# - consumer_tag: 수동으로 식별 태그 지정 가능
# - arguments: AMQP arguments (예: 우선순위)
# 8️⃣ 소비 시작 (무한 대기 루프)
channel.start_consuming()
# 📌 start_consuming():
# - Blocking 방식으로 메시지를 기다림
# - Ctrl+C 또는 channel.stop_consuming()으로 종료 가능
4.5.3. consumer - fiewriter
import pika # RabbitMQ와 연결을 위한 Python 라이브러리
# 1️⃣ RabbitMQ 서버에 연결(Connection 객체 생성)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost') # 기본 포트: 5672, 기본 vhost: '/', 기본 사용자: guest
)
# 2️⃣ 채널 생성 (논리적 메시지 송수신 경로)
channel = connection.channel()
# 3️⃣ Direct Exchange 생성 (존재하지 않으면 생성됨, 있으면 무시)
channel.exchange_declare(
exchange='logs_exchange', # 익스체인지 이름
exchange_type='direct', # 라우팅 키를 기반으로 메시지 라우팅
durable=False, # False일 경우 서버 재시작 시 사라짐
auto_delete=False, # 바인딩된 큐가 모두 해제되어도 삭제 안 됨
internal=False, # 외부 publish 허용 (True면 내부용으로만 사용)
passive=False, # True면 존재 유무 확인만 하고 생성은 하지 않음
arguments=None # 고급 옵션 (예: alternate-exchange, TTL)
)
# 4️⃣ 임시 큐 생성 (이름 없이 자동 생성, 독점 연결)
result = channel.queue_declare(
queue='', # 빈 문자열을 지정하면 서버가 임의의 이름을 생성해줌
exclusive=True # 이 연결에서만 접근 가능, 연결 종료 시 큐도 자동 삭제됨
)
queue_name = result.method.queue # 자동으로 생성된 큐 이름 저장
# 5️⃣ 큐를 exchange에 바인딩 (라우팅 키: "Warning")
channel.queue_bind(
exchange='logs_exchange', # 바인딩할 익스체인지 이름
queue=queue_name, # 바인딩 대상 큐
routing_key="Warning" # 라우팅 키가 'Warning'인 메시지만 수신
)
print('[*] waiting for the messages') # 메시지 수신 대기 상태 출력
# 6️⃣ 수신 메시지 처리 콜백 함수
def callback(ch, method, properties, body):
print('[x] Writing to File:::: %r' % body)
# 7️⃣ 소비자 등록
channel.basic_consume(
queue=queue_name, # 수신할 큐 이름
on_message_callback=callback, # 수신 시 실행할 콜백 함수
auto_ack=True # 메시지를 수신하면 즉시 ACK 자동 전송
# auto_ack=False로 설정할 경우, 메시지 처리 후 ch.basic_ack(...)을 명시적으로 호출해야 함
)
# 8️⃣ 메시지 소비 시작 (blocking 방식, 무한 루프)
channel.start_consuming()
# Ctrl+C 또는 channel.stop_consuming() 호출 시 중단 가능
4.5.4. consumer - screenprinter
import pika # RabbitMQ와 통신하기 위한 Python 라이브러리
# 1️⃣ RabbitMQ 서버에 연결
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost') # 기본 포트: 5672
)
# 2️⃣ 메시지를 송수신할 채널 생성
channel = connection.channel()
# 3️⃣ direct exchange 생성 또는 선언 (없으면 새로 생성됨)
channel.exchange_declare(
exchange='logs_exchange', # 익스체인지 이름
exchange_type='direct', # 정확히 일치하는 라우팅 키만 해당 큐로 전달됨
durable=False, # 서버 재시작 시 익스체인지 유지 여부 (False: 비영속)
auto_delete=False, # 모든 큐가 바인딩 해제되면 자동 삭제 여부
internal=False, # 외부에서 메시지 발행 허용 여부
passive=False, # True면 존재만 확인하고 없으면 오류 발생
arguments=None # 고급 설정 (alternate-exchange 등)
)
# 4️⃣ 이름 없는 임시 큐 생성 → RabbitMQ가 자동으로 이름 부여
result = channel.queue_declare(
queue='', # 빈 문자열: 자동 생성된 이름
exclusive=True # 이 연결에서만 사용할 수 있고, 연결 종료 시 자동 삭제
)
queue_name = result.method.queue # 생성된 임시 큐 이름 추출
# 5️⃣ 큐를 익스체인지에 3가지 라우팅 키로 바인딩
severity = ["Error", "Warning", "Info"]
for rk in severity:
channel.queue_bind(
exchange='logs_exchange', # 바인딩할 익스체인지
queue=queue_name, # 대상 큐
routing_key=rk # 큐가 수신할 메시지의 라우팅 키
)
# 🔔 라우팅 키 설명:
# - Error: 심각한 문제
# - Warning: 경고 메시지
# - Info: 일반 정보 로그
print('[*] waiting for the messages') # 준비 완료 메시지
# 6️⃣ 메시지 수신 시 호출될 콜백 함수 정의
def callback(ch, method, properties, body):
print('[x] Alarm:::: %r' % body)
# 7️⃣ 큐에서 메시지 소비 시작 (콜백 등록)
channel.basic_consume(
queue=queue_name, # 수신할 큐 이름
on_message_callback=callback, # 메시지를 처리할 함수
auto_ack=True # 수신 즉시 메시지 확인(ack) 전송 (실패 시 재전송 안됨)
)
# 📌 auto_ack 설명:
# - True: 수신 즉시 ACK (메시지 유실 가능성 ↑)
# - False: 메시지 처리 후 `ch.basic_ack()`으로 명시적으로 확인 필요 (안정성 ↑)
# 8️⃣ 메시지 수신 시작 (Blocking 방식)
channel.start_consuming()
# 📌 종료하려면: Ctrl+C 또는 `channel.stop_consuming()` 호출 필요
5. Topic Exchange - Pattern Based Routing
5.1. 메시지 구조 및 라우팅 키 구성
5.1.1. 메시지 속성 (message metadata)
속성 카테고리 | 가능한 값 |
Severity | Error (E), Warning (W), Info (I) |
Priority | High (H), Medium (M), Low (L) |
Action | Action1 (A1), Action2 (A2), Action3 (A3) |
Component | C1, C2, C3 |
5.1.2. 라우팅 키 구성 방식
<Severity>.<Priority>.<Action>.<Component>
예시: E.H.A1.C1
5.1.3. 라우팅 키 예시
- 이 구조는 4계층 라우팅 키 구조로, 패턴 기반의 필터링에 매우 적합하다.
E.H.A1.C1
E.H.A1.C2
E.M.A1.C1
W.L.A3.C3
I.L.A3.C3
5.2. Subscriber 바인딩 전략 및 확장성 문제
구독 목적 | 바인딩 키 |
C1에서 발생한 High Priority Error Action A1 | E.H.A1.C1 |
모든 컴포넌트에서의 E.H.A1 | E.H.A1.C1, E.H.A1.C2, E.H.A1.C3 |
5.2.1. 문제점
- 컴포넌트가 추가되면 구독자는 새로운 바인딩 키를 수동으로 추가해야 함 → 유지보수 부담
- 조건이 늘어날수록 바인딩 키 수가 급증함 → 관리 복잡성 증가
5.3. 패턴 기반 바인딩 룰(Solution)
- 참고: 와일드카드 *는 하나의 단어, #는 0개 이상 단어를 의미한다.
Requirement | Binding Rule | 설명 |
모든 Error 메시지 | E.# | Severity가 'Error'로 시작하는 모든 메시지 수신 |
모든 High Priority 메시지 | *.H.# | Priority가 'High'인 모든 메시지 수신 |
모든 A3 액션 메시지 | *.*.A3.* 또는 #.*.A3.* | Action이 A3인 메시지 전체 수신 |
Component C2에서 발생한 모든 메시지 | #.#.#.C2 또는 #.*.C2 | Component가 C2인 메시지 수신 |
High Priority Error 메시지 전체 | E.H.# | Severity=Error, Priority=High 인 메시지 수신 |
Component C3에서 발생한 Warning 메시지 | W.#.C3 | Severity=Warning, Component=C3인 메시지 |
Component C2에서 발생한 High Priority Error | E.H.*.C2 | E, H, C2 조합에 해당하는 모든 메시지 수신 |
Action2가 필요한 Medium Priority Warning | W.M.A2.* | Severity=Warning, Priority=Medium, Action=A2 |
C3에서 발생한 High Priority Info 메시지 (Action2 수행) | I.H.A2.C3 | 모든 조건이 정확히 일치하는 메시지 수신 |
5.4. Topic Exchange의 확장성 요점
- 바인딩 키를 와일드카드 패턴으로 설계하면 컴포넌트 추가 시 유지보수가 필요 없음
- * : 정확히 하나의 토큰 일치
- # : 0개 이상의 토큰 일치
5.5. 예시 비교
방식 | 바인딩 키 | 설명 |
정적 | E.H.A1.C1, E.H.A1.C2 등 | 컴포넌트 추가 시 바인딩 수 증가 |
패턴 | E.H.A1.* | 컴포넌트가 몇 개든 자동 적용됨 |
5.6. Topic Exchange 바인딩 설계 시 유의 사항 (Additionally 보완 정리)
항목 | 설명 |
다중 바인딩 | 하나의 큐에 여러 라우팅 키 바인딩 가능 |
메시지 중복 | 동일한 메시지가 여러 바인딩 규칙에 일치해도 큐에는 1회만 도달 |
공유 큐 | 하나의 큐에 다수 구독자 등록 가능 (exclusive=False) |
라우팅 실패 | 어떠한 바인딩 규칙에도 일치하지 않으면 메시지는 drop 됨 |
5.7. 강의 프로젝트
5.7.1. publisher
import random
import sys
import pika
# RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 채널 생성
channel = connection.channel()
# Topic Exchange 선언
# exchange_type='topic' → 패턴 기반 라우팅 가능
# durable=True → 서버 재시작 후에도 exchange가 유지됨
channel.exchange_declare(
exchange='system_exchange',
exchange_type='topic',
durable=True
)
# 각 메시지의 속성 요소 정의
# Severity (심각도), Priority (우선순위), Action (조치), Component (컴포넌트)
severity = ['E', 'W', 'I'] # Error, Warning, Info
priority = ['H', 'M', 'L'] # High, Medium, Low
action = ['A1', 'A2', 'A3'] # Action1, Action2, Action3
component = ['C1', 'C2', 'C3'] # Component1, Component2, Component3
# 10개의 무작위 메시지 생성 및 발행
for i in range(10):
# 각각의 항목에서 무작위 선택
randomSeverity = random.choice(severity)
randomPriority = random.choice(priority)
randomAction = random.choice(action)
randomComponent = random.choice(component)
# 라우팅 키 형식: <Severity>.<Priority>.<Action>.<Component>
rk = f"{randomSeverity}.{randomPriority}.{randomAction}.{randomComponent}"
# 메시지 본문 구성
message = rk + " ::::: <Message>"
# 메시지 발행
# routing_key는 위에서 생성한 rk (ex. E.H.A1.C2)
channel.basic_publish(
exchange='system_exchange',
routing_key=rk,
body=message
)
print("[x] sent %r" % message)
# 교환기를 삭제할 경우 (사용 중이지 않으면)
# channel.exchange_delete(exchange='system_exchange', if_unused=False)
# 연결 종료
connection.close()
5.7.2. subscriber - A3actiontaker
import pika
# RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 연결을 통해 채널 생성
channel = connection.channel()
# Topic Exchange 선언
# durable=True → 서버 재시작 이후에도 exchange가 유지됨
channel.exchange_declare(
exchange='system_exchange',
exchange_type='topic',
durable=True
)
# 임시 큐 선언
# queue='' → 이름 자동 생성
# exclusive=True → 해당 연결(connection)이 닫히면 큐도 삭제됨
result = channel.queue_declare(
queue='', # 비워두면 임시 이름 자동 생성
exclusive=True # 연결 종료 시 큐 자동 삭제
)
# 자동 생성된 큐 이름 저장
queue_name = result.method.queue
# (디버깅용 출력)
# print("Subscriber queue_name =", queue_name)
# 큐를 exchange에 바인딩
# routing_key="#.A3.#" → A3가 포함된 모든 라우팅 키에 대해 메시지 수신
# 예: E.H.A3.C1, W.L.A3.C3 등
channel.queue_bind(
exchange='system_exchange',
queue=queue_name,
routing_key="#.A3.#"
)
print('[*] waiting for the messages')
# 메시지를 수신하면 호출될 콜백 함수 정의
def callback(ch, method, properties, body):
print('[x] :::: %r' % body)
# 소비자 등록
# auto_ack=True → 메시지 수신 즉시 확인(ack) 전송 (신뢰성 낮음, 손실 위험 있음)
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
# 메시지 수신 대기 (무한 대기)
channel.start_consuming()
5.7.3. subscriber - allwarningsfromC2
import pika
# RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 연결된 채널 생성
channel = connection.channel()
# Topic Exchange 선언
# exchange_type='topic' → 메시지 라우팅 키의 패턴 기반 필터링
# durable=True → 서버 재시작 후에도 exchange 유지
channel.exchange_declare(
exchange='system_exchange',
exchange_type='topic',
durable=True
)
# 임시 큐 선언
# queue='' → 서버가 자동으로 임의 이름 생성
# exclusive=True → 해당 연결이 끊기면 큐도 자동 삭제됨 (개인용 구독자에 적합)
result = channel.queue_declare(
queue='',
exclusive=True
)
# 자동 생성된 큐 이름 획득
queue_name = result.method.queue
# 디버깅용 큐 이름 출력 (필요시 주석 해제)
# print("Subscriber queue_name =", queue_name)
# 큐를 Exchange에 바인딩 (연결)
# routing_key="W.#.C2" → 라우팅 키가 Warning(W)으로 시작하고 중간에 0개 이상 토큰이 있으며 마지막이 C2인 경우 수신
# 예: W.H.A1.C2, W.M.C2, W..C2 등 (유효한 형식이어야 하며 정확한 점(.) 수는 라우팅 키에 따라 달라짐)
channel.queue_bind(
exchange='system_exchange',
queue=queue_name,
routing_key="W.#.C2"
)
print('[*] waiting for the messages')
# 메시지를 수신했을 때 호출될 콜백 함수 정의
def callback(ch, method, properties, body):
print('[x] :::: %r' % body)
# 메시지 소비자 등록
# auto_ack=True → 메시지 수신 즉시 자동으로 처리 완료(ack)됨 → 장애 발생 시 메시지 손실 위험 있음
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
# 무한 루프 형태로 메시지를 계속 수신
channel.start_consuming()
5.7.4. subscriber - errorhandlingsub
import pika
# RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 연결된 채널 생성
channel = connection.channel()
# Topic Exchange 생성 (없으면 새로 생성, 있으면 설정 유지)
# exchange_type='topic' → 라우팅 키 패턴 기반 메시지 필터링
# durable=True → 서버 재시작 시에도 exchange 유지
channel.exchange_declare(
exchange='system_exchange',
exchange_type='topic',
durable=True
)
# 임시 큐 선언
# queue='' → RabbitMQ가 이름 자동 생성
# exclusive=True → 연결 종료 시 큐도 자동 삭제됨
result = channel.queue_declare(queue='', exclusive=True)
# 생성된 큐 이름 추출
queue_name = result.method.queue
# 디버깅용 출력 (필요시 사용)
# print("Subscriber queue_name =", queue_name)
# 큐를 Exchange에 바인딩
# routing_key="E.#" → 'E'로 시작하는 모든 라우팅 키 매칭
# 예: "E.H.A1.C1", "E.M.C3", "E" 등
channel.queue_bind(
exchange='system_exchange',
queue=queue_name,
routing_key="E.#"
)
print('[*] waiting for the messages')
# 메시지 수신 시 실행할 콜백 함수 정의
def callback(ch, method, properties, body):
print('[x] :::: %r' % body)
# 메시지 소비 등록
# auto_ack=True → 수신 즉시 ack 전송 (장애 발생 시 메시지 유실 가능)
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
# 메시지 수신 시작 (무한 루프)
channel.start_consuming()
# 🔴 중복된 호출 — 필요 없음 (start_consuming()은 blocking 함수로 여기서 멈춤)
# channel.start_consuming()
6. Reliable communication
6.1. 메시지 흐름과 실패 지점 (Failure Points)
Publisher → [Exchange] → [Queue] → Subscriber
6.2. 실패 가능 지점 요약:
구성 요소 | 실패 가능성 | 설명 |
Publisher → Exchange | 연결 또는 채널 단절 | 메시지 자체가 브로커에 도달하지 않음 |
Exchange → Queue | Exchange/Queue 다운 또는 재시작 | 메시지가 올바르게 라우팅되지 않음 |
Queue 자체 | 휘발성 큐는 메시지를 유실할 수 있음 | |
Queue → Subscriber | 구독자 다운 또는 과부하 | 처리되지 않은 메시지는 유실 위험 있음 |
6.3. RabbitMQ Provision 요약
6.3.1. Provision #1: Publisher Confirmation
- 기능: 브로커가 메시지를 수신했는지 확인
- 이점:
- 메시지 손실 방지
- 실패 원인 파악 가능 (채널 종료, 연결 종료 등)
6.3.2. Provision #2: Durable Exchange
- 기능: Exchange를 영속성 있게 구성
- 이점:
- 브로커 재시작 시에도 Exchange는 유지됨
6.3.3. Provision #3: Durable Queue
- 기능: Queue를 Durable로 설정
- 이점:
- 미전달 메시지, 미확인 메시지 디스크 저장
- 브로커 재시작에도 유지
- 주의점:
- 성능 저하
- 여전히 메시지 유실 가능성 존재
6.3.4. Provision #4: Manual Acknowledgment (수동 ACK)
- 이점:
- 메시지 처리 완료 후 ACK를 전송 → 메시지 유실 방지
- 필요 조건:
- Subscriber가 비정상 종료되거나 과부하되어도 다시 전달 가능
- Channel과 Queue는 비독점적(exclusive=false)이어야 함
- QoS 설정: subscriber에 전달될 메시지 수를 제한 가능 (prefetch 설정)
6.4. 수동 ACK의 중요성과 이유
- 자동 ACK는 메시지가 도착하자마자 확인되므로 Subscriber가 다운되면 메시지는 사라짐 = 유실 가능성 존재
- 수동 ACK는 처리 완료 후 명시적으로 확인 → 신뢰성 높은 시스템 구현에 필수
- basic_consume의 auto_ack=True 설정은 callback의 시작 전에 ack가 전송될 수 있다.
- ack가 rabbitMQ에게 전송되면, rabbitMQ는 다음 메시지를 송신하며 컨슈머의 기본적인 수신 가능한 크기는 무제한이다.
- 수신된 메시지는 메모리에 저장되며, callback에 의해 꺼내어 처리된다.
- 안전한 사용 방법으로 ack를 통해 수동으로 메시지를 하나씩 꺼내와 callback의 처리 후 ack를 다시 송신하는 방법을 이용하면 된다.
- 이때, 발행자의 basic_publish의 BasicProperties는 delivery_mode=2로 설정한다.
- delivery_mode=1(Transient): 메시지가 메모리에만 저장되며, RabbitMQ 서버가 다운될 경우 메시지가 손실될 수 있다.
- delivery_mode=2(Persistent): 메시지가 디스크에 저장됩니다. RabbitMQ 서버가 재시작되어도 메시지가 유지된다.
- 수신자의 exchange_declare와 queue_declare에 durable=True 설정을 하여 삭제가 되지 않도록 설정한다.
- basic_qos(prefetch_count=1)를 이용해 받을 메시지의 수를 제한한다.
- basic_consume에 auto_ack=True 설정은 하면 안된다.
- callback의 종료전, basic_ack(delivery_tag=method.delivery_tag)를 사용해 수동으로 ack를 전달한다.
이유 | 설명 |
안정성 향상 | Consumer가 메시지를 정상적으로 처리했는지 확인 후 ACK |
메시지 유실 방지 | Consumer가 다운되면 메시지는 다시 큐에 남고 재전송 가능 |
성능 제어 | QoS(presetch) 설정으로 subscriber 과부하 방지 |
실시간 오류 처리 | NACK 또는 reject로 오류 상황을 브로커에 알릴 수 있음 |
6.5. RabbitMQ Reliable Communication 요약 체크리스트
항목 | 설정 필요 여부 | 설명 |
✅ Durable Exchange | 예 | 브로커 재시작에도 유지 |
✅ Durable Queue | 예 | 메시지 디스크 저장 |
✅ Persistent Message | 예 | 메시지를 디스크에 저장 |
✅ Manual ACK | 필수 | 처리 완료 후 ACK |
✅ QoS (Prefetch) | 권장 | 과부하 방지 |
✅ DLX (Dead Letter Exchange) | 권장 | 실패 메시지 저장 및 재처리 |
6.6. 강의 프로젝트
6.6.1. publisher
import random
import sys
import pika
# RabbitMQ 서버에 연결 생성
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 채널 생성
channel = connection.channel()
# Publisher confirm 모드 설정
# 이 설정은 메시지를 발행했을 때 RabbitMQ 브로커가 성공적으로 수신했는지를 확인할 수 있게 해준다.
channel.confirm_delivery()
# exchange 선언
# exchange 이름: 'logs_exchange'
# exchange 타입: 'direct'
# durable=True → 서버 재시작 후에도 exchange가 유지됨
channel.exchange_declare(exchange='logs_exchange', exchange_type='direct', durable=True)
# 메시지의 라우팅 키 (severity 수준)과 메시지 내용 (내용 자체)
severity = ["Error", "Warning", "Info", "Other"]
messages = ["EMsg", "WMsg", "IMsg", "OMsg"]
# 10개의 메시지를 무작위 라우팅 키와 함께 전송
for i in range(10):
randomNum = random.randint(0, len(severity)-1)
message = f"{messages[randomNum]} : {i}" # 메시지 내용 구성
rk = severity[randomNum] # 라우팅 키 선택
try:
# 메시지 발행
channel.basic_publish(
exchange='logs_exchange',
routing_key=rk,
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # 메시지의 영속성 설정 (1=non-persistent, 2=persistent)
)
)
print("[x] sent %r" % message)
# 예외 처리: 채널 또는 연결이 닫힌 경우
except pika.exceptions.ChannelClosed:
print("Channel Closed")
except pika.exceptions.ConnectionClosed:
print("Connection Closed")
# exchange 삭제 (if_unused=False → 사용 중이라도 삭제 시도)
channel.exchange_delete(exchange='logs_exchange', if_unused=False)
# 연결 종료
connection.close()
6.6.2. subscriber
import random
import time
import pika
# 각 Subscriber마다 랜덤한 ID를 부여
subId = random.randint(1, 100)
print("Subscriber Id = ", subId)
# RabbitMQ 서버 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 채널 생성
channel = connection.channel()
# durable=True → 서버가 재시작되더라도 exchange가 유지됨
channel.exchange_declare(exchange='logs_exchange', exchange_type='direct', durable=True)
# 큐 이름 지정 (task_queue)
# durable=True → 큐가 영속적으로 유지됨 (서버 재시작에도 사라지지 않음)
# exclusive=True → 해당 채널에만 독점적으로 바인딩 (닫히면 자동 삭제됨, 현재 주석 처리)
result = channel.queue_declare(
queue='task_queue',
durable=True
# exclusive=True
)
queue_name = "task_queue"
# 여러 Routing Key를 동일 큐에 바인딩 (Direct Exchange에선 각각 명시)
severity = ["Error", "Warning", "Info", "Other"]
for s in severity:
channel.queue_bind(exchange='logs_exchange', queue=queue_name, routing_key=s)
print('[*] waiting for the messages')
# 메시지 수신 콜백 함수 정의
def callback(ch, method, properties, body):
print('[x] Received message:::: %r' % body)
# 메시지를 처리하는 데 걸리는 시간 (여기선 고정 5초로 시뮬레이션)
randomSleep = 5
print("Working for ", randomSleep, "seconds")
while randomSleep > 0:
print(".", end="") # 진행 상태 시각화
time.sleep(1)
randomSleep -= 1
print("!")
# 메시지 처리 완료 후 ack 전송 (자동이 아닌 수동 승인 방식)
ch.basic_ack(delivery_tag=method.delivery_tag)
# QoS (Quality of Service) 설정
# prefetch_count=1 → 한 번에 하나의 메시지만 처리하도록 제한 (consumer가 ack하기 전까진 다음 메시지 전송 안됨)
channel.basic_qos(prefetch_count=1)
# 메시지 소비 시작
# auto_ack=False (기본값) → 수동 ack 모드
# auto_ack=True로 설정하면 콜백 실행 전에도 메시지가 즉시 ack 되어 메시지 손실 가능성 존재
channel.basic_consume(
queue=queue_name,
on_message_callback=callback
# auto_ack=True
)
# 큐에서 메시지를 계속 소비 (blocking loop)
channel.start_consuming()
7. RPC - Remote Procedure Call
7.1. 강의 프로젝트
7.1.1. Server
import pika
# ▶️ 팩토리얼 함수 정의
def fact(n):
if n <= 1:
return 1
return n * fact(n - 1)
# ▶️ 메시지 수신 콜백 함수 (RPC 요청 처리)
def on_request(ch, method, props, body):
# 클라이언트가 응답받기를 원하는 큐 이름 (reply_to 속성)
reply_queue_name = props.reply_to
# 응답 메시지와 요청 메시지를 매칭하기 위한 ID
corr_id = props.correlation_id
# 요청 받은 숫자 추출 및 출력
n = int(body)
print("called fact(", n, ")")
# 팩토리얼 계산
response = fact(n)
# 응답 메시지를 reply_to 큐에 전송
ch.basic_publish(
exchange='', # default exchange 사용
routing_key=reply_queue_name, # 응답 큐로 전송
properties=pika.BasicProperties(
correlation_id=corr_id # 클라이언트가 보낸 ID 유지
),
body=str(response)
)
# 메시지 처리가 끝났으므로 ACK 전송
ch.basic_ack(delivery_tag=method.delivery_tag)
########## Main Program #################
# ▶️ RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# ▶️ 서버가 받을 RPC 요청 큐 생성 (durable로 유지됨)
queue_name = "rpc_server_queue"
result = channel.queue_declare(
queue=queue_name,
# exclusive=True, # 연결 종료 시 큐 삭제 (비활성화)
durable=True # 서버 재시작 후에도 유지
)
# ▶️ 동시에 하나의 요청만 처리하도록 제한 (처리 중 메시지 누적 방지)
channel.basic_qos(prefetch_count=1)
# ▶️ 큐에서 메시지를 꺼내면 on_request 함수 호출
channel.basic_consume(
queue=queue_name,
on_message_callback=on_request
# auto_ack=False → 수동 ACK 방식
)
# ▶️ 메시지 수신 루프 시작 (RPC 서버 시작)
print("Awaiting RPC requests")
channel.start_consuming()
7.1.2. Client
import pika
import uuid
class FactRPCClient:
def __init__(self):
self.connection = \
pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.queue_name = 'rpc_client_queue'
self.server_queue_name = 'rpc_server_queue'
self.channel.queue_declare(queue=self.queue_name, exclusive=True)
self.channel.basic_consume(queue=self.queue_name,
on_message_callback= self.on_response,
auto_ack= True)
def on_response(self, ch, method, props, body):
if(self.correlation_id == props.correlation_id):
self.response = body
def call(self, n):
self.response = None
self.correlation_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key=self.server_queue_name,
properties=pika.BasicProperties(
reply_to=self.queue_name,
correlation_id= self.correlation_id,
),
body=str(n),
)
while(self.response is None):
self.connection.process_data_events()
return(int(self.response))
fact_rpc = FactRPCClient()
n = 5
print("Requesting Fact(", n, ")")
response = fact_rpc.call(n)
print("Got the response ", response)
댓글