왜 NATS인가?
NestJS 마이크로서비스 트랜스포트로 Redis, Kafka, RabbitMQ 등이 있지만, NATS는 독보적인 장점을 가집니다. Go로 작성된 초경량 메시지 브로커로, 단일 바이너리에 설정 파일 하나면 됩니다. 메모리 10MB 미만으로 동작하며, 100만+ msg/sec 처리가 가능합니다. 클러스터링, JetStream(영속성), Request-Reply 패턴을 네이티브로 지원하여 마이크로서비스 간 통신에 최적화되어 있습니다.
NestJS + NATS 기본 설정
// main.ts — NATS 마이크로서비스 부트스트랩
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
// HTTP 서버 (API Gateway 역할)
const app = await NestFactory.create(AppModule);
// NATS 마이크로서비스 연결
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.NATS,
options: {
servers: ['nats://nats-1:4222', 'nats://nats-2:4222'],
queue: 'order-service', // Queue Group (로드밸런싱)
maxReconnectAttempts: -1, // 무한 재연결
reconnectTimeWait: 2000,
token: process.env.NATS_TOKEN,
},
});
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
queue 옵션이 핵심입니다. 같은 Queue Group의 서비스 인스턴스 중 하나만 메시지를 수신하므로, 수평 확장 시 자동 로드밸런싱됩니다.
Request-Reply 패턴
동기식 요청-응답이 필요한 경우 @MessagePattern을 사용합니다.
// order.controller.ts — 메시지 핸들러 (서버 측)
@Controller()
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@MessagePattern('order.create')
async createOrder(data: CreateOrderDto): Promise<Order> {
return this.orderService.create(data);
}
@MessagePattern('order.findById')
async findOrder(data: { id: number }): Promise<Order> {
const order = await this.orderService.findById(data.id);
if (!order) {
throw new RpcException({
status: 404,
message: `Order ${data.id} not found`,
});
}
return order;
}
@MessagePattern('order.list')
async listOrders(data: { userId: number; page: number }) {
return this.orderService.findByUser(data.userId, data.page);
}
}
// api-gateway — 클라이언트 측
@Module({
imports: [
ClientsModule.register([
{
name: 'ORDER_SERVICE',
transport: Transport.NATS,
options: {
servers: ['nats://nats:4222'],
queue: 'order-service',
},
},
]),
],
})
export class GatewayModule {}
@Controller('orders')
export class OrderGatewayController {
constructor(
@Inject('ORDER_SERVICE')
private readonly orderClient: ClientProxy,
) {}
@Post()
createOrder(@Body() dto: CreateOrderDto) {
// send(): Request-Reply (Observable 반환)
return this.orderClient.send('order.create', dto);
}
@Get(':id')
findOrder(@Param('id', ParseIntPipe) id: number) {
return this.orderClient.send('order.findById', { id });
}
}
Event 패턴: Fire-and-Forget
응답이 필요 없는 이벤트 발행에는 @EventPattern과 emit()을 사용합니다.
// 이벤트 발행 (프로듀서)
@Injectable()
export class OrderService {
constructor(
@Inject('NATS_CLIENT')
private readonly client: ClientProxy,
) {}
async create(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepo.save(dto);
// 이벤트 발행 — 응답 대기 없음
this.client.emit('order.created', {
orderId: order.id,
userId: order.userId,
totalAmount: order.totalAmount,
createdAt: new Date().toISOString(),
});
return order;
}
}
// 이벤트 수신 (컨슈머) — 알림 서비스
@Controller()
export class NotificationHandler {
@EventPattern('order.created')
async handleOrderCreated(data: OrderCreatedEvent) {
await this.emailService.sendOrderConfirmation(
data.userId,
data.orderId,
);
console.log(`주문 확인 이메일 발송: ${data.orderId}`);
}
}
// 이벤트 수신 (컨슈머) — 재고 서비스
@Controller()
export class InventoryHandler {
@EventPattern('order.created')
async handleOrderCreated(data: OrderCreatedEvent) {
await this.inventoryService.decrementStock(data.orderId);
console.log(`재고 차감: ${data.orderId}`);
}
}
emit()은 Pub/Sub 방식이므로 모든 구독자가 메시지를 수신합니다. 단, 같은 Queue Group 내에서는 하나의 인스턴스만 처리합니다.
JetStream: 메시지 영속성
기본 NATS는 at-most-once 전달입니다. 메시지 손실이 허용되지 않으면 JetStream을 사용합니다.
// JetStream 커스텀 트랜스포트 전략
import { connect, JetStreamClient, StringCodec } from 'nats';
@Injectable()
export class JetStreamService implements OnModuleInit {
private js: JetStreamClient;
private sc = StringCodec();
async onModuleInit() {
const nc = await connect({
servers: 'nats://nats:4222',
});
const jsm = await nc.jetstreamManager();
// 스트림 생성 (없으면)
try {
await jsm.streams.add({
name: 'ORDERS',
subjects: ['orders.>'], // orders.* 모든 서브젝트
retention: 'limits', // limits | interest | workqueue
max_msgs: 1000000,
max_age: 7 * 24 * 3600 * 1e9, // 7일 보관 (나노초)
storage: 'file',
num_replicas: 3, // 클러스터 내 3중 복제
});
} catch (e) {
// 이미 존재하면 무시
}
this.js = nc.jetstream();
}
// 메시지 발행 (영속)
async publish(subject: string, data: any): Promise<void> {
await this.js.publish(
subject,
this.sc.encode(JSON.stringify(data)),
);
}
// 컨슈머 생성 및 메시지 소비
async subscribe(
stream: string,
consumer: string,
handler: (data: any) => Promise<void>,
) {
const sub = await this.js.pullSubscribe(`orders.>`, {
durable: consumer,
ack_policy: 'explicit', // 수동 ACK
max_deliver: 5, // 최대 5번 재전달
ack_wait: 30 * 1e9, // 30초 내 ACK
});
const pull = async () => {
for await (const msg of sub) {
try {
const data = JSON.parse(this.sc.decode(msg.data));
await handler(data);
msg.ack(); // 성공 시 ACK
} catch (e) {
msg.nak(); // 실패 시 NAK → 재전달
}
}
};
pull();
}
}
에러 처리와 타임아웃
// RPC 예외 필터
@Catch(RpcException)
export class NatsExceptionFilter implements RpcExceptionFilter {
catch(exception: RpcException, host: ArgumentsHost) {
const error = exception.getError() as any;
return throwError(() => ({
status: error.status || 500,
message: error.message || 'Internal error',
timestamp: new Date().toISOString(),
}));
}
}
// 타임아웃 설정
@Controller('orders')
export class OrderGatewayController {
@Get(':id')
findOrder(@Param('id') id: number) {
return this.orderClient
.send('order.findById', { id })
.pipe(
timeout(5000), // 5초 타임아웃
catchError(err => {
if (err instanceof TimeoutError) {
throw new GatewayTimeoutException(
'Order service did not respond'
);
}
throw new InternalServerErrorException(err.message);
}),
);
}
}
운영 팁
- Queue Group: 서비스명을 Queue Group으로 사용하면 자동 로드밸런싱
- Subject 네이밍:
서비스.리소스.액션패턴 (예:order.payment.completed) - 헬스체크:
nats-server --signal health로 브로커 상태 확인 - 모니터링: NATS 기본 제공 HTTP 모니터링 포트 8222 (
/connz,/subsz) - 클러스터: 최소 3노드 클러스터로 고가용성 확보
관련 글
- NestJS Microservices Transport — TCP, Redis, NATS 등 트랜스포트 레이어 비교
- NestJS gRPC 마이크로서비스 — NATS 대비 gRPC의 장단점과 사용 시나리오