Redis Pub/Sub란?
Redis Pub/Sub는 발행-구독 패턴의 메시징 시스템입니다. 프로듀서가 채널에 메시지를 발행(publish)하면 해당 채널을 구독(subscribe)하는 모든 클라이언트에게 실시간으로 전달됩니다. Kafka와 달리 메시지가 영속화되지 않고, 구독 시점 이후의 메시지만 수신합니다. 실시간 알림, 캐시 무효화, WebSocket 브로커 등 즉시성이 중요하고 메시지 유실이 허용되는 시나리오에 적합합니다.
기본 Pub/Sub 명령어
# 터미널 1: 구독자
redis-cli SUBSCRIBE chat:general notifications:user:42
# 터미널 2: 발행자
redis-cli PUBLISH chat:general "안녕하세요!"
# (integer) 2 → 2명의 구독자에게 전달됨
redis-cli PUBLISH notifications:user:42 '{"type":"order","id":1234}'
# (integer) 1
# 패턴 구독: 와일드카드로 여러 채널 구독
redis-cli PSUBSCRIBE chat:* notifications:*
# 활성 채널/구독자 수 확인
redis-cli PUBSUB CHANNELS chat:*
redis-cli PUBSUB NUMSUB chat:general
redis-cli PUBSUB NUMPAT
| 명령어 | 설명 |
|---|---|
SUBSCRIBE |
정확한 채널명으로 구독 |
PSUBSCRIBE |
패턴(glob) 매칭으로 구독 |
PUBLISH |
채널에 메시지 발행, 구독자 수 반환 |
UNSUBSCRIBE |
구독 해제 |
PUBSUB |
활성 채널·구독 현황 조회 |
Pub/Sub vs Streams vs Kafka: 언제 무엇을 쓸까
| 특성 | Redis Pub/Sub | Redis Streams | Kafka |
|---|---|---|---|
| 메시지 영속성 | 없음 (fire & forget) | 있음 | 있음 |
| 과거 메시지 조회 | 불가 | 가능 | 가능 |
| 컨슈머 그룹 | 없음 | 있음 | 있음 |
| 지연시간 | ~마이크로초 | ~밀리초 | ~밀리초 |
| 적합한 용도 | 실시간 알림, 캐시 무효화 | 이벤트 로그, 작업 큐 | 대규모 이벤트 스트리밍 |
Spring Boot + Redis Pub/Sub 통합
Spring Data Redis의 RedisMessageListenerContainer로 Pub/Sub를 구현합니다:
@Configuration
public class RedisPubSubConfig {
@Bean
public RedisMessageListenerContainer redisContainer(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter cacheListener,
MessageListenerAdapter notificationListener) {
var container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 채널별 리스너 등록
container.addMessageListener(cacheListener,
new ChannelTopic("cache:invalidation"));
container.addMessageListener(notificationListener,
new PatternTopic("notifications:*"));
// 스레드 풀 설정 (기본: SimpleAsyncTaskExecutor)
container.setTaskExecutor(Executors.newFixedThreadPool(4));
return container;
}
@Bean
public MessageListenerAdapter cacheListener(
CacheInvalidationHandler handler) {
return new MessageListenerAdapter(handler, "handleMessage");
}
@Bean
public MessageListenerAdapter notificationListener(
NotificationHandler handler) {
return new MessageListenerAdapter(handler, "handleMessage");
}
}
// 캐시 무효화 핸들러
@Component
@Slf4j
public class CacheInvalidationHandler {
private final CacheManager cacheManager;
public CacheInvalidationHandler(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
public void handleMessage(String message, String channel) {
log.info("Cache invalidation received: channel={}, key={}",
channel, message);
// JSON 파싱
var event = objectMapper.readValue(message, CacheEvent.class);
Cache cache = cacheManager.getCache(event.cacheName());
if (cache != null) {
if (event.key() != null) {
cache.evict(event.key());
} else {
cache.clear();
}
}
}
}
public record CacheEvent(String cacheName, String key) {}
// 발행 서비스
@Service
public class CacheEventPublisher {
private final StringRedisTemplate redisTemplate;
public CacheEventPublisher(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void publishInvalidation(String cacheName, String key) {
var event = new CacheEvent(cacheName, key);
redisTemplate.convertAndSend(
"cache:invalidation",
objectMapper.writeValueAsString(event)
);
}
// AOP로 캐시 업데이트 시 자동 발행
@Around("@annotation(org.springframework.cache.annotation.CacheEvict)")
public Object publishOnEvict(ProceedingJoinPoint pjp) throws Throwable {
Object result = pjp.proceed();
// 다른 인스턴스에 캐시 무효화 전파
CacheEvict annotation = getAnnotation(pjp);
publishInvalidation(annotation.value()[0],
resolveKey(annotation, pjp));
return result;
}
}
실전 패턴 1: 분산 캐시 무효화
여러 Spring Boot 인스턴스에서 로컬 캐시(Caffeine)를 사용할 때, Pub/Sub로 캐시 일관성을 유지합니다:
@Component
public class DistributedCacheSync {
private final StringRedisTemplate redis;
private final CacheManager localCacheManager;
private final String instanceId = UUID.randomUUID().toString();
// 캐시 변경 시 다른 인스턴스에 알림
public void notifyCacheChange(String cacheName, String key, String action) {
var payload = Map.of(
"cacheName", cacheName,
"key", key,
"action", action, // "evict" or "clear"
"source", instanceId // 자기 자신 제외용
);
redis.convertAndSend("cache:sync",
objectMapper.writeValueAsString(payload));
}
// 다른 인스턴스의 캐시 변경 수신
@RedisListener(channel = "cache:sync")
public void onCacheSync(String message) {
var event = objectMapper.readValue(message, Map.class);
// 자기 자신이 보낸 메시지는 무시
if (instanceId.equals(event.get("source"))) {
return;
}
Cache cache = localCacheManager.getCache((String) event.get("cacheName"));
if (cache == null) return;
if ("clear".equals(event.get("action"))) {
cache.clear();
} else {
cache.evict(event.get("key"));
}
}
}
실전 패턴 2: WebSocket 브로드캐스트
다중 서버 환경에서 WebSocket 메시지를 모든 인스턴스의 클라이언트에게 전달합니다:
@Configuration
public class WebSocketRedisConfig {
@Bean
public RedisMessageListenerContainer wsContainer(
RedisConnectionFactory factory,
WebSocketBroadcastHandler handler) {
var container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(
new MessageListenerAdapter(handler),
new PatternTopic("ws:broadcast:*")
);
return container;
}
}
@Component
public class WebSocketBroadcastHandler {
private final SimpMessagingTemplate messagingTemplate;
public WebSocketBroadcastHandler(SimpMessagingTemplate template) {
this.messagingTemplate = template;
}
// Redis에서 수신 → 로컬 WebSocket 클라이언트에 전달
public void handleMessage(String payload, String channel) {
// channel: "ws:broadcast:chat-room-1"
String destination = channel.replace("ws:broadcast:", "/topic/");
messagingTemplate.convertAndSend(destination, payload);
}
}
// 메시지 발행: 어떤 인스턴스에서든 모든 클라이언트에 도달
@Service
public class ChatService {
private final StringRedisTemplate redis;
public void sendMessage(String room, ChatMessage message) {
redis.convertAndSend(
"ws:broadcast:" + room,
objectMapper.writeValueAsString(message)
);
}
}
NestJS + Redis Pub/Sub 통합
// Redis Pub/Sub 모듈
import Redis from 'ioredis';
@Module({
providers: [
{
provide: 'REDIS_SUBSCRIBER',
useFactory: () => {
const sub = new Redis({ host: 'localhost', port: 6379 });
return sub;
},
},
{
provide: 'REDIS_PUBLISHER',
useFactory: () => {
const pub = new Redis({ host: 'localhost', port: 6379 });
return pub;
},
},
RedisPubSubService,
],
exports: [RedisPubSubService],
})
export class RedisPubSubModule {}
@Injectable()
export class RedisPubSubService implements OnModuleInit, OnModuleDestroy {
private handlers = new Map<string, ((message: string, channel: string) => void)[]>();
constructor(
@Inject('REDIS_SUBSCRIBER') private subscriber: Redis,
@Inject('REDIS_PUBLISHER') private publisher: Redis,
) {}
onModuleInit() {
// 메시지 수신 핸들러
this.subscriber.on('message', (channel, message) => {
const handlers = this.handlers.get(channel) || [];
handlers.forEach((handler) => handler(message, channel));
});
this.subscriber.on('pmessage', (pattern, channel, message) => {
const handlers = this.handlers.get(pattern) || [];
handlers.forEach((handler) => handler(message, channel));
});
}
// 채널 구독
subscribe(channel: string, handler: (msg: string, ch: string) => void) {
if (!this.handlers.has(channel)) {
this.handlers.set(channel, []);
if (channel.includes('*')) {
this.subscriber.psubscribe(channel);
} else {
this.subscriber.subscribe(channel);
}
}
this.handlers.get(channel)!.push(handler);
}
// 메시지 발행
async publish(channel: string, message: string | object): Promise<number> {
const payload = typeof message === 'string'
? message
: JSON.stringify(message);
return this.publisher.publish(channel, payload);
}
onModuleDestroy() {
this.subscriber.disconnect();
this.publisher.disconnect();
}
}
// 사용 예: 실시간 알림 서비스
@Injectable()
export class NotificationService implements OnModuleInit {
constructor(
private pubsub: RedisPubSubService,
private gateway: NotificationGateway,
) {}
onModuleInit() {
this.pubsub.subscribe('notifications:*', (message, channel) => {
const userId = channel.split(':')[1];
const payload = JSON.parse(message);
this.gateway.sendToUser(userId, payload);
});
}
async notify(userId: string, notification: NotificationDto) {
await this.pubsub.publish(
`notifications:${userId}`,
notification,
);
}
}
Keyspace Notifications: 키 변경 이벤트
Redis의 Keyspace Notifications는 키의 변경(SET, DEL, EXPIRE 등)을 Pub/Sub로 자동 발행합니다:
# Keyspace Notification 활성화
redis-cli CONFIG SET notify-keyspace-events KEA
# K: Keyspace 이벤트 (키 이름 기반)
# E: Keyevent 이벤트 (명령어 기반)
# A: 모든 이벤트
# 세션 만료 감지
redis-cli SUBSCRIBE __keyevent@0__:expired
# 다른 터미널에서:
redis-cli SET session:user:42 "data" EX 5
# 5초 후 → "__keyevent@0__:expired" 채널에 "session:user:42" 수신
// Spring: 세션 만료 감지로 자동 로그아웃
@Component
public class SessionExpirationListener {
@Bean
public RedisMessageListenerContainer keyspaceContainer(
RedisConnectionFactory factory) {
var container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(
(message, pattern) -> {
String expiredKey = new String(message.getBody());
if (expiredKey.startsWith("session:user:")) {
String userId = expiredKey.split(":")[2];
handleSessionExpired(userId);
}
},
new PatternTopic("__keyevent@*__:expired")
);
return container;
}
private void handleSessionExpired(String userId) {
log.info("Session expired for user: {}", userId);
// WebSocket 연결 종료, 감사 로그 기록 등
}
}
운영 주의사항과 모니터링
# 구독자 연결 상태 모니터링
redis-cli CLIENT LIST | grep "sub="
# flags=S sub=3 → 3개 채널 구독 중인 클라이언트
# 느린 구독자 감지: output-buffer-limit
# redis.conf
client-output-buffer-limit pubsub 32mb 8mb 60
# 32MB 즉시 차단 / 8MB 60초 지속 시 차단
# → 느린 구독자가 메모리를 과도하게 소비하는 것 방지
# 초당 발행 메시지 수 모니터링
redis-cli INFO stats | grep pubsub
# pubsub_channels:5
# pubsub_patterns:2
# Pub/Sub 디버깅
redis-cli MONITOR | grep PUBLISH
Sharded Pub/Sub (Redis 7.0+)
Redis Cluster에서 기존 Pub/Sub는 모든 노드에 메시지를 브로드캐스트하여 비효율적이었습니다. Redis 7.0의 Sharded Pub/Sub은 채널을 해시 슬롯에 매핑하여 해당 노드에서만 처리합니다:
# Sharded Pub/Sub (Redis 7.0+)
redis-cli SSUBSCRIBE orders:{tenant-1}
redis-cli SPUBLISH orders:{tenant-1} '{"orderId":123}'
# → 해당 슬롯의 노드에서만 처리, Cluster 전체 브로드캐스트 없음
# 기존 Pub/Sub: O(N) — N은 Cluster 노드 수
# Sharded Pub/Sub: O(1) — 슬롯 소유 노드만 처리
핵심 정리
| 패턴 | 적합한 시나리오 |
|---|---|
| 분산 캐시 무효화 | 멀티 인스턴스 로컬 캐시 동기화 |
| WebSocket 브로드캐스트 | 다중 서버 실시간 메시지 전달 |
| Keyspace Notification | 세션 만료 감지, TTL 이벤트 |
| Sharded Pub/Sub | Cluster 환경 고성능 메시징 |
Redis Pub/Sub는 인프라 추가 없이 마이크로초 수준의 실시간 메시징을 제공합니다. 메시지 유실이 허용되지 않는 경우 Redis Streams 이벤트 처리를, Cluster 환경에서는 Sharded Pub/Sub를 사용하세요. Redis Cluster 샤딩·페일오버 운영도 함께 참고하세요.