NestJS RabbitMQ 메시지 큐 심화

NestJS + RabbitMQ 통합

NestJS는 @nestjs/microservices에 내장된 RabbitMQ 트랜스포트와 커뮤니티 패키지 @golevelup/nestjs-rabbitmq 두 가지 방식으로 RabbitMQ를 지원한다. 내장 트랜스포트는 단순한 요청-응답/이벤트 패턴에 적합하고, @golevelup 패키지는 Exchange, Routing Key, Dead Letter Queue 같은 AMQP 고급 기능을 활용할 때 적합하다.

내장 트랜스포트: 기본 설정

// main.ts — Microservice 모드
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://user:password@rabbitmq:5672'],
      queue: 'orders_queue',
      queueOptions: {
        durable: true,       // 서버 재시작 시 큐 유지
      },
      prefetchCount: 10,     // 동시 처리 메시지 수 제한
      noAck: false,          // 수동 ACK 모드
    },
  },
);
await app.listen();

하이브리드 모드: HTTP + RabbitMQ 동시 운영

// HTTP + Microservice 동시 실행
const app = await NestFactory.create(AppModule);

app.connectMicroservice<MicroserviceOptions>({
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://rabbitmq:5672'],
    queue: 'orders_queue',
    queueOptions: { durable: true },
  },
});

await app.startAllMicroservices();
await app.listen(3000);  // HTTP도 동시에

메시지 패턴: Request-Response vs Event

// --- Consumer (수신측) ---
@Controller()
export class OrdersController {
  // Request-Response: 응답을 기다리는 패턴
  @MessagePattern('order.create')
  async createOrder(@Payload() data: CreateOrderDto, @Ctx() context: RmqContext) {
    const channel = context.getChannelRef();
    const message = context.getMessage();

    try {
      const order = await this.orderService.create(data);
      channel.ack(message);  // 수동 ACK
      return order;           // 응답 반환
    } catch (error) {
      channel.nack(message, false, false);  // reject, requeue=false
      throw error;
    }
  }

  // Event-Based: 응답 없이 발행만
  @EventPattern('order.shipped')
  async handleShipped(@Payload() data: OrderShippedEvent, @Ctx() context: RmqContext) {
    const channel = context.getChannelRef();
    const message = context.getMessage();

    await this.notificationService.sendShippingEmail(data);
    channel.ack(message);
  }
}

// --- Producer (발신측) ---
@Injectable()
export class OrderProducer {
  constructor(@Inject('ORDER_SERVICE') private client: ClientProxy) {}

  // Request-Response
  createOrder(dto: CreateOrderDto): Observable<Order> {
    return this.client.send('order.create', dto);
  }

  // Event (fire-and-forget)
  emitShipped(event: OrderShippedEvent) {
    this.client.emit('order.shipped', event);
  }
}

@golevelup/nestjs-rabbitmq: AMQP 심화

실무에서는 Exchange 라우팅, Dead Letter Queue, 메시지 TTL 등 AMQP 고급 기능이 필요하다. @golevelup/nestjs-rabbitmq는 이를 데코레이터 기반으로 깔끔하게 지원한다.

// module 설정
@Module({
  imports: [
    RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [
        { name: 'orders', type: 'topic' },        // Topic Exchange
        { name: 'orders.dlx', type: 'fanout' },   // Dead Letter Exchange
      ],
      uri: 'amqp://user:password@rabbitmq:5672',
      connectionInitOptions: { wait: true, timeout: 10000 },
      channels: {
        default: { prefetchCount: 10, default: true },
        bulk: { prefetchCount: 1 },  // 무거운 작업용 채널
      },
    }),
  ],
})
export class MessagingModule {}

Topic Exchange 라우팅

@Injectable()
export class OrderConsumer {
  // order.created, order.updated 등 order.* 패턴 매칭
  @RabbitSubscribe({
    exchange: 'orders',
    routingKey: 'order.created',
    queue: 'order-created-queue',
    queueOptions: {
      durable: true,
      deadLetterExchange: 'orders.dlx',      // 실패 시 DLX로
      deadLetterRoutingKey: 'order.failed',
      messageTtl: 60000,                      // 60초 TTL
    },
  })
  async handleOrderCreated(msg: OrderCreatedEvent) {
    await this.inventoryService.reserve(msg.items);
    await this.paymentService.charge(msg.userId, msg.total);
  }

  // 와일드카드 라우팅: order.* 전체 구독
  @RabbitSubscribe({
    exchange: 'orders',
    routingKey: 'order.*',
    queue: 'order-audit-queue',
  })
  async auditAllOrderEvents(msg: any, amqpMsg: ConsumeMessage) {
    await this.auditService.log({
      routingKey: amqpMsg.fields.routingKey,
      payload: msg,
      timestamp: new Date(),
    });
  }
}

메시지 발행

@Injectable()
export class OrderPublisher {
  constructor(private readonly amqpConnection: AmqpConnection) {}

  async publishOrderCreated(order: Order) {
    await this.amqpConnection.publish('orders', 'order.created', {
      orderId: order.id,
      userId: order.userId,
      items: order.items,
      total: order.total,
    }, {
      persistent: true,     // 메시지 디스크 저장
      messageId: randomUUID(),
      timestamp: Date.now(),
      headers: {
        'x-retry-count': 0,
        'x-source': 'order-service',
      },
    });
  }
}

Dead Letter Queue: 실패 메시지 처리

// DLQ Consumer: 실패한 메시지를 별도로 처리
@RabbitSubscribe({
  exchange: 'orders.dlx',
  routingKey: '#',           // 모든 라우팅 키
  queue: 'order-dead-letter-queue',
})
async handleDeadLetter(msg: any, amqpMsg: ConsumeMessage) {
  const retryCount = (amqpMsg.properties.headers?.['x-retry-count'] ?? 0) as number;

  if (retryCount < 3) {
    // 재시도: 원래 큐로 재발행 (지수 백오프)
    const delay = Math.pow(2, retryCount) * 1000;
    await sleep(delay);
    await this.amqpConnection.publish('orders', amqpMsg.fields.routingKey, msg, {
      headers: { ...amqpMsg.properties.headers, 'x-retry-count': retryCount + 1 },
    });
  } else {
    // 최대 재시도 초과 → 알림 + DB 저장
    await this.alertService.notify(`DLQ 최대 재시도 초과: ${amqpMsg.fields.routingKey}`);
    await this.failedMessageRepo.save({
      routingKey: amqpMsg.fields.routingKey,
      payload: JSON.stringify(msg),
      error: amqpMsg.properties.headers?.['x-death']?.[0]?.reason,
    });
  }
}

멱등성 보장: 중복 메시지 처리

네트워크 문제로 동일 메시지가 두 번 전달될 수 있다. 멱등성(Idempotency)을 보장해야 한다.

@Injectable()
export class IdempotentConsumer {
  constructor(private readonly redis: Redis) {}

  @RabbitSubscribe({ exchange: 'orders', routingKey: 'order.created', queue: 'order-process' })
  async handle(msg: OrderCreatedEvent, amqpMsg: ConsumeMessage) {
    const messageId = amqpMsg.properties.messageId;
    const key = `processed:${messageId}`;

    // Redis SET NX로 중복 체크
    const isNew = await this.redis.set(key, '1', 'EX', 86400, 'NX');
    if (!isNew) {
      return;  // 이미 처리된 메시지 → 스킵
    }

    await this.orderService.process(msg);
  }
}

헬스체크와 모니터링

// Terminus 헬스체크에 RabbitMQ 상태 추가
@Injectable()
export class RabbitHealthIndicator extends HealthIndicator {
  constructor(private readonly amqpConnection: AmqpConnection) { super(); }

  async isHealthy(key: string): Promise<HealthIndicatorResult> {
    try {
      const channel = this.amqpConnection.channel;
      await channel.checkQueue('orders_queue');
      return this.getStatus(key, true);
    } catch {
      return this.getStatus(key, false);
    }
  }
}

NestJS Terminus 헬스체크와 통합하면 RabbitMQ 연결 상태를 /health 엔드포인트로 모니터링할 수 있다.

운영 팁

  • prefetchCount 튜닝 — 기본값 1은 안전하지만 느리다. CPU 바운드 작업은 코어 수, I/O 바운드 작업은 10~50으로 설정한다.
  • persistent 메시지 — 중요한 메시지는 persistent: true로 디스크에 저장한다. 성능은 떨어지지만 RabbitMQ 재시작 시 유실을 방지한다.
  • Exchange 타입 선택 — Direct(1:1), Topic(패턴 라우팅), Fanout(브로드캐스트), Headers(헤더 매칭). 대부분 Topic Exchange면 충분하다.
  • Connection Recovery@golevelup은 자동 재연결을 지원하지만, Graceful Shutdown으로 처리 중인 메시지의 ACK를 보장해야 한다.
  • 메시지 직렬화 — JSON이 기본이지만, 대용량 바이너리는 Protocol Buffers를 고려한다.

정리

NestJS + RabbitMQ 조합은 비동기 마이크로서비스 통신의 표준 패턴이다. 단순한 이벤트 발행은 내장 트랜스포트로, Exchange 라우팅·DLQ·멱등성 같은 고급 요구사항은 @golevelup/nestjs-rabbitmq로 해결한다. 핵심은 세 가지다: DLQ로 실패 메시지를 격리하고, 멱등성으로 중복을 처리하고, prefetchCount로 부하를 제어한다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux