Redis Stream에 대한 정리
1. Redis Stream 기본 개념
- Redis Stream은 Append-Only 구조의 데이터 스트림으로, 메시지를 순차적으로 저장하고 처리할 수 있도록 설계되었다. Kafka와 같은 메시지 브로커의 기능을 일부 포함하면서도, Redis의 빠른 성능과 간편한 인터페이스를 제공한다.
1.1 Stream 기본 용어
- Stream : 메시지가 추가되는 기본 데이터 구조 (XADD, XRANGE, XREAD 등의 명령어를 사용)
- Entry : 개별 메시지 항목, 고유한 ID(<milliseconds>-<sequence>)를 가짐
- Consumer Group : 여러 개의 소비자(Consumer)가 하나의 그룹을 이루어 메시지를 분배받음
- Pending : 소비자가 처리 중이거나 장애 등으로 인해 ACK를 보내지 못한 메시지들
- ACK (Acknowledge) : 메시지가 정상적으로 처리되었음을 Redis에 알리는 확인 메시지
2. Redis Stream 기본 사용법
2.1 Stream에 메시지 추가 (XADD)
- "mystream" 스트림에 {"user": "alice", "message": "Hello, Redis!"} 데이터를 추가한다.
- XADD 명령어는 자동으로 고유한 ID를 생성한다.
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Stream에 메시지 추가
r.xadd("mystream", {"user": "alice", "message": "Hello, Redis!"})
2.2 메시지 읽기 (XRANGE, XREAD)
- XRANGE는 스트림에서 특정 범위의 데이터를 조회하는 명령어이다.
- XREAD는 블로킹 방식으로 데이터를 읽을 수 있다.
# 전체 메시지 조회
messages = r.xrange("mystream")
print(messages)
# 최신 메시지 5개만 조회
latest_messages = r.xrevrange("mystream", "+", "-", count=5)
print(latest_messages)
3. Consumer Group을 활용한 메시지 처리
3.1 Consumer Group 생성 (XGROUP CREATE)
- XGROUP CREATE를 사용해 "mystream"에 "mygroup" 소비자 그룹을 생성한다.
- mkstream=True 옵션은 Stream이 없을 경우 자동 생성한다.
try:
r.xgroup_create("mystream", "mygroup", id="0", mkstream=True)
except redis.exceptions.ResponseError:
print("Consumer group already exists")
3.2 Consumer에서 메시지 읽기 (XREADGROUP)
- XREADGROUP을 사용하여 Consumer가 그룹 내에서 메시지를 가져온다.
- '>'를 사용하면 아직 처리되지 않은 새 메시지만 읽는다.
- XACK을 호출하여 메시지를 정상적으로 처리했음을 Redis에 알린다.
consumer_name = "worker-1"
# 메시지 읽기 (최대 5개)
messages = r.xreadgroup("mygroup", consumer_name, {"mystream": ">"}, count=5)
for stream, msgs in messages:
for msg_id, data in msgs:
print(f"Processing {msg_id}: {data}")
r.xack("mystream", "mygroup", msg_id) # 메시지 처리 완료
4. Pending 메시지 처리와 스케줄러 구현
4.1 Pending 메시지 확인 (XPENDING)
- XPENDING을 사용하면 아직 처리되지 않은 메시지 목록을 확인할 수 있다.
pending_info = r.xpending_range("mystream", "mygroup", "-", "+", count=10)
for msg in pending_info:
msg_id, consumer, elapsed_time, delivery_count = msg
print(f"Pending Message {msg_id} assigned to {consumer}, elapsed: {elapsed_time} ms, count: {delivery_count}")
4.2 Pending 메시지를 재배정 (XCLAIM)
- XCLAIM을 사용하면 일정 시간이 지난 메시지를 다른 Consumer가 가져올 수 있습니다.
# 일정 시간이 지난 메시지만 다시 가져옴
stuck_messages = r.xpending_range("mystream", "mygroup", "-", "+", count=10)
for msg_id, consumer, elapsed_time, delivery_count in stuck_messages:
if elapsed_time > 60000: # 60초 이상 경과한 메시지
r.xclaim("mystream", "mygroup", "worker-1", min_idle_time=60000, message_ids=[msg_id])
print(f"Reclaimed message {msg_id}")
4.3 Pending 메시지 자동 처리 스케줄러 (실무 코드)
- 백그라운드에서 실행되는 스레드가 주기적으로 XPENDING을 확인하고 XCLAIM을 호출하여 오래된 메시지를 재처리한다.
- 실무 환경에서는 Celery Beat나 Kubernetes CronJob을 사용해 정기적으로 실행하는 방식도 가능하다.
import time
import threading
def pending_message_checker():
while True:
stuck_messages = r.xpending_range("mystream", "mygroup", "-", "+", count=10)
for msg_id, consumer, elapsed_time, delivery_count in stuck_messages:
if elapsed_time > 60000: # 60초 이상 경과한 메시지 재처리
r.xclaim("mystream", "mygroup", "worker-1", min_idle_time=60000, message_ids=[msg_id])
print(f"Reclaimed message {msg_id}")
time.sleep(10) # 10초마다 확인
# 백그라운드 스레드 실행
threading.Thread(target=pending_message_checker, daemon=True).start()
5. Redis Stream을 활용한 실무 응용 사례
- 실시간 로그 처리: 대량의 로그 데이터를 실시간으로 수집하고 분석하는 시스템 구축
- 이벤트 소싱(Event Sourcing): 애플리케이션 상태 변경을 Stream 기반으로 저장
- 비동기 작업 큐: Redis Stream을 메시지 큐처럼 활용하여 Worker들이 분산 처리
- IoT 데이터 수집: 센서 데이터 및 이벤트를 지속적으로 기록하고 분석
6. 정리
- Redis Stream은 강력한 이벤트 스트림 저장소로 활용 가능
- Consumer Group을 활용하면 분산 메시지 처리가 가능
- Pending 메시지와 ACK를 관리하여 신뢰성 있는 메시지 처리 가능
- 스케줄러를 통해 PENDING 메시지를 자동 관리하는 것이 중요
- 위 코드를 기반으로 실무에서 활용하면, 효율적인 메시지 큐 및 이벤트 스트림 처리 시스템을 구축할 수 있다.
- reference :
Redis Stream 적용기
안녕하세요 Data Product 팀 박상우입니다. 이번에 제가 소개해드릴 내용은 팀 내 session Info data 적재 및 API 서비스 구축에 적용한 Redis Stream에 대한 이야기입니다. 저희 팀에서는 User의 행동 정보를
dev.gmarket.com
https://kingjakeu.github.io/page2/
Studio u by kingjakeu · This is hello from jakeu
Redis Stream (레디스 스트림) 기본 정리 07 Feb 2021 Redis Stream이란? Redis Stream(레디스 스트림)은 Redis 5.0부터 추가 된 자료구조로, log 파일처럼 append only로 저장되는 구조를 가지고 있다. 메시징 시스템
kingjakeu.github.io
https://techblog.lycorp.co.jp/ko/building-a-messaging-queuing-system-with-redis-streams
실시간 추천 서비스를 위해 메시지 큐잉 도입하기(with Redis Streams)
들어가며 현재 일본과 대만, 태국에서는 LINE 앱에서 LINE VOOM이라는 서비스를 제공하고 있습니다. LINE VOOM은 LINE 앱 사용자를 위한 SNS로 사진과 텍스트, ...
techblog.lycorp.co.jp
https://velog.io/@nwactris/MQ-%EB%B3%84-%ED%8A%B9%EC%A7%95-3.-Redis-Stream1-Redis%EB%9E%80
MQ 별 특징 3. - Redis Stream(1) [Redis란?]
Redis의 특징부터, Redis cluster 구성 방법까지 알아보았다.
velog.io
https://velog.io/@nwactris/MQ-%EB%B3%84-%ED%8A%B9%EC%A7%95-3.-Redis-Stream2
MQ 별 특징 3. - Redis Stream(2)
이벤트 브로커로서의 기능을 갖춘 Redis Stream에 대해 알아보았다.
velog.io
https://jybaek.tistory.com/935
[Redis] Stream 사용 방법
작은 프로젝트를 진행할 때도 메시지 브로커는 아키텍처에 따라 필요한 경우가 종종 있습니다. 이때 Apache kafka를 쓰자니 배보다 배꼽이 커지고, 클라우드의 메시지 큐(AWS SQS 등)를 사용하자니 벤
jybaek.tistory.com
http://redisgate.kr/redis/command/streams.php
STREAMS Introduction Redis
streams_intro STREAMS Introduction 소개 Introduction 스트림 Streams 스트림(Stream)은 로그 데이터를 처리하기 위해서 5.0에서 새로 도입된 데이터 타입입니다. 여러 산업(industry)에서는 많은 경우 데이터가 연
redisgate.kr
https://velog.io/@ddings73/Redis-Streams
Redis Streams
Pub/Sub Messaging 에서 잠깐 언급했던 것처럼 Redis에서 발행되는 메시지는 단 한번만 전송된다.네트워크 장애 등으로 인해 메시지가 발행된 순간에 시스템이 정상이 아니라면 해당 메시지는 소실된
velog.io
https://dlwnsdud205.tistory.com/369
[Redis] Redis-stream 메모리 누수와 XGROUP의 동작원리
최근 진행하고 있는 프로젝트 Gitanimals에서는 분산된 트랜잭션간의 데이터 정합성을 맞추기위해 redis-stream을 이용해 Saga를 구현하고 있습니다. 메시지 스트림 구현체로 redis-stream을 사용하면서,
dlwnsdud205.tistory.com
https://velog.io/@nwactris/%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8
Redis Stream 제안 & 효율적인 알림 시스템 구축을 위한 RabbitMQ, Kafka와의 성능 비교 (23-2 캡스톤디자
Redis Pub/Sub에 비해 기존 연구가 부족한 Redis Stream를 탐구하고 RabbitMQ, Kafka와의 성능 비교를 진행
velog.io
'Database > redis' 카테고리의 다른 글
Redis-OM 객체 매핑을 지원하는 redis 라이브러리 (0) | 2025.01.30 |
---|---|
django MQ 시리즈 4편 - celery with redis for pub/sub (1) | 2023.10.09 |
실무에서 redis를 사용한 시스템 구축 사례 정리 (0) | 2023.10.08 |
redis pub/sub 이해하기 (0) | 2023.04.29 |
우분투 22.04 redis 관리 툴 설치하기 redis-desktop-manager (0) | 2023.03.16 |
댓글