Spring Kafka Streams 상태 처리

Spring Kafka Streams란?

Spring Kafka Streams는 Apache Kafka의 Streams DSL을 Spring Boot 환경에서 손쉽게 사용할 수 있도록 통합한 라이브러리입니다. 기존 Consumer/Producer 기반 처리와 달리, KStreamKTable을 활용한 실시간 스트림 처리와 State Store를 통한 상태 관리가 핵심입니다. 별도 클러스터 없이 애플리케이션 내부에서 실시간 집계, 조인, 윈도우 연산을 수행할 수 있어 마이크로서비스 아키텍처에서 매우 유용합니다.

의존성 설정

Spring Boot 3.x 기준으로 spring-kafka에 Kafka Streams가 포함되어 있습니다.

<!-- build.gradle.kts -->
dependencies {
    implementation("org.springframework.kafka:spring-kafka")
    implementation("org.apache.kafka:kafka-streams")
}

application.yml에서 Kafka Streams 기본 설정을 정의합니다.

spring:
  kafka:
    bootstrap-servers: localhost:9092
    streams:
      application-id: order-analytics
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        state.dir: /tmp/kafka-streams
        commit.interval.ms: 1000

KStream vs KTable 핵심 개념

KStream은 이벤트의 무한 스트림이고, KTable은 키 기준 최신 값만 유지하는 변경 로그(changelog)입니다. 이 두 가지 추상화를 이해하는 것이 Kafka Streams의 시작점입니다.

구분 KStream KTable
의미 이벤트 로그 (append-only) 상태 테이블 (upsert)
같은 키 재전송 별도 레코드로 처리 기존 값 덮어쓰기
사용 사례 주문 이벤트, 클릭 로그 사용자 프로필, 재고 현황
null 값 일반 레코드 tombstone (삭제)

StreamsBuilder 설정

Spring Boot에서 Kafka Streams 토폴로지를 정의하는 방법입니다. @EnableKafkaStreams를 활성화하고 StreamsBuilder 빈을 주입받아 사용합니다.

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean
    public KStream<String, String> orderStream(StreamsBuilder builder) {
        KStream<String, String> stream = builder.stream("orders",
            Consumed.with(Serdes.String(), Serdes.String()));

        // 주문 이벤트를 파싱하여 카테고리별로 분기
        stream
            .filter((key, value) -> value != null && !value.isEmpty())
            .mapValues(value -> {
                JsonNode node = objectMapper.readTree(value);
                return node.toString();
            })
            .to("orders-validated");

        return stream;
    }
}

Windowed Aggregation: 시간 윈도우 집계

실시간 분석에서 가장 많이 사용되는 패턴은 시간 윈도우 기반 집계입니다. Kafka Streams는 세 가지 윈도우를 제공합니다.

  • Tumbling Window: 고정 크기, 겹치지 않는 윈도우
  • Hopping Window: 고정 크기, 일정 간격으로 슬라이딩
  • Session Window: 활동 기반, 비활성 갭으로 구분
@Bean
public KTable<Windowed<String>, Long> orderCountByWindow(StreamsBuilder builder) {
    KStream<String, String> orders = builder.stream("orders",
        Consumed.with(Serdes.String(), Serdes.String()));

    // 5분 텀블링 윈도우로 카테고리별 주문 수 집계
    KTable<Windowed<String>, Long> counts = orders
        .groupBy((key, value) -> extractCategory(value),
            Grouped.with(Serdes.String(), Serdes.String()))
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
        .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
            "order-count-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Long()));

    // 결과를 토픽으로 전송
    counts.toStream()
        .map((windowedKey, count) -> KeyValue.pair(
            windowedKey.key(),
            String.format("{"category":"%s","count":%d,"windowStart":"%s","windowEnd":"%s"}",
                windowedKey.key(), count,
                Instant.ofEpochMilli(windowedKey.window().start()),
                Instant.ofEpochMilli(windowedKey.window().end()))))
        .to("order-counts", Produced.with(Serdes.String(), Serdes.String()));

    return counts;
}

State Store와 Interactive Query

Kafka Streams의 State Store는 집계 결과를 로컬에 저장하는 내장 데이터베이스(RocksDB)입니다. Interactive Query를 통해 외부에서 이 상태를 직접 조회할 수 있어, 별도 DB 없이도 실시간 대시보드를 구현할 수 있습니다.

@RestController
@RequiredArgsConstructor
public class OrderAnalyticsController {

    private final StreamsBuilderFactoryBean factoryBean;

    @GetMapping("/analytics/orders/{category}")
    public ResponseEntity<Map<String, Object>> getOrderCount(
            @PathVariable String category) {

        KafkaStreams streams = factoryBean.getKafkaStreams();
        if (streams == null) {
            return ResponseEntity.status(503).build();
        }

        // State Store에서 직접 조회
        ReadOnlyKeyValueStore<String, Long> store =
            streams.store(StoreQueryParameters.fromNameAndType(
                "order-count-store",
                QueryableStoreTypes.keyValueStore()));

        Long count = store.get(category);
        return ResponseEntity.ok(Map.of(
            "category", category,
            "count", count != null ? count : 0,
            "timestamp", Instant.now().toString()
        ));
    }

    @GetMapping("/analytics/orders")
    public ResponseEntity<List<Map<String, Object>>> getAllCounts() {
        KafkaStreams streams = factoryBean.getKafkaStreams();
        ReadOnlyKeyValueStore<String, Long> store =
            streams.store(StoreQueryParameters.fromNameAndType(
                "order-count-store",
                QueryableStoreTypes.keyValueStore()));

        List<Map<String, Object>> results = new ArrayList<>();
        try (KeyValueIterator<String, Long> iter = store.all()) {
            while (iter.hasNext()) {
                KeyValue<String, Long> entry = iter.next();
                results.add(Map.of(
                    "category", entry.key,
                    "count", entry.value
                ));
            }
        }
        return ResponseEntity.ok(results);
    }
}

KStream-KTable 조인

실시간 데이터 보강(enrichment)의 핵심 패턴은 KStream-KTable 조인입니다. 이벤트 스트림에 참조 데이터를 결합하여 풍부한 데이터를 생성합니다.

@Bean
public KStream<String, String> enrichedOrderStream(StreamsBuilder builder) {
    // 주문 이벤트 스트림
    KStream<String, OrderEvent> orders = builder.stream("orders",
        Consumed.with(Serdes.String(), orderEventSerde));

    // 상품 정보 테이블 (최신 상태 유지)
    KTable<String, ProductInfo> products = builder.table("products",
        Consumed.with(Serdes.String(), productInfoSerde),
        Materialized.as("product-store"));

    // 주문에 상품 정보 결합
    KStream<String, EnrichedOrder> enriched = orders
        .selectKey((key, order) -> order.getProductId())
        .join(products,
            (order, product) -> EnrichedOrder.builder()
                .orderId(order.getOrderId())
                .productName(product.getName())
                .price(product.getPrice())
                .quantity(order.getQuantity())
                .totalAmount(product.getPrice() * order.getQuantity())
                .build(),
            Joined.with(Serdes.String(), orderEventSerde, productInfoSerde));

    enriched.to("enriched-orders",
        Produced.with(Serdes.String(), enrichedOrderSerde));

    return enriched.mapValues(EnrichedOrder::toJson);
}

커스텀 Serde 구현

실무에서는 JSON 객체를 다루므로 커스텀 Serde가 필수입니다. Jackson 기반 범용 Serde를 만들어 재사용합니다.

public class JsonSerde<T> implements Serde<T> {

    private final ObjectMapper mapper = new ObjectMapper();
    private final Class<T> targetType;

    public JsonSerde(Class<T> targetType) {
        this.targetType = targetType;
    }

    @Override
    public Serializer<T> serializer() {
        return (topic, data) -> {
            try {
                return mapper.writeValueAsBytes(data);
            } catch (JsonProcessingException e) {
                throw new SerializationException("직렬화 실패", e);
            }
        };
    }

    @Override
    public Deserializer<T> deserializer() {
        return (topic, data) -> {
            if (data == null) return null;
            try {
                return mapper.readValue(data, targetType);
            } catch (IOException e) {
                throw new SerializationException("역직렬화 실패", e);
            }
        };
    }
}

사용 시에는 new JsonSerde<>(OrderEvent.class)처럼 타입을 지정하면 됩니다.

에러 핸들링 전략

Kafka Streams에서 역직렬화 실패나 처리 오류는 스트림 전체를 중단시킬 수 있습니다. 프로덕션 환경에서는 반드시 에러 핸들러를 설정해야 합니다.

spring:
  kafka:
    streams:
      properties:
        default.deserialization.exception.handler:
          org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
        default.production.exception.handler:
          org.apache.kafka.streams.errors.DefaultProductionExceptionHandler

커스텀 에러 핸들러로 Dead Letter Queue(DLQ)에 실패 레코드를 전송하는 패턴도 자주 사용됩니다.

public class DlqDeserializationHandler implements DeserializationExceptionHandler {

    private KafkaProducer<byte[], byte[]> dlqProducer;

    @Override
    public void configure(Map<String, ?> configs) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            configs.get("bootstrap.servers"));
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        this.dlqProducer = new KafkaProducer<>(props);
    }

    @Override
    public DeserializationHandlerResponse handle(ProcessorContext context,
            ConsumerRecord<byte[], byte[]> record, Exception exception) {

        // DLQ로 실패 레코드 전송
        ProducerRecord<byte[], byte[]> dlqRecord =
            new ProducerRecord<>("dlq-" + record.topic(),
                record.key(), record.value());
        dlqRecord.headers().add("error", exception.getMessage().getBytes());
        dlqProducer.send(dlqRecord);

        return DeserializationHandlerResponse.CONTINUE;
    }
}

Exactly-Once 처리 보장

Kafka Streams는 Exactly-Once Semantics(EOS)를 기본 지원합니다. processing.guarantee 설정 하나로 활성화할 수 있습니다.

spring:
  kafka:
    streams:
      properties:
        processing.guarantee: exactly_once_v2
        # EOS v2는 Kafka 2.5+ 필요
        # 트랜잭션 기반으로 읽기-처리-쓰기를 원자적으로 수행

EOS를 활성화하면 State Store 업데이트와 출력 토픽 전송이 하나의 트랜잭션으로 묶여, 장애 복구 시에도 정확히 한 번만 처리됩니다. 다만 처리량이 약 10~20% 감소할 수 있으므로 요구사항에 따라 선택합니다.

상태 복구와 Standby Replica

State Store는 RocksDB에 로컬 저장되므로, 인스턴스 재시작 시 Kafka의 changelog 토픽에서 복구합니다. Standby Replica를 설정하면 복구 시간을 대폭 줄일 수 있습니다.

spring:
  kafka:
    streams:
      properties:
        num.standby.replicas: 1
        # 핫 스탠바이로 State Store 복제본 유지
        # 장애 시 즉시 다른 인스턴스가 인계

통합 테스트: TopologyTestDriver

Kafka Streams의 큰 장점 중 하나는 실제 Kafka 브로커 없이 테스트할 수 있다는 것입니다. TopologyTestDriver를 사용하면 인메모리에서 전체 토폴로지를 검증할 수 있습니다.

@Test
void testOrderCountAggregation() {
    StreamsBuilder builder = new StreamsBuilder();
    // 토폴로지 구성 (위의 orderCountByWindow 로직 재사용)
    buildTopology(builder);

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

    try (TopologyTestDriver driver = new TopologyTestDriver(
            builder.build(), props)) {

        TestInputTopic<String, String> inputTopic = driver.createInputTopic(
            "orders", new StringSerializer(), new StringSerializer());

        TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(
            "order-counts", new StringDeserializer(), new StringDeserializer());

        // 주문 이벤트 전송
        inputTopic.pipeInput("order-1", "{"category":"electronics","amount":100}");
        inputTopic.pipeInput("order-2", "{"category":"electronics","amount":200}");
        inputTopic.pipeInput("order-3", "{"category":"books","amount":50}");

        // 집계 결과 검증
        List<KeyValue<String, String>> results = outputTopic.readKeyValuesToList();
        assertThat(results).isNotEmpty();
        assertThat(results.stream()
            .filter(kv -> kv.key.equals("electronics"))
            .count()).isGreaterThanOrEqualTo(1);
    }
}

프로덕션 모니터링

Kafka Streams는 다양한 JMX 메트릭을 제공합니다. Micrometer와 연동하면 Grafana 대시보드로 실시간 모니터링이 가능합니다.

@Component
@RequiredArgsConstructor
public class KafkaStreamsHealthIndicator implements HealthIndicator {

    private final StreamsBuilderFactoryBean factoryBean;

    @Override
    public Health health() {
        KafkaStreams streams = factoryBean.getKafkaStreams();
        if (streams == null) {
            return Health.down().withDetail("reason", "not initialized").build();
        }

        KafkaStreams.State state = streams.state();
        if (state == KafkaStreams.State.RUNNING ||
            state == KafkaStreams.State.REBALANCING) {
            return Health.up()
                .withDetail("state", state.name())
                .withDetail("threadCount", streams.metadataForLocalThreads().size())
                .build();
        }

        return Health.down()
            .withDetail("state", state.name())
            .build();
    }
}

실전 팁과 주의사항

  • 파티션 수 = 최대 병렬도: Kafka Streams 인스턴스 수는 입력 토픽의 파티션 수를 초과할 수 없습니다. 확장을 고려해 충분한 파티션을 미리 설정하세요.
  • State Store 디스크: RocksDB는 로컬 디스크를 사용합니다. K8s 환경에서는 PersistentVolume을 마운트하여 상태를 보존하세요.
  • Rebalancing 최소화: max.poll.interval.mssession.timeout.ms를 적절히 조정하여 불필요한 리밸런싱을 방지합니다.
  • 토폴로지 변경 시 주의: State Store 이름이나 토폴로지 구조를 변경하면 기존 상태가 호환되지 않을 수 있습니다. application-id를 변경하거나 내부 토픽을 초기화해야 합니다.
  • GlobalKTable: 모든 파티션의 데이터를 로컬에 복제하는 GlobalKTable은 참조 데이터 조인에 유용하지만, 데이터가 클 경우 메모리 부담이 커질 수 있습니다.

마무리

Spring Kafka Streams는 별도 인프라 없이 애플리케이션 내부에서 실시간 스트림 처리를 구현할 수 있는 강력한 도구입니다. KStream과 KTable의 이중 추상화, State Store를 통한 상태 관리, Interactive Query를 활용한 실시간 조회까지 — 이 모든 것이 Spring Boot의 자동 설정 위에서 동작합니다. 기존 Kafka Consumer/Producer 패턴과 함께 사용하면 다양한 실시간 처리 요구사항을 커버할 수 있습니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux