NestJS GraphQL Subscription

GraphQL Subscription이란?

GraphQL의 Query·Mutation은 요청-응답 패턴이지만, Subscription은 서버가 클라이언트에 실시간으로 데이터를 푸시하는 패턴입니다. 채팅, 알림, 실시간 대시보드 등에서 WebSocket 기반으로 동작하며, NestJS의 Code-First 접근법과 결합하면 타입 안전한 실시간 API를 선언적으로 구현할 수 있습니다.

Query·Mutation·Subscription 비교

타입 프로토콜 흐름 용도
Query HTTP 클라이언트 → 서버 → 응답 데이터 조회
Mutation HTTP 클라이언트 → 서버 → 응답 데이터 변경
Subscription WebSocket 서버 → 클라이언트 (지속 푸시) 실시간 알림

프로젝트 설정

// 패키지 설치
npm install @nestjs/graphql @nestjs/apollo @apollo/server
npm install graphql graphql-subscriptions graphql-ws
npm install graphql-redis-subscriptions ioredis

GraphQL 모듈 설정

Subscription을 활성화하려면 WebSocket 설정이 필요합니다.

// app.module.ts
@Module({
  imports: [
    GraphQLModule.forRoot<ApolloDriverConfig>({
      driver: ApolloDriver,
      autoSchemaFile: join(process.cwd(), 'src/schema.gql'),
      sortSchema: true,

      // Subscription 활성화
      subscriptions: {
        'graphql-ws': {
          path: '/graphql',
          onConnect: (context) => {
            const { connectionParams } = context;
            // 인증 토큰 검증
            const token = connectionParams?.Authorization as string;
            if (!token) {
              throw new Error('인증 토큰이 필요합니다');
            }
            return { token };
          },
          onDisconnect: (context) => {
            console.log('클라이언트 연결 해제');
          },
        },
      },

      context: ({ req, connection }) => {
        // HTTP 요청과 WebSocket 연결 모두 처리
        if (connection) {
          return { ...connection.context };
        }
        return { req };
      },
    }),
    ChatModule,
    NotificationModule,
  ],
})
export class AppModule {}

PubSub 엔진: 메모리 vs Redis

단일 인스턴스에서는 인메모리 PubSub으로 충분하지만, 다중 인스턴스 환경에서는 Redis PubSub이 필수입니다.

// pubsub.module.ts
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';

const PUBSUB_TOKEN = 'PUB_SUB';

@Module({
  providers: [
    {
      provide: PUBSUB_TOKEN,
      useFactory: () => {
        if (process.env.NODE_ENV === 'production') {
          // 프로덕션: Redis PubSub (다중 인스턴스 지원)
          return new RedisPubSub({
            publisher: new Redis({
              host: process.env.REDIS_HOST,
              port: 6379,
              retryStrategy: (times) => Math.min(times * 50, 2000),
            }),
            subscriber: new Redis({
              host: process.env.REDIS_HOST,
              port: 6379,
              retryStrategy: (times) => Math.min(times * 50, 2000),
            }),
          });
        }
        // 개발: 인메모리 PubSub
        return new PubSub();
      },
    },
  ],
  exports: [PUBSUB_TOKEN],
})
export class PubSubModule {}

채팅 Subscription 구현

실시간 채팅을 Subscription으로 구현합니다. @Subscription() 데코레이터와 필터링을 결합합니다.

// chat.resolver.ts
@Resolver(() => ChatMessage)
export class ChatResolver {
  constructor(
    @Inject('PUB_SUB') private readonly pubSub: RedisPubSub,
    private readonly chatService: ChatService,
  ) {}

  // 메시지 전송 (Mutation)
  @Mutation(() => ChatMessage)
  async sendMessage(
    @Args('input') input: SendMessageInput,
    @CurrentUser() user: User,
  ): Promise<ChatMessage> {
    const message = await this.chatService.create({
      roomId: input.roomId,
      content: input.content,
      senderId: user.id,
      senderName: user.name,
    });

    // 구독자에게 발행
    await this.pubSub.publish(`chat.${input.roomId}`, {
      messageAdded: message,
    });

    return message;
  }

  // 실시간 메시지 수신 (Subscription)
  @Subscription(() => ChatMessage, {
    // 특정 채팅방만 필터링
    filter: (payload, variables) => {
      return payload.messageAdded.roomId === variables.roomId;
    },
    // 반환 데이터 변환
    resolve: (payload) => payload.messageAdded,
  })
  messageAdded(
    @Args('roomId') roomId: string,
  ) {
    return this.pubSub.asyncIterator(`chat.${roomId}`);
  }
}

알림 Subscription: 사용자별 필터링

// notification.resolver.ts
@Resolver(() => Notification)
export class NotificationResolver {
  constructor(
    @Inject('PUB_SUB') private readonly pubSub: RedisPubSub,
    private readonly notificationService: NotificationService,
  ) {}

  @Mutation(() => Notification)
  async createNotification(
    @Args('input') input: CreateNotificationInput,
  ): Promise<Notification> {
    const notification = await this.notificationService.create(input);

    // 대상 사용자에게 발행
    await this.pubSub.publish(
      `notification.${input.targetUserId}`,
      { notificationReceived: notification },
    );

    return notification;
  }

  @Subscription(() => Notification, {
    filter: (payload, variables, context) => {
      // 인증된 사용자 본인의 알림만 수신
      return payload.notificationReceived.targetUserId
        === context.userId;
    },
    resolve: (payload) => payload.notificationReceived,
  })
  notificationReceived() {
    // 와일드카드 패턴으로 모든 사용자 알림 구독
    return this.pubSub.asyncIterator('notification.*');
  }
}

// ObjectType 정의
@ObjectType()
export class Notification {
  @Field(() => ID)
  id: string;

  @Field()
  title: string;

  @Field()
  body: string;

  @Field()
  targetUserId: string;

  @Field(() => NotificationType)
  type: NotificationType;

  @Field()
  createdAt: Date;

  @Field()
  isRead: boolean;
}

실시간 대시보드: 주기적 데이터 푸시

외부 이벤트가 아닌 주기적 데이터 폴링을 Subscription으로 래핑합니다.

@Resolver(() => DashboardStats)
export class DashboardResolver {
  constructor(
    @Inject('PUB_SUB') private readonly pubSub: RedisPubSub,
    private readonly statsService: StatsService,
  ) {}

  // 5초마다 대시보드 통계 발행
  @Interval(5000)
  async publishStats() {
    const stats = await this.statsService.getRealtimeStats();
    await this.pubSub.publish('dashboard.stats', {
      dashboardUpdated: stats,
    });
  }

  @Subscription(() => DashboardStats, {
    resolve: (payload) => payload.dashboardUpdated,
  })
  dashboardUpdated() {
    return this.pubSub.asyncIterator('dashboard.stats');
  }
}

WebSocket 인증 Guard

Subscription 연결 시 JWT 토큰을 검증하는 Guard를 구현합니다.

@Injectable()
export class WsAuthGuard implements CanActivate {
  constructor(private readonly jwtService: JwtService) {}

  canActivate(context: ExecutionContext): boolean {
    const gqlContext = GqlExecutionContext.create(context);
    const ctx = gqlContext.getContext();

    // WebSocket 연결의 connectionParams에서 토큰 추출
    const token = ctx.token || ctx.req?.headers?.authorization;

    if (!token) {
      throw new UnauthorizedException('인증 토큰 없음');
    }

    try {
      const cleanToken = token.replace('Bearer ', '');
      const payload = this.jwtService.verify(cleanToken);
      ctx.userId = payload.sub;
      ctx.userRole = payload.role;
      return true;
    } catch {
      throw new UnauthorizedException('유효하지 않은 토큰');
    }
  }
}

// Resolver에 적용
@Subscription(() => ChatMessage, { /* ... */ })
@UseGuards(WsAuthGuard)
messageAdded(@Args('roomId') roomId: string) {
  return this.pubSub.asyncIterator(`chat.${roomId}`);
}

연결 관리: 구독 수 제한

// connection-tracker.service.ts
@Injectable()
export class ConnectionTracker {
  private connections = new Map<string, Set<string>>();
  private readonly MAX_SUBSCRIPTIONS_PER_USER = 10;

  addSubscription(userId: string, subscriptionId: string): void {
    const userSubs = this.connections.get(userId) || new Set();

    if (userSubs.size >= this.MAX_SUBSCRIPTIONS_PER_USER) {
      throw new Error(
        `구독 제한 초과: 최대 ${this.MAX_SUBSCRIPTIONS_PER_USER}개`
      );
    }

    userSubs.add(subscriptionId);
    this.connections.set(userId, userSubs);
  }

  removeSubscription(userId: string, subscriptionId: string): void {
    const userSubs = this.connections.get(userId);
    if (userSubs) {
      userSubs.delete(subscriptionId);
      if (userSubs.size === 0) {
        this.connections.delete(userId);
      }
    }
  }

  getActiveCount(): number {
    let total = 0;
    this.connections.forEach(subs => total += subs.size);
    return total;
  }
}

클라이언트 사용 예시

// Apollo Client (React)
import { createClient } from 'graphql-ws';

const wsClient = createClient({
  url: 'ws://api.example.com/graphql',
  connectionParams: {
    Authorization: `Bearer ${getToken()}`,
  },
  retryAttempts: 5,
  shouldRetry: () => true,
});

// Subscription 구독
const subscription = wsClient.subscribe(
  {
    query: `
      subscription OnMessageAdded($roomId: String!) {
        messageAdded(roomId: $roomId) {
          id
          content
          senderName
          createdAt
        }
      }
    `,
    variables: { roomId: 'room-1' },
  },
  {
    next: (data) => console.log('새 메시지:', data),
    error: (err) => console.error('구독 에러:', err),
    complete: () => console.log('구독 종료'),
  },
);

테스트 전략

E2E 테스트에서 WebSocket 클라이언트로 Subscription을 검증합니다.

describe('Chat Subscription (e2e)', () => {
  let app: INestApplication;

  beforeAll(async () => {
    const module = await Test.createTestingModule({
      imports: [AppModule],
    }).compile();

    app = module.createNestApplication();
    await app.listen(0);
  });

  it('receives messages via subscription', (done) => {
    const port = app.getHttpServer().address().port;
    const client = createClient({
      url: `ws://localhost:${port}/graphql`,
      webSocketImpl: WebSocket,
      connectionParams: { Authorization: 'Bearer test-token' },
    });

    const messages: any[] = [];

    client.subscribe(
      {
        query: `subscription { messageAdded(roomId: "room-1") {
          id content senderName
        }}`,
      },
      {
        next: (data) => {
          messages.push(data.data.messageAdded);
          if (messages.length === 2) {
            expect(messages[0].content).toBe('Hello');
            expect(messages[1].content).toBe('World');
            done();
          }
        },
        error: done,
        complete: () => {},
      },
    );

    // Mutation으로 메시지 전송
    setTimeout(async () => {
      await request(app.getHttpServer())
        .post('/graphql')
        .send({
          query: `mutation { sendMessage(input: {
            roomId: "room-1", content: "Hello"
          }) { id } }`,
        });
      await request(app.getHttpServer())
        .post('/graphql')
        .send({
          query: `mutation { sendMessage(input: {
            roomId: "room-1", content: "World"
          }) { id } }`,
        });
    }, 500);
  });
});

운영 체크리스트

항목 확인 사항
PubSub 엔진 다중 인스턴스 → Redis PubSub 필수
WebSocket 인증 connectionParams로 JWT 검증
구독 수 제한 사용자별 max subscription 설정
filter 함수 불필요한 이벤트 클라이언트 전송 방지
재연결 전략 클라이언트 retryAttempts 설정
로드밸런서 sticky session 또는 WebSocket 지원 확인

마치며

NestJS GraphQL Subscription은 WebSocket 기반 실시간 데이터 푸시를 타입 안전하게 구현하는 강력한 패턴입니다. Code-First @Subscription() 데코레이터로 선언적 구현하고, filter로 정밀한 이벤트 필터링을 적용하며, Redis PubSub으로 다중 인스턴스 확장성을 확보합니다. 채팅, 알림, 대시보드 등 실시간 기능에서 REST 폴링을 대체하는 효율적인 선택입니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux