Redis Streams란?
Redis Streams는 Redis 5.0에서 도입된 로그 기반 데이터 구조로, Apache Kafka와 유사한 메시지 스트리밍 기능을 제공합니다. Pub/Sub과 달리 메시지가 영속 저장되고, Consumer Group으로 메시지를 분산 처리하며, ACK 기반으로 처리 보장이 가능합니다. 이 글에서는 Redis Streams의 핵심 명령어부터 Consumer Group 설계, 실패 처리, Spring/NestJS 통합까지 실무에서 바로 쓸 수 있는 심화 내용을 다룹니다.
기본 명령어: XADD, XREAD, XRANGE
# 메시지 추가 (자동 ID 생성)
XADD orders * userId 1001 amount 50000 status created
# → "1710400000000-0" (타임스탬프-시퀀스)
# 특정 ID 이후 메시지 읽기
XREAD COUNT 10 STREAMS orders 0-0
# → 처음부터 10개 읽기
# 블로킹 읽기 (새 메시지 대기)
XREAD BLOCK 5000 COUNT 1 STREAMS orders $
# → 5초간 새 메시지 대기, $는 마지막 ID 이후
# 범위 조회
XRANGE orders 1710400000000-0 + COUNT 100
# → 특정 시점부터 100개 조회
# 역순 조회 (최신 먼저)
XREVRANGE orders + - COUNT 10
# 스트림 길이
XLEN orders
# 스트림 정보
XINFO STREAM orders
Consumer Group: 분산 메시지 처리
Consumer Group은 여러 컨슈머가 메시지를 분산 처리하면서도 중복 없이 정확히 한 번씩 전달받도록 합니다:
# Consumer Group 생성 (0 = 처음부터, $ = 새 메시지부터)
XGROUP CREATE orders order-processors 0 MKSTREAM
# Consumer Group으로 읽기
# > 기호: 아직 전달되지 않은 새 메시지
XREADGROUP GROUP order-processors consumer-1
COUNT 10 BLOCK 2000 STREAMS orders >
# 처리 완료 ACK
XACK orders order-processors 1710400000000-0
# Pending 메시지 확인 (아직 ACK 안 된 것)
XPENDING orders order-processors - + 10
# 특정 컨슈머의 Pending 메시지
XPENDING orders order-processors - + 10 consumer-1
Consumer Group의 동작 흐름:
1. Producer → XADD → Stream에 메시지 저장
2. Consumer Group이 메시지를 컨슈머에게 분배
3. consumer-1 → 메시지 A 수신, consumer-2 → 메시지 B 수신
4. 처리 완료 → XACK으로 확인
5. ACK 안 된 메시지 → Pending 목록에 남음 → 재처리 가능
실패 처리: XCLAIM과 Dead Letter
컨슈머가 죽거나 처리 실패한 메시지를 다른 컨슈머가 인계받습니다:
# 30초 이상 ACK 안 된 메시지를 consumer-2가 인계
XCLAIM orders order-processors consumer-2 30000
1710400000000-0 1710400000001-0
# XAUTOCLAIM: 자동으로 오래된 Pending 메시지 인계 (Redis 6.2+)
XAUTOCLAIM orders order-processors consumer-2 30000 0-0 COUNT 10
# → 30초 이상 된 Pending 메시지를 자동으로 consumer-2에게
# Dead Letter 패턴: 재시도 초과 메시지를 별도 스트림으로
# 1. Pending 메시지 조회
XPENDING orders order-processors - + 10
# 2. 전달 횟수(delivery count)가 5 이상인 메시지
# → Dead Letter Stream으로 이동
XADD dead-letter-orders *
original-id 1710400000000-0
userId 1001 amount 50000 reason "max retries exceeded"
# 3. 원본에서 ACK (더 이상 재시도 안 함)
XACK orders order-processors 1710400000000-0
Spring Boot 통합: Lettuce + StreamListener
// 의존성: spring-boot-starter-data-redis
// Producer
@Service
@RequiredArgsConstructor
public class OrderEventProducer {
private final StringRedisTemplate redisTemplate;
public RecordId publish(OrderCreatedEvent event) {
Map<String, String> fields = Map.of(
"orderId", event.getOrderId().toString(),
"userId", event.getUserId().toString(),
"amount", event.getAmount().toString(),
"status", "created"
);
StringRecord record = StreamRecords.string(fields)
.withStreamKey("stream:orders");
return redisTemplate.opsForStream().add(record);
}
}
// Consumer: StreamMessageListenerContainer 사용
@Configuration
public class RedisStreamConfig {
@Bean
public Subscription orderStreamSubscription(
RedisConnectionFactory connectionFactory,
OrderStreamConsumer consumer) {
// Consumer Group 생성 (이미 있으면 무시)
try {
connectionFactory.getConnection().xGroupCreate(
"stream:orders".getBytes(),
"order-processors",
ReadOffset.from("0"),
true // MKSTREAM
);
} catch (RedisSystemException e) {
// BUSYGROUP: 이미 존재
}
var options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.batchSize(10)
.executor(Executors.newFixedThreadPool(4))
.build();
var container = StreamMessageListenerContainer
.create(connectionFactory, options);
var subscription = container.receive(
Consumer.from("order-processors", "consumer-1"),
StreamOffset.create("stream:orders", ReadOffset.lastConsumed()),
consumer
);
container.start();
return subscription;
}
}
@Component
@Slf4j
public class OrderStreamConsumer
implements StreamListener<String, MapRecord<String, String, String>> {
private final StringRedisTemplate redisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
try {
Map<String, String> fields = message.getValue();
log.info("Processing order: {}", fields.get("orderId"));
// 비즈니스 로직 처리
processOrder(fields);
// ACK
redisTemplate.opsForStream().acknowledge(
"stream:orders", "order-processors", message.getId());
} catch (Exception e) {
log.error("Failed to process: {}", message.getId(), e);
// ACK 안 하면 Pending에 남음 → 재시도 대상
}
}
}
NestJS 통합: ioredis
// ioredis 사용
import Redis from 'ioredis';
@Injectable()
export class OrderStreamProducer {
private readonly redis = new Redis(process.env.REDIS_URL);
async publish(event: OrderCreatedEvent): Promise<string> {
return this.redis.xadd(
'stream:orders', '*',
'orderId', event.orderId,
'userId', event.userId,
'amount', String(event.amount),
);
}
}
@Injectable()
export class OrderStreamConsumer implements OnModuleInit {
private readonly redis = new Redis(process.env.REDIS_URL);
private running = true;
async onModuleInit() {
// Consumer Group 생성
try {
await this.redis.xgroup(
'CREATE', 'stream:orders', 'order-processors', '0', 'MKSTREAM');
} catch (e) {
// BUSYGROUP: already exists
}
this.consume();
}
private async consume() {
while (this.running) {
try {
const results = await this.redis.xreadgroup(
'GROUP', 'order-processors', 'consumer-1',
'COUNT', '10', 'BLOCK', '2000',
'STREAMS', 'stream:orders', '>',
);
if (!results) continue;
for (const [, messages] of results) {
for (const [id, fields] of messages) {
await this.processMessage(id, fields);
}
}
} catch (e) {
console.error('Stream consumer error:', e);
await new Promise(r => setTimeout(r, 1000));
}
}
}
private async processMessage(id: string, fields: string[]) {
const data = this.parseFields(fields);
try {
// 비즈니스 로직
await this.orderService.process(data);
// ACK
await this.redis.xack('stream:orders', 'order-processors', id);
} catch (e) {
console.error(`Failed: ${id}`, e);
}
}
private parseFields(fields: string[]): Record<string, string> {
const result: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
result[fields[i]] = fields[i + 1];
}
return result;
}
}
스트림 관리: 트리밍과 모니터링
# 최대 길이 제한 (근사치, 성능 우선)
XADD orders MAXLEN ~ 100000 * userId 1001 amount 50000
# 정확한 트리밍
XTRIM orders MAXLEN 100000
# 최소 ID 기준 트리밍 (Redis 6.2+)
XTRIM orders MINID 1710400000000-0
# 모니터링
XINFO STREAM orders # 스트림 정보
XINFO GROUPS orders # Consumer Group 목록
XINFO CONSUMERS orders order-processors # 컨슈머별 상태
자동 트리밍은 Redis Cluster 샤딩·페일오버 운영에서 다룬 메모리 관리와 함께 적용해야 합니다.
Kafka vs Redis Streams
| 기준 | Redis Streams | Apache Kafka |
|---|---|---|
| 지연 시간 | 마이크로초~밀리초 | 밀리초~수십 밀리초 |
| 처리량 | 수만~수십만 TPS | 수백만 TPS |
| 영속성 | 메모리 + AOF/RDB | 디스크 기반 (무제한 보존) |
| 운영 복잡도 | 낮음 (Redis만 있으면 됨) | 높음 (ZK/KRaft + 브로커) |
| 적합한 경우 | 소~중규모, 이미 Redis 사용 중 | 대규모, 장기 보존 필요 |
Pending 자동 재처리 패턴
// 주기적으로 Pending 메시지 재처리
@Scheduled(fixedRate = 60_000) // 1분마다
public void reclaimPendingMessages() {
// 30초 이상 된 Pending 메시지 자동 인계
List<MapRecord<String, String, String>> claimed =
redisTemplate.opsForStream().autoClaim(
"stream:orders",
Consumer.from("order-processors", "reclaimer"),
Duration.ofSeconds(30),
10 // 최대 10개
);
for (var msg : claimed) {
// 전달 횟수 확인
PendingMessage pending = getPendingInfo(msg.getId());
if (pending.getTotalDeliveryCount() > 5) {
// Dead Letter로 이동
moveToDeadLetter(msg);
redisTemplate.opsForStream().acknowledge(
"stream:orders", "order-processors", msg.getId());
} else {
processMessage(msg);
}
}
}
이 재시도 패턴은 Spring Retry 재시도 전략의 개념을 스트림 레벨에서 적용한 것입니다.
운영 베스트 프랙티스
- MAXLEN 트리밍 필수:
XADD key MAXLEN ~ 100000으로 메모리 폭증 방지 - Consumer 이름 고유하게: hostname + PID 조합으로 컨슈머를 구분하세요
- ACK 즉시 처리: 처리 완료 후 바로 XACK, 지연하면 불필요한 재전달 발생
- Pending 모니터링: XPENDING 카운트를 메트릭으로 추적, 급증 시 알림
- Dead Letter 관리: 재시도 5회 초과 메시지는 별도 스트림으로 격리
- Consumer Group per 도메인: 주문, 결제, 알림 등 도메인별 별도 그룹 운영
마무리
Redis Streams는 Kafka 없이도 Consumer Group, ACK 기반 전달 보장, Pending 메시지 재처리를 구현할 수 있는 경량 메시지 스트리밍 솔루션입니다. 이미 Redis를 사용하고 있다면 별도 인프라 없이 이벤트 기반 아키텍처를 구축할 수 있으며, XCLAIM과 Dead Letter 패턴으로 메시지 유실 없는 안정적인 처리가 가능합니다.