NestJS SSE란? 실시간 단방향 스트리밍
Server-Sent Events(SSE)는 서버에서 클라이언트로 실시간 데이터를 푸시하는 HTTP 기반 프로토콜이다. WebSocket과 달리 단방향(서버→클라이언트)이며, 일반 HTTP 연결 위에서 동작하므로 프록시·방화벽 호환성이 뛰어나다. NestJS는 RxJS Observable을 반환하는 것만으로 SSE를 지원한다.
이 글에서는 NestJS에서 SSE 엔드포인트 구현, 이벤트 타입 분류, 재연결 처리, Redis Pub/Sub 기반 다중 인스턴스 지원, 그리고 Streamable File 응답까지 실무 패턴을 깊이 있게 다룬다.
기본 SSE 엔드포인트
NestJS에서 SSE를 구현하려면 @Sse() 데코레이터를 사용하고 Observable<MessageEvent>를 반환한다.
import { Controller, Sse, MessageEvent } from '@nestjs/common';
import { Observable, interval, map } from 'rxjs';
@Controller('events')
export class EventsController {
@Sse('stream')
stream(): Observable<MessageEvent> {
// 3초마다 서버 시간 전송
return interval(3000).pipe(
map((seq) => ({
data: { timestamp: new Date().toISOString(), seq },
id: String(seq),
type: 'heartbeat',
retry: 5000, // 재연결 대기 ms
})),
);
}
}
클라이언트는 표준 EventSource API로 연결한다:
const source = new EventSource('/events/stream');
source.addEventListener('heartbeat', (e) => {
const data = JSON.parse(e.data);
console.log(`[${data.seq}] ${data.timestamp}`);
});
source.onerror = () => console.log('Reconnecting...');
MessageEvent 인터페이스의 각 필드 역할:
| 필드 | 타입 | 설명 |
|---|---|---|
data |
string | object | 이벤트 페이로드 (자동 JSON 직렬화) |
id |
string | 이벤트 ID (재연결 시 Last-Event-ID 헤더로 전송) |
type |
string | 이벤트 타입 (클라이언트 addEventListener 매칭) |
retry |
number | 재연결 대기 시간 (ms) |
EventEmitter2 기반 실시간 이벤트 브로드캐스트
실제 애플리케이션에서는 타이머가 아니라 비즈니스 이벤트를 스트리밍해야 한다. @nestjs/event-emitter와 Subject를 조합하면 서비스 레이어의 이벤트를 SSE로 전달할 수 있다.
// notification.gateway.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { Subject, Observable } from 'rxjs';
import { MessageEvent } from '@nestjs/common';
@Injectable()
export class NotificationGateway {
// 사용자별 Subject 관리
private subjects = new Map<string, Subject<MessageEvent>>();
getStream(userId: string): Observable<MessageEvent> {
if (!this.subjects.has(userId)) {
this.subjects.set(userId, new Subject<MessageEvent>());
}
return this.subjects.get(userId)!.asObservable();
}
// 연결 해제 시 정리
removeStream(userId: string): void {
const subject = this.subjects.get(userId);
if (subject) {
subject.complete();
this.subjects.delete(userId);
}
}
@OnEvent('order.created')
handleOrderCreated(payload: { userId: string; orderId: string }) {
this.emit(payload.userId, 'order-created', {
orderId: payload.orderId,
message: '주문이 생성되었습니다.',
});
}
@OnEvent('payment.completed')
handlePaymentCompleted(payload: { userId: string; amount: number }) {
this.emit(payload.userId, 'payment-completed', {
amount: payload.amount,
message: '결제가 완료되었습니다.',
});
}
private emit(userId: string, type: string, data: any): void {
const subject = this.subjects.get(userId);
if (subject) {
subject.next({
data,
type,
id: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
});
}
}
}
// events.controller.ts
@Controller('events')
export class EventsController {
constructor(private readonly gateway: NotificationGateway) {}
@Sse(':userId/notifications')
notifications(
@Param('userId') userId: string,
@Req() req: Request,
): Observable<MessageEvent> {
// 연결 종료 시 정리
req.on('close', () => {
this.gateway.removeStream(userId);
});
return this.gateway.getStream(userId);
}
}
// order.service.ts — 이벤트 발행 측
@Injectable()
export class OrderService {
constructor(private readonly eventEmitter: EventEmitter2) {}
async createOrder(userId: string, dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepository.save({ ...dto, userId });
// SSE로 자동 전파됨
this.eventEmitter.emit('order.created', {
userId,
orderId: order.id,
});
return order;
}
}
Last-Event-ID: 재연결 시 누락 이벤트 복구
네트워크 끊김 후 재연결 시 EventSource는 마지막으로 수신한 이벤트 ID를 Last-Event-ID 헤더로 전송한다. 서버는 이를 활용해 놓친 이벤트를 재전송할 수 있다.
@Sse(':userId/notifications')
notifications(
@Param('userId') userId: string,
@Headers('last-event-id') lastEventId: string | undefined,
@Req() req: Request,
): Observable<MessageEvent> {
const stream$ = this.gateway.getStream(userId);
if (lastEventId) {
// 놓친 이벤트를 DB/Redis에서 조회하여 먼저 전송
const missed$ = from(
this.eventStore.getEventsSince(userId, lastEventId),
).pipe(
mergeMap((events) => from(events)),
map((event) => ({
data: event.payload,
type: event.type,
id: event.id,
})),
);
return concat(missed$, stream$);
}
req.on('close', () => this.gateway.removeStream(userId));
return stream$;
}
// 이벤트 저장소 (Redis Sorted Set 활용)
@Injectable()
export class EventStore {
constructor(@InjectRedis() private readonly redis: Redis) {}
async saveEvent(userId: string, event: StoredEvent): Promise<void> {
const key = `events:${userId}`;
await this.redis.zadd(key, Date.now(), JSON.stringify(event));
// 최근 1000개만 유지
await this.redis.zremrangebyrank(key, 0, -1001);
}
async getEventsSince(
userId: string,
lastEventId: string,
): Promise<StoredEvent[]> {
const timestamp = this.extractTimestamp(lastEventId);
const raw = await this.redis.zrangebyscore(
`events:${userId}`,
timestamp + 1,
'+inf',
);
return raw.map((r) => JSON.parse(r));
}
}
Redis Pub/Sub: 다중 인스턴스 지원
서버가 여러 인스턴스로 스케일아웃되면, 특정 인스턴스에서 발생한 이벤트가 다른 인스턴스의 SSE 연결로 전달되지 않는다. Redis Pub/Sub로 인스턴스 간 이벤트를 동기화한다.
@Injectable()
export class RedisNotificationBridge implements OnModuleInit, OnModuleDestroy {
private subscriber: Redis;
constructor(
@InjectRedis() private readonly publisher: Redis,
private readonly gateway: NotificationGateway,
) {
this.subscriber = publisher.duplicate();
}
async onModuleInit(): Promise<void> {
await this.subscriber.subscribe('notifications');
this.subscriber.on('message', (channel, message) => {
const { userId, type, data, id } = JSON.parse(message);
// 로컬 Subject로 전달
this.gateway.emitDirect(userId, type, data, id);
});
}
async onModuleDestroy(): Promise<void> {
await this.subscriber.unsubscribe('notifications');
await this.subscriber.quit();
}
// 서비스에서 이벤트 발행 시 Redis로 전파
async publish(
userId: string,
type: string,
data: any,
): Promise<void> {
const id = `${Date.now()}-${Math.random().toString(36).slice(2)}`;
await this.publisher.publish(
'notifications',
JSON.stringify({ userId, type, data, id }),
);
}
}
| 아키텍처 | 장점 | 단점 |
|---|---|---|
| 단일 인스턴스 + Subject | 간단, 의존성 없음 | 스케일아웃 불가 |
| Redis Pub/Sub | 다중 인스턴스 지원, 간단한 설정 | 메시지 유실 가능(구독 전 발행) |
| Redis Streams | 메시지 영속성, consumer group | 설정 복잡 |
SSE vs WebSocket: 선택 기준
| 기준 | SSE | WebSocket |
|---|---|---|
| 방향 | 서버→클라이언트 (단방향) | 양방향 |
| 프로토콜 | HTTP/1.1 (표준) | WS (별도 프로토콜) |
| 자동 재연결 | 브라우저 내장 | 직접 구현 필요 |
| 프록시/CDN | 대부분 호환 | 설정 필요 |
| 적합 사례 | 알림, 대시보드, 실시간 피드 | 채팅, 게임, 양방향 제어 |
알림, 실시간 피드, 대시보드처럼 서버→클라이언트 단방향 푸시가 목적이라면 SSE가 훨씬 단순하고 안정적이다. 양방향 통신이 필요할 때만 WebSocket Gateway를 사용한다.
StreamableFile: 대용량 파일 스트리밍
SSE 외에도 NestJS는 StreamableFile을 통해 대용량 파일을 메모리에 전부 올리지 않고 스트리밍 전송할 수 있다.
import { StreamableFile } from '@nestjs/common';
import { createReadStream } from 'fs';
@Controller('files')
export class FilesController {
@Get(':id/download')
@Header('Content-Type', 'application/octet-stream')
download(@Param('id') id: string, @Res({ passthrough: true }) res: Response) {
const filePath = this.fileService.getPath(id);
const stat = statSync(filePath);
res.set({
'Content-Disposition': `attachment; filename="${id}.pdf"`,
'Content-Length': stat.size,
});
const stream = createReadStream(filePath);
return new StreamableFile(stream);
}
// DB에서 CSV 스트리밍 생성
@Get('export/users')
@Header('Content-Type', 'text/csv')
async exportUsers(@Res({ passthrough: true }) res: Response) {
res.set({ 'Content-Disposition': 'attachment; filename="users.csv"' });
const passThrough = new PassThrough();
passThrough.write('id,name,emailn');
// 청크 단위로 DB 조회 → 스트리밍
const batchSize = 1000;
let offset = 0;
let hasMore = true;
while (hasMore) {
const users = await this.userService.findBatch(offset, batchSize);
for (const user of users) {
passThrough.write(`${user.id},${user.name},${user.email}n`);
}
offset += batchSize;
hasMore = users.length === batchSize;
}
passThrough.end();
return new StreamableFile(passThrough);
}
}
Nginx/K8s SSE 프록시 설정
SSE는 장시간 유지되는 HTTP 연결이므로 프록시 설정에 주의가 필요하다.
# nginx.conf — SSE 엔드포인트 전용 설정
location /events/ {
proxy_pass http://nestjs_upstream;
proxy_http_version 1.1;
proxy_set_header Connection ''; # keep-alive
proxy_buffering off; # 버퍼링 비활성화 필수!
proxy_cache off;
proxy_read_timeout 86400s; # 24시간 (기본 60초면 끊김)
chunked_transfer_encoding off;
}
# Kubernetes Ingress (nginx-ingress)
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
nginx.ingress.kubernetes.io/proxy-buffering: "off"
nginx.ingress.kubernetes.io/proxy-read-timeout: "86400"
nginx.ingress.kubernetes.io/proxy-send-timeout: "86400"
인증이 필요한 SSE: 토큰 전달
EventSource API는 커스텀 헤더를 지원하지 않는다. JWT를 전달하는 방법은 두 가지다:
// 방법 1: 쿼리 파라미터 (간단하지만 로그에 토큰 노출 주의)
const source = new EventSource('/events/stream?token=eyJhbG...');
// NestJS Guard에서 쿼리 토큰 추출
@Injectable()
export class SseAuthGuard implements CanActivate {
canActivate(context: ExecutionContext): boolean {
const req = context.switchToHttp().getRequest();
const token = req.query.token || req.headers.authorization?.split(' ')[1];
if (!token) throw new UnauthorizedException();
req.user = this.jwtService.verify(token);
return true;
}
}
// 방법 2: fetch + ReadableStream (헤더 전달 가능, 권장)
async function connectSSE(url: string, token: string) {
const response = await fetch(url, {
headers: { Authorization: `Bearer ${token}` },
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
// SSE 프로토콜 파싱
parseSSEChunk(text);
}
}
운영 체크리스트
- 커넥션 수 제한: 브라우저는 도메인당 SSE 연결을 6개로 제한한다(HTTP/1.1). HTTP/2에서는 제한이 크게 완화된다
- 하트비트: 30초 간격으로 주석(
: ping)을 전송해 프록시 타임아웃을 방지한다 - 메모리 누수: 클라이언트 연결 종료 시 반드시 Subject를 complete하고 Map에서 제거한다
- Graceful Shutdown: 서버 종료 시 모든 SSE Subject를 complete하여 클라이언트가 재연결하도록 유도한다
- 모니터링: Interceptor로 활성 SSE 연결 수를 Prometheus 메트릭으로 노출한다
- 배압 제어: 느린 클라이언트에 이벤트가 쌓이지 않도록 버퍼 크기를 제한하고, 초과 시 연결을 끊는다
NestJS SSE는 RxJS Observable과의 자연스러운 통합 덕분에 최소한의 코드로 실시간 스트리밍을 구현할 수 있다. 단방향 푸시에는 SSE, 양방향 통신에는 WebSocket — 이 기준만 지키면 불필요한 복잡성 없이 실시간 기능을 제공할 수 있다.