Database/redis

Redis Stream에 대한 정리

bluebamus 2025. 1. 30.

1. Redis Stream 기본 개념

   - Redis Stream은 Append-Only 구조의 데이터 스트림으로, 메시지를 순차적으로 저장하고 처리할 수 있도록 설계되었다. Kafka와 같은 메시지 브로커의 기능을 일부 포함하면서도, Redis의 빠른 성능과 간편한 인터페이스를 제공한다.

 

   1.1 Stream 기본 용어

      - Stream : 메시지가 추가되는 기본 데이터 구조 (XADD, XRANGE, XREAD 등의 명령어를 사용)
      - Entry : 개별 메시지 항목, 고유한 ID(<milliseconds>-<sequence>)를 가짐
      - Consumer Group : 여러 개의 소비자(Consumer)가 하나의 그룹을 이루어 메시지를 분배받음
      - Pending : 소비자가 처리 중이거나 장애 등으로 인해 ACK를 보내지 못한 메시지들
      - ACK (Acknowledge) : 메시지가 정상적으로 처리되었음을 Redis에 알리는 확인 메시지

 

2. Redis Stream 기본 사용법

   2.1 Stream에 메시지 추가 (XADD)

      - "mystream" 스트림에 {"user": "alice", "message": "Hello, Redis!"} 데이터를 추가한다.
      - XADD 명령어는 자동으로 고유한 ID를 생성한다.

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Stream에 메시지 추가
r.xadd("mystream", {"user": "alice", "message": "Hello, Redis!"})

 

2.2 메시지 읽기 (XRANGE, XREAD)

   - XRANGE는 스트림에서 특정 범위의 데이터를 조회하는 명령어이다.

   - XREAD는 블로킹 방식으로 데이터를 읽을 수 있다.

# 전체 메시지 조회
messages = r.xrange("mystream")
print(messages)

# 최신 메시지 5개만 조회
latest_messages = r.xrevrange("mystream", "+", "-", count=5)
print(latest_messages)

 

3. Consumer Group을 활용한 메시지 처리

   3.1 Consumer Group 생성 (XGROUP CREATE)

      - XGROUP CREATE를 사용해 "mystream"에 "mygroup" 소비자 그룹을 생성한다.

      - mkstream=True 옵션은 Stream이 없을 경우 자동 생성한다.

try:
    r.xgroup_create("mystream", "mygroup", id="0", mkstream=True)
except redis.exceptions.ResponseError:
    print("Consumer group already exists")

 

   3.2 Consumer에서 메시지 읽기 (XREADGROUP)

      - XREADGROUP을 사용하여 Consumer가 그룹 내에서 메시지를 가져온다.

      - '>'를 사용하면 아직 처리되지 않은 새 메시지만 읽는다.

      - XACK을 호출하여 메시지를 정상적으로 처리했음을 Redis에 알린다.

consumer_name = "worker-1"

# 메시지 읽기 (최대 5개)
messages = r.xreadgroup("mygroup", consumer_name, {"mystream": ">"}, count=5)

for stream, msgs in messages:
    for msg_id, data in msgs:
        print(f"Processing {msg_id}: {data}")
        r.xack("mystream", "mygroup", msg_id)  # 메시지 처리 완료

 

4. Pending 메시지 처리와 스케줄러 구현

   4.1 Pending 메시지 확인 (XPENDING)

      - XPENDING을 사용하면 아직 처리되지 않은 메시지 목록을 확인할 수 있다.

pending_info = r.xpending_range("mystream", "mygroup", "-", "+", count=10)

for msg in pending_info:
    msg_id, consumer, elapsed_time, delivery_count = msg
    print(f"Pending Message {msg_id} assigned to {consumer}, elapsed: {elapsed_time} ms, count: {delivery_count}")

 

   4.2 Pending 메시지를 재배정 (XCLAIM)

      - XCLAIM을 사용하면 일정 시간이 지난 메시지를 다른 Consumer가 가져올 수 있습니다.

# 일정 시간이 지난 메시지만 다시 가져옴
stuck_messages = r.xpending_range("mystream", "mygroup", "-", "+", count=10)

for msg_id, consumer, elapsed_time, delivery_count in stuck_messages:
    if elapsed_time > 60000:  # 60초 이상 경과한 메시지
        r.xclaim("mystream", "mygroup", "worker-1", min_idle_time=60000, message_ids=[msg_id])
        print(f"Reclaimed message {msg_id}")

 

   4.3 Pending 메시지 자동 처리 스케줄러 (실무 코드)

      - 백그라운드에서 실행되는 스레드가 주기적으로 XPENDING을 확인하고 XCLAIM을 호출하여 오래된 메시지를 재처리한다.
      - 실무 환경에서는 Celery Beat나 Kubernetes CronJob을 사용해 정기적으로 실행하는 방식도 가능하다.

import time
import threading

def pending_message_checker():
    while True:
        stuck_messages = r.xpending_range("mystream", "mygroup", "-", "+", count=10)

        for msg_id, consumer, elapsed_time, delivery_count in stuck_messages:
            if elapsed_time > 60000:  # 60초 이상 경과한 메시지 재처리
                r.xclaim("mystream", "mygroup", "worker-1", min_idle_time=60000, message_ids=[msg_id])
                print(f"Reclaimed message {msg_id}")

        time.sleep(10)  # 10초마다 확인

# 백그라운드 스레드 실행
threading.Thread(target=pending_message_checker, daemon=True).start()

 

5. Redis Stream을 활용한 실무 응용 사례

   - 실시간 로그 처리: 대량의 로그 데이터를 실시간으로 수집하고 분석하는 시스템 구축
   - 이벤트 소싱(Event Sourcing): 애플리케이션 상태 변경을 Stream 기반으로 저장
   - 비동기 작업 큐: Redis Stream을 메시지 큐처럼 활용하여 Worker들이 분산 처리
   - IoT 데이터 수집: 센서 데이터 및 이벤트를 지속적으로 기록하고 분석

 

6. 정리

   - Redis Stream은 강력한 이벤트 스트림 저장소로 활용 가능
   - Consumer Group을 활용하면 분산 메시지 처리가 가능
   - Pending 메시지와 ACK를 관리하여 신뢰성 있는 메시지 처리 가능
   - 스케줄러를 통해 PENDING 메시지를 자동 관리하는 것이 중요

   - 위 코드를 기반으로 실무에서 활용하면, 효율적인 메시지 큐 및 이벤트 스트림 처리 시스템을 구축할 수 있다.

 

- reference : 

https://dev.gmarket.com/113

 

Redis Stream 적용기

안녕하세요 Data Product 팀 박상우입니다. 이번에 제가 소개해드릴 내용은 팀 내 session Info data 적재 및 API 서비스 구축에 적용한 Redis Stream에 대한 이야기입니다. 저희 팀에서는 User의 행동 정보를

dev.gmarket.com

https://kingjakeu.github.io/page2/

 

Studio u by kingjakeu · This is hello from jakeu

Redis Stream (레디스 스트림) 기본 정리 07 Feb 2021 Redis Stream이란? Redis Stream(레디스 스트림)은 Redis 5.0부터 추가 된 자료구조로, log 파일처럼 append only로 저장되는 구조를 가지고 있다. 메시징 시스템

kingjakeu.github.io

https://techblog.lycorp.co.jp/ko/building-a-messaging-queuing-system-with-redis-streams

 

실시간 추천 서비스를 위해 메시지 큐잉 도입하기(with Redis Streams)

들어가며 현재 일본과 대만, 태국에서는 LINE 앱에서 LINE VOOM이라는 서비스를 제공하고 있습니다. LINE VOOM은 LINE 앱 사용자를 위한 SNS로 사진과 텍스트, ...

techblog.lycorp.co.jp

https://velog.io/@nwactris/MQ-%EB%B3%84-%ED%8A%B9%EC%A7%95-3.-Redis-Stream1-Redis%EB%9E%80

 

MQ 별 특징 3. - Redis Stream(1) [Redis란?]

Redis의 특징부터, Redis cluster 구성 방법까지 알아보았다.

velog.io

https://velog.io/@nwactris/MQ-%EB%B3%84-%ED%8A%B9%EC%A7%95-3.-Redis-Stream2

 

MQ 별 특징 3. - Redis Stream(2)

이벤트 브로커로서의 기능을 갖춘 Redis Stream에 대해 알아보았다.

velog.io

https://jybaek.tistory.com/935

 

[Redis] Stream 사용 방법

작은 프로젝트를 진행할 때도 메시지 브로커는 아키텍처에 따라 필요한 경우가 종종 있습니다. 이때 Apache kafka를 쓰자니 배보다 배꼽이 커지고, 클라우드의 메시지 큐(AWS SQS 등)를 사용하자니 벤

jybaek.tistory.com

http://redisgate.kr/redis/command/streams.php

 

STREAMS Introduction Redis

streams_intro STREAMS Introduction 소개 Introduction 스트림 Streams 스트림(Stream)은 로그 데이터를 처리하기 위해서 5.0에서 새로 도입된 데이터 타입입니다. 여러 산업(industry)에서는 많은 경우 데이터가 연

redisgate.kr

https://velog.io/@ddings73/Redis-Streams

 

Redis Streams

Pub/Sub Messaging 에서 잠깐 언급했던 것처럼 Redis에서 발행되는 메시지는 단 한번만 전송된다.네트워크 장애 등으로 인해 메시지가 발행된 순간에 시스템이 정상이 아니라면 해당 메시지는 소실된

velog.io

https://dlwnsdud205.tistory.com/369

 

[Redis] Redis-stream 메모리 누수와 XGROUP의 동작원리

최근 진행하고 있는 프로젝트 Gitanimals에서는 분산된 트랜잭션간의 데이터 정합성을 맞추기위해 redis-stream을 이용해 Saga를 구현하고 있습니다. 메시지 스트림 구현체로 redis-stream을 사용하면서,

dlwnsdud205.tistory.com

https://velog.io/@nwactris/%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8

 

Redis Stream 제안 & 효율적인 알림 시스템 구축을 위한 RabbitMQ, Kafka와의 성능 비교 (23-2 캡스톤디자

Redis Pub/Sub에 비해 기존 연구가 부족한 Redis Stream를 탐구하고 RabbitMQ, Kafka와의 성능 비교를 진행

velog.io

 

댓글