NestJS gRPC 마이크로서비스

NestJS gRPC 마이크로서비스란?

NestJS는 gRPC(Google Remote Procedure Call)를 네이티브 Transport로 지원하여, HTTP REST 대비 바이너리 직렬화(Protocol Buffers), 양방향 스트리밍, 강력한 타입 계약을 활용한 고성능 마이크로서비스 통신을 구현할 수 있습니다.

이 글에서는 Proto 정의와 코드 생성, 서버/클라이언트 설정, 스트리밍 패턴(Server/Client/Bidirectional), 인터셉터와 메타데이터, 헬스체크와 로드밸런싱까지 실무 운영 패턴을 다룹니다.

Proto 파일 정의와 코드 생성

// proto/order.proto
syntax = "proto3";

package order;

service OrderService {
  // Unary RPC
  rpc CreateOrder(CreateOrderRequest) returns (OrderResponse);
  rpc GetOrder(GetOrderRequest) returns (OrderResponse);
  rpc ListOrders(ListOrdersRequest) returns (ListOrdersResponse);

  // Server Streaming: 주문 상태 실시간 추적
  rpc TrackOrder(TrackOrderRequest) returns (stream OrderStatusUpdate);

  // Client Streaming: 대량 주문 일괄 생성
  rpc BulkCreateOrders(stream CreateOrderRequest) returns (BulkCreateResponse);

  // Bidirectional Streaming: 실시간 주문 채팅
  rpc OrderChat(stream ChatMessage) returns (stream ChatMessage);
}

message CreateOrderRequest {
  string customer_id = 1;
  repeated OrderItem items = 2;
  Address shipping_address = 3;
}

message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double unit_price = 3;
}

message Address {
  string street = 1;
  string city = 2;
  string zip_code = 3;
  string country = 4;
}

message OrderResponse {
  string id = 1;
  string customer_id = 2;
  OrderStatus status = 3;
  double total_amount = 4;
  google.protobuf.Timestamp created_at = 5;
}

enum OrderStatus {
  PENDING = 0;
  CONFIRMED = 1;
  SHIPPING = 2;
  DELIVERED = 3;
  CANCELLED = 4;
}

message OrderStatusUpdate {
  string order_id = 1;
  OrderStatus status = 2;
  string message = 3;
  google.protobuf.Timestamp timestamp = 4;
}

ts-proto로 타입 생성

// package.json scripts
{
  "proto:generate": "protoc --plugin=./node_modules/.bin/protoc-gen-ts_proto --ts_proto_out=./src/generated --ts_proto_opt=nestJs=true,addGrpcMetadata=true,outputEncodeMethods=false,outputJsonMethods=false,outputClientImpl=false proto/*.proto"
}

// 생성된 타입 인터페이스 (src/generated/order.ts)
export interface OrderServiceController {
  createOrder(
    request: CreateOrderRequest,
    metadata?: Metadata,
  ): Promise<OrderResponse> | Observable<OrderResponse>;

  getOrder(
    request: GetOrderRequest,
    metadata?: Metadata,
  ): Promise<OrderResponse> | Observable<OrderResponse>;

  trackOrder(
    request: TrackOrderRequest,
    metadata?: Metadata,
  ): Observable<OrderStatusUpdate>;

  bulkCreateOrders(
    request: Observable<CreateOrderRequest>,
    metadata?: Metadata,
  ): Promise<BulkCreateResponse> | Observable<BulkCreateResponse>;
}

gRPC 서버 설정

// main.ts
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { join } from 'path';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  // gRPC 마이크로서비스 연결
  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.GRPC,
    options: {
      package: 'order',
      protoPath: join(__dirname, '../proto/order.proto'),
      url: '0.0.0.0:5000',
      // 최대 메시지 크기 (기본 4MB)
      maxReceiveMessageLength: 10 * 1024 * 1024,
      maxSendMessageLength: 10 * 1024 * 1024,
      // 채널 옵션
      channelOptions: {
        'grpc.keepalive_time_ms': 30000,
        'grpc.keepalive_timeout_ms': 5000,
        'grpc.keepalive_permit_without_calls': 1,
        'grpc.max_reconnect_backoff_ms': 5000,
      },
      // TLS 설정
      credentials: ServerCredentials.createSsl(
        fs.readFileSync('ca.pem'),
        [{
          cert_chain: fs.readFileSync('server-cert.pem'),
          private_key: fs.readFileSync('server-key.pem'),
        }],
        true, // 클라이언트 인증서 검증
      ),
    },
  });

  // HTTP + gRPC 동시 실행 (하이브리드)
  await app.startAllMicroservices();
  await app.listen(3000);
}
bootstrap();

컨트롤러: Unary + 스트리밍 구현

import { Controller } from '@nestjs/common';
import { GrpcMethod, GrpcStreamMethod, GrpcStreamCall } from '@nestjs/microservices';
import { Metadata, ServerUnaryCall, ServerWritableStream } from '@grpc/grpc-js';
import { Observable, Subject, from, interval } from 'rxjs';
import { map, takeUntil } from 'rxjs/operators';

@Controller()
export class OrderController {
  constructor(private readonly orderService: OrderService) {}

  // Unary RPC
  @GrpcMethod('OrderService', 'CreateOrder')
  async createOrder(
    data: CreateOrderRequest,
    metadata: Metadata,
    call: ServerUnaryCall<any, any>,
  ): Promise<OrderResponse> {
    // 메타데이터에서 인증 토큰 추출
    const token = metadata.get('authorization')[0] as string;
    const userId = await this.authService.verify(token);

    const order = await this.orderService.create({
      ...data,
      customerId: userId,
    });

    return {
      id: order.id,
      customerId: order.customerId,
      status: OrderStatus.PENDING,
      totalAmount: order.totalAmount,
      createdAt: Timestamp.fromDate(order.createdAt),
    };
  }

  // Server Streaming: 주문 상태 실시간 추적
  @GrpcMethod('OrderService', 'TrackOrder')
  trackOrder(
    data: TrackOrderRequest,
    metadata: Metadata,
  ): Observable<OrderStatusUpdate> {
    return new Observable((subscriber) => {
      // EventEmitter로 주문 상태 변경 구독
      const handler = (update: OrderStatusUpdate) => {
        if (update.orderId === data.orderId) {
          subscriber.next(update);
          if (update.status === OrderStatus.DELIVERED ||
              update.status === OrderStatus.CANCELLED) {
            subscriber.complete();
          }
        }
      };

      this.eventEmitter.on('order.status.changed', handler);

      // 구독 해제 (클라이언트 연결 종료 시)
      return () => {
        this.eventEmitter.off('order.status.changed', handler);
      };
    });
  }

  // Client Streaming: 대량 주문 일괄 생성
  @GrpcStreamMethod('OrderService', 'BulkCreateOrders')
  bulkCreateOrders(
    messages: Observable<CreateOrderRequest>,
  ): Observable<BulkCreateResponse> {
    const subject = new Subject<BulkCreateResponse>();
    const orders: OrderResponse[] = [];
    let errorCount = 0;

    messages.subscribe({
      next: async (request) => {
        try {
          const order = await this.orderService.create(request);
          orders.push(order);
        } catch {
          errorCount++;
        }
      },
      complete: () => {
        subject.next({
          totalCreated: orders.length,
          totalFailed: errorCount,
          orderIds: orders.map((o) => o.id),
        });
        subject.complete();
      },
    });

    return subject.asObservable();
  }
}

gRPC 클라이언트 설정

// order-client.module.ts
@Module({
  imports: [
    ClientsModule.registerAsync([
      {
        name: 'ORDER_PACKAGE',
        useFactory: (config: ConfigService) => ({
          transport: Transport.GRPC,
          options: {
            package: 'order',
            protoPath: join(__dirname, '../proto/order.proto'),
            url: config.get('ORDER_SERVICE_URL', 'localhost:5000'),
            channelOptions: {
              'grpc.service_config': JSON.stringify({
                loadBalancingConfig: [{ round_robin: {} }],
                methodConfig: [{
                  name: [{ service: 'order.OrderService' }],
                  timeout: { seconds: 10 },
                  retryPolicy: {
                    maxAttempts: 3,
                    initialBackoff: '0.1s',
                    maxBackoff: '1s',
                    backoffMultiplier: 2,
                    retryableStatusCodes: ['UNAVAILABLE', 'DEADLINE_EXCEEDED'],
                  },
                }],
              }),
            },
          },
        }),
        inject: [ConfigService],
      },
    ]),
  ],
  exports: [ClientsModule],
})
export class OrderClientModule {}

// 클라이언트 사용
@Injectable()
export class OrderGatewayService implements OnModuleInit {
  private orderService: OrderServiceClient;

  constructor(@Inject('ORDER_PACKAGE') private client: ClientGrpc) {}

  onModuleInit() {
    this.orderService = this.client.getService<OrderServiceClient>('OrderService');
  }

  async getOrder(id: string): Promise<OrderResponse> {
    return firstValueFrom(
      this.orderService.getOrder({ orderId: id }),
    );
  }

  // Server Streaming 소비
  trackOrder(orderId: string): Observable<OrderStatusUpdate> {
    return this.orderService.trackOrder({ orderId });
  }
}

인터셉터: 인증·로깅·에러 처리

// gRPC 서버 인터셉터 (NestJS Interceptor)
@Injectable()
export class GrpcLoggingInterceptor implements NestInterceptor {
  private readonly logger = new Logger('gRPC');

  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    const rpcContext = context.switchToRpc();
    const metadata = context.getArgByIndex(1) as Metadata;
    const handler = context.getHandler().name;
    const start = Date.now();

    // 요청 ID 전파
    const requestId = metadata.get('x-request-id')[0] as string
      || randomUUID();

    return next.handle().pipe(
      tap({
        next: () => {
          this.logger.log(
            `${handler} OK ${Date.now() - start}ms [${requestId}]`,
          );
        },
        error: (err) => {
          this.logger.error(
            `${handler} ERROR ${Date.now() - start}ms [${requestId}]: ${err.message}`,
          );
        },
      }),
    );
  }
}

// gRPC 에러 매핑 필터
@Catch()
export class GrpcExceptionFilter implements RpcExceptionFilter {
  catch(exception: any): Observable<never> {
    const grpcError = this.mapToGrpcError(exception);
    return throwError(() => grpcError);
  }

  private mapToGrpcError(exception: any): RpcException {
    if (exception instanceof NotFoundException) {
      return new RpcException({
        code: status.NOT_FOUND,
        message: exception.message,
      });
    }
    if (exception instanceof BadRequestException) {
      return new RpcException({
        code: status.INVALID_ARGUMENT,
        message: exception.message,
      });
    }
    if (exception instanceof UnauthorizedException) {
      return new RpcException({
        code: status.UNAUTHENTICATED,
        message: exception.message,
      });
    }
    return new RpcException({
      code: status.INTERNAL,
      message: 'Internal server error',
    });
  }
}

헬스체크와 리플렉션

NestJS Terminus 헬스체크에서 다뤘던 HTTP 헬스체크와 마찬가지로, gRPC도 표준 헬스체크 프로토콜을 지원합니다.

// gRPC Health Check 구현
import { GrpcOptions } from '@nestjs/microservices';

const grpcOptions: GrpcOptions = {
  transport: Transport.GRPC,
  options: {
    package: ['order', 'grpc.health.v1'],
    protoPath: [
      join(__dirname, '../proto/order.proto'),
      join(__dirname, '../proto/health.proto'),
    ],
    url: '0.0.0.0:5000',
  },
};

// K8s gRPC 헬스체크 (v1.24+)
// livenessProbe:
//   grpc:
//     port: 5000
//   initialDelaySeconds: 10

// gRPC 리플렉션 (grpcurl 디버깅용)
// npm install @grpc/reflection
import { ReflectionService } from '@grpc/reflection';

// main.ts에서 리플렉션 서비스 추가
const server = app.getMicroservice(0);
const reflectionService = new ReflectionService(packageDefinition);
reflectionService.addToServer(server.server);

운영 베스트 프랙티스

항목 권장 이유
Proto 버전 관리 별도 레포 + npm 패키지 서비스 간 계약 일관성
Deadline 설정 모든 RPC에 필수 연쇄 타임아웃 방지
Keepalive 30초 간격 로드밸런서 연결 유지
리트라이 UNAVAILABLE만 멱등성 보장되는 경우만
메시지 크기 4MB 이하 유지 대용량은 스트리밍 활용
하위 호환성 필드 추가만, 삭제/변경 금지 Proto3 호환성 규칙

마무리

NestJS gRPC 마이크로서비스는 Proto 기반 타입 계약, 세 가지 스트리밍 패턴, 인터셉터 체인, 서비스 리플렉션까지 활용하면 고성능 서비스 간 통신을 구현할 수 있습니다. NestJS Interceptor 실전 패턴과 결합하여 로깅, 인증, 에러 매핑을 체계화하는 것이 운영의 핵심입니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux