Spring Batch 청크 처리 심화

Spring Batch란?

Spring Batch는 대용량 데이터 처리를 위한 프레임워크입니다. 수백만 건의 데이터 마이그레이션, 정산 처리, 리포트 생성 등 배치 작업에 필요한 트랜잭션 관리, 청크 기반 처리, 재시작·재시도, 병렬 처리를 표준화된 방식으로 제공합니다.

핵심 구조: Job → Step → Chunk

Job (배치 작업 전체)
├── Step 1: 데이터 추출 → 변환 → 적재
│   ├── ItemReader   (읽기)
│   ├── ItemProcessor (변환)
│   └── ItemWriter   (쓰기)
├── Step 2: 리포트 생성
└── Step 3: 알림 발송

Job은 여러 Step으로 구성되고, 각 Step은 Chunk 단위로 데이터를 처리합니다. 1000건을 100건씩 10번 나눠 처리하면, 실패 시 해당 청크만 재처리하면 됩니다.

기본 Chunk 기반 Step

@Configuration
@RequiredArgsConstructor
public class OrderSettlementJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager txManager;
    private final DataSource dataSource;

    @Bean
    public Job settlementJob() {
        return new JobBuilder("settlementJob", jobRepository)
            .start(settlementStep())
            .next(reportStep())
            .build();
    }

    @Bean
    public Step settlementStep() {
        return new StepBuilder("settlementStep", jobRepository)
            .<Order, Settlement>chunk(100, txManager)  // 100건씩 처리
            .reader(orderReader())
            .processor(settlementProcessor())
            .writer(settlementWriter())
            .faultTolerant()
            .retryLimit(3)
            .retry(DeadlockLoserDataAccessException.class)
            .skipLimit(10)
            .skip(InvalidOrderException.class)
            .build();
    }
}

chunk(100)은 100건을 읽어서 변환하고 한 번에 쓴다는 의미입니다. faultTolerant()로 재시도(retry)와 건너뛰기(skip) 정책을 설정합니다.

ItemReader: 다양한 데이터 소스

// JPA 커서 기반 Reader (메모리 효율적)
@Bean
public JpaCursorItemReader<Order> orderReader() {
    return new JpaCursorItemReaderBuilder<Order>()
        .name("orderReader")
        .entityManagerFactory(emf)
        .queryString(
            "SELECT o FROM Order o WHERE o.status = 'COMPLETED' " +
            "AND o.settlementDate IS NULL")
        .build();
}

// JDBC 페이징 Reader (대용량에 적합)
@Bean
public JdbcPagingItemReader<Order> orderPagingReader() {
    Map<String, Order> sortKeys = Map.of("id", Order.ASCENDING);

    return new JdbcPagingItemReaderBuilder<Order>()
        .name("orderPagingReader")
        .dataSource(dataSource)
        .selectClause("SELECT id, amount, created_at")
        .fromClause("FROM orders")
        .whereClause("WHERE status = 'COMPLETED'")
        .sortKeys(sortKeys)
        .pageSize(100)
        .rowMapper(new BeanPropertyRowMapper<>(Order.class))
        .build();
}

// CSV 파일 Reader
@Bean
public FlatFileItemReader<OrderCsv> csvReader() {
    return new FlatFileItemReaderBuilder<OrderCsv>()
        .name("csvReader")
        .resource(new ClassPathResource("orders.csv"))
        .delimited()
        .names("orderId", "amount", "date")
        .targetType(OrderCsv.class)
        .linesToSkip(1)  // 헤더 건너뛰기
        .build();
}

JpaCursorItemReader는 커서를 열어 한 건씩 가져오므로 메모리 사용이 일정합니다. JdbcPagingItemReader는 페이지 단위로 가져와 대용량에서도 안정적입니다.

ItemProcessor: 비즈니스 로직

@Component
public class SettlementProcessor
    implements ItemProcessor<Order, Settlement> {

    @Override
    public Settlement process(Order order) throws Exception {
        // null 반환 시 해당 아이템 건너뜀 (필터링)
        if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
            return null;
        }

        BigDecimal fee = order.getAmount()
            .multiply(new BigDecimal("0.03"));  // 3% 수수료
        BigDecimal settlementAmount = order.getAmount().subtract(fee);

        return Settlement.builder()
            .orderId(order.getId())
            .originalAmount(order.getAmount())
            .fee(fee)
            .settlementAmount(settlementAmount)
            .processedAt(LocalDateTime.now())
            .build();
    }
}

// Processor 체이닝
@Bean
public CompositeItemProcessor<Order, Settlement> compositeProcessor() {
    return new CompositeItemProcessorBuilder<Order, Settlement>()
        .delegates(List.of(
            validationProcessor(),   // 검증
            enrichmentProcessor(),   // 데이터 보강
            settlementProcessor()    // 정산 계산
        ))
        .build();
}

ItemWriter: 벌크 쓰기

// JPA Writer
@Bean
public JpaItemWriter<Settlement> settlementWriter() {
    JpaItemWriter<Settlement> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(emf);
    return writer;
}

// JDBC Batch Writer (더 빠름)
@Bean
public JdbcBatchItemWriter<Settlement> jdbcWriter() {
    return new JdbcBatchItemWriterBuilder<Settlement>()
        .dataSource(dataSource)
        .sql("INSERT INTO settlement " +
             "(order_id, amount, fee, processed_at) " +
             "VALUES (:orderId, :amount, :fee, :processedAt)")
        .beanMapped()
        .build();
}

// Composite Writer: 여러 곳에 쓰기
@Bean
public CompositeItemWriter<Settlement> compositeWriter() {
    return new CompositeItemWriterBuilder<Settlement>()
        .delegates(List.of(
            jdbcWriter(),        // DB 저장
            kafkaWriter()        // Kafka 이벤트 발행
        ))
        .build();
}

Job 파라미터와 스케줄링

// JobParameter로 동적 값 전달
@Bean
@StepScope  // 런타임에 파라미터 바인딩
public JpaCursorItemReader<Order> orderReader(
        @Value("#{jobParameters['targetDate']}") String targetDate) {
    return new JpaCursorItemReaderBuilder<Order>()
        .name("orderReader")
        .entityManagerFactory(emf)
        .queryString(
            "SELECT o FROM Order o WHERE o.createdAt >= :date")
        .parameterValues(Map.of("date",
            LocalDate.parse(targetDate).atStartOfDay()))
        .build();
}

// 스케줄러에서 Job 실행
@Component
@RequiredArgsConstructor
public class BatchScheduler {

    private final JobLauncher jobLauncher;
    private final Job settlementJob;

    @Scheduled(cron = "0 0 2 * * *")  // 매일 새벽 2시
    public void runSettlement() throws Exception {
        JobParameters params = new JobParametersBuilder()
            .addString("targetDate",
                LocalDate.now().minusDays(1).toString())
            .addLong("timestamp", System.currentTimeMillis())
            .toJobParameters();

        jobLauncher.run(settlementJob, params);
    }
}

@StepScope는 Step 실행 시점에 빈을 생성해 JobParameter를 주입받을 수 있게 합니다. Spring Scheduler 분산 락과 함께 사용하면 멀티 인스턴스 환경에서도 배치가 중복 실행되지 않습니다.

병렬 처리: 멀티스레드 Step

@Bean
public Step parallelSettlementStep() {
    return new StepBuilder("parallelStep", jobRepository)
        .<Order, Settlement>chunk(100, txManager)
        .reader(orderReader())       // thread-safe Reader 필수!
        .processor(processor())
        .writer(writer())
        .taskExecutor(taskExecutor())
        .throttleLimit(4)            // 최대 4개 스레드
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(8);
    executor.setQueueCapacity(25);
    executor.setThreadNamePrefix("batch-");
    return executor;
}

// Partitioning: 데이터를 분할해 병렬 처리
@Bean
public Step partitionedStep() {
    return new StepBuilder("partitionedStep", jobRepository)
        .partitioner("workerStep", rangePartitioner())
        .step(workerStep())
        .gridSize(10)                // 10개 파티션
        .taskExecutor(taskExecutor())
        .build();
}

주의: 멀티스레드 Step에서는 ItemReader가 thread-safe해야 합니다. JdbcPagingItemReader는 thread-safe하지만, JpaCursorItemReader는 아닙니다. Partitioning이 더 안전한 병렬 처리 방식입니다.

재시작과 재처리

Spring Batch는 BATCH_JOB_EXECUTION 테이블에 실행 상태를 저장합니다. 실패한 Job을 같은 파라미터로 재실행하면 실패 지점부터 재개됩니다:

상태 재시작 시 동작
COMPLETED 같은 파라미터로 재실행 불가 (이미 성공)
FAILED 마지막 성공 청크 다음부터 재개
STOPPED 중단 지점부터 재개
// allowStartIfComplete: 성공한 Step도 재실행 허용
@Bean
public Step idempotentStep() {
    return new StepBuilder("idempotentStep", jobRepository)
        .<Order, Settlement>chunk(100, txManager)
        .reader(reader())
        .writer(writer())
        .allowStartIfComplete(true)
        .build();
}

실전 팁

  • JPA보다 JDBC: 배치에서는 JPA의 1차 캐시가 메모리를 잡아먹음. JDBC Writer가 2~5배 빠름
  • chunk size 튜닝: 너무 작으면 커밋 오버헤드, 너무 크면 메모리 부족. 100~500이 일반적
  • 메타 테이블 정리: BATCH_* 테이블이 계속 커지므로 주기적 정리 필요
  • 멱등성 보장: 재시작 시 중복 처리되지 않도록 UPSERT 또는 트랜잭션 격리 활용

마무리

Spring Batch는 대용량 데이터 처리의 표준입니다. Chunk 기반으로 메모리 효율적인 처리, Skip/Retry로 장애 복원력, Partitioning으로 병렬 처리, JobRepository로 실행 이력과 재시작을 관리합니다. Reader/Processor/Writer를 조합하고, StepScope로 동적 파라미터를 바인딩하면 유연하고 안정적인 배치 파이프라인을 구축할 수 있습니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux