Spring Outbox 패턴 이벤트 발행

Transactional Outbox 패턴이란?

마이크로서비스에서 DB 트랜잭션과 메시지 발행을 원자적으로 처리하는 것은 대표적인 분산 시스템 난제입니다. 주문을 DB에 저장한 뒤 Kafka에 이벤트를 발행하는데, Kafka 전송이 실패하면 데이터 불일치가 발생합니다. Outbox 패턴은 이벤트를 DB의 outbox 테이블에 함께 저장한 뒤, 별도 프로세스가 이를 읽어 메시지 브로커에 발행하는 방식으로 이 문제를 해결합니다.

기본 아키텍처

Outbox 패턴의 흐름은 다음과 같습니다:

1. 비즈니스 트랜잭션 내에서:
   - orders 테이블에 INSERT
   - outbox_events 테이블에 INSERT (같은 트랜잭션)

2. Outbox Relay (별도 프로세스)가:
   - outbox_events를 폴링 또는 CDC로 읽기
   - Kafka/RabbitMQ에 이벤트 발행
   - 발행 완료 후 outbox 레코드 마킹/삭제

DB 트랜잭션이 커밋되면 비즈니스 데이터와 이벤트가 함께 저장되므로, 이벤트 유실이 불가능합니다.

Outbox 테이블 설계

@Entity
@Table(name = "outbox_events", indexes = {
    @Index(name = "idx_outbox_status", columnList = "status"),
    @Index(name = "idx_outbox_created", columnList = "createdAt")
})
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.UUID)
    private UUID id;

    @Column(nullable = false, length = 100)
    private String aggregateType;    // "Order", "Payment"

    @Column(nullable = false, length = 100)
    private String aggregateId;      // 주문 ID 등

    @Column(nullable = false, length = 100)
    private String eventType;        // "OrderCreated", "OrderCancelled"

    @Column(nullable = false, columnDefinition = "jsonb")
    private String payload;          // 이벤트 데이터 (JSON)

    @Column(nullable = false, length = 20)
    @Enumerated(EnumType.STRING)
    private OutboxStatus status = OutboxStatus.PENDING;

    @Column(nullable = false)
    private LocalDateTime createdAt = LocalDateTime.now();

    private LocalDateTime publishedAt;

    private int retryCount = 0;
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}

비즈니스 로직에서 Outbox 저장

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    @Transactional
    public Order createOrder(CreateOrderRequest request) {
        // 1. 비즈니스 로직
        Order order = Order.builder()
                .userId(request.getUserId())
                .totalAmount(request.getTotalAmount())
                .status(OrderStatus.CREATED)
                .build();
        orderRepository.save(order);

        // 2. Outbox 이벤트 저장 (같은 트랜잭션!)
        OrderCreatedEvent event = new OrderCreatedEvent(
                order.getId(),
                order.getUserId(),
                order.getTotalAmount()
        );

        OutboxEvent outbox = OutboxEvent.builder()
                .aggregateType("Order")
                .aggregateId(order.getId().toString())
                .eventType("OrderCreated")
                .payload(objectMapper.writeValueAsString(event))
                .build();
        outboxRepository.save(outbox);

        return order;
    }
}

핵심은 @Transactional 하나로 주문과 이벤트를 함께 저장하는 것입니다. 이 트랜잭션 관리에 대한 심화 내용은 Spring Transaction 전파 전략을 참고하세요.

Outbox Publisher: 폴링 방식

가장 단순한 구현은 주기적 폴링입니다:

@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxPollingPublisher {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(fixedDelay = 1000)  // 1초마다 폴링
    @Transactional
    public void publishPendingEvents() {
        List<OutboxEvent> events = outboxRepository
                .findTop100ByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);

        for (OutboxEvent event : events) {
            try {
                String topic = "events." +
                    event.getAggregateType().toLowerCase();

                kafkaTemplate.send(topic,
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);  // 동기 대기

                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(LocalDateTime.now());

            } catch (Exception e) {
                log.error("Outbox publish failed: {}", event.getId(), e);
                event.setRetryCount(event.getRetryCount() + 1);

                if (event.getRetryCount() >= 5) {
                    event.setStatus(OutboxStatus.FAILED);
                }
            }
        }
    }
}

Repository와 정리 작업

public interface OutboxRepository extends JpaRepository<OutboxEvent, UUID> {

    List<OutboxEvent> findTop100ByStatusOrderByCreatedAtAsc(
        OutboxStatus status);

    // 실패 이벤트 재시도 대상
    @Query("SELECT o FROM OutboxEvent o WHERE o.status = 'PENDING' " +
           "AND o.retryCount < :maxRetry " +
           "ORDER BY o.createdAt ASC")
    List<OutboxEvent> findRetryable(@Param("maxRetry") int maxRetry,
                                     Pageable pageable);

    // 오래된 발행 완료 이벤트 정리
    @Modifying
    @Query("DELETE FROM OutboxEvent o WHERE o.status = 'PUBLISHED' " +
           "AND o.publishedAt < :before")
    int deletePublishedBefore(@Param("before") LocalDateTime before);
}

// 정리 스케줄러
@Scheduled(cron = "0 0 3 * * *")  // 매일 새벽 3시
@Transactional
public void cleanupOldEvents() {
    LocalDateTime threshold = LocalDateTime.now().minusDays(7);
    int deleted = outboxRepository.deletePublishedBefore(threshold);
    log.info("Cleaned up {} old outbox events", deleted);
}

Debezium CDC 방식: 폴링 대안

폴링의 한계(지연, DB 부하)를 해결하려면 Debezium CDC(Change Data Capture)를 사용합니다. DB의 WAL(Write-Ahead Log)을 읽어 실시간으로 이벤트를 캡처합니다:

# Debezium Outbox Connector 설정
{
  "name": "outbox-connector",
  "config": {
    "connector.class":
      "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "orders_db",
    "topic.prefix": "order-service",

    "transforms": "outbox",
    "transforms.outbox.type":
      "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement":
      "events.${routedByValue}",

    "table.include.list": "public.outbox_events"
  }
}

Debezium의 EventRouter SMT(Single Message Transform)는 outbox 테이블의 레코드를 자동으로 적절한 Kafka 토픽으로 라우팅합니다.

방식 지연 DB 부하 복잡도 적합한 경우
폴링 1~5초 중간 낮음 소규모, 빠른 구현
Debezium CDC 밀리초 낮음 높음 대규모, 실시간 요구

Outbox 서비스 추상화

반복 코드를 줄이기 위해 Outbox 발행을 서비스로 추상화합니다:

@Service
@RequiredArgsConstructor
public class OutboxService {

    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public void publish(String aggregateType, String aggregateId,
                        String eventType, Object event) {
        try {
            OutboxEvent outbox = OutboxEvent.builder()
                    .aggregateType(aggregateType)
                    .aggregateId(aggregateId)
                    .eventType(eventType)
                    .payload(objectMapper.writeValueAsString(event))
                    .build();
            outboxRepository.save(outbox);
        } catch (JsonProcessingException e) {
            throw new IllegalStateException("Event serialization failed", e);
        }
    }
}

// 사용
@Transactional
public Order createOrder(CreateOrderRequest request) {
    Order order = orderRepository.save(/* ... */);

    outboxService.publish(
        "Order", order.getId().toString(),
        "OrderCreated",
        new OrderCreatedEvent(order)
    );

    return order;
}

멱등성 보장: 컨슈머 측

Outbox 패턴은 최소 1회(at-least-once) 발행을 보장합니다. 따라서 컨슈머는 중복 메시지를 처리할 수 있어야 합니다:

@Component
@RequiredArgsConstructor
public class OrderEventConsumer {

    private final ProcessedEventRepository processedEventRepo;

    @KafkaListener(topics = "events.order")
    @Transactional
    public void handleOrderEvent(
            @Header("event-id") String eventId,
            @Payload String payload) {

        // 이미 처리된 이벤트인지 확인
        if (processedEventRepo.existsById(eventId)) {
            log.info("Duplicate event skipped: {}", eventId);
            return;
        }

        // 비즈니스 로직 처리
        processOrder(payload);

        // 처리 완료 기록
        processedEventRepo.save(new ProcessedEvent(eventId));
    }
}

이 멱등성 패턴은 Spring Kafka 컨슈머·프로듀서에서 다룬 에러 핸들링과 함께 사용하면 더욱 견고한 시스템을 구축할 수 있습니다.

모니터링과 알림

@Component
@RequiredArgsConstructor
public class OutboxMetrics {

    private final OutboxRepository outboxRepository;
    private final MeterRegistry registry;

    @Scheduled(fixedRate = 30_000)
    public void recordMetrics() {
        long pending = outboxRepository.countByStatus(OutboxStatus.PENDING);
        long failed = outboxRepository.countByStatus(OutboxStatus.FAILED);

        registry.gauge("outbox.pending.count", pending);
        registry.gauge("outbox.failed.count", failed);

        // 5분 이상 PENDING인 이벤트가 있으면 경고
        if (pending > 1000) {
            log.warn("Outbox backlog: {} pending events", pending);
        }
    }
}

운영 베스트 프랙티스

  • Outbox 테이블 파티셔닝: 대용량 환경에서는 날짜별 파티션으로 정리 성능 향상
  • 배치 발행: 이벤트를 하나씩 보내지 말고 배치로 모아 전송하세요
  • 순서 보장: 같은 aggregateId의 이벤트 순서가 중요하면 Kafka 파티션 키로 aggregateId를 사용
  • FAILED 이벤트 알림: status=FAILED인 이벤트가 쌓이면 즉시 알림
  • payload 크기 제한: 큰 데이터는 참조(ID)만 넣고 컨슈머가 조회하는 패턴 사용
  • 인덱스: (status, createdAt) 복합 인덱스 필수

마무리

Transactional Outbox 패턴은 마이크로서비스에서 데이터 일관성과 이벤트 발행을 동시에 보장하는 가장 실용적인 방법입니다. 폴링 방식으로 빠르게 시작하고, 규모가 커지면 Debezium CDC로 전환할 수 있습니다. 컨슈머 측의 멱등성 처리와 함께 적용하면 분산 시스템에서도 신뢰할 수 있는 이벤트 기반 아키텍처를 구축할 수 있습니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux