Redis Pub/Sub 실시간 메시징

Redis Pub/Sub란?

Redis Pub/Sub는 발행-구독 패턴의 메시징 시스템입니다. 프로듀서가 채널에 메시지를 발행(publish)하면 해당 채널을 구독(subscribe)하는 모든 클라이언트에게 실시간으로 전달됩니다. Kafka와 달리 메시지가 영속화되지 않고, 구독 시점 이후의 메시지만 수신합니다. 실시간 알림, 캐시 무효화, WebSocket 브로커 등 즉시성이 중요하고 메시지 유실이 허용되는 시나리오에 적합합니다.

기본 Pub/Sub 명령어

# 터미널 1: 구독자
redis-cli SUBSCRIBE chat:general notifications:user:42

# 터미널 2: 발행자
redis-cli PUBLISH chat:general "안녕하세요!"
# (integer) 2  → 2명의 구독자에게 전달됨

redis-cli PUBLISH notifications:user:42 '{"type":"order","id":1234}'
# (integer) 1

# 패턴 구독: 와일드카드로 여러 채널 구독
redis-cli PSUBSCRIBE chat:* notifications:*

# 활성 채널/구독자 수 확인
redis-cli PUBSUB CHANNELS chat:*
redis-cli PUBSUB NUMSUB chat:general
redis-cli PUBSUB NUMPAT
명령어 설명
SUBSCRIBE 정확한 채널명으로 구독
PSUBSCRIBE 패턴(glob) 매칭으로 구독
PUBLISH 채널에 메시지 발행, 구독자 수 반환
UNSUBSCRIBE 구독 해제
PUBSUB 활성 채널·구독 현황 조회

Pub/Sub vs Streams vs Kafka: 언제 무엇을 쓸까

특성 Redis Pub/Sub Redis Streams Kafka
메시지 영속성 없음 (fire & forget) 있음 있음
과거 메시지 조회 불가 가능 가능
컨슈머 그룹 없음 있음 있음
지연시간 ~마이크로초 ~밀리초 ~밀리초
적합한 용도 실시간 알림, 캐시 무효화 이벤트 로그, 작업 큐 대규모 이벤트 스트리밍

Spring Boot + Redis Pub/Sub 통합

Spring Data Redis의 RedisMessageListenerContainer로 Pub/Sub를 구현합니다:

@Configuration
public class RedisPubSubConfig {

    @Bean
    public RedisMessageListenerContainer redisContainer(
            RedisConnectionFactory connectionFactory,
            MessageListenerAdapter cacheListener,
            MessageListenerAdapter notificationListener) {

        var container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 채널별 리스너 등록
        container.addMessageListener(cacheListener,
            new ChannelTopic("cache:invalidation"));
        container.addMessageListener(notificationListener,
            new PatternTopic("notifications:*"));

        // 스레드 풀 설정 (기본: SimpleAsyncTaskExecutor)
        container.setTaskExecutor(Executors.newFixedThreadPool(4));
        return container;
    }

    @Bean
    public MessageListenerAdapter cacheListener(
            CacheInvalidationHandler handler) {
        return new MessageListenerAdapter(handler, "handleMessage");
    }

    @Bean
    public MessageListenerAdapter notificationListener(
            NotificationHandler handler) {
        return new MessageListenerAdapter(handler, "handleMessage");
    }
}
// 캐시 무효화 핸들러
@Component
@Slf4j
public class CacheInvalidationHandler {

    private final CacheManager cacheManager;

    public CacheInvalidationHandler(CacheManager cacheManager) {
        this.cacheManager = cacheManager;
    }

    public void handleMessage(String message, String channel) {
        log.info("Cache invalidation received: channel={}, key={}", 
            channel, message);

        // JSON 파싱
        var event = objectMapper.readValue(message, CacheEvent.class);

        Cache cache = cacheManager.getCache(event.cacheName());
        if (cache != null) {
            if (event.key() != null) {
                cache.evict(event.key());
            } else {
                cache.clear();
            }
        }
    }
}

public record CacheEvent(String cacheName, String key) {}

// 발행 서비스
@Service
public class CacheEventPublisher {

    private final StringRedisTemplate redisTemplate;

    public CacheEventPublisher(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void publishInvalidation(String cacheName, String key) {
        var event = new CacheEvent(cacheName, key);
        redisTemplate.convertAndSend(
            "cache:invalidation",
            objectMapper.writeValueAsString(event)
        );
    }

    // AOP로 캐시 업데이트 시 자동 발행
    @Around("@annotation(org.springframework.cache.annotation.CacheEvict)")
    public Object publishOnEvict(ProceedingJoinPoint pjp) throws Throwable {
        Object result = pjp.proceed();
        // 다른 인스턴스에 캐시 무효화 전파
        CacheEvict annotation = getAnnotation(pjp);
        publishInvalidation(annotation.value()[0], 
            resolveKey(annotation, pjp));
        return result;
    }
}

실전 패턴 1: 분산 캐시 무효화

여러 Spring Boot 인스턴스에서 로컬 캐시(Caffeine)를 사용할 때, Pub/Sub로 캐시 일관성을 유지합니다:

@Component
public class DistributedCacheSync {

    private final StringRedisTemplate redis;
    private final CacheManager localCacheManager;
    private final String instanceId = UUID.randomUUID().toString();

    // 캐시 변경 시 다른 인스턴스에 알림
    public void notifyCacheChange(String cacheName, String key, String action) {
        var payload = Map.of(
            "cacheName", cacheName,
            "key", key,
            "action", action,        // "evict" or "clear"
            "source", instanceId     // 자기 자신 제외용
        );
        redis.convertAndSend("cache:sync",
            objectMapper.writeValueAsString(payload));
    }

    // 다른 인스턴스의 캐시 변경 수신
    @RedisListener(channel = "cache:sync")
    public void onCacheSync(String message) {
        var event = objectMapper.readValue(message, Map.class);

        // 자기 자신이 보낸 메시지는 무시
        if (instanceId.equals(event.get("source"))) {
            return;
        }

        Cache cache = localCacheManager.getCache((String) event.get("cacheName"));
        if (cache == null) return;

        if ("clear".equals(event.get("action"))) {
            cache.clear();
        } else {
            cache.evict(event.get("key"));
        }
    }
}

실전 패턴 2: WebSocket 브로드캐스트

다중 서버 환경에서 WebSocket 메시지를 모든 인스턴스의 클라이언트에게 전달합니다:

@Configuration
public class WebSocketRedisConfig {

    @Bean
    public RedisMessageListenerContainer wsContainer(
            RedisConnectionFactory factory,
            WebSocketBroadcastHandler handler) {
        var container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(
            new MessageListenerAdapter(handler),
            new PatternTopic("ws:broadcast:*")
        );
        return container;
    }
}

@Component
public class WebSocketBroadcastHandler {

    private final SimpMessagingTemplate messagingTemplate;

    public WebSocketBroadcastHandler(SimpMessagingTemplate template) {
        this.messagingTemplate = template;
    }

    // Redis에서 수신 → 로컬 WebSocket 클라이언트에 전달
    public void handleMessage(String payload, String channel) {
        // channel: "ws:broadcast:chat-room-1"
        String destination = channel.replace("ws:broadcast:", "/topic/");
        messagingTemplate.convertAndSend(destination, payload);
    }
}

// 메시지 발행: 어떤 인스턴스에서든 모든 클라이언트에 도달
@Service
public class ChatService {

    private final StringRedisTemplate redis;

    public void sendMessage(String room, ChatMessage message) {
        redis.convertAndSend(
            "ws:broadcast:" + room,
            objectMapper.writeValueAsString(message)
        );
    }
}

NestJS + Redis Pub/Sub 통합

// Redis Pub/Sub 모듈
import Redis from 'ioredis';

@Module({
  providers: [
    {
      provide: 'REDIS_SUBSCRIBER',
      useFactory: () => {
        const sub = new Redis({ host: 'localhost', port: 6379 });
        return sub;
      },
    },
    {
      provide: 'REDIS_PUBLISHER',
      useFactory: () => {
        const pub = new Redis({ host: 'localhost', port: 6379 });
        return pub;
      },
    },
    RedisPubSubService,
  ],
  exports: [RedisPubSubService],
})
export class RedisPubSubModule {}

@Injectable()
export class RedisPubSubService implements OnModuleInit, OnModuleDestroy {
  private handlers = new Map<string, ((message: string, channel: string) => void)[]>();

  constructor(
    @Inject('REDIS_SUBSCRIBER') private subscriber: Redis,
    @Inject('REDIS_PUBLISHER') private publisher: Redis,
  ) {}

  onModuleInit() {
    // 메시지 수신 핸들러
    this.subscriber.on('message', (channel, message) => {
      const handlers = this.handlers.get(channel) || [];
      handlers.forEach((handler) => handler(message, channel));
    });

    this.subscriber.on('pmessage', (pattern, channel, message) => {
      const handlers = this.handlers.get(pattern) || [];
      handlers.forEach((handler) => handler(message, channel));
    });
  }

  // 채널 구독
  subscribe(channel: string, handler: (msg: string, ch: string) => void) {
    if (!this.handlers.has(channel)) {
      this.handlers.set(channel, []);
      if (channel.includes('*')) {
        this.subscriber.psubscribe(channel);
      } else {
        this.subscriber.subscribe(channel);
      }
    }
    this.handlers.get(channel)!.push(handler);
  }

  // 메시지 발행
  async publish(channel: string, message: string | object): Promise<number> {
    const payload = typeof message === 'string'
      ? message
      : JSON.stringify(message);
    return this.publisher.publish(channel, payload);
  }

  onModuleDestroy() {
    this.subscriber.disconnect();
    this.publisher.disconnect();
  }
}

// 사용 예: 실시간 알림 서비스
@Injectable()
export class NotificationService implements OnModuleInit {
  constructor(
    private pubsub: RedisPubSubService,
    private gateway: NotificationGateway,
  ) {}

  onModuleInit() {
    this.pubsub.subscribe('notifications:*', (message, channel) => {
      const userId = channel.split(':')[1];
      const payload = JSON.parse(message);
      this.gateway.sendToUser(userId, payload);
    });
  }

  async notify(userId: string, notification: NotificationDto) {
    await this.pubsub.publish(
      `notifications:${userId}`,
      notification,
    );
  }
}

Keyspace Notifications: 키 변경 이벤트

Redis의 Keyspace Notifications는 키의 변경(SET, DEL, EXPIRE 등)을 Pub/Sub로 자동 발행합니다:

# Keyspace Notification 활성화
redis-cli CONFIG SET notify-keyspace-events KEA
# K: Keyspace 이벤트 (키 이름 기반)
# E: Keyevent 이벤트 (명령어 기반)
# A: 모든 이벤트

# 세션 만료 감지
redis-cli SUBSCRIBE __keyevent@0__:expired

# 다른 터미널에서:
redis-cli SET session:user:42 "data" EX 5
# 5초 후 → "__keyevent@0__:expired" 채널에 "session:user:42" 수신
// Spring: 세션 만료 감지로 자동 로그아웃
@Component
public class SessionExpirationListener {

    @Bean
    public RedisMessageListenerContainer keyspaceContainer(
            RedisConnectionFactory factory) {
        var container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(
            (message, pattern) -> {
                String expiredKey = new String(message.getBody());
                if (expiredKey.startsWith("session:user:")) {
                    String userId = expiredKey.split(":")[2];
                    handleSessionExpired(userId);
                }
            },
            new PatternTopic("__keyevent@*__:expired")
        );
        return container;
    }

    private void handleSessionExpired(String userId) {
        log.info("Session expired for user: {}", userId);
        // WebSocket 연결 종료, 감사 로그 기록 등
    }
}

운영 주의사항과 모니터링

# 구독자 연결 상태 모니터링
redis-cli CLIENT LIST | grep "sub="
# flags=S sub=3 → 3개 채널 구독 중인 클라이언트

# 느린 구독자 감지: output-buffer-limit
# redis.conf
client-output-buffer-limit pubsub 32mb 8mb 60
# 32MB 즉시 차단 / 8MB 60초 지속 시 차단
# → 느린 구독자가 메모리를 과도하게 소비하는 것 방지

# 초당 발행 메시지 수 모니터링
redis-cli INFO stats | grep pubsub
# pubsub_channels:5
# pubsub_patterns:2

# Pub/Sub 디버깅
redis-cli MONITOR | grep PUBLISH

Sharded Pub/Sub (Redis 7.0+)

Redis Cluster에서 기존 Pub/Sub는 모든 노드에 메시지를 브로드캐스트하여 비효율적이었습니다. Redis 7.0의 Sharded Pub/Sub은 채널을 해시 슬롯에 매핑하여 해당 노드에서만 처리합니다:

# Sharded Pub/Sub (Redis 7.0+)
redis-cli SSUBSCRIBE orders:{tenant-1}
redis-cli SPUBLISH orders:{tenant-1} '{"orderId":123}'
# → 해당 슬롯의 노드에서만 처리, Cluster 전체 브로드캐스트 없음

# 기존 Pub/Sub: O(N) — N은 Cluster 노드 수
# Sharded Pub/Sub: O(1) — 슬롯 소유 노드만 처리

핵심 정리

패턴 적합한 시나리오
분산 캐시 무효화 멀티 인스턴스 로컬 캐시 동기화
WebSocket 브로드캐스트 다중 서버 실시간 메시지 전달
Keyspace Notification 세션 만료 감지, TTL 이벤트
Sharded Pub/Sub Cluster 환경 고성능 메시징

Redis Pub/Sub는 인프라 추가 없이 마이크로초 수준의 실시간 메시징을 제공합니다. 메시지 유실이 허용되지 않는 경우 Redis Streams 이벤트 처리를, Cluster 환경에서는 Sharded Pub/Sub를 사용하세요. Redis Cluster 샤딩·페일오버 운영도 함께 참고하세요.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux