Spring Kafka 컨슈머·프로듀서 심화

Spring Kafka란

Spring for Apache Kafka는 Spring 생태계에서 Kafka를 사용하기 위한 공식 프로젝트입니다. 저수준 Kafka Client를 추상화하여 @KafkaListener 어노테이션 기반의 선언적 컨슈머, KafkaTemplate을 통한 프로듀서, 그리고 에러 핸들링·재시도·DLT(Dead Letter Topic) 등 프로덕션 필수 기능을 제공합니다.

기본 설정

Spring Boot의 자동 설정으로 최소한의 프로퍼티만으로 시작할 수 있습니다:

# build.gradle.kts
dependencies {
    implementation("org.springframework.kafka:spring-kafka")
}
# application.yml
spring:
  kafka:
    bootstrap-servers: kafka-1:9092,kafka-2:9092,kafka-3:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all                    # 모든 replica 확인
      retries: 3
      properties:
        enable.idempotence: true   # 중복 전송 방지
        max.in.flight.requests.per.connection: 5
    consumer:
      group-id: order-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false    # 수동 커밋
      properties:
        spring.json.trusted.packages: "com.example.event"
    listener:
      ack-mode: manual             # 수동 ACK
      concurrency: 3               # 파티션별 컨슈머 스레드

프로듀서: KafkaTemplate

메시지 발행은 KafkaTemplate으로 처리합니다. 동기·비동기 방식 모두 지원합니다:

@Service
@RequiredArgsConstructor
public class OrderEventPublisher {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    // 비동기 발행 (기본)
    public void publishOrderCreated(OrderCreatedEvent event) {
        CompletableFuture<SendResult<String, Object>> future =
            kafkaTemplate.send(
                "order.created",           // 토픽
                event.getOrderId(),        // 키 (파티셔닝 기준)
                event                      // 값
            );

        future.whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("메시지 발행 실패: topic={}, key={}",
                    "order.created", event.getOrderId(), ex);
            } else {
                RecordMetadata meta = result.getRecordMetadata();
                log.info("메시지 발행: topic={}, partition={}, offset={}",
                    meta.topic(), meta.partition(), meta.offset());
            }
        });
    }

    // 헤더 포함 발행
    public void publishWithHeaders(OrderEvent event) {
        ProducerRecord<String, Object> record = new ProducerRecord<>(
            "order.events", event.getOrderId(), event
        );
        record.headers()
            .add("eventType", event.getType().getBytes())
            .add("source", "order-service".getBytes())
            .add("correlationId", UUID.randomUUID().toString().getBytes());

        kafkaTemplate.send(record);
    }
}

컨슈머: @KafkaListener

선언적 어노테이션으로 컨슈머를 정의합니다:

@Component
@RequiredArgsConstructor
public class OrderEventConsumer {

    private final PaymentService paymentService;

    @KafkaListener(
        topics = "order.created",
        groupId = "payment-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderCreated(
            @Payload OrderCreatedEvent event,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment ack) {

        log.info("주문 이벤트 수신: orderId={}, topic={}, partition={}, offset={}",
            event.getOrderId(), topic, partition, offset);

        try {
            paymentService.processPayment(event);
            ack.acknowledge();  // 수동 커밋
        } catch (Exception e) {
            log.error("처리 실패: orderId={}", event.getOrderId(), e);
            // ACK하지 않으면 재처리됨
            throw e;
        }
    }

    // 배치 컨슈머 — 대량 처리 시 성능 향상
    @KafkaListener(
        topics = "order.analytics",
        groupId = "analytics-service",
        batch = "true"
    )
    public void handleBatch(
            List<ConsumerRecord<String, AnalyticsEvent>> records,
            Acknowledgment ack) {

        log.info("배치 수신: {} 건", records.size());

        List<AnalyticsEvent> events = records.stream()
            .map(ConsumerRecord::value)
            .toList();

        analyticsService.bulkInsert(events);
        ack.acknowledge();
    }
}

에러 핸들링과 재시도

Spring Kafka 3.x에서는 DefaultErrorHandler@RetryableTopic으로 정교한 에러 처리가 가능합니다:

@Configuration
public class KafkaErrorConfig {

    @Bean
    public DefaultErrorHandler errorHandler(
            KafkaOperations<String, Object> kafkaOps) {

        // DLT(Dead Letter Topic)로 실패 메시지 전달
        DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(kafkaOps,
                (record, ex) -> new TopicPartition(
                    record.topic() + ".DLT", record.partition()
                ));

        // 지수 백오프 재시도: 1초 → 2초 → 4초, 최대 3회
        ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0);
        backOff.setMaxElapsedTime(15000L);

        DefaultErrorHandler handler =
            new DefaultErrorHandler(recoverer, backOff);

        // 특정 예외는 재시도하지 않음
        handler.addNotRetryableExceptions(
            DeserializationException.class,
            IllegalArgumentException.class
        );

        return handler;
    }
}

@RetryableTopic: 선언적 재시도

Spring Kafka 3.x의 @RetryableTopic은 재시도 토픽을 자동 생성합니다:

@RetryableTopic(
    attempts = "4",
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 10000),
    topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
    dltStrategy = DltStrategy.FAIL_ON_ERROR,
    include = { TransientException.class }
)
@KafkaListener(topics = "payment.process", groupId = "payment-service")
public void processPayment(PaymentEvent event) {
    // 실패 시 자동으로 payment.process-retry-0 → retry-1 → retry-2 → DLT
    paymentGateway.charge(event);
}

@DltHandler
public void handleDlt(PaymentEvent event,
                      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    log.error("DLT 도착: event={}, topic={}", event, topic);
    alertService.notify("결제 처리 최종 실패: " + event.getPaymentId());
}

이 설정이 자동 생성하는 토픽 구조:

토픽 재시도 대기 설명
payment.process 원본 토픽
payment.process-retry-0 1초 1차 재시도
payment.process-retry-1 2초 2차 재시도
payment.process-retry-2 4초 3차 재시도
payment.process-dlt 최종 실패 (Dead Letter)

트랜잭셔널 프로듀서

DB 트랜잭션과 Kafka 메시지 발행의 원자성을 보장하려면 Transactional Outbox 패턴 또는 Kafka 트랜잭션을 사용합니다:

# application.yml
spring:
  kafka:
    producer:
      transaction-id-prefix: order-tx-  # 트랜잭션 활성화
@Service
@RequiredArgsConstructor
public class OrderService {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;

    @Transactional
    public Order createOrder(CreateOrderRequest request) {
        Order order = orderRepository.save(
            Order.from(request)
        );

        // Kafka 트랜잭션 내에서 발행
        kafkaTemplate.executeInTransaction(ops -> {
            ops.send("order.created",
                order.getId(), OrderCreatedEvent.from(order));
            ops.send("order.analytics",
                order.getId(), AnalyticsEvent.orderCreated(order));
            return null;
        });

        return order;
    }
}

컨슈머 동시성과 파티셔닝

Kafka의 병렬 처리는 파티션 수 = 최대 동시 컨슈머 수입니다. Spring Kafka의 concurrency 설정을 이해해야 합니다:

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object>
            kafkaListenerContainerFactory(
                ConsumerFactory<String, Object> consumerFactory) {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(6);  // 6개 파티션 → 6개 스레드

        // 배치 리스너 활성화
        factory.setBatchListener(true);

        // 수동 ACK
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        // 컨슈머별 인터셉터
        factory.setRecordInterceptor((record, consumer) -> {
            MDC.put("correlationId",
                new String(record.headers()
                    .lastHeader("correlationId").value()));
            return record;
        });

        return factory;
    }
}

모니터링과 헬스체크

Spring Actuator와 Micrometer를 통해 Kafka 메트릭을 수집합니다:

# application.yml
management:
  health:
    kafka:
      enabled: true           # Kafka 헬스체크 활성화
  endpoints:
    web:
      exposure:
        include: health,metrics

# Micrometer로 수집되는 주요 메트릭
# spring.kafka.consumer.fetch.manager.records.consumed.total
# spring.kafka.consumer.fetch.manager.records.lag
# spring.kafka.producer.record.send.total
# spring.kafka.producer.record.error.total
// 커스텀 Consumer Lag 모니터링
@Component
@RequiredArgsConstructor
public class KafkaLagMonitor {

    private final KafkaListenerEndpointRegistry registry;

    @Scheduled(fixedRate = 30000)
    public void checkLag() {
        registry.getListenerContainers().forEach(container -> {
            Map<String, Map<MetricName, ? extends Metric>> metrics =
                container.metrics();

            metrics.forEach((clientId, metricMap) -> {
                metricMap.entrySet().stream()
                    .filter(e -> e.getKey().name().equals("records-lag-max"))
                    .forEach(e -> {
                        double lag = e.getValue().metricValue() instanceof Double d ? d : 0;
                        if (lag > 10000) {
                            log.warn("Consumer lag 경고: client={}, lag={}",
                                clientId, lag);
                        }
                    });
            });
        });
    }
}

테스트: Testcontainers + EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = { "order.created", "order.created.DLT" },
    brokerProperties = { "listeners=PLAINTEXT://localhost:9092" }
)
class OrderEventTest {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    private OrderEventConsumer consumer;

    @Test
    void shouldProcessOrderEvent() throws Exception {
        var event = new OrderCreatedEvent("ORD-001", 50000L);
        kafkaTemplate.send("order.created", event.getOrderId(), event).get();

        // 컨슈머 처리 대기
        await().atMost(Duration.ofSeconds(10))
            .untilAsserted(() ->
                verify(paymentService).processPayment(
                    argThat(e -> e.getOrderId().equals("ORD-001"))
                )
            );
    }
}

정리

Spring Kafka는 @KafkaListener의 선언적 컨슈머, @RetryableTopic의 자동 재시도 토픽 체인, 트랜잭셔널 프로듀서 등 프로덕션 레벨의 이벤트 드리븐 아키텍처를 구축하는 데 필요한 모든 도구를 제공합니다. 핵심은 수동 ACK + DLT + Consumer Lag 모니터링의 조합으로, 메시지 유실 없는 안정적인 비동기 처리 파이프라인을 완성하는 것입니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux