SSE(Server-Sent Events)란?
실시간 데이터 전달에 WebSocket만 있는 것은 아니다. SSE(Server-Sent Events)는 서버에서 클라이언트로의 단방향 실시간 스트리밍을 HTTP 기반으로 구현하는 표준이다. 알림 피드, 실시간 대시보드, AI 응답 스트리밍, 주가 업데이트처럼 서버→클라이언트 단방향이면 SSE가 WebSocket보다 간단하고 효율적이다.
| 구분 | SSE | WebSocket |
|---|---|---|
| 방향 | 서버 → 클라이언트 (단방향) | 양방향 |
| 프로토콜 | HTTP/1.1, HTTP/2 | WS 프로토콜 (HTTP 업그레이드) |
| 자동 재연결 | 브라우저 내장 | 직접 구현 필요 |
| 데이터 형식 | 텍스트 (UTF-8) | 텍스트 + 바이너리 |
| 인프라 호환 | 프록시·CDN 친화적 | 별도 설정 필요 |
NestJS SSE 기본 구현
NestJS는 @Sse() 데코레이터로 SSE 엔드포인트를 간단하게 만들 수 있다. 핵심은 Observable<MessageEvent>를 반환하는 것이다.
import { Controller, Sse, MessageEvent } from '@nestjs/common';
import { Observable, interval, map } from 'rxjs';
@Controller('events')
export class EventController {
// 기본 SSE: 매 초 서버 시간 전송
@Sse('time')
streamTime(): Observable<MessageEvent> {
return interval(1000).pipe(
map((seq) => ({
data: {
timestamp: new Date().toISOString(),
seq,
},
})),
);
}
}
// 클라이언트 (브라우저):
// const source = new EventSource('/events/time');
// source.onmessage = (event) => {
// const data = JSON.parse(event.data);
// console.log(data.timestamp);
// };
Subject 기반 실시간 이벤트 브로드캐스트
실전에서는 interval이 아니라, 비즈니스 이벤트가 발생할 때 연결된 모든 클라이언트에게 푸시해야 한다. RxJS Subject가 이벤트 허브 역할을 한다.
// notification-sse.service.ts
import { Injectable } from '@nestjs/common';
import { Subject, Observable, filter, map } from 'rxjs';
interface NotificationEvent {
userId: string;
type: 'order' | 'message' | 'alert' | 'system';
title: string;
body: string;
timestamp: Date;
}
@Injectable()
export class NotificationSseService {
private readonly events$ = new Subject<NotificationEvent>();
// 이벤트 발행 (서비스 레이어에서 호출)
emit(event: NotificationEvent): void {
this.events$.next(event);
}
// 특정 사용자의 이벤트 스트림 반환
getStream(userId: string): Observable<MessageEvent> {
return this.events$.pipe(
filter((event) => event.userId === userId || event.type === 'system'),
map((event) => ({
data: {
type: event.type,
title: event.title,
body: event.body,
timestamp: event.timestamp.toISOString(),
},
type: event.type, // SSE event type (클라이언트에서 addEventListener로 분기)
id: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
})),
);
}
// 연결 수 확인 (모니터링용)
get observerCount(): number {
return this.events$.observers.length;
}
}
컨트롤러: 인증된 SSE 스트림
@Controller('events')
export class EventController {
constructor(
private readonly notificationSse: NotificationSseService,
) {}
@Sse('notifications')
@UseGuards(JwtAuthGuard)
streamNotifications(@Req() req: AuthRequest): Observable<MessageEvent> {
const userId = req.user.id;
// 연결 시 초기 이벤트 전송
const initial$ = new Observable<MessageEvent>((subscriber) => {
subscriber.next({
data: { type: 'connected', message: 'SSE 연결 성공' },
type: 'system',
});
});
// 실시간 이벤트 스트림
const events$ = this.notificationSse.getStream(userId);
// 30초마다 하트비트 (연결 유지)
const heartbeat$ = interval(30_000).pipe(
map(() => ({
data: {},
type: 'heartbeat',
id: undefined,
})),
);
return merge(initial$, events$, heartbeat$);
}
}
// 클라이언트:
// const source = new EventSource('/events/notifications', {
// headers: { 'Authorization': 'Bearer ' + token } // 주의: EventSource는 커스텀 헤더 미지원
// });
//
// → 해결책: 쿼리 파라미터로 토큰 전달 또는 eventsource-polyfill 사용
SSE 인증 문제 해결
브라우저의 EventSource API는 커스텀 헤더를 지원하지 않는다. JWT 토큰을 전달하려면 쿼리 파라미터, 쿠키, 또는 폴리필을 사용해야 한다.
// 방법 1: 쿼리 파라미터 (간단하지만 로그에 토큰 노출 위험)
// GET /events/notifications?token=eyJhbG...
@Sse('notifications')
streamNotifications(@Query('token') token: string): Observable<MessageEvent> {
const user = this.jwtService.verify(token);
return this.notificationSse.getStream(user.id);
}
// 방법 2: 일회용 티켓 발급 (권장)
@Post('events/ticket')
@UseGuards(JwtAuthGuard)
async createTicket(@Req() req: AuthRequest) {
const ticket = crypto.randomUUID();
await this.cache.set(`sse:ticket:${ticket}`, req.user.id, 30_000); // 30초 유효
return { ticket };
}
@Sse('notifications')
async streamNotifications(@Query('ticket') ticket: string): Promise<Observable<MessageEvent>> {
const userId = await this.cache.get<string>(`sse:ticket:${ticket}`);
if (!userId) throw new UnauthorizedException('유효하지 않은 티켓');
await this.cache.del(`sse:ticket:${ticket}`); // 일회용 삭제
return this.notificationSse.getStream(userId);
}
// 방법 3: fetch 기반 SSE (모던 브라우저)
// const response = await fetch('/events/notifications', {
// headers: { 'Authorization': 'Bearer ' + token }
// });
// const reader = response.body.getReader();
// const decoder = new TextDecoder();
이벤트 연동: 비즈니스 이벤트 → SSE 푸시
import { OnEvent } from '@nestjs/event-emitter';
@Injectable()
export class NotificationEventHandler {
constructor(private readonly sseService: NotificationSseService) {}
@OnEvent('order.completed')
handleOrderCompleted(event: OrderCompletedEvent) {
this.sseService.emit({
userId: event.userId,
type: 'order',
title: '주문 완료',
body: `주문 #${event.orderId}이 완료되었습니다.`,
timestamp: new Date(),
});
}
@OnEvent('message.received')
handleNewMessage(event: MessageReceivedEvent) {
this.sseService.emit({
userId: event.recipientId,
type: 'message',
title: `${event.senderName}님의 메시지`,
body: event.preview,
timestamp: new Date(),
});
}
@OnEvent('system.maintenance')
handleMaintenance(event: MaintenanceEvent) {
this.sseService.emit({
userId: '*', // 전체 사용자
type: 'system',
title: '시스템 점검 안내',
body: event.message,
timestamp: new Date(),
});
}
}
NestJS EventEmitter2 이벤트 설계에서 다룬 도메인 이벤트 패턴이 SSE 푸시의 트리거로 자연스럽게 연결된다.
AI 응답 스트리밍: ChatGPT 스타일
LLM 응답을 토큰 단위로 스트리밍하는 것은 SSE의 대표적인 활용 사례다.
@Controller('ai')
export class AiController {
constructor(private readonly aiService: AiService) {}
@Sse('chat/stream')
@UseGuards(JwtAuthGuard)
streamChat(
@Query('prompt') prompt: string,
@Req() req: AuthRequest,
): Observable<MessageEvent> {
return new Observable((subscriber) => {
const abortController = new AbortController();
this.aiService
.streamCompletion(prompt, {
signal: abortController.signal,
onToken: (token: string) => {
subscriber.next({
data: { token, done: false },
type: 'token',
});
},
onDone: (fullResponse: string) => {
subscriber.next({
data: { token: '', done: true, full: fullResponse },
type: 'done',
});
subscriber.complete();
},
onError: (error: Error) => {
subscriber.next({
data: { error: error.message },
type: 'error',
});
subscriber.complete();
},
})
.catch((err) => subscriber.error(err));
// 클라이언트 연결 해제 시 스트리밍 중단
return () => abortController.abort();
});
}
}
연결 관리와 정리
SSE 연결이 많아지면 메모리와 파일 디스크립터 관리가 중요하다. 연결 풀 관리와 타임아웃 설정이 필수다.
@Injectable()
export class SseConnectionManager {
private connections = new Map<string, Set<string>>(); // userId → sessionIds
register(userId: string, sessionId: string): void {
if (!this.connections.has(userId)) {
this.connections.set(userId, new Set());
}
this.connections.get(userId)!.add(sessionId);
}
unregister(userId: string, sessionId: string): void {
this.connections.get(userId)?.delete(sessionId);
if (this.connections.get(userId)?.size === 0) {
this.connections.delete(userId);
}
}
getStats() {
return {
totalUsers: this.connections.size,
totalConnections: Array.from(this.connections.values())
.reduce((sum, set) => sum + set.size, 0),
};
}
}
// Nginx SSE 설정 (버퍼링 비활성화 필수)
// location /events/ {
// proxy_pass http://upstream;
// proxy_set_header Connection '';
// proxy_http_version 1.1;
// proxy_buffering off; ← 핵심!
// proxy_cache off;
// proxy_read_timeout 86400s; ← 긴 타임아웃
// chunked_transfer_encoding off;
// }
NestJS Cache Manager 심화에서 다룬 Redis와 SSE를 결합하면, 멀티 인스턴스 환경에서도 Redis Pub/Sub를 통해 모든 서버의 SSE 클라이언트에게 이벤트를 전달할 수 있다.
마무리
SSE는 서버→클라이언트 단방향 실시간 통신의 가장 간단하고 효율적인 선택이다. NestJS의 @Sse()와 RxJS Observable 조합으로 알림 피드, AI 스트리밍, 실시간 대시보드를 쉽게 구현할 수 있다. 인증은 일회용 티켓 패턴으로, 확장은 Redis Pub/Sub로 해결하는 것이 실전 표준이다.