GraphQL Subscription이란?
GraphQL의 Query·Mutation은 요청-응답 패턴이지만, Subscription은 서버가 클라이언트에 실시간으로 데이터를 푸시하는 패턴입니다. 채팅, 알림, 실시간 대시보드 등에서 WebSocket 기반으로 동작하며, NestJS의 Code-First 접근법과 결합하면 타입 안전한 실시간 API를 선언적으로 구현할 수 있습니다.
Query·Mutation·Subscription 비교
| 타입 | 프로토콜 | 흐름 | 용도 |
|---|---|---|---|
| Query | HTTP | 클라이언트 → 서버 → 응답 | 데이터 조회 |
| Mutation | HTTP | 클라이언트 → 서버 → 응답 | 데이터 변경 |
| Subscription | WebSocket | 서버 → 클라이언트 (지속 푸시) | 실시간 알림 |
프로젝트 설정
// 패키지 설치
npm install @nestjs/graphql @nestjs/apollo @apollo/server
npm install graphql graphql-subscriptions graphql-ws
npm install graphql-redis-subscriptions ioredis
GraphQL 모듈 설정
Subscription을 활성화하려면 WebSocket 설정이 필요합니다.
// app.module.ts
@Module({
imports: [
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
autoSchemaFile: join(process.cwd(), 'src/schema.gql'),
sortSchema: true,
// Subscription 활성화
subscriptions: {
'graphql-ws': {
path: '/graphql',
onConnect: (context) => {
const { connectionParams } = context;
// 인증 토큰 검증
const token = connectionParams?.Authorization as string;
if (!token) {
throw new Error('인증 토큰이 필요합니다');
}
return { token };
},
onDisconnect: (context) => {
console.log('클라이언트 연결 해제');
},
},
},
context: ({ req, connection }) => {
// HTTP 요청과 WebSocket 연결 모두 처리
if (connection) {
return { ...connection.context };
}
return { req };
},
}),
ChatModule,
NotificationModule,
],
})
export class AppModule {}
PubSub 엔진: 메모리 vs Redis
단일 인스턴스에서는 인메모리 PubSub으로 충분하지만, 다중 인스턴스 환경에서는 Redis PubSub이 필수입니다.
// pubsub.module.ts
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';
const PUBSUB_TOKEN = 'PUB_SUB';
@Module({
providers: [
{
provide: PUBSUB_TOKEN,
useFactory: () => {
if (process.env.NODE_ENV === 'production') {
// 프로덕션: Redis PubSub (다중 인스턴스 지원)
return new RedisPubSub({
publisher: new Redis({
host: process.env.REDIS_HOST,
port: 6379,
retryStrategy: (times) => Math.min(times * 50, 2000),
}),
subscriber: new Redis({
host: process.env.REDIS_HOST,
port: 6379,
retryStrategy: (times) => Math.min(times * 50, 2000),
}),
});
}
// 개발: 인메모리 PubSub
return new PubSub();
},
},
],
exports: [PUBSUB_TOKEN],
})
export class PubSubModule {}
채팅 Subscription 구현
실시간 채팅을 Subscription으로 구현합니다. @Subscription() 데코레이터와 필터링을 결합합니다.
// chat.resolver.ts
@Resolver(() => ChatMessage)
export class ChatResolver {
constructor(
@Inject('PUB_SUB') private readonly pubSub: RedisPubSub,
private readonly chatService: ChatService,
) {}
// 메시지 전송 (Mutation)
@Mutation(() => ChatMessage)
async sendMessage(
@Args('input') input: SendMessageInput,
@CurrentUser() user: User,
): Promise<ChatMessage> {
const message = await this.chatService.create({
roomId: input.roomId,
content: input.content,
senderId: user.id,
senderName: user.name,
});
// 구독자에게 발행
await this.pubSub.publish(`chat.${input.roomId}`, {
messageAdded: message,
});
return message;
}
// 실시간 메시지 수신 (Subscription)
@Subscription(() => ChatMessage, {
// 특정 채팅방만 필터링
filter: (payload, variables) => {
return payload.messageAdded.roomId === variables.roomId;
},
// 반환 데이터 변환
resolve: (payload) => payload.messageAdded,
})
messageAdded(
@Args('roomId') roomId: string,
) {
return this.pubSub.asyncIterator(`chat.${roomId}`);
}
}
알림 Subscription: 사용자별 필터링
// notification.resolver.ts
@Resolver(() => Notification)
export class NotificationResolver {
constructor(
@Inject('PUB_SUB') private readonly pubSub: RedisPubSub,
private readonly notificationService: NotificationService,
) {}
@Mutation(() => Notification)
async createNotification(
@Args('input') input: CreateNotificationInput,
): Promise<Notification> {
const notification = await this.notificationService.create(input);
// 대상 사용자에게 발행
await this.pubSub.publish(
`notification.${input.targetUserId}`,
{ notificationReceived: notification },
);
return notification;
}
@Subscription(() => Notification, {
filter: (payload, variables, context) => {
// 인증된 사용자 본인의 알림만 수신
return payload.notificationReceived.targetUserId
=== context.userId;
},
resolve: (payload) => payload.notificationReceived,
})
notificationReceived() {
// 와일드카드 패턴으로 모든 사용자 알림 구독
return this.pubSub.asyncIterator('notification.*');
}
}
// ObjectType 정의
@ObjectType()
export class Notification {
@Field(() => ID)
id: string;
@Field()
title: string;
@Field()
body: string;
@Field()
targetUserId: string;
@Field(() => NotificationType)
type: NotificationType;
@Field()
createdAt: Date;
@Field()
isRead: boolean;
}
실시간 대시보드: 주기적 데이터 푸시
외부 이벤트가 아닌 주기적 데이터 폴링을 Subscription으로 래핑합니다.
@Resolver(() => DashboardStats)
export class DashboardResolver {
constructor(
@Inject('PUB_SUB') private readonly pubSub: RedisPubSub,
private readonly statsService: StatsService,
) {}
// 5초마다 대시보드 통계 발행
@Interval(5000)
async publishStats() {
const stats = await this.statsService.getRealtimeStats();
await this.pubSub.publish('dashboard.stats', {
dashboardUpdated: stats,
});
}
@Subscription(() => DashboardStats, {
resolve: (payload) => payload.dashboardUpdated,
})
dashboardUpdated() {
return this.pubSub.asyncIterator('dashboard.stats');
}
}
WebSocket 인증 Guard
Subscription 연결 시 JWT 토큰을 검증하는 Guard를 구현합니다.
@Injectable()
export class WsAuthGuard implements CanActivate {
constructor(private readonly jwtService: JwtService) {}
canActivate(context: ExecutionContext): boolean {
const gqlContext = GqlExecutionContext.create(context);
const ctx = gqlContext.getContext();
// WebSocket 연결의 connectionParams에서 토큰 추출
const token = ctx.token || ctx.req?.headers?.authorization;
if (!token) {
throw new UnauthorizedException('인증 토큰 없음');
}
try {
const cleanToken = token.replace('Bearer ', '');
const payload = this.jwtService.verify(cleanToken);
ctx.userId = payload.sub;
ctx.userRole = payload.role;
return true;
} catch {
throw new UnauthorizedException('유효하지 않은 토큰');
}
}
}
// Resolver에 적용
@Subscription(() => ChatMessage, { /* ... */ })
@UseGuards(WsAuthGuard)
messageAdded(@Args('roomId') roomId: string) {
return this.pubSub.asyncIterator(`chat.${roomId}`);
}
연결 관리: 구독 수 제한
// connection-tracker.service.ts
@Injectable()
export class ConnectionTracker {
private connections = new Map<string, Set<string>>();
private readonly MAX_SUBSCRIPTIONS_PER_USER = 10;
addSubscription(userId: string, subscriptionId: string): void {
const userSubs = this.connections.get(userId) || new Set();
if (userSubs.size >= this.MAX_SUBSCRIPTIONS_PER_USER) {
throw new Error(
`구독 제한 초과: 최대 ${this.MAX_SUBSCRIPTIONS_PER_USER}개`
);
}
userSubs.add(subscriptionId);
this.connections.set(userId, userSubs);
}
removeSubscription(userId: string, subscriptionId: string): void {
const userSubs = this.connections.get(userId);
if (userSubs) {
userSubs.delete(subscriptionId);
if (userSubs.size === 0) {
this.connections.delete(userId);
}
}
}
getActiveCount(): number {
let total = 0;
this.connections.forEach(subs => total += subs.size);
return total;
}
}
클라이언트 사용 예시
// Apollo Client (React)
import { createClient } from 'graphql-ws';
const wsClient = createClient({
url: 'ws://api.example.com/graphql',
connectionParams: {
Authorization: `Bearer ${getToken()}`,
},
retryAttempts: 5,
shouldRetry: () => true,
});
// Subscription 구독
const subscription = wsClient.subscribe(
{
query: `
subscription OnMessageAdded($roomId: String!) {
messageAdded(roomId: $roomId) {
id
content
senderName
createdAt
}
}
`,
variables: { roomId: 'room-1' },
},
{
next: (data) => console.log('새 메시지:', data),
error: (err) => console.error('구독 에러:', err),
complete: () => console.log('구독 종료'),
},
);
테스트 전략
E2E 테스트에서 WebSocket 클라이언트로 Subscription을 검증합니다.
describe('Chat Subscription (e2e)', () => {
let app: INestApplication;
beforeAll(async () => {
const module = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = module.createNestApplication();
await app.listen(0);
});
it('receives messages via subscription', (done) => {
const port = app.getHttpServer().address().port;
const client = createClient({
url: `ws://localhost:${port}/graphql`,
webSocketImpl: WebSocket,
connectionParams: { Authorization: 'Bearer test-token' },
});
const messages: any[] = [];
client.subscribe(
{
query: `subscription { messageAdded(roomId: "room-1") {
id content senderName
}}`,
},
{
next: (data) => {
messages.push(data.data.messageAdded);
if (messages.length === 2) {
expect(messages[0].content).toBe('Hello');
expect(messages[1].content).toBe('World');
done();
}
},
error: done,
complete: () => {},
},
);
// Mutation으로 메시지 전송
setTimeout(async () => {
await request(app.getHttpServer())
.post('/graphql')
.send({
query: `mutation { sendMessage(input: {
roomId: "room-1", content: "Hello"
}) { id } }`,
});
await request(app.getHttpServer())
.post('/graphql')
.send({
query: `mutation { sendMessage(input: {
roomId: "room-1", content: "World"
}) { id } }`,
});
}, 500);
});
});
운영 체크리스트
| 항목 | 확인 사항 |
|---|---|
| PubSub 엔진 | 다중 인스턴스 → Redis PubSub 필수 |
| WebSocket 인증 | connectionParams로 JWT 검증 |
| 구독 수 제한 | 사용자별 max subscription 설정 |
| filter 함수 | 불필요한 이벤트 클라이언트 전송 방지 |
| 재연결 전략 | 클라이언트 retryAttempts 설정 |
| 로드밸런서 | sticky session 또는 WebSocket 지원 확인 |
마치며
NestJS GraphQL Subscription은 WebSocket 기반 실시간 데이터 푸시를 타입 안전하게 구현하는 강력한 패턴입니다. Code-First @Subscription() 데코레이터로 선언적 구현하고, filter로 정밀한 이벤트 필터링을 적용하며, Redis PubSub으로 다중 인스턴스 확장성을 확보합니다. 채팅, 알림, 대시보드 등 실시간 기능에서 REST 폴링을 대체하는 효율적인 선택입니다.