NestJS + Kafka란?
Apache Kafka는 대규모 이벤트 스트리밍의 사실상 표준 플랫폼입니다. NestJS는 @nestjs/microservices 패키지를 통해 Kafka를 네이티브 Transport로 지원하며, Producer·Consumer·Consumer Group을 데코레이터 기반으로 선언적 구현할 수 있습니다. 이 글에서는 NestJS에서 Kafka를 활용한 이벤트 기반 마이크로서비스 아키텍처를 코드 레벨로 심화 분석합니다.
프로젝트 설정
// 패키지 설치
npm install @nestjs/microservices kafkajs
// package.json 주요 의존성
{
"@nestjs/microservices": "^10.x",
"kafkajs": "^2.2.4"
}
Kafka Transport 연결 설정
Hybrid Application으로 HTTP와 Kafka를 동시에 사용하는 것이 일반적입니다.
// main.ts — Hybrid Application
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// Kafka Microservice 연결
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_USER,
password: process.env.KAFKA_PASSWORD,
},
retry: {
initialRetryTime: 300,
retries: 10,
},
},
consumer: {
groupId: 'order-service-group',
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB
retry: {
retries: 5,
},
},
producer: {
allowAutoTopicCreation: false,
idempotent: true, // 멱등성 보장
maxInFlightRequests: 5,
},
},
});
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
Producer: 이벤트 발행
ClientKafka를 주입받아 토픽에 메시지를 발행합니다.
// kafka-client.module.ts
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'order-producer',
brokers: ['kafka-1:9092', 'kafka-2:9092'],
},
producer: {
idempotent: true,
},
},
},
]),
],
exports: [ClientsModule],
})
export class KafkaClientModule {}
// order.service.ts
@Injectable()
export class OrderService implements OnModuleInit {
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
private readonly orderRepo: OrderRepository,
) {}
async onModuleInit() {
// 응답 토픽 구독 (Request-Reply 패턴 사용 시)
this.kafkaClient.subscribeToResponseOf('order.process');
await this.kafkaClient.connect();
}
async createOrder(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepo.save(
this.orderRepo.create(dto),
);
// 이벤트 발행 (Fire-and-Forget)
this.kafkaClient.emit('order.created', {
key: order.id.toString(), // 파티션 키
value: {
orderId: order.id,
userId: order.userId,
items: order.items,
totalAmount: order.totalAmount,
createdAt: order.createdAt.toISOString(),
},
headers: {
'correlation-id': randomUUID(),
'source': 'order-service',
'event-type': 'OrderCreated',
},
});
return order;
}
// Request-Reply 패턴: 응답 대기
async processOrder(orderId: string): Promise<ProcessResult> {
return firstValueFrom(
this.kafkaClient.send('order.process', {
key: orderId,
value: { orderId, action: 'process' },
}),
);
}
}
Consumer: 이벤트 소비
@EventPattern과 @MessagePattern으로 토픽별 핸들러를 등록합니다.
// payment.controller.ts
@Controller()
export class PaymentController {
constructor(private readonly paymentService: PaymentService) {}
// Fire-and-Forget: emit()으로 발행된 이벤트
@EventPattern('order.created')
async handleOrderCreated(
@Payload() data: OrderCreatedEvent,
@Ctx() context: KafkaContext,
) {
const topic = context.getTopic();
const partition = context.getPartition();
const offset = context.getMessage().offset;
const headers = context.getMessage().headers;
console.log(`[${topic}] partition=${partition} offset=${offset}`);
console.log(`correlation-id: ${headers['correlation-id']}`);
await this.paymentService.processPayment({
orderId: data.orderId,
amount: data.totalAmount,
userId: data.userId,
});
}
// Request-Reply: send()로 발행된 메시지 → 응답 반환
@MessagePattern('order.process')
async handleProcessOrder(
@Payload() data: ProcessOrderMessage,
@Ctx() context: KafkaContext,
): Promise<ProcessResult> {
const result = await this.paymentService.execute(data.orderId);
return {
success: true,
paymentId: result.id,
processedAt: new Date().toISOString(),
};
}
}
직렬화 커스터마이징
기본 JSON 직렬화를 확장하여 스키마 검증, 압축 등을 적용합니다.
// custom-serializer.ts
import { Serializer } from '@nestjs/microservices';
export class KafkaEventSerializer implements Serializer {
serialize(value: any) {
return {
...value,
value: {
...value.value,
_metadata: {
version: '1.0',
timestamp: new Date().toISOString(),
service: process.env.SERVICE_NAME,
},
},
};
}
}
// custom-deserializer.ts
import { Deserializer, IncomingEvent } from '@nestjs/microservices';
export class KafkaEventDeserializer implements Deserializer {
deserialize(message: any, options?: Record<string, any>): IncomingEvent {
const { key, value, headers, timestamp } = message;
// JSON 파싱 + 헤더 디코딩
const decodedHeaders = Object.entries(headers || {}).reduce(
(acc, [k, v]) => ({
...acc,
[k]: v?.toString(),
}),
{},
);
return {
pattern: undefined, // NestJS가 토픽으로 매칭
data: {
key: key?.toString(),
value: typeof value === 'string' ? JSON.parse(value) : value,
headers: decodedHeaders,
timestamp,
},
};
}
}
// main.ts에서 적용
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
// ... client/consumer 설정
serializer: new KafkaEventSerializer(),
deserializer: new KafkaEventDeserializer(),
},
});
에러 처리와 DLQ
소비 실패 시 Dead Letter Queue(DLQ)로 라우팅하여 메시지 유실을 방지합니다.
// kafka-exception.filter.ts
@Catch()
export class KafkaExceptionFilter implements ExceptionFilter {
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
) {}
async catch(exception: any, host: ArgumentsHost) {
const ctx = host.switchToRpc().getContext<KafkaContext>();
const originalMessage = ctx.getMessage();
const topic = ctx.getTopic();
const retryCount = parseInt(
originalMessage.headers?.['retry-count']?.toString() || '0',
);
if (retryCount < 3) {
// 재시도: 원래 토픽에 재발행
this.kafkaClient.emit(topic, {
key: originalMessage.key?.toString(),
value: originalMessage.value,
headers: {
...originalMessage.headers,
'retry-count': (retryCount + 1).toString(),
'last-error': exception.message,
'retry-at': new Date().toISOString(),
},
});
} else {
// DLQ로 전송
this.kafkaClient.emit(`${topic}.dlq`, {
key: originalMessage.key?.toString(),
value: {
originalTopic: topic,
originalMessage: originalMessage.value,
error: exception.message,
stack: exception.stack,
failedAt: new Date().toISOString(),
retryCount,
},
});
}
}
}
// Controller에서 적용
@Controller()
@UseFilters(KafkaExceptionFilter)
export class PaymentController {
// ...
}
Consumer Group 관리
여러 인스턴스가 같은 groupId를 사용하면 Kafka가 파티션을 자동 분배합니다. 운영에서 고려할 설정을 정리합니다.
| 설정 | 권장값 | 설명 |
|---|---|---|
| sessionTimeout | 30초 | 컨슈머 장애 감지 시간 |
| heartbeatInterval | 3초 | sessionTimeout의 1/10 |
| maxBytesPerPartition | 1MB | 파티션별 최대 fetch 크기 |
| 인스턴스 수 | ≤ 파티션 수 | 초과 시 유휴 컨슈머 발생 |
멱등성 보장 패턴
Kafka의 at-least-once 전달 특성상 중복 처리를 방지해야 합니다.
@Injectable()
export class IdempotentHandler {
constructor(
private readonly redis: RedisService,
) {}
async processOnce<T>(
eventId: string,
ttlSeconds: number,
handler: () => Promise<T>,
): Promise<T | null> {
const key = `idempotent:${eventId}`;
const acquired = await this.redis.set(key, '1', 'EX', ttlSeconds, 'NX');
if (!acquired) {
// 이미 처리됨 → 스킵
return null;
}
try {
return await handler();
} catch (error) {
// 실패 시 키 삭제 → 재처리 허용
await this.redis.del(key);
throw error;
}
}
}
// 사용
@EventPattern('order.created')
async handleOrderCreated(
@Payload() data: OrderCreatedEvent,
@Ctx() context: KafkaContext,
) {
const eventId = context.getMessage().headers['correlation-id']?.toString();
await this.idempotentHandler.processOnce(
eventId,
86400, // 24시간 TTL
() => this.paymentService.processPayment(data),
);
}
테스트 전략
Jest와 Docker Compose로 통합 테스트를 구성합니다.
// kafka.e2e-spec.ts
describe('Kafka Integration', () => {
let app: INestApplication;
let kafkaClient: ClientKafka;
beforeAll(async () => {
const module = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = module.createNestApplication();
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: { brokers: ['localhost:9092'] },
consumer: { groupId: 'test-group-' + randomUUID() },
},
});
await app.startAllMicroservices();
await app.init();
kafkaClient = app.get('KAFKA_SERVICE');
await kafkaClient.connect();
});
it('order.created → payment processed', async () => {
const orderId = randomUUID();
kafkaClient.emit('order.created', {
key: orderId,
value: {
orderId,
userId: 'user-1',
totalAmount: 50000,
},
headers: { 'correlation-id': randomUUID() },
});
// 비동기 처리 대기
await waitForCondition(
() => paymentRepo.findByOrderId(orderId),
{ timeout: 10000, interval: 500 },
);
const payment = await paymentRepo.findByOrderId(orderId);
expect(payment).toBeDefined();
expect(payment.amount).toBe(50000);
});
afterAll(async () => {
await kafkaClient.close();
await app.close();
});
});
운영 체크리스트
| 항목 | 확인 사항 |
|---|---|
| Producer 멱등성 | idempotent: true 설정 |
| DLQ 모니터링 | .dlq 토픽 메시지 알림 설정 |
| 파티션 키 | 순서 보장 필요 시 동일 키 사용 |
| Consumer 수 ≤ 파티션 수 | 유휴 인스턴스 방지 |
| 스키마 버전 관리 | 헤더에 version 포함 |
| Consumer Lag 모니터링 | Lag 급증 시 스케일아웃 |
마치며
NestJS + Kafka 조합은 이벤트 기반 마이크로서비스의 강력한 기반입니다. @EventPattern과 @MessagePattern으로 소비자를 선언적으로 구성하고, 커스텀 Serializer로 스키마를 관리하며, DLQ와 멱등성 패턴으로 메시지 유실 없는 안정적 이벤트 처리를 구현할 수 있습니다. Hybrid Application 구성으로 기존 HTTP API와 Kafka를 자연스럽게 공존시키는 것이 실무에서 가장 일반적인 패턴입니다.