NestJS SSE 실시간 스트리밍

SSE(Server-Sent Events)란?

실시간 데이터 전달에 WebSocket만 있는 것은 아니다. SSE(Server-Sent Events)는 서버에서 클라이언트로의 단방향 실시간 스트리밍을 HTTP 기반으로 구현하는 표준이다. 알림 피드, 실시간 대시보드, AI 응답 스트리밍, 주가 업데이트처럼 서버→클라이언트 단방향이면 SSE가 WebSocket보다 간단하고 효율적이다.

구분 SSE WebSocket
방향 서버 → 클라이언트 (단방향) 양방향
프로토콜 HTTP/1.1, HTTP/2 WS 프로토콜 (HTTP 업그레이드)
자동 재연결 브라우저 내장 직접 구현 필요
데이터 형식 텍스트 (UTF-8) 텍스트 + 바이너리
인프라 호환 프록시·CDN 친화적 별도 설정 필요

NestJS SSE 기본 구현

NestJS는 @Sse() 데코레이터로 SSE 엔드포인트를 간단하게 만들 수 있다. 핵심은 Observable<MessageEvent>를 반환하는 것이다.

import { Controller, Sse, MessageEvent } from '@nestjs/common';
import { Observable, interval, map } from 'rxjs';

@Controller('events')
export class EventController {

  // 기본 SSE: 매 초 서버 시간 전송
  @Sse('time')
  streamTime(): Observable<MessageEvent> {
    return interval(1000).pipe(
      map((seq) => ({
        data: {
          timestamp: new Date().toISOString(),
          seq,
        },
      })),
    );
  }
}

// 클라이언트 (브라우저):
// const source = new EventSource('/events/time');
// source.onmessage = (event) => {
//   const data = JSON.parse(event.data);
//   console.log(data.timestamp);
// };

Subject 기반 실시간 이벤트 브로드캐스트

실전에서는 interval이 아니라, 비즈니스 이벤트가 발생할 때 연결된 모든 클라이언트에게 푸시해야 한다. RxJS Subject가 이벤트 허브 역할을 한다.

// notification-sse.service.ts
import { Injectable } from '@nestjs/common';
import { Subject, Observable, filter, map } from 'rxjs';

interface NotificationEvent {
  userId: string;
  type: 'order' | 'message' | 'alert' | 'system';
  title: string;
  body: string;
  timestamp: Date;
}

@Injectable()
export class NotificationSseService {
  private readonly events$ = new Subject<NotificationEvent>();

  // 이벤트 발행 (서비스 레이어에서 호출)
  emit(event: NotificationEvent): void {
    this.events$.next(event);
  }

  // 특정 사용자의 이벤트 스트림 반환
  getStream(userId: string): Observable<MessageEvent> {
    return this.events$.pipe(
      filter((event) => event.userId === userId || event.type === 'system'),
      map((event) => ({
        data: {
          type: event.type,
          title: event.title,
          body: event.body,
          timestamp: event.timestamp.toISOString(),
        },
        type: event.type,  // SSE event type (클라이언트에서 addEventListener로 분기)
        id: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
      })),
    );
  }

  // 연결 수 확인 (모니터링용)
  get observerCount(): number {
    return this.events$.observers.length;
  }
}

컨트롤러: 인증된 SSE 스트림

@Controller('events')
export class EventController {
  constructor(
    private readonly notificationSse: NotificationSseService,
  ) {}

  @Sse('notifications')
  @UseGuards(JwtAuthGuard)
  streamNotifications(@Req() req: AuthRequest): Observable<MessageEvent> {
    const userId = req.user.id;

    // 연결 시 초기 이벤트 전송
    const initial$ = new Observable<MessageEvent>((subscriber) => {
      subscriber.next({
        data: { type: 'connected', message: 'SSE 연결 성공' },
        type: 'system',
      });
    });

    // 실시간 이벤트 스트림
    const events$ = this.notificationSse.getStream(userId);

    // 30초마다 하트비트 (연결 유지)
    const heartbeat$ = interval(30_000).pipe(
      map(() => ({
        data: {},
        type: 'heartbeat',
        id: undefined,
      })),
    );

    return merge(initial$, events$, heartbeat$);
  }
}

// 클라이언트:
// const source = new EventSource('/events/notifications', {
//   headers: { 'Authorization': 'Bearer ' + token }  // 주의: EventSource는 커스텀 헤더 미지원
// });
//
// → 해결책: 쿼리 파라미터로 토큰 전달 또는 eventsource-polyfill 사용

SSE 인증 문제 해결

브라우저의 EventSource API는 커스텀 헤더를 지원하지 않는다. JWT 토큰을 전달하려면 쿼리 파라미터, 쿠키, 또는 폴리필을 사용해야 한다.

// 방법 1: 쿼리 파라미터 (간단하지만 로그에 토큰 노출 위험)
// GET /events/notifications?token=eyJhbG...

@Sse('notifications')
streamNotifications(@Query('token') token: string): Observable<MessageEvent> {
  const user = this.jwtService.verify(token);
  return this.notificationSse.getStream(user.id);
}

// 방법 2: 일회용 티켓 발급 (권장)
@Post('events/ticket')
@UseGuards(JwtAuthGuard)
async createTicket(@Req() req: AuthRequest) {
  const ticket = crypto.randomUUID();
  await this.cache.set(`sse:ticket:${ticket}`, req.user.id, 30_000); // 30초 유효
  return { ticket };
}

@Sse('notifications')
async streamNotifications(@Query('ticket') ticket: string): Promise<Observable<MessageEvent>> {
  const userId = await this.cache.get<string>(`sse:ticket:${ticket}`);
  if (!userId) throw new UnauthorizedException('유효하지 않은 티켓');
  await this.cache.del(`sse:ticket:${ticket}`); // 일회용 삭제
  return this.notificationSse.getStream(userId);
}

// 방법 3: fetch 기반 SSE (모던 브라우저)
// const response = await fetch('/events/notifications', {
//   headers: { 'Authorization': 'Bearer ' + token }
// });
// const reader = response.body.getReader();
// const decoder = new TextDecoder();

이벤트 연동: 비즈니스 이벤트 → SSE 푸시

import { OnEvent } from '@nestjs/event-emitter';

@Injectable()
export class NotificationEventHandler {
  constructor(private readonly sseService: NotificationSseService) {}

  @OnEvent('order.completed')
  handleOrderCompleted(event: OrderCompletedEvent) {
    this.sseService.emit({
      userId: event.userId,
      type: 'order',
      title: '주문 완료',
      body: `주문 #${event.orderId}이 완료되었습니다.`,
      timestamp: new Date(),
    });
  }

  @OnEvent('message.received')
  handleNewMessage(event: MessageReceivedEvent) {
    this.sseService.emit({
      userId: event.recipientId,
      type: 'message',
      title: `${event.senderName}님의 메시지`,
      body: event.preview,
      timestamp: new Date(),
    });
  }

  @OnEvent('system.maintenance')
  handleMaintenance(event: MaintenanceEvent) {
    this.sseService.emit({
      userId: '*',  // 전체 사용자
      type: 'system',
      title: '시스템 점검 안내',
      body: event.message,
      timestamp: new Date(),
    });
  }
}

NestJS EventEmitter2 이벤트 설계에서 다룬 도메인 이벤트 패턴이 SSE 푸시의 트리거로 자연스럽게 연결된다.

AI 응답 스트리밍: ChatGPT 스타일

LLM 응답을 토큰 단위로 스트리밍하는 것은 SSE의 대표적인 활용 사례다.

@Controller('ai')
export class AiController {
  constructor(private readonly aiService: AiService) {}

  @Sse('chat/stream')
  @UseGuards(JwtAuthGuard)
  streamChat(
    @Query('prompt') prompt: string,
    @Req() req: AuthRequest,
  ): Observable<MessageEvent> {
    return new Observable((subscriber) => {
      const abortController = new AbortController();

      this.aiService
        .streamCompletion(prompt, {
          signal: abortController.signal,
          onToken: (token: string) => {
            subscriber.next({
              data: { token, done: false },
              type: 'token',
            });
          },
          onDone: (fullResponse: string) => {
            subscriber.next({
              data: { token: '', done: true, full: fullResponse },
              type: 'done',
            });
            subscriber.complete();
          },
          onError: (error: Error) => {
            subscriber.next({
              data: { error: error.message },
              type: 'error',
            });
            subscriber.complete();
          },
        })
        .catch((err) => subscriber.error(err));

      // 클라이언트 연결 해제 시 스트리밍 중단
      return () => abortController.abort();
    });
  }
}

연결 관리와 정리

SSE 연결이 많아지면 메모리와 파일 디스크립터 관리가 중요하다. 연결 풀 관리와 타임아웃 설정이 필수다.

@Injectable()
export class SseConnectionManager {
  private connections = new Map<string, Set<string>>(); // userId → sessionIds

  register(userId: string, sessionId: string): void {
    if (!this.connections.has(userId)) {
      this.connections.set(userId, new Set());
    }
    this.connections.get(userId)!.add(sessionId);
  }

  unregister(userId: string, sessionId: string): void {
    this.connections.get(userId)?.delete(sessionId);
    if (this.connections.get(userId)?.size === 0) {
      this.connections.delete(userId);
    }
  }

  getStats() {
    return {
      totalUsers: this.connections.size,
      totalConnections: Array.from(this.connections.values())
        .reduce((sum, set) => sum + set.size, 0),
    };
  }
}

// Nginx SSE 설정 (버퍼링 비활성화 필수)
// location /events/ {
//   proxy_pass http://upstream;
//   proxy_set_header Connection '';
//   proxy_http_version 1.1;
//   proxy_buffering off;          ← 핵심!
//   proxy_cache off;
//   proxy_read_timeout 86400s;    ← 긴 타임아웃
//   chunked_transfer_encoding off;
// }

NestJS Cache Manager 심화에서 다룬 Redis와 SSE를 결합하면, 멀티 인스턴스 환경에서도 Redis Pub/Sub를 통해 모든 서버의 SSE 클라이언트에게 이벤트를 전달할 수 있다.

마무리

SSE는 서버→클라이언트 단방향 실시간 통신의 가장 간단하고 효율적인 선택이다. NestJS의 @Sse()와 RxJS Observable 조합으로 알림 피드, AI 스트리밍, 실시간 대시보드를 쉽게 구현할 수 있다. 인증은 일회용 티켓 패턴으로, 확장은 Redis Pub/Sub로 해결하는 것이 실전 표준이다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux