NestJS SSE 실시간 스트리밍

NestJS SSE란? 실시간 단방향 스트리밍

Server-Sent Events(SSE)는 서버에서 클라이언트로 실시간 데이터를 푸시하는 HTTP 기반 프로토콜이다. WebSocket과 달리 단방향(서버→클라이언트)이며, 일반 HTTP 연결 위에서 동작하므로 프록시·방화벽 호환성이 뛰어나다. NestJS는 RxJS Observable을 반환하는 것만으로 SSE를 지원한다.

이 글에서는 NestJS에서 SSE 엔드포인트 구현, 이벤트 타입 분류, 재연결 처리, Redis Pub/Sub 기반 다중 인스턴스 지원, 그리고 Streamable File 응답까지 실무 패턴을 깊이 있게 다룬다.

기본 SSE 엔드포인트

NestJS에서 SSE를 구현하려면 @Sse() 데코레이터를 사용하고 Observable<MessageEvent>를 반환한다.

import { Controller, Sse, MessageEvent } from '@nestjs/common';
import { Observable, interval, map } from 'rxjs';

@Controller('events')
export class EventsController {
  @Sse('stream')
  stream(): Observable<MessageEvent> {
    // 3초마다 서버 시간 전송
    return interval(3000).pipe(
      map((seq) => ({
        data: { timestamp: new Date().toISOString(), seq },
        id: String(seq),
        type: 'heartbeat',
        retry: 5000, // 재연결 대기 ms
      })),
    );
  }
}

클라이언트는 표준 EventSource API로 연결한다:

const source = new EventSource('/events/stream');

source.addEventListener('heartbeat', (e) => {
  const data = JSON.parse(e.data);
  console.log(`[${data.seq}] ${data.timestamp}`);
});

source.onerror = () => console.log('Reconnecting...');

MessageEvent 인터페이스의 각 필드 역할:

필드 타입 설명
data string | object 이벤트 페이로드 (자동 JSON 직렬화)
id string 이벤트 ID (재연결 시 Last-Event-ID 헤더로 전송)
type string 이벤트 타입 (클라이언트 addEventListener 매칭)
retry number 재연결 대기 시간 (ms)

EventEmitter2 기반 실시간 이벤트 브로드캐스트

실제 애플리케이션에서는 타이머가 아니라 비즈니스 이벤트를 스트리밍해야 한다. @nestjs/event-emitter와 Subject를 조합하면 서비스 레이어의 이벤트를 SSE로 전달할 수 있다.

// notification.gateway.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { Subject, Observable } from 'rxjs';
import { MessageEvent } from '@nestjs/common';

@Injectable()
export class NotificationGateway {
  // 사용자별 Subject 관리
  private subjects = new Map<string, Subject<MessageEvent>>();

  getStream(userId: string): Observable<MessageEvent> {
    if (!this.subjects.has(userId)) {
      this.subjects.set(userId, new Subject<MessageEvent>());
    }
    return this.subjects.get(userId)!.asObservable();
  }

  // 연결 해제 시 정리
  removeStream(userId: string): void {
    const subject = this.subjects.get(userId);
    if (subject) {
      subject.complete();
      this.subjects.delete(userId);
    }
  }

  @OnEvent('order.created')
  handleOrderCreated(payload: { userId: string; orderId: string }) {
    this.emit(payload.userId, 'order-created', {
      orderId: payload.orderId,
      message: '주문이 생성되었습니다.',
    });
  }

  @OnEvent('payment.completed')
  handlePaymentCompleted(payload: { userId: string; amount: number }) {
    this.emit(payload.userId, 'payment-completed', {
      amount: payload.amount,
      message: '결제가 완료되었습니다.',
    });
  }

  private emit(userId: string, type: string, data: any): void {
    const subject = this.subjects.get(userId);
    if (subject) {
      subject.next({
        data,
        type,
        id: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
      });
    }
  }
}
// events.controller.ts
@Controller('events')
export class EventsController {
  constructor(private readonly gateway: NotificationGateway) {}

  @Sse(':userId/notifications')
  notifications(
    @Param('userId') userId: string,
    @Req() req: Request,
  ): Observable<MessageEvent> {
    // 연결 종료 시 정리
    req.on('close', () => {
      this.gateway.removeStream(userId);
    });

    return this.gateway.getStream(userId);
  }
}

// order.service.ts — 이벤트 발행 측
@Injectable()
export class OrderService {
  constructor(private readonly eventEmitter: EventEmitter2) {}

  async createOrder(userId: string, dto: CreateOrderDto): Promise<Order> {
    const order = await this.orderRepository.save({ ...dto, userId });

    // SSE로 자동 전파됨
    this.eventEmitter.emit('order.created', {
      userId,
      orderId: order.id,
    });

    return order;
  }
}

Last-Event-ID: 재연결 시 누락 이벤트 복구

네트워크 끊김 후 재연결 시 EventSource는 마지막으로 수신한 이벤트 ID를 Last-Event-ID 헤더로 전송한다. 서버는 이를 활용해 놓친 이벤트를 재전송할 수 있다.

@Sse(':userId/notifications')
notifications(
  @Param('userId') userId: string,
  @Headers('last-event-id') lastEventId: string | undefined,
  @Req() req: Request,
): Observable<MessageEvent> {
  const stream$ = this.gateway.getStream(userId);

  if (lastEventId) {
    // 놓친 이벤트를 DB/Redis에서 조회하여 먼저 전송
    const missed$ = from(
      this.eventStore.getEventsSince(userId, lastEventId),
    ).pipe(
      mergeMap((events) => from(events)),
      map((event) => ({
        data: event.payload,
        type: event.type,
        id: event.id,
      })),
    );

    return concat(missed$, stream$);
  }

  req.on('close', () => this.gateway.removeStream(userId));
  return stream$;
}

// 이벤트 저장소 (Redis Sorted Set 활용)
@Injectable()
export class EventStore {
  constructor(@InjectRedis() private readonly redis: Redis) {}

  async saveEvent(userId: string, event: StoredEvent): Promise<void> {
    const key = `events:${userId}`;
    await this.redis.zadd(key, Date.now(), JSON.stringify(event));
    // 최근 1000개만 유지
    await this.redis.zremrangebyrank(key, 0, -1001);
  }

  async getEventsSince(
    userId: string,
    lastEventId: string,
  ): Promise<StoredEvent[]> {
    const timestamp = this.extractTimestamp(lastEventId);
    const raw = await this.redis.zrangebyscore(
      `events:${userId}`,
      timestamp + 1,
      '+inf',
    );
    return raw.map((r) => JSON.parse(r));
  }
}

Redis Pub/Sub: 다중 인스턴스 지원

서버가 여러 인스턴스로 스케일아웃되면, 특정 인스턴스에서 발생한 이벤트가 다른 인스턴스의 SSE 연결로 전달되지 않는다. Redis Pub/Sub로 인스턴스 간 이벤트를 동기화한다.

@Injectable()
export class RedisNotificationBridge implements OnModuleInit, OnModuleDestroy {
  private subscriber: Redis;

  constructor(
    @InjectRedis() private readonly publisher: Redis,
    private readonly gateway: NotificationGateway,
  ) {
    this.subscriber = publisher.duplicate();
  }

  async onModuleInit(): Promise<void> {
    await this.subscriber.subscribe('notifications');
    this.subscriber.on('message', (channel, message) => {
      const { userId, type, data, id } = JSON.parse(message);
      // 로컬 Subject로 전달
      this.gateway.emitDirect(userId, type, data, id);
    });
  }

  async onModuleDestroy(): Promise<void> {
    await this.subscriber.unsubscribe('notifications');
    await this.subscriber.quit();
  }

  // 서비스에서 이벤트 발행 시 Redis로 전파
  async publish(
    userId: string,
    type: string,
    data: any,
  ): Promise<void> {
    const id = `${Date.now()}-${Math.random().toString(36).slice(2)}`;
    await this.publisher.publish(
      'notifications',
      JSON.stringify({ userId, type, data, id }),
    );
  }
}
아키텍처 장점 단점
단일 인스턴스 + Subject 간단, 의존성 없음 스케일아웃 불가
Redis Pub/Sub 다중 인스턴스 지원, 간단한 설정 메시지 유실 가능(구독 전 발행)
Redis Streams 메시지 영속성, consumer group 설정 복잡

SSE vs WebSocket: 선택 기준

기준 SSE WebSocket
방향 서버→클라이언트 (단방향) 양방향
프로토콜 HTTP/1.1 (표준) WS (별도 프로토콜)
자동 재연결 브라우저 내장 직접 구현 필요
프록시/CDN 대부분 호환 설정 필요
적합 사례 알림, 대시보드, 실시간 피드 채팅, 게임, 양방향 제어

알림, 실시간 피드, 대시보드처럼 서버→클라이언트 단방향 푸시가 목적이라면 SSE가 훨씬 단순하고 안정적이다. 양방향 통신이 필요할 때만 WebSocket Gateway를 사용한다.

StreamableFile: 대용량 파일 스트리밍

SSE 외에도 NestJS는 StreamableFile을 통해 대용량 파일을 메모리에 전부 올리지 않고 스트리밍 전송할 수 있다.

import { StreamableFile } from '@nestjs/common';
import { createReadStream } from 'fs';

@Controller('files')
export class FilesController {
  @Get(':id/download')
  @Header('Content-Type', 'application/octet-stream')
  download(@Param('id') id: string, @Res({ passthrough: true }) res: Response) {
    const filePath = this.fileService.getPath(id);
    const stat = statSync(filePath);

    res.set({
      'Content-Disposition': `attachment; filename="${id}.pdf"`,
      'Content-Length': stat.size,
    });

    const stream = createReadStream(filePath);
    return new StreamableFile(stream);
  }

  // DB에서 CSV 스트리밍 생성
  @Get('export/users')
  @Header('Content-Type', 'text/csv')
  async exportUsers(@Res({ passthrough: true }) res: Response) {
    res.set({ 'Content-Disposition': 'attachment; filename="users.csv"' });

    const passThrough = new PassThrough();
    passThrough.write('id,name,emailn');

    // 청크 단위로 DB 조회 → 스트리밍
    const batchSize = 1000;
    let offset = 0;
    let hasMore = true;

    while (hasMore) {
      const users = await this.userService.findBatch(offset, batchSize);
      for (const user of users) {
        passThrough.write(`${user.id},${user.name},${user.email}n`);
      }
      offset += batchSize;
      hasMore = users.length === batchSize;
    }

    passThrough.end();
    return new StreamableFile(passThrough);
  }
}

Nginx/K8s SSE 프록시 설정

SSE는 장시간 유지되는 HTTP 연결이므로 프록시 설정에 주의가 필요하다.

# nginx.conf — SSE 엔드포인트 전용 설정
location /events/ {
    proxy_pass http://nestjs_upstream;
    proxy_http_version 1.1;
    proxy_set_header Connection '';  # keep-alive
    proxy_buffering off;             # 버퍼링 비활성화 필수!
    proxy_cache off;
    proxy_read_timeout 86400s;       # 24시간 (기본 60초면 끊김)
    chunked_transfer_encoding off;
}

# Kubernetes Ingress (nginx-ingress)
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
    nginx.ingress.kubernetes.io/proxy-buffering: "off"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "86400"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "86400"

인증이 필요한 SSE: 토큰 전달

EventSource API는 커스텀 헤더를 지원하지 않는다. JWT를 전달하는 방법은 두 가지다:

// 방법 1: 쿼리 파라미터 (간단하지만 로그에 토큰 노출 주의)
const source = new EventSource('/events/stream?token=eyJhbG...');

// NestJS Guard에서 쿼리 토큰 추출
@Injectable()
export class SseAuthGuard implements CanActivate {
  canActivate(context: ExecutionContext): boolean {
    const req = context.switchToHttp().getRequest();
    const token = req.query.token || req.headers.authorization?.split(' ')[1];
    if (!token) throw new UnauthorizedException();
    req.user = this.jwtService.verify(token);
    return true;
  }
}

// 방법 2: fetch + ReadableStream (헤더 전달 가능, 권장)
async function connectSSE(url: string, token: string) {
  const response = await fetch(url, {
    headers: { Authorization: `Bearer ${token}` },
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const text = decoder.decode(value);
    // SSE 프로토콜 파싱
    parseSSEChunk(text);
  }
}

운영 체크리스트

  • 커넥션 수 제한: 브라우저는 도메인당 SSE 연결을 6개로 제한한다(HTTP/1.1). HTTP/2에서는 제한이 크게 완화된다
  • 하트비트: 30초 간격으로 주석(: ping)을 전송해 프록시 타임아웃을 방지한다
  • 메모리 누수: 클라이언트 연결 종료 시 반드시 Subject를 complete하고 Map에서 제거한다
  • Graceful Shutdown: 서버 종료 시 모든 SSE Subject를 complete하여 클라이언트가 재연결하도록 유도한다
  • 모니터링: Interceptor로 활성 SSE 연결 수를 Prometheus 메트릭으로 노출한다
  • 배압 제어: 느린 클라이언트에 이벤트가 쌓이지 않도록 버퍼 크기를 제한하고, 초과 시 연결을 끊는다

NestJS SSE는 RxJS Observable과의 자연스러운 통합 덕분에 최소한의 코드로 실시간 스트리밍을 구현할 수 있다. 단방향 푸시에는 SSE, 양방향 통신에는 WebSocket — 이 기준만 지키면 불필요한 복잡성 없이 실시간 기능을 제공할 수 있다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux