Spring Kafka Streams란?
Spring Kafka Streams는 Apache Kafka의 Streams DSL을 Spring Boot 환경에서 손쉽게 사용할 수 있도록 통합한 라이브러리입니다. 기존 Consumer/Producer 기반 처리와 달리, KStream과 KTable을 활용한 실시간 스트림 처리와 State Store를 통한 상태 관리가 핵심입니다. 별도 클러스터 없이 애플리케이션 내부에서 실시간 집계, 조인, 윈도우 연산을 수행할 수 있어 마이크로서비스 아키텍처에서 매우 유용합니다.
의존성 설정
Spring Boot 3.x 기준으로 spring-kafka에 Kafka Streams가 포함되어 있습니다.
<!-- build.gradle.kts -->
dependencies {
implementation("org.springframework.kafka:spring-kafka")
implementation("org.apache.kafka:kafka-streams")
}
application.yml에서 Kafka Streams 기본 설정을 정의합니다.
spring:
kafka:
bootstrap-servers: localhost:9092
streams:
application-id: order-analytics
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
state.dir: /tmp/kafka-streams
commit.interval.ms: 1000
KStream vs KTable 핵심 개념
KStream은 이벤트의 무한 스트림이고, KTable은 키 기준 최신 값만 유지하는 변경 로그(changelog)입니다. 이 두 가지 추상화를 이해하는 것이 Kafka Streams의 시작점입니다.
| 구분 | KStream | KTable |
|---|---|---|
| 의미 | 이벤트 로그 (append-only) | 상태 테이블 (upsert) |
| 같은 키 재전송 | 별도 레코드로 처리 | 기존 값 덮어쓰기 |
| 사용 사례 | 주문 이벤트, 클릭 로그 | 사용자 프로필, 재고 현황 |
| null 값 | 일반 레코드 | tombstone (삭제) |
StreamsBuilder 설정
Spring Boot에서 Kafka Streams 토폴로지를 정의하는 방법입니다. @EnableKafkaStreams를 활성화하고 StreamsBuilder 빈을 주입받아 사용합니다.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KStream<String, String> orderStream(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream("orders",
Consumed.with(Serdes.String(), Serdes.String()));
// 주문 이벤트를 파싱하여 카테고리별로 분기
stream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> {
JsonNode node = objectMapper.readTree(value);
return node.toString();
})
.to("orders-validated");
return stream;
}
}
Windowed Aggregation: 시간 윈도우 집계
실시간 분석에서 가장 많이 사용되는 패턴은 시간 윈도우 기반 집계입니다. Kafka Streams는 세 가지 윈도우를 제공합니다.
- Tumbling Window: 고정 크기, 겹치지 않는 윈도우
- Hopping Window: 고정 크기, 일정 간격으로 슬라이딩
- Session Window: 활동 기반, 비활성 갭으로 구분
@Bean
public KTable<Windowed<String>, Long> orderCountByWindow(StreamsBuilder builder) {
KStream<String, String> orders = builder.stream("orders",
Consumed.with(Serdes.String(), Serdes.String()));
// 5분 텀블링 윈도우로 카테고리별 주문 수 집계
KTable<Windowed<String>, Long> counts = orders
.groupBy((key, value) -> extractCategory(value),
Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"order-count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
// 결과를 토픽으로 전송
counts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(),
String.format("{"category":"%s","count":%d,"windowStart":"%s","windowEnd":"%s"}",
windowedKey.key(), count,
Instant.ofEpochMilli(windowedKey.window().start()),
Instant.ofEpochMilli(windowedKey.window().end()))))
.to("order-counts", Produced.with(Serdes.String(), Serdes.String()));
return counts;
}
State Store와 Interactive Query
Kafka Streams의 State Store는 집계 결과를 로컬에 저장하는 내장 데이터베이스(RocksDB)입니다. Interactive Query를 통해 외부에서 이 상태를 직접 조회할 수 있어, 별도 DB 없이도 실시간 대시보드를 구현할 수 있습니다.
@RestController
@RequiredArgsConstructor
public class OrderAnalyticsController {
private final StreamsBuilderFactoryBean factoryBean;
@GetMapping("/analytics/orders/{category}")
public ResponseEntity<Map<String, Object>> getOrderCount(
@PathVariable String category) {
KafkaStreams streams = factoryBean.getKafkaStreams();
if (streams == null) {
return ResponseEntity.status(503).build();
}
// State Store에서 직접 조회
ReadOnlyKeyValueStore<String, Long> store =
streams.store(StoreQueryParameters.fromNameAndType(
"order-count-store",
QueryableStoreTypes.keyValueStore()));
Long count = store.get(category);
return ResponseEntity.ok(Map.of(
"category", category,
"count", count != null ? count : 0,
"timestamp", Instant.now().toString()
));
}
@GetMapping("/analytics/orders")
public ResponseEntity<List<Map<String, Object>>> getAllCounts() {
KafkaStreams streams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<String, Long> store =
streams.store(StoreQueryParameters.fromNameAndType(
"order-count-store",
QueryableStoreTypes.keyValueStore()));
List<Map<String, Object>> results = new ArrayList<>();
try (KeyValueIterator<String, Long> iter = store.all()) {
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
results.add(Map.of(
"category", entry.key,
"count", entry.value
));
}
}
return ResponseEntity.ok(results);
}
}
KStream-KTable 조인
실시간 데이터 보강(enrichment)의 핵심 패턴은 KStream-KTable 조인입니다. 이벤트 스트림에 참조 데이터를 결합하여 풍부한 데이터를 생성합니다.
@Bean
public KStream<String, String> enrichedOrderStream(StreamsBuilder builder) {
// 주문 이벤트 스트림
KStream<String, OrderEvent> orders = builder.stream("orders",
Consumed.with(Serdes.String(), orderEventSerde));
// 상품 정보 테이블 (최신 상태 유지)
KTable<String, ProductInfo> products = builder.table("products",
Consumed.with(Serdes.String(), productInfoSerde),
Materialized.as("product-store"));
// 주문에 상품 정보 결합
KStream<String, EnrichedOrder> enriched = orders
.selectKey((key, order) -> order.getProductId())
.join(products,
(order, product) -> EnrichedOrder.builder()
.orderId(order.getOrderId())
.productName(product.getName())
.price(product.getPrice())
.quantity(order.getQuantity())
.totalAmount(product.getPrice() * order.getQuantity())
.build(),
Joined.with(Serdes.String(), orderEventSerde, productInfoSerde));
enriched.to("enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde));
return enriched.mapValues(EnrichedOrder::toJson);
}
커스텀 Serde 구현
실무에서는 JSON 객체를 다루므로 커스텀 Serde가 필수입니다. Jackson 기반 범용 Serde를 만들어 재사용합니다.
public class JsonSerde<T> implements Serde<T> {
private final ObjectMapper mapper = new ObjectMapper();
private final Class<T> targetType;
public JsonSerde(Class<T> targetType) {
this.targetType = targetType;
}
@Override
public Serializer<T> serializer() {
return (topic, data) -> {
try {
return mapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("직렬화 실패", e);
}
};
}
@Override
public Deserializer<T> deserializer() {
return (topic, data) -> {
if (data == null) return null;
try {
return mapper.readValue(data, targetType);
} catch (IOException e) {
throw new SerializationException("역직렬화 실패", e);
}
};
}
}
사용 시에는 new JsonSerde<>(OrderEvent.class)처럼 타입을 지정하면 됩니다.
에러 핸들링 전략
Kafka Streams에서 역직렬화 실패나 처리 오류는 스트림 전체를 중단시킬 수 있습니다. 프로덕션 환경에서는 반드시 에러 핸들러를 설정해야 합니다.
spring:
kafka:
streams:
properties:
default.deserialization.exception.handler:
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.production.exception.handler:
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
커스텀 에러 핸들러로 Dead Letter Queue(DLQ)에 실패 레코드를 전송하는 패턴도 자주 사용됩니다.
public class DlqDeserializationHandler implements DeserializationExceptionHandler {
private KafkaProducer<byte[], byte[]> dlqProducer;
@Override
public void configure(Map<String, ?> configs) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
configs.get("bootstrap.servers"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
this.dlqProducer = new KafkaProducer<>(props);
}
@Override
public DeserializationHandlerResponse handle(ProcessorContext context,
ConsumerRecord<byte[], byte[]> record, Exception exception) {
// DLQ로 실패 레코드 전송
ProducerRecord<byte[], byte[]> dlqRecord =
new ProducerRecord<>("dlq-" + record.topic(),
record.key(), record.value());
dlqRecord.headers().add("error", exception.getMessage().getBytes());
dlqProducer.send(dlqRecord);
return DeserializationHandlerResponse.CONTINUE;
}
}
Exactly-Once 처리 보장
Kafka Streams는 Exactly-Once Semantics(EOS)를 기본 지원합니다. processing.guarantee 설정 하나로 활성화할 수 있습니다.
spring:
kafka:
streams:
properties:
processing.guarantee: exactly_once_v2
# EOS v2는 Kafka 2.5+ 필요
# 트랜잭션 기반으로 읽기-처리-쓰기를 원자적으로 수행
EOS를 활성화하면 State Store 업데이트와 출력 토픽 전송이 하나의 트랜잭션으로 묶여, 장애 복구 시에도 정확히 한 번만 처리됩니다. 다만 처리량이 약 10~20% 감소할 수 있으므로 요구사항에 따라 선택합니다.
상태 복구와 Standby Replica
State Store는 RocksDB에 로컬 저장되므로, 인스턴스 재시작 시 Kafka의 changelog 토픽에서 복구합니다. Standby Replica를 설정하면 복구 시간을 대폭 줄일 수 있습니다.
spring:
kafka:
streams:
properties:
num.standby.replicas: 1
# 핫 스탠바이로 State Store 복제본 유지
# 장애 시 즉시 다른 인스턴스가 인계
통합 테스트: TopologyTestDriver
Kafka Streams의 큰 장점 중 하나는 실제 Kafka 브로커 없이 테스트할 수 있다는 것입니다. TopologyTestDriver를 사용하면 인메모리에서 전체 토폴로지를 검증할 수 있습니다.
@Test
void testOrderCountAggregation() {
StreamsBuilder builder = new StreamsBuilder();
// 토폴로지 구성 (위의 orderCountByWindow 로직 재사용)
buildTopology(builder);
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
try (TopologyTestDriver driver = new TopologyTestDriver(
builder.build(), props)) {
TestInputTopic<String, String> inputTopic = driver.createInputTopic(
"orders", new StringSerializer(), new StringSerializer());
TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(
"order-counts", new StringDeserializer(), new StringDeserializer());
// 주문 이벤트 전송
inputTopic.pipeInput("order-1", "{"category":"electronics","amount":100}");
inputTopic.pipeInput("order-2", "{"category":"electronics","amount":200}");
inputTopic.pipeInput("order-3", "{"category":"books","amount":50}");
// 집계 결과 검증
List<KeyValue<String, String>> results = outputTopic.readKeyValuesToList();
assertThat(results).isNotEmpty();
assertThat(results.stream()
.filter(kv -> kv.key.equals("electronics"))
.count()).isGreaterThanOrEqualTo(1);
}
}
프로덕션 모니터링
Kafka Streams는 다양한 JMX 메트릭을 제공합니다. Micrometer와 연동하면 Grafana 대시보드로 실시간 모니터링이 가능합니다.
@Component
@RequiredArgsConstructor
public class KafkaStreamsHealthIndicator implements HealthIndicator {
private final StreamsBuilderFactoryBean factoryBean;
@Override
public Health health() {
KafkaStreams streams = factoryBean.getKafkaStreams();
if (streams == null) {
return Health.down().withDetail("reason", "not initialized").build();
}
KafkaStreams.State state = streams.state();
if (state == KafkaStreams.State.RUNNING ||
state == KafkaStreams.State.REBALANCING) {
return Health.up()
.withDetail("state", state.name())
.withDetail("threadCount", streams.metadataForLocalThreads().size())
.build();
}
return Health.down()
.withDetail("state", state.name())
.build();
}
}
실전 팁과 주의사항
- 파티션 수 = 최대 병렬도: Kafka Streams 인스턴스 수는 입력 토픽의 파티션 수를 초과할 수 없습니다. 확장을 고려해 충분한 파티션을 미리 설정하세요.
- State Store 디스크: RocksDB는 로컬 디스크를 사용합니다. K8s 환경에서는 PersistentVolume을 마운트하여 상태를 보존하세요.
- Rebalancing 최소화:
max.poll.interval.ms와session.timeout.ms를 적절히 조정하여 불필요한 리밸런싱을 방지합니다. - 토폴로지 변경 시 주의: State Store 이름이나 토폴로지 구조를 변경하면 기존 상태가 호환되지 않을 수 있습니다. application-id를 변경하거나 내부 토픽을 초기화해야 합니다.
- GlobalKTable: 모든 파티션의 데이터를 로컬에 복제하는 GlobalKTable은 참조 데이터 조인에 유용하지만, 데이터가 클 경우 메모리 부담이 커질 수 있습니다.
마무리
Spring Kafka Streams는 별도 인프라 없이 애플리케이션 내부에서 실시간 스트림 처리를 구현할 수 있는 강력한 도구입니다. KStream과 KTable의 이중 추상화, State Store를 통한 상태 관리, Interactive Query를 활용한 실시간 조회까지 — 이 모든 것이 Spring Boot의 자동 설정 위에서 동작합니다. 기존 Kafka Consumer/Producer 패턴과 함께 사용하면 다양한 실시간 처리 요구사항을 커버할 수 있습니다.