Spring Batch Chunk 병렬 처리

Spring Batch란?

Spring Batch는 대용량 데이터를 청크(Chunk) 단위로 처리하는 배치 프레임워크입니다. ETL, 정산, 리포트 생성 등 엔터프라이즈 환경에서 반드시 필요한 기술이며, Job → Step → (Reader → Processor → Writer) 구조로 동작합니다. 이 글에서는 Spring Batch 5 기준으로 Chunk 처리, Partitioning, Skip/Retry 전략까지 실무에서 바로 쓸 수 있는 심화 내용을 다룹니다.

핵심 아키텍처: Job, Step, Chunk

Spring Batch의 실행 단위는 Job입니다. Job은 하나 이상의 Step으로 구성되며, 각 Step은 Tasklet 또는 Chunk 기반으로 동작합니다. Chunk 기반 Step은 ItemReader → ItemProcessor → ItemWriter 흐름을 따릅니다.

@Configuration
public class OrderBatchConfig {

    @Bean
    public Job orderSettlementJob(JobRepository jobRepository,
                                   Step settlementStep) {
        return new JobBuilder("orderSettlementJob", jobRepository)
                .start(settlementStep)
                .build();
    }

    @Bean
    public Step settlementStep(JobRepository jobRepository,
                                PlatformTransactionManager tx,
                                ItemReader<Order> reader,
                                ItemProcessor<Order, Settlement> processor,
                                ItemWriter<Settlement> writer) {
        return new StepBuilder("settlementStep", jobRepository)
                .<Order, Settlement>chunk(500, tx)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}

chunk(500, tx)는 500건씩 읽어서 처리 후 한 번에 커밋한다는 의미입니다. 청크 크기는 메모리와 트랜잭션 부하를 고려해 100~1000 사이에서 조정합니다.

ItemReader 심화: 커서 vs 페이징

대량 데이터를 읽는 방식은 크게 커서(Cursor)페이징(Paging) 두 가지입니다.

방식 특징 적합한 경우
JdbcCursorItemReader DB 커서로 한 건씩 스트리밍, 커넥션 유지 단일 스레드, 대용량 순차 처리
JdbcPagingItemReader LIMIT/OFFSET 쿼리로 페이지 단위 조회 멀티 스레드, 재시작 안정성 필요
JpaPagingItemReader JPA 기반 페이징, 엔티티 매핑 자동 JPA 프로젝트에서 간편하게 사용
@Bean
public JdbcPagingItemReader<Order> orderReader(DataSource dataSource) {
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("order_id", Order.ASCENDING);

    return new JdbcPagingItemReaderBuilder<Order>()
            .name("orderReader")
            .dataSource(dataSource)
            .selectClause("SELECT order_id, user_id, amount, status")
            .fromClause("FROM orders")
            .whereClause("WHERE status = 'COMPLETED' AND settled = false")
            .sortKeys(sortKeys)
            .rowMapper(new OrderRowMapper())
            .pageSize(500)  // chunk size와 동일하게
            .build();
}

팁: pageSizechunkSize를 동일하게 맞추면 불필요한 쿼리 호출을 줄일 수 있습니다. 페이징 리더에서는 반드시 정렬 키를 지정해야 합니다.

ItemProcessor: 비즈니스 로직 분리

Processor는 변환·필터링·검증 로직을 담당합니다. null을 반환하면 해당 아이템을 스킵합니다.

@Component
public class SettlementProcessor
        implements ItemProcessor<Order, Settlement> {

    @Override
    public Settlement process(Order order) {
        // 이미 정산된 주문 필터링
        if (order.isSettled()) {
            return null;  // 스킵
        }

        BigDecimal fee = order.getAmount()
                .multiply(BigDecimal.valueOf(0.033));  // 3.3% 수수료
        BigDecimal net = order.getAmount().subtract(fee);

        return Settlement.builder()
                .orderId(order.getId())
                .grossAmount(order.getAmount())
                .fee(fee)
                .netAmount(net)
                .settledAt(LocalDateTime.now())
                .build();
    }
}

여러 Processor를 체인으로 연결하려면 CompositeItemProcessor를 사용합니다:

@Bean
public CompositeItemProcessor<Order, Settlement> compositeProcessor(
        ValidationProcessor validator,
        SettlementProcessor calculator) {
    return new CompositeItemProcessorBuilder<Order, Settlement>()
            .delegates(List.of(validator, calculator))
            .build();
}

Skip/Retry 전략: 장애 내성 확보

실무에서 배치 처리 중 일부 레코드가 실패하는 것은 흔한 일입니다. 전체 Job을 중단하지 않고 Skip하거나 Retry하는 전략이 필수입니다.

@Bean
public Step resilientStep(JobRepository jobRepository,
                           PlatformTransactionManager tx) {
    return new StepBuilder("resilientStep", jobRepository)
            .<Order, Settlement>chunk(500, tx)
            .reader(orderReader())
            .processor(settlementProcessor())
            .writer(settlementWriter())
            // Skip 전략
            .faultTolerant()
            .skipLimit(100)
            .skip(DataIntegrityViolationException.class)
            .skip(InvalidOrderException.class)
            .noSkip(SystemException.class)
            // Retry 전략
            .retryLimit(3)
            .retry(DeadlockLoserDataAccessException.class)
            .retry(OptimisticLockingFailureException.class)
            // Skip 로깅
            .listener(new SkipListener<Order, Settlement>() {
                @Override
                public void onSkipInProcess(Order item, Throwable t) {
                    log.warn("Skipped order: {} - {}", item.getId(), t.getMessage());
                }
            })
            .build();
}

skipLimit(100)은 최대 100건까지 스킵을 허용합니다. noSkip()으로 절대 스킵하면 안 되는 예외를 명시할 수 있습니다. Retry는 Resilience4j 서킷브레이커와 함께 사용하면 외부 API 호출 시 더욱 효과적입니다.

Partitioning: 멀티 스레드 병렬 처리

단일 스레드로 수천만 건을 처리하면 시간이 오래 걸립니다. Partitioning은 데이터를 논리적으로 분할해 여러 스레드가 동시에 처리하는 방식입니다.

@Bean
public Step partitionedStep(JobRepository jobRepository,
                             Step workerStep) {
    return new StepBuilder("partitionedStep", jobRepository)
            .partitioner("workerStep", rangePartitioner())
            .step(workerStep)
            .gridSize(8)          // 8개 파티션
            .taskExecutor(batchExecutor())
            .build();
}

@Bean
public Partitioner rangePartitioner() {
    return gridSize -> {
        Map<String, ExecutionContext> partitions = new HashMap<>();
        long min = orderRepository.findMinId();
        long max = orderRepository.findMaxId();
        long range = (max - min) / gridSize + 1;

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext ctx = new ExecutionContext();
            ctx.putLong("minId", min + (range * i));
            ctx.putLong("maxId", min + (range * (i + 1)) - 1);
            partitions.put("partition" + i, ctx);
        }
        return partitions;
    };
}

@Bean
public TaskExecutor batchExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(8);
    executor.setMaxPoolSize(8);
    executor.setQueueCapacity(0);
    executor.setThreadNamePrefix("batch-");
    return executor;
}

Worker Step에서는 @StepScope로 파티션별 파라미터를 주입받습니다:

@Bean
@StepScope
public JdbcPagingItemReader<Order> partitionedReader(
        @Value("#{stepExecutionContext['minId']}") Long minId,
        @Value("#{stepExecutionContext['maxId']}") Long maxId,
        DataSource dataSource) {
    return new JdbcPagingItemReaderBuilder<Order>()
            .name("partitionedReader")
            .dataSource(dataSource)
            .selectClause("SELECT *")
            .fromClause("FROM orders")
            .whereClause("WHERE order_id BETWEEN :minId AND :maxId AND status = 'COMPLETED'")
            .sortKeys(Map.of("order_id", Order.ASCENDING))
            .parameterValues(Map.of("minId", minId, "maxId", maxId))
            .pageSize(500)
            .build();
}

JobParameter와 멱등성

배치 Job은 멱등(idempotent)하게 설계해야 합니다. 같은 파라미터로 재실행해도 결과가 동일해야 합니다.

@Bean
public Job dailySettlementJob(JobRepository jobRepository, Step step) {
    return new JobBuilder("dailySettlement", jobRepository)
            .start(step)
            .incrementer(new RunIdIncrementer())  // 재실행 허용
            .validator(new DefaultJobParametersValidator(
                    new String[]{"targetDate"},   // 필수 파라미터
                    new String[]{"dryRun"}        // 선택 파라미터
            ))
            .build();
}

// 실행 시
JobParameters params = new JobParametersBuilder()
        .addString("targetDate", "2026-03-13")
        .addString("dryRun", "false")
        .toJobParameters();
jobLauncher.run(dailySettlementJob, params);

Writer에서 UPSERT(INSERT ON CONFLICT UPDATE)를 사용하면 중복 실행에도 안전합니다. 이 패턴은 Spring @Async 비동기 처리와 함께 비동기 배치 실행에도 응용됩니다.

Spring Batch 5 주요 변경점

Spring Batch 5(Spring Boot 3+)에서는 중요한 변경사항이 있습니다:

  • JobBuilderFactory/StepBuilderFactory 제거JobBuilder, StepBuilderJobRepository 직접 전달
  • @EnableBatchProcessing 선택적 → Spring Boot 자동 구성으로 대부분 불필요
  • Jakarta EE 9+ 전환javax.*jakarta.* 패키지
  • 메타데이터 테이블 → 신규 컬럼 추가, 마이그레이션 스크립트 필요

실무 운영 팁

1. 모니터링과 알림

@Component
public class JobCompletionListener extends JobExecutionListenerSupport {

    @Override
    public void afterJob(JobExecution exec) {
        long readCount = exec.getStepExecutions().stream()
                .mapToLong(StepExecution::getReadCount).sum();
        long skipCount = exec.getStepExecutions().stream()
                .mapToLong(StepExecution::getSkipCount).sum();
        Duration duration = Duration.between(
                exec.getStartTime(), exec.getEndTime());

        log.info("Job {} completed: status={}, read={}, skip={}, duration={}s",
                exec.getJobInstance().getJobName(),
                exec.getStatus(), readCount, skipCount,
                duration.toSeconds());

        if (exec.getStatus() == BatchStatus.FAILED) {
            alertService.send("배치 실패: " + exec.getAllFailureExceptions());
        }
    }
}

2. 청크 크기 튜닝 가이드

데이터 규모 권장 청크 크기 비고
~10만 건 100~500 단일 스레드로 충분
10만~1000만 건 500~1000 Partitioning 고려
1000만 건 이상 1000+ Partitioning 필수, 커서 방식 검토

마무리

Spring Batch는 단순한 반복 처리를 넘어, 장애 내성(Skip/Retry), 병렬 처리(Partitioning), 멱등성(JobParameter)을 모두 갖춘 엔터프라이즈급 배치 프레임워크입니다. 청크 크기 튜닝, Partitioner 설계, Skip/Retry 전략을 실무 요구사항에 맞게 조합하면 수천만 건의 데이터도 안정적으로 처리할 수 있습니다. Spring Boot 3 환경에서는 Batch 5의 변경점을 숙지하고 마이그레이션하는 것이 중요합니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux