NestJS Kafka 이벤트 스트리밍

NestJS + Kafka란?

Apache Kafka는 대규모 이벤트 스트리밍의 사실상 표준 플랫폼입니다. NestJS는 @nestjs/microservices 패키지를 통해 Kafka를 네이티브 Transport로 지원하며, Producer·Consumer·Consumer Group을 데코레이터 기반으로 선언적 구현할 수 있습니다. 이 글에서는 NestJS에서 Kafka를 활용한 이벤트 기반 마이크로서비스 아키텍처를 코드 레벨로 심화 분석합니다.

프로젝트 설정

// 패키지 설치
npm install @nestjs/microservices kafkajs

// package.json 주요 의존성
{
  "@nestjs/microservices": "^10.x",
  "kafkajs": "^2.2.4"
}

Kafka Transport 연결 설정

Hybrid Application으로 HTTP와 Kafka를 동시에 사용하는 것이 일반적입니다.

// main.ts — Hybrid Application
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  // Kafka Microservice 연결
  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: 'order-service',
        brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
        ssl: true,
        sasl: {
          mechanism: 'plain',
          username: process.env.KAFKA_USER,
          password: process.env.KAFKA_PASSWORD,
        },
        retry: {
          initialRetryTime: 300,
          retries: 10,
        },
      },
      consumer: {
        groupId: 'order-service-group',
        sessionTimeout: 30000,
        heartbeatInterval: 3000,
        maxBytesPerPartition: 1048576, // 1MB
        retry: {
          retries: 5,
        },
      },
      producer: {
        allowAutoTopicCreation: false,
        idempotent: true,        // 멱등성 보장
        maxInFlightRequests: 5,
      },
    },
  });

  await app.startAllMicroservices();
  await app.listen(3000);
}
bootstrap();

Producer: 이벤트 발행

ClientKafka를 주입받아 토픽에 메시지를 발행합니다.

// kafka-client.module.ts
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'order-producer',
            brokers: ['kafka-1:9092', 'kafka-2:9092'],
          },
          producer: {
            idempotent: true,
          },
        },
      },
    ]),
  ],
  exports: [ClientsModule],
})
export class KafkaClientModule {}

// order.service.ts
@Injectable()
export class OrderService implements OnModuleInit {
  constructor(
    @Inject('KAFKA_SERVICE')
    private readonly kafkaClient: ClientKafka,
    private readonly orderRepo: OrderRepository,
  ) {}

  async onModuleInit() {
    // 응답 토픽 구독 (Request-Reply 패턴 사용 시)
    this.kafkaClient.subscribeToResponseOf('order.process');
    await this.kafkaClient.connect();
  }

  async createOrder(dto: CreateOrderDto): Promise<Order> {
    const order = await this.orderRepo.save(
      this.orderRepo.create(dto),
    );

    // 이벤트 발행 (Fire-and-Forget)
    this.kafkaClient.emit('order.created', {
      key: order.id.toString(),       // 파티션 키
      value: {
        orderId: order.id,
        userId: order.userId,
        items: order.items,
        totalAmount: order.totalAmount,
        createdAt: order.createdAt.toISOString(),
      },
      headers: {
        'correlation-id': randomUUID(),
        'source': 'order-service',
        'event-type': 'OrderCreated',
      },
    });

    return order;
  }

  // Request-Reply 패턴: 응답 대기
  async processOrder(orderId: string): Promise<ProcessResult> {
    return firstValueFrom(
      this.kafkaClient.send('order.process', {
        key: orderId,
        value: { orderId, action: 'process' },
      }),
    );
  }
}

Consumer: 이벤트 소비

@EventPattern@MessagePattern으로 토픽별 핸들러를 등록합니다.

// payment.controller.ts
@Controller()
export class PaymentController {
  constructor(private readonly paymentService: PaymentService) {}

