Redis Streams 이벤트 처리

Redis Streams란?

Redis Streams는 Redis 5.0에서 도입된 로그 기반 데이터 구조로, Apache Kafka와 유사한 메시지 스트리밍 기능을 제공합니다. Pub/Sub과 달리 메시지가 영속 저장되고, Consumer Group으로 메시지를 분산 처리하며, ACK 기반으로 처리 보장이 가능합니다. 이 글에서는 Redis Streams의 핵심 명령어부터 Consumer Group 설계, 실패 처리, Spring/NestJS 통합까지 실무에서 바로 쓸 수 있는 심화 내용을 다룹니다.

기본 명령어: XADD, XREAD, XRANGE

# 메시지 추가 (자동 ID 생성)
XADD orders * userId 1001 amount 50000 status created
# → "1710400000000-0" (타임스탬프-시퀀스)

# 특정 ID 이후 메시지 읽기
XREAD COUNT 10 STREAMS orders 0-0
# → 처음부터 10개 읽기

# 블로킹 읽기 (새 메시지 대기)
XREAD BLOCK 5000 COUNT 1 STREAMS orders $
# → 5초간 새 메시지 대기, $는 마지막 ID 이후

# 범위 조회
XRANGE orders 1710400000000-0 + COUNT 100
# → 특정 시점부터 100개 조회

# 역순 조회 (최신 먼저)
XREVRANGE orders + - COUNT 10

# 스트림 길이
XLEN orders

# 스트림 정보
XINFO STREAM orders

Consumer Group: 분산 메시지 처리

Consumer Group은 여러 컨슈머가 메시지를 분산 처리하면서도 중복 없이 정확히 한 번씩 전달받도록 합니다:

# Consumer Group 생성 (0 = 처음부터, $ = 새 메시지부터)
XGROUP CREATE orders order-processors 0 MKSTREAM

# Consumer Group으로 읽기
# > 기호: 아직 전달되지 않은 새 메시지
XREADGROUP GROUP order-processors consumer-1 
  COUNT 10 BLOCK 2000 STREAMS orders >

# 처리 완료 ACK
XACK orders order-processors 1710400000000-0

# Pending 메시지 확인 (아직 ACK 안 된 것)
XPENDING orders order-processors - + 10

# 특정 컨슈머의 Pending 메시지
XPENDING orders order-processors - + 10 consumer-1

Consumer Group의 동작 흐름:

1. Producer → XADD → Stream에 메시지 저장
2. Consumer Group이 메시지를 컨슈머에게 분배
3. consumer-1 → 메시지 A 수신, consumer-2 → 메시지 B 수신
4. 처리 완료 → XACK으로 확인
5. ACK 안 된 메시지 → Pending 목록에 남음 → 재처리 가능

실패 처리: XCLAIM과 Dead Letter

컨슈머가 죽거나 처리 실패한 메시지를 다른 컨슈머가 인계받습니다:

# 30초 이상 ACK 안 된 메시지를 consumer-2가 인계
XCLAIM orders order-processors consumer-2 30000 
  1710400000000-0 1710400000001-0

# XAUTOCLAIM: 자동으로 오래된 Pending 메시지 인계 (Redis 6.2+)
XAUTOCLAIM orders order-processors consumer-2 30000 0-0 COUNT 10
# → 30초 이상 된 Pending 메시지를 자동으로 consumer-2에게
# Dead Letter 패턴: 재시도 초과 메시지를 별도 스트림으로
# 1. Pending 메시지 조회
XPENDING orders order-processors - + 10

# 2. 전달 횟수(delivery count)가 5 이상인 메시지
#    → Dead Letter Stream으로 이동
XADD dead-letter-orders * 
  original-id 1710400000000-0 
  userId 1001 amount 50000 reason "max retries exceeded"

# 3. 원본에서 ACK (더 이상 재시도 안 함)
XACK orders order-processors 1710400000000-0

Spring Boot 통합: Lettuce + StreamListener

// 의존성: spring-boot-starter-data-redis

// Producer
@Service
@RequiredArgsConstructor
public class OrderEventProducer {

    private final StringRedisTemplate redisTemplate;

    public RecordId publish(OrderCreatedEvent event) {
        Map<String, String> fields = Map.of(
            "orderId", event.getOrderId().toString(),
            "userId", event.getUserId().toString(),
            "amount", event.getAmount().toString(),
            "status", "created"
        );

        StringRecord record = StreamRecords.string(fields)
                .withStreamKey("stream:orders");

        return redisTemplate.opsForStream().add(record);
    }
}
// Consumer: StreamMessageListenerContainer 사용
@Configuration
public class RedisStreamConfig {

    @Bean
    public Subscription orderStreamSubscription(
            RedisConnectionFactory connectionFactory,
            OrderStreamConsumer consumer) {

        // Consumer Group 생성 (이미 있으면 무시)
        try {
            connectionFactory.getConnection().xGroupCreate(
                "stream:orders".getBytes(),
                "order-processors",
                ReadOffset.from("0"),
                true  // MKSTREAM
            );
        } catch (RedisSystemException e) {
            // BUSYGROUP: 이미 존재
        }

        var options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(2))
                .batchSize(10)
                .executor(Executors.newFixedThreadPool(4))
                .build();

        var container = StreamMessageListenerContainer
                .create(connectionFactory, options);

        var subscription = container.receive(
                Consumer.from("order-processors", "consumer-1"),
                StreamOffset.create("stream:orders", ReadOffset.lastConsumed()),
                consumer
        );

        container.start();
        return subscription;
    }
}

@Component
@Slf4j
public class OrderStreamConsumer
        implements StreamListener<String, MapRecord<String, String, String>> {

    private final StringRedisTemplate redisTemplate;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try {
            Map<String, String> fields = message.getValue();
            log.info("Processing order: {}", fields.get("orderId"));

            // 비즈니스 로직 처리
            processOrder(fields);

            // ACK
            redisTemplate.opsForStream().acknowledge(
                "stream:orders", "order-processors", message.getId());

        } catch (Exception e) {
            log.error("Failed to process: {}", message.getId(), e);
            // ACK 안 하면 Pending에 남음 → 재시도 대상
        }
    }
}

NestJS 통합: ioredis

// ioredis 사용
import Redis from 'ioredis';

@Injectable()
export class OrderStreamProducer {
  private readonly redis = new Redis(process.env.REDIS_URL);

  async publish(event: OrderCreatedEvent): Promise<string> {
    return this.redis.xadd(
      'stream:orders', '*',
      'orderId', event.orderId,
      'userId', event.userId,
      'amount', String(event.amount),
    );
  }
}

@Injectable()
export class OrderStreamConsumer implements OnModuleInit {
  private readonly redis = new Redis(process.env.REDIS_URL);
  private running = true;

  async onModuleInit() {
    // Consumer Group 생성
    try {
      await this.redis.xgroup(
        'CREATE', 'stream:orders', 'order-processors', '0', 'MKSTREAM');
    } catch (e) {
      // BUSYGROUP: already exists
    }
    this.consume();
  }

  private async consume() {
    while (this.running) {
      try {
        const results = await this.redis.xreadgroup(
          'GROUP', 'order-processors', 'consumer-1',
          'COUNT', '10', 'BLOCK', '2000',
          'STREAMS', 'stream:orders', '>',
        );

        if (!results) continue;

        for (const [, messages] of results) {
          for (const [id, fields] of messages) {
            await this.processMessage(id, fields);
          }
        }
      } catch (e) {
        console.error('Stream consumer error:', e);
        await new Promise(r => setTimeout(r, 1000));
      }
    }
  }

  private async processMessage(id: string, fields: string[]) {
    const data = this.parseFields(fields);
    try {
      // 비즈니스 로직
      await this.orderService.process(data);
      // ACK
      await this.redis.xack('stream:orders', 'order-processors', id);
    } catch (e) {
      console.error(`Failed: ${id}`, e);
    }
  }

  private parseFields(fields: string[]): Record<string, string> {
    const result: Record<string, string> = {};
    for (let i = 0; i < fields.length; i += 2) {
      result[fields[i]] = fields[i + 1];
    }
    return result;
  }
}

스트림 관리: 트리밍과 모니터링

# 최대 길이 제한 (근사치, 성능 우선)
XADD orders MAXLEN ~ 100000 * userId 1001 amount 50000

# 정확한 트리밍
XTRIM orders MAXLEN 100000

# 최소 ID 기준 트리밍 (Redis 6.2+)
XTRIM orders MINID 1710400000000-0

# 모니터링
XINFO STREAM orders           # 스트림 정보
XINFO GROUPS orders           # Consumer Group 목록
XINFO CONSUMERS orders order-processors  # 컨슈머별 상태

자동 트리밍은 Redis Cluster 샤딩·페일오버 운영에서 다룬 메모리 관리와 함께 적용해야 합니다.

Kafka vs Redis Streams

기준 Redis Streams Apache Kafka
지연 시간 마이크로초~밀리초 밀리초~수십 밀리초
처리량 수만~수십만 TPS 수백만 TPS
영속성 메모리 + AOF/RDB 디스크 기반 (무제한 보존)
운영 복잡도 낮음 (Redis만 있으면 됨) 높음 (ZK/KRaft + 브로커)
적합한 경우 소~중규모, 이미 Redis 사용 중 대규모, 장기 보존 필요

Pending 자동 재처리 패턴

// 주기적으로 Pending 메시지 재처리
@Scheduled(fixedRate = 60_000)  // 1분마다
public void reclaimPendingMessages() {
    // 30초 이상 된 Pending 메시지 자동 인계
    List<MapRecord<String, String, String>> claimed =
        redisTemplate.opsForStream().autoClaim(
            "stream:orders",
            Consumer.from("order-processors", "reclaimer"),
            Duration.ofSeconds(30),
            10  // 최대 10개
        );

    for (var msg : claimed) {
        // 전달 횟수 확인
        PendingMessage pending = getPendingInfo(msg.getId());
        if (pending.getTotalDeliveryCount() > 5) {
            // Dead Letter로 이동
            moveToDeadLetter(msg);
            redisTemplate.opsForStream().acknowledge(
                "stream:orders", "order-processors", msg.getId());
        } else {
            processMessage(msg);
        }
    }
}

이 재시도 패턴은 Spring Retry 재시도 전략의 개념을 스트림 레벨에서 적용한 것입니다.

운영 베스트 프랙티스

  • MAXLEN 트리밍 필수: XADD key MAXLEN ~ 100000으로 메모리 폭증 방지
  • Consumer 이름 고유하게: hostname + PID 조합으로 컨슈머를 구분하세요
  • ACK 즉시 처리: 처리 완료 후 바로 XACK, 지연하면 불필요한 재전달 발생
  • Pending 모니터링: XPENDING 카운트를 메트릭으로 추적, 급증 시 알림
  • Dead Letter 관리: 재시도 5회 초과 메시지는 별도 스트림으로 격리
  • Consumer Group per 도메인: 주문, 결제, 알림 등 도메인별 별도 그룹 운영

마무리

Redis Streams는 Kafka 없이도 Consumer Group, ACK 기반 전달 보장, Pending 메시지 재처리를 구현할 수 있는 경량 메시지 스트리밍 솔루션입니다. 이미 Redis를 사용하고 있다면 별도 인프라 없이 이벤트 기반 아키텍처를 구축할 수 있으며, XCLAIM과 Dead Letter 패턴으로 메시지 유실 없는 안정적인 처리가 가능합니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux