NestJS NATS 마이크로서비스 심화

왜 NATS인가?

NestJS 마이크로서비스 트랜스포트로 Redis, Kafka, RabbitMQ 등이 있지만, NATS는 독보적인 장점을 가집니다. Go로 작성된 초경량 메시지 브로커로, 단일 바이너리에 설정 파일 하나면 됩니다. 메모리 10MB 미만으로 동작하며, 100만+ msg/sec 처리가 가능합니다. 클러스터링, JetStream(영속성), Request-Reply 패턴을 네이티브로 지원하여 마이크로서비스 간 통신에 최적화되어 있습니다.

NestJS + NATS 기본 설정

// main.ts — NATS 마이크로서비스 부트스트랩
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  // HTTP 서버 (API Gateway 역할)
  const app = await NestFactory.create(AppModule);

  // NATS 마이크로서비스 연결
  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.NATS,
    options: {
      servers: ['nats://nats-1:4222', 'nats://nats-2:4222'],
      queue: 'order-service',  // Queue Group (로드밸런싱)
      maxReconnectAttempts: -1, // 무한 재연결
      reconnectTimeWait: 2000,
      token: process.env.NATS_TOKEN,
    },
  });

  await app.startAllMicroservices();
  await app.listen(3000);
}
bootstrap();

queue 옵션이 핵심입니다. 같은 Queue Group의 서비스 인스턴스 중 하나만 메시지를 수신하므로, 수평 확장 시 자동 로드밸런싱됩니다.

Request-Reply 패턴

동기식 요청-응답이 필요한 경우 @MessagePattern을 사용합니다.

// order.controller.ts — 메시지 핸들러 (서버 측)
@Controller()
export class OrderController {
  constructor(private readonly orderService: OrderService) {}

  @MessagePattern('order.create')
  async createOrder(data: CreateOrderDto): Promise<Order> {
    return this.orderService.create(data);
  }

  @MessagePattern('order.findById')
  async findOrder(data: { id: number }): Promise<Order> {
    const order = await this.orderService.findById(data.id);
    if (!order) {
      throw new RpcException({
        status: 404,
        message: `Order ${data.id} not found`,
      });
    }
    return order;
  }

  @MessagePattern('order.list')
  async listOrders(data: { userId: number; page: number }) {
    return this.orderService.findByUser(data.userId, data.page);
  }
}
// api-gateway — 클라이언트 측
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'ORDER_SERVICE',
        transport: Transport.NATS,
        options: {
          servers: ['nats://nats:4222'],
          queue: 'order-service',
        },
      },
    ]),
  ],
})
export class GatewayModule {}

@Controller('orders')
export class OrderGatewayController {
  constructor(
    @Inject('ORDER_SERVICE')
    private readonly orderClient: ClientProxy,
  ) {}

  @Post()
  createOrder(@Body() dto: CreateOrderDto) {
    // send(): Request-Reply (Observable 반환)
    return this.orderClient.send('order.create', dto);
  }

  @Get(':id')
  findOrder(@Param('id', ParseIntPipe) id: number) {
    return this.orderClient.send('order.findById', { id });
  }
}

Event 패턴: Fire-and-Forget

응답이 필요 없는 이벤트 발행에는 @EventPatternemit()을 사용합니다.

// 이벤트 발행 (프로듀서)
@Injectable()
export class OrderService {
  constructor(
    @Inject('NATS_CLIENT')
    private readonly client: ClientProxy,
  ) {}

  async create(dto: CreateOrderDto): Promise<Order> {
    const order = await this.orderRepo.save(dto);

    // 이벤트 발행 — 응답 대기 없음
    this.client.emit('order.created', {
      orderId: order.id,
      userId: order.userId,
      totalAmount: order.totalAmount,
      createdAt: new Date().toISOString(),
    });

    return order;
  }
}

// 이벤트 수신 (컨슈머) — 알림 서비스
@Controller()
export class NotificationHandler {
  @EventPattern('order.created')
  async handleOrderCreated(data: OrderCreatedEvent) {
    await this.emailService.sendOrderConfirmation(
      data.userId,
      data.orderId,
    );
    console.log(`주문 확인 이메일 발송: ${data.orderId}`);
  }
}

// 이벤트 수신 (컨슈머) — 재고 서비스
@Controller()
export class InventoryHandler {
  @EventPattern('order.created')
  async handleOrderCreated(data: OrderCreatedEvent) {
    await this.inventoryService.decrementStock(data.orderId);
    console.log(`재고 차감: ${data.orderId}`);
  }
}

emit()은 Pub/Sub 방식이므로 모든 구독자가 메시지를 수신합니다. 단, 같은 Queue Group 내에서는 하나의 인스턴스만 처리합니다.

JetStream: 메시지 영속성

기본 NATS는 at-most-once 전달입니다. 메시지 손실이 허용되지 않으면 JetStream을 사용합니다.

// JetStream 커스텀 트랜스포트 전략
import { connect, JetStreamClient, StringCodec } from 'nats';

@Injectable()
export class JetStreamService implements OnModuleInit {
  private js: JetStreamClient;
  private sc = StringCodec();

  async onModuleInit() {
    const nc = await connect({
      servers: 'nats://nats:4222',
    });
    const jsm = await nc.jetstreamManager();

    // 스트림 생성 (없으면)
    try {
      await jsm.streams.add({
        name: 'ORDERS',
        subjects: ['orders.>'],   // orders.* 모든 서브젝트
        retention: 'limits',       // limits | interest | workqueue
        max_msgs: 1000000,
        max_age: 7 * 24 * 3600 * 1e9, // 7일 보관 (나노초)
        storage: 'file',
        num_replicas: 3,           // 클러스터 내 3중 복제
      });
    } catch (e) {
      // 이미 존재하면 무시
    }

    this.js = nc.jetstream();
  }

  // 메시지 발행 (영속)
  async publish(subject: string, data: any): Promise<void> {
    await this.js.publish(
      subject,
      this.sc.encode(JSON.stringify(data)),
    );
  }

  // 컨슈머 생성 및 메시지 소비
  async subscribe(
    stream: string,
    consumer: string,
    handler: (data: any) => Promise<void>,
  ) {
    const sub = await this.js.pullSubscribe(`orders.>`, {
      durable: consumer,
      ack_policy: 'explicit',  // 수동 ACK
      max_deliver: 5,          // 최대 5번 재전달
      ack_wait: 30 * 1e9,     // 30초 내 ACK
    });

    const pull = async () => {
      for await (const msg of sub) {
        try {
          const data = JSON.parse(this.sc.decode(msg.data));
          await handler(data);
          msg.ack();  // 성공 시 ACK
        } catch (e) {
          msg.nak();  // 실패 시 NAK → 재전달
        }
      }
    };
    pull();
  }
}

에러 처리와 타임아웃

// RPC 예외 필터
@Catch(RpcException)
export class NatsExceptionFilter implements RpcExceptionFilter {
  catch(exception: RpcException, host: ArgumentsHost) {
    const error = exception.getError() as any;
    return throwError(() => ({
      status: error.status || 500,
      message: error.message || 'Internal error',
      timestamp: new Date().toISOString(),
    }));
  }
}

// 타임아웃 설정
@Controller('orders')
export class OrderGatewayController {
  @Get(':id')
  findOrder(@Param('id') id: number) {
    return this.orderClient
      .send('order.findById', { id })
      .pipe(
        timeout(5000),  // 5초 타임아웃
        catchError(err => {
          if (err instanceof TimeoutError) {
            throw new GatewayTimeoutException(
              'Order service did not respond'
            );
          }
          throw new InternalServerErrorException(err.message);
        }),
      );
  }
}

운영 팁

  • Queue Group: 서비스명을 Queue Group으로 사용하면 자동 로드밸런싱
  • Subject 네이밍: 서비스.리소스.액션 패턴 (예: order.payment.completed)
  • 헬스체크: nats-server --signal health로 브로커 상태 확인
  • 모니터링: NATS 기본 제공 HTTP 모니터링 포트 8222 (/connz, /subsz)
  • 클러스터: 최소 3노드 클러스터로 고가용성 확보

관련 글

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux