Spring Kafka란
Spring for Apache Kafka는 Spring 생태계에서 Kafka를 사용하기 위한 공식 프로젝트입니다. 저수준 Kafka Client를 추상화하여 @KafkaListener 어노테이션 기반의 선언적 컨슈머, KafkaTemplate을 통한 프로듀서, 그리고 에러 핸들링·재시도·DLT(Dead Letter Topic) 등 프로덕션 필수 기능을 제공합니다.
기본 설정
Spring Boot의 자동 설정으로 최소한의 프로퍼티만으로 시작할 수 있습니다:
# build.gradle.kts
dependencies {
implementation("org.springframework.kafka:spring-kafka")
}
# application.yml
spring:
kafka:
bootstrap-servers: kafka-1:9092,kafka-2:9092,kafka-3:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # 모든 replica 확인
retries: 3
properties:
enable.idempotence: true # 중복 전송 방지
max.in.flight.requests.per.connection: 5
consumer:
group-id: order-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
enable-auto-commit: false # 수동 커밋
properties:
spring.json.trusted.packages: "com.example.event"
listener:
ack-mode: manual # 수동 ACK
concurrency: 3 # 파티션별 컨슈머 스레드
프로듀서: KafkaTemplate
메시지 발행은 KafkaTemplate으로 처리합니다. 동기·비동기 방식 모두 지원합니다:
@Service
@RequiredArgsConstructor
public class OrderEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
// 비동기 발행 (기본)
public void publishOrderCreated(OrderCreatedEvent event) {
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(
"order.created", // 토픽
event.getOrderId(), // 키 (파티셔닝 기준)
event // 값
);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("메시지 발행 실패: topic={}, key={}",
"order.created", event.getOrderId(), ex);
} else {
RecordMetadata meta = result.getRecordMetadata();
log.info("메시지 발행: topic={}, partition={}, offset={}",
meta.topic(), meta.partition(), meta.offset());
}
});
}
// 헤더 포함 발행
public void publishWithHeaders(OrderEvent event) {
ProducerRecord<String, Object> record = new ProducerRecord<>(
"order.events", event.getOrderId(), event
);
record.headers()
.add("eventType", event.getType().getBytes())
.add("source", "order-service".getBytes())
.add("correlationId", UUID.randomUUID().toString().getBytes());
kafkaTemplate.send(record);
}
}
컨슈머: @KafkaListener
선언적 어노테이션으로 컨슈머를 정의합니다:
@Component
@RequiredArgsConstructor
public class OrderEventConsumer {
private final PaymentService paymentService;
@KafkaListener(
topics = "order.created",
groupId = "payment-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderCreated(
@Payload OrderCreatedEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) {
log.info("주문 이벤트 수신: orderId={}, topic={}, partition={}, offset={}",
event.getOrderId(), topic, partition, offset);
try {
paymentService.processPayment(event);
ack.acknowledge(); // 수동 커밋
} catch (Exception e) {
log.error("처리 실패: orderId={}", event.getOrderId(), e);
// ACK하지 않으면 재처리됨
throw e;
}
}
// 배치 컨슈머 — 대량 처리 시 성능 향상
@KafkaListener(
topics = "order.analytics",
groupId = "analytics-service",
batch = "true"
)
public void handleBatch(
List<ConsumerRecord<String, AnalyticsEvent>> records,
Acknowledgment ack) {
log.info("배치 수신: {} 건", records.size());
List<AnalyticsEvent> events = records.stream()
.map(ConsumerRecord::value)
.toList();
analyticsService.bulkInsert(events);
ack.acknowledge();
}
}
에러 핸들링과 재시도
Spring Kafka 3.x에서는 DefaultErrorHandler와 @RetryableTopic으로 정교한 에러 처리가 가능합니다:
@Configuration
public class KafkaErrorConfig {
@Bean
public DefaultErrorHandler errorHandler(
KafkaOperations<String, Object> kafkaOps) {
// DLT(Dead Letter Topic)로 실패 메시지 전달
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaOps,
(record, ex) -> new TopicPartition(
record.topic() + ".DLT", record.partition()
));
// 지수 백오프 재시도: 1초 → 2초 → 4초, 최대 3회
ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxElapsedTime(15000L);
DefaultErrorHandler handler =
new DefaultErrorHandler(recoverer, backOff);
// 특정 예외는 재시도하지 않음
handler.addNotRetryableExceptions(
DeserializationException.class,
IllegalArgumentException.class
);
return handler;
}
}
@RetryableTopic: 선언적 재시도
Spring Kafka 3.x의 @RetryableTopic은 재시도 토픽을 자동 생성합니다:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 10000),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltStrategy = DltStrategy.FAIL_ON_ERROR,
include = { TransientException.class }
)
@KafkaListener(topics = "payment.process", groupId = "payment-service")
public void processPayment(PaymentEvent event) {
// 실패 시 자동으로 payment.process-retry-0 → retry-1 → retry-2 → DLT
paymentGateway.charge(event);
}
@DltHandler
public void handleDlt(PaymentEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.error("DLT 도착: event={}, topic={}", event, topic);
alertService.notify("결제 처리 최종 실패: " + event.getPaymentId());
}
이 설정이 자동 생성하는 토픽 구조:
| 토픽 | 재시도 대기 | 설명 |
|---|---|---|
| payment.process | – | 원본 토픽 |
| payment.process-retry-0 | 1초 | 1차 재시도 |
| payment.process-retry-1 | 2초 | 2차 재시도 |
| payment.process-retry-2 | 4초 | 3차 재시도 |
| payment.process-dlt | – | 최종 실패 (Dead Letter) |
트랜잭셔널 프로듀서
DB 트랜잭션과 Kafka 메시지 발행의 원자성을 보장하려면 Transactional Outbox 패턴 또는 Kafka 트랜잭션을 사용합니다:
# application.yml
spring:
kafka:
producer:
transaction-id-prefix: order-tx- # 트랜잭션 활성화
@Service
@RequiredArgsConstructor
public class OrderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
@Transactional
public Order createOrder(CreateOrderRequest request) {
Order order = orderRepository.save(
Order.from(request)
);
// Kafka 트랜잭션 내에서 발행
kafkaTemplate.executeInTransaction(ops -> {
ops.send("order.created",
order.getId(), OrderCreatedEvent.from(order));
ops.send("order.analytics",
order.getId(), AnalyticsEvent.orderCreated(order));
return null;
});
return order;
}
}
컨슈머 동시성과 파티셔닝
Kafka의 병렬 처리는 파티션 수 = 최대 동시 컨슈머 수입니다. Spring Kafka의 concurrency 설정을 이해해야 합니다:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(6); // 6개 파티션 → 6개 스레드
// 배치 리스너 활성화
factory.setBatchListener(true);
// 수동 ACK
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 컨슈머별 인터셉터
factory.setRecordInterceptor((record, consumer) -> {
MDC.put("correlationId",
new String(record.headers()
.lastHeader("correlationId").value()));
return record;
});
return factory;
}
}
모니터링과 헬스체크
Spring Actuator와 Micrometer를 통해 Kafka 메트릭을 수집합니다:
# application.yml
management:
health:
kafka:
enabled: true # Kafka 헬스체크 활성화
endpoints:
web:
exposure:
include: health,metrics
# Micrometer로 수집되는 주요 메트릭
# spring.kafka.consumer.fetch.manager.records.consumed.total
# spring.kafka.consumer.fetch.manager.records.lag
# spring.kafka.producer.record.send.total
# spring.kafka.producer.record.error.total
// 커스텀 Consumer Lag 모니터링
@Component
@RequiredArgsConstructor
public class KafkaLagMonitor {
private final KafkaListenerEndpointRegistry registry;
@Scheduled(fixedRate = 30000)
public void checkLag() {
registry.getListenerContainers().forEach(container -> {
Map<String, Map<MetricName, ? extends Metric>> metrics =
container.metrics();
metrics.forEach((clientId, metricMap) -> {
metricMap.entrySet().stream()
.filter(e -> e.getKey().name().equals("records-lag-max"))
.forEach(e -> {
double lag = e.getValue().metricValue() instanceof Double d ? d : 0;
if (lag > 10000) {
log.warn("Consumer lag 경고: client={}, lag={}",
clientId, lag);
}
});
});
});
}
}
테스트: Testcontainers + EmbeddedKafka
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = { "order.created", "order.created.DLT" },
brokerProperties = { "listeners=PLAINTEXT://localhost:9092" }
)
class OrderEventTest {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private OrderEventConsumer consumer;
@Test
void shouldProcessOrderEvent() throws Exception {
var event = new OrderCreatedEvent("ORD-001", 50000L);
kafkaTemplate.send("order.created", event.getOrderId(), event).get();
// 컨슈머 처리 대기
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() ->
verify(paymentService).processPayment(
argThat(e -> e.getOrderId().equals("ORD-001"))
)
);
}
}
정리
Spring Kafka는 @KafkaListener의 선언적 컨슈머, @RetryableTopic의 자동 재시도 토픽 체인, 트랜잭셔널 프로듀서 등 프로덕션 레벨의 이벤트 드리븐 아키텍처를 구축하는 데 필요한 모든 도구를 제공합니다. 핵심은 수동 ACK + DLT + Consumer Lag 모니터링의 조합으로, 메시지 유실 없는 안정적인 비동기 처리 파이프라인을 완성하는 것입니다.