  // Fire-and-Forget: emit()으로 발행된 이벤트
  @EventPattern('order.created')
  async handleOrderCreated(
    @Payload() data: OrderCreatedEvent,
    @Ctx() context: KafkaContext,
  ) {
    const topic = context.getTopic();
    const partition = context.getPartition();
    const offset = context.getMessage().offset;
    const headers = context.getMessage().headers;

    console.log(`[${topic}] partition=${partition} offset=${offset}`);
    console.log(`correlation-id: ${headers['correlation-id']}`);

    await this.paymentService.processPayment({
      orderId: data.orderId,
      amount: data.totalAmount,
      userId: data.userId,
    });
  }

  // Request-Reply: send()로 발행된 메시지 → 응답 반환
  @MessagePattern('order.process')
  async handleProcessOrder(
    @Payload() data: ProcessOrderMessage,
    @Ctx() context: KafkaContext,
  ): Promise<ProcessResult> {
    const result = await this.paymentService.execute(data.orderId);
    return {
      success: true,
      paymentId: result.id,
      processedAt: new Date().toISOString(),
    };
  }
}

직렬화 커스터마이징

기본 JSON 직렬화를 확장하여 스키마 검증, 압축 등을 적용합니다.

// custom-serializer.ts
import { Serializer } from '@nestjs/microservices';

export class KafkaEventSerializer implements Serializer {
  serialize(value: any) {
    return {
      ...value,
      value: {
        ...value.value,
        _metadata: {
          version: '1.0',
          timestamp: new Date().toISOString(),
          service: process.env.SERVICE_NAME,
        },
      },
    };
  }
}

// custom-deserializer.ts
import { Deserializer, IncomingEvent } from '@nestjs/microservices';

export class KafkaEventDeserializer implements Deserializer {
  deserialize(message: any, options?: Record<string, any>): IncomingEvent {
    const { key, value, headers, timestamp } = message;

    // JSON 파싱 + 헤더 디코딩
    const decodedHeaders = Object.entries(headers || {}).reduce(
      (acc, [k, v]) => ({
        ...acc,
        [k]: v?.toString(),
      }),
      {},
    );

    return {
      pattern: undefined, // NestJS가 토픽으로 매칭
      data: {
        key: key?.toString(),
        value: typeof value === 'string' ? JSON.parse(value) : value,
        headers: decodedHeaders,
        timestamp,
      },
    };
  }
}

// main.ts에서 적용
app.connectMicroservice<MicroserviceOptions>({
  transport: Transport.KAFKA,
  options: {
    // ... client/consumer 설정
    serializer: new KafkaEventSerializer(),
    deserializer: new KafkaEventDeserializer(),
  },
});

에러 처리와 DLQ

소비 실패 시 Dead Letter Queue(DLQ)로 라우팅하여 메시지 유실을 방지합니다.

// kafka-exception.filter.ts
@Catch()
export class KafkaExceptionFilter implements ExceptionFilter {
  constructor(
    @Inject('KAFKA_SERVICE')
    private readonly kafkaClient: ClientKafka,
  ) {}

  async catch(exception: any, host: ArgumentsHost) {
    const ctx = host.switchToRpc().getContext<KafkaContext>();
    const originalMessage = ctx.getMessage();
    const topic = ctx.getTopic();
    const retryCount = parseInt(
      originalMessage.headers?.['retry-count']?.toString() || '0',
    );

    if (retryCount < 3) {
      // 재시도: 원래 토픽에 재발행
      this.kafkaClient.emit(topic, {
        key: originalMessage.key?.toString(),
        value: originalMessage.value,
        headers: {
          ...originalMessage.headers,
          'retry-count': (retryCount + 1).toString(),
          'last-error': exception.message,
          'retry-at': new Date().toISOString(),
        },
      });
    } else {
      // DLQ로 전송
      this.kafkaClient.emit(`${topic}.dlq`, {
        key: originalMessage.key?.toString(),
        value: {
          originalTopic: topic,
          originalMessage: originalMessage.value,
          error: exception.message,
          stack: exception.stack,
          failedAt: new Date().toISOString(),
          retryCount,
        },
      });
    }
  }
}

// Controller에서 적용
@Controller()
@UseFilters(KafkaExceptionFilter)
export class PaymentController {
  // ...
}

Consumer Group 관리

여러 인스턴스가 같은 groupId를 사용하면 Kafka가 파티션을 자동 분배합니다. 운영에서 고려할 설정을 정리합니다.

설정 권장값 설명
sessionTimeout 30초 컨슈머 장애 감지 시간
heartbeatInterval 3초 sessionTimeout의 1/10
maxBytesPerPartition 1MB 파티션별 최대 fetch 크기
인스턴스 수 ≤ 파티션 수 초과 시 유휴 컨슈머 발생

멱등성 보장 패턴

Kafka의 at-least-once 전달 특성상 중복 처리를 방지해야 합니다.

@Injectable()
export class IdempotentHandler {
  constructor(
    private readonly redis: RedisService,
  ) {}

  async processOnce<T>(
    eventId: string,
    ttlSeconds: number,
    handler: () => Promise<T>,
  ): Promise<T | null> {
    const key = `idempotent:${eventId}`;
    const acquired = await this.redis.set(key, '1', 'EX', ttlSeconds, 'NX');

    if (!acquired) {
      // 이미 처리됨 → 스킵
      return null;
    }

    try {
      return await handler();
    } catch (error) {
      // 실패 시 키 삭제 → 재처리 허용
      await this.redis.del(key);
      throw error;
    }
  }
}

// 사용
@EventPattern('order.created')
async handleOrderCreated(
  @Payload() data: OrderCreatedEvent,
  @Ctx() context: KafkaContext,
) {
  const eventId = context.getMessage().headers['correlation-id']?.toString();

  await this.idempotentHandler.processOnce(
    eventId,
    86400, // 24시간 TTL
    () => this.paymentService.processPayment(data),
  );
}

테스트 전략

JestDocker Compose로 통합 테스트를 구성합니다.

// kafka.e2e-spec.ts
describe('Kafka Integration', () => {
  let app: INestApplication;
  let kafkaClient: ClientKafka;

  beforeAll(async () => {
    const module = await Test.createTestingModule({
      imports: [AppModule],
    }).compile();

    app = module.createNestApplication();
    app.connectMicroservice<MicroserviceOptions>({
      transport: Transport.KAFKA,
      options: {
        client: { brokers: ['localhost:9092'] },
        consumer: { groupId: 'test-group-' + randomUUID() },
      },
    });

    await app.startAllMicroservices();
    await app.init();

    kafkaClient = app.get('KAFKA_SERVICE');
    await kafkaClient.connect();
  });

  it('order.created → payment processed', async () => {
    const orderId = randomUUID();

    kafkaClient.emit('order.created', {
      key: orderId,
      value: {
        orderId,
        userId: 'user-1',
        totalAmount: 50000,
      },
      headers: { 'correlation-id': randomUUID() },
    });

    // 비동기 처리 대기
    await waitForCondition(
      () => paymentRepo.findByOrderId(orderId),
      { timeout: 10000, interval: 500 },
    );

    const payment = await paymentRepo.findByOrderId(orderId);
    expect(payment).toBeDefined();
    expect(payment.amount).toBe(50000);
  });

  afterAll(async () => {
    await kafkaClient.close();
    await app.close();
  });
});

운영 체크리스트

항목 확인 사항
Producer 멱등성 idempotent: true 설정
DLQ 모니터링 .dlq 토픽 메시지 알림 설정
파티션 키 순서 보장 필요 시 동일 키 사용
Consumer 수 ≤ 파티션 수 유휴 인스턴스 방지
스키마 버전 관리 헤더에 version 포함
Consumer Lag 모니터링 Lag 급증 시 스케일아웃

마치며

NestJS + Kafka 조합은 이벤트 기반 마이크로서비스의 강력한 기반입니다. @EventPattern@MessagePattern으로 소비자를 선언적으로 구성하고, 커스텀 Serializer로 스키마를 관리하며, DLQ와 멱등성 패턴으로 메시지 유실 없는 안정적 이벤트 처리를 구현할 수 있습니다. Hybrid Application 구성으로 기존 HTTP API와 Kafka를 자연스럽게 공존시키는 것이 실무에서 가장 일반적인 패턴입니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux