NestJS + RabbitMQ 통합
NestJS는 @nestjs/microservices에 내장된 RabbitMQ 트랜스포트와 커뮤니티 패키지 @golevelup/nestjs-rabbitmq 두 가지 방식으로 RabbitMQ를 지원한다. 내장 트랜스포트는 단순한 요청-응답/이벤트 패턴에 적합하고, @golevelup 패키지는 Exchange, Routing Key, Dead Letter Queue 같은 AMQP 고급 기능을 활용할 때 적합하다.
내장 트랜스포트: 기본 설정
// main.ts — Microservice 모드
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
options: {
urls: ['amqp://user:password@rabbitmq:5672'],
queue: 'orders_queue',
queueOptions: {
durable: true, // 서버 재시작 시 큐 유지
},
prefetchCount: 10, // 동시 처리 메시지 수 제한
noAck: false, // 수동 ACK 모드
},
},
);
await app.listen();
하이브리드 모드: HTTP + RabbitMQ 동시 운영
// HTTP + Microservice 동시 실행
const app = await NestFactory.create(AppModule);
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: ['amqp://rabbitmq:5672'],
queue: 'orders_queue',
queueOptions: { durable: true },
},
});
await app.startAllMicroservices();
await app.listen(3000); // HTTP도 동시에
메시지 패턴: Request-Response vs Event
// --- Consumer (수신측) ---
@Controller()
export class OrdersController {
// Request-Response: 응답을 기다리는 패턴
@MessagePattern('order.create')
async createOrder(@Payload() data: CreateOrderDto, @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const message = context.getMessage();
try {
const order = await this.orderService.create(data);
channel.ack(message); // 수동 ACK
return order; // 응답 반환
} catch (error) {
channel.nack(message, false, false); // reject, requeue=false
throw error;
}
}
// Event-Based: 응답 없이 발행만
@EventPattern('order.shipped')
async handleShipped(@Payload() data: OrderShippedEvent, @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const message = context.getMessage();
await this.notificationService.sendShippingEmail(data);
channel.ack(message);
}
}
// --- Producer (발신측) ---
@Injectable()
export class OrderProducer {
constructor(@Inject('ORDER_SERVICE') private client: ClientProxy) {}
// Request-Response
createOrder(dto: CreateOrderDto): Observable<Order> {
return this.client.send('order.create', dto);
}
// Event (fire-and-forget)
emitShipped(event: OrderShippedEvent) {
this.client.emit('order.shipped', event);
}
}
@golevelup/nestjs-rabbitmq: AMQP 심화
실무에서는 Exchange 라우팅, Dead Letter Queue, 메시지 TTL 등 AMQP 고급 기능이 필요하다. @golevelup/nestjs-rabbitmq는 이를 데코레이터 기반으로 깔끔하게 지원한다.
// module 설정
@Module({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
{ name: 'orders', type: 'topic' }, // Topic Exchange
{ name: 'orders.dlx', type: 'fanout' }, // Dead Letter Exchange
],
uri: 'amqp://user:password@rabbitmq:5672',
connectionInitOptions: { wait: true, timeout: 10000 },
channels: {
default: { prefetchCount: 10, default: true },
bulk: { prefetchCount: 1 }, // 무거운 작업용 채널
},
}),
],
})
export class MessagingModule {}
Topic Exchange 라우팅
@Injectable()
export class OrderConsumer {
// order.created, order.updated 등 order.* 패턴 매칭
@RabbitSubscribe({
exchange: 'orders',
routingKey: 'order.created',
queue: 'order-created-queue',
queueOptions: {
durable: true,
deadLetterExchange: 'orders.dlx', // 실패 시 DLX로
deadLetterRoutingKey: 'order.failed',
messageTtl: 60000, // 60초 TTL
},
})
async handleOrderCreated(msg: OrderCreatedEvent) {
await this.inventoryService.reserve(msg.items);
await this.paymentService.charge(msg.userId, msg.total);
}
// 와일드카드 라우팅: order.* 전체 구독
@RabbitSubscribe({
exchange: 'orders',
routingKey: 'order.*',
queue: 'order-audit-queue',
})
async auditAllOrderEvents(msg: any, amqpMsg: ConsumeMessage) {
await this.auditService.log({
routingKey: amqpMsg.fields.routingKey,
payload: msg,
timestamp: new Date(),
});
}
}
메시지 발행
@Injectable()
export class OrderPublisher {
constructor(private readonly amqpConnection: AmqpConnection) {}
async publishOrderCreated(order: Order) {
await this.amqpConnection.publish('orders', 'order.created', {
orderId: order.id,
userId: order.userId,
items: order.items,
total: order.total,
}, {
persistent: true, // 메시지 디스크 저장
messageId: randomUUID(),
timestamp: Date.now(),
headers: {
'x-retry-count': 0,
'x-source': 'order-service',
},
});
}
}
Dead Letter Queue: 실패 메시지 처리
// DLQ Consumer: 실패한 메시지를 별도로 처리
@RabbitSubscribe({
exchange: 'orders.dlx',
routingKey: '#', // 모든 라우팅 키
queue: 'order-dead-letter-queue',
})
async handleDeadLetter(msg: any, amqpMsg: ConsumeMessage) {
const retryCount = (amqpMsg.properties.headers?.['x-retry-count'] ?? 0) as number;
if (retryCount < 3) {
// 재시도: 원래 큐로 재발행 (지수 백오프)
const delay = Math.pow(2, retryCount) * 1000;
await sleep(delay);
await this.amqpConnection.publish('orders', amqpMsg.fields.routingKey, msg, {
headers: { ...amqpMsg.properties.headers, 'x-retry-count': retryCount + 1 },
});
} else {
// 최대 재시도 초과 → 알림 + DB 저장
await this.alertService.notify(`DLQ 최대 재시도 초과: ${amqpMsg.fields.routingKey}`);
await this.failedMessageRepo.save({
routingKey: amqpMsg.fields.routingKey,
payload: JSON.stringify(msg),
error: amqpMsg.properties.headers?.['x-death']?.[0]?.reason,
});
}
}
멱등성 보장: 중복 메시지 처리
네트워크 문제로 동일 메시지가 두 번 전달될 수 있다. 멱등성(Idempotency)을 보장해야 한다.
@Injectable()
export class IdempotentConsumer {
constructor(private readonly redis: Redis) {}
@RabbitSubscribe({ exchange: 'orders', routingKey: 'order.created', queue: 'order-process' })
async handle(msg: OrderCreatedEvent, amqpMsg: ConsumeMessage) {
const messageId = amqpMsg.properties.messageId;
const key = `processed:${messageId}`;
// Redis SET NX로 중복 체크
const isNew = await this.redis.set(key, '1', 'EX', 86400, 'NX');
if (!isNew) {
return; // 이미 처리된 메시지 → 스킵
}
await this.orderService.process(msg);
}
}
헬스체크와 모니터링
// Terminus 헬스체크에 RabbitMQ 상태 추가
@Injectable()
export class RabbitHealthIndicator extends HealthIndicator {
constructor(private readonly amqpConnection: AmqpConnection) { super(); }
async isHealthy(key: string): Promise<HealthIndicatorResult> {
try {
const channel = this.amqpConnection.channel;
await channel.checkQueue('orders_queue');
return this.getStatus(key, true);
} catch {
return this.getStatus(key, false);
}
}
}
NestJS Terminus 헬스체크와 통합하면 RabbitMQ 연결 상태를 /health 엔드포인트로 모니터링할 수 있다.
운영 팁
- prefetchCount 튜닝 — 기본값 1은 안전하지만 느리다. CPU 바운드 작업은 코어 수, I/O 바운드 작업은 10~50으로 설정한다.
- persistent 메시지 — 중요한 메시지는
persistent: true로 디스크에 저장한다. 성능은 떨어지지만 RabbitMQ 재시작 시 유실을 방지한다. - Exchange 타입 선택 — Direct(1:1), Topic(패턴 라우팅), Fanout(브로드캐스트), Headers(헤더 매칭). 대부분 Topic Exchange면 충분하다.
- Connection Recovery —
@golevelup은 자동 재연결을 지원하지만, Graceful Shutdown으로 처리 중인 메시지의 ACK를 보장해야 한다. - 메시지 직렬화 — JSON이 기본이지만, 대용량 바이너리는 Protocol Buffers를 고려한다.
정리
NestJS + RabbitMQ 조합은 비동기 마이크로서비스 통신의 표준 패턴이다. 단순한 이벤트 발행은 내장 트랜스포트로, Exchange 라우팅·DLQ·멱등성 같은 고급 요구사항은 @golevelup/nestjs-rabbitmq로 해결한다. 핵심은 세 가지다: DLQ로 실패 메시지를 격리하고, 멱등성으로 중복을 처리하고, prefetchCount로 부하를 제어한다.