Spring Batch Partitioning 심화

Spring Batch Partitioning이란?

Spring Batch의 Partitioning은 대량의 데이터를 논리적으로 분할하고, 각 파티션을 독립된 StepExecution으로 병렬 처리하는 기법이다. Chunk 기반 병렬 처리(멀티스레드 Step)와 달리, 파티셔닝은 데이터 자체를 나눠서 각 Worker가 자기 범위만 처리하므로 Reader의 thread-safety 문제가 없다.

이 글에서는 Partitioner 구현, PartitionHandler 설정, 로컬/원격 파티셔닝, 동적 파티션 크기 조정, 그리고 장애 복구 패턴까지 실전 중심으로 다룬다.

파티셔닝 아키텍처

파티셔닝은 Manager StepWorker Step으로 구성된다. Manager는 데이터를 분할하고 각 Worker에게 할당하며, Worker는 독립적으로 자기 파티션을 처리한다.

┌─────────────────────────────────────────┐
│           Manager Step                   │
│  ┌─────────────────────────────────┐    │
│  │  Partitioner                     │    │
│  │  (데이터 범위 분할)              │    │
│  └──────────┬──────────────────────┘    │
│             │ ExecutionContext × N       │
│  ┌──────────┴──────────────────────┐    │
│  │  PartitionHandler               │    │
│  │  (Worker에게 파티션 분배)        │    │
│  └──┬───────┬───────┬──────────────┘    │
│     │       │       │                    │
│  ┌──▼──┐ ┌──▼──┐ ┌──▼──┐               │
│  │ W-1 │ │ W-2 │ │ W-3 │  Worker Steps │
│  │0-999│ │1000-│ │2000-│               │
│  │     │ │1999 │ │2999 │               │
│  └─────┘ └─────┘ └─────┘               │
└─────────────────────────────────────────┘

Partitioner 구현

Partitioner는 전체 데이터를 논리적 범위로 나누고, 각 범위를 ExecutionContext에 담아 반환한다.

1. 범위 기반 Partitioner (ID Range)

@Component
public class OrderIdRangePartitioner implements Partitioner {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // 전체 범위 조회
        Long minId = jdbcTemplate.queryForObject(
            "SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
        Long maxId = jdbcTemplate.queryForObject(
            "SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);

        if (minId == null || maxId == null) {
            return Collections.emptyMap();
        }

        long range = (maxId - minId) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext ctx = new ExecutionContext();
            long start = minId + (i * range);
            long end = Math.min(start + range - 1, maxId);

            ctx.putLong("minId", start);
            ctx.putLong("maxId", end);
            ctx.putString("partitionName", "partition-" + i);

            partitions.put("partition-" + i, ctx);
        }

        return partitions;
    }
}

2. 날짜 기반 Partitioner

@Component
public class DateRangePartitioner implements Partitioner {

    @Value("${batch.start-date}")
    private LocalDate startDate;

    @Value("${batch.end-date}")
    private LocalDate endDate;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        long totalDays = ChronoUnit.DAYS.between(startDate, endDate);
        long daysPerPartition = Math.max(totalDays / gridSize, 1);

        Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
        LocalDate current = startDate;
        int idx = 0;

        while (current.isBefore(endDate)) {
            ExecutionContext ctx = new ExecutionContext();
            LocalDate partEnd = current.plusDays(daysPerPartition);
            if (partEnd.isAfter(endDate)) partEnd = endDate;

            ctx.putString("startDate", current.toString());
            ctx.putString("endDate", partEnd.toString());
            partitions.put("date-" + idx++, ctx);

            current = partEnd;
        }

        return partitions;
    }
}

3. 파일 기반 Partitioner (멀티 파일 처리)

@Component
public class MultiFilePartitioner implements Partitioner {

    @Value("${batch.input-dir}")
    private String inputDir;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> partitions = new HashMap<>();

        try {
            Resource[] resources = new PathMatchingResourcePatternResolver()
                .getResources("file:" + inputDir + "/*.csv");

            for (int i = 0; i < resources.length; i++) {
                ExecutionContext ctx = new ExecutionContext();
                ctx.putString("fileName", resources[i].getFile().getAbsolutePath());
                partitions.put("file-" + i, ctx);
            }
        } catch (IOException e) {
            throw new RuntimeException("파일 스캔 실패", e);
        }

        return partitions;
    }
}

PartitionHandler 설정

로컬 TaskExecutor 기반 (단일 JVM)

@Configuration
@EnableBatchProcessing
public class PartitionJobConfig {

    @Bean
    public Job partitionJob(JobRepository jobRepository,
                            Step managerStep) {
        return new JobBuilder("partitionJob", jobRepository)
            .start(managerStep)
            .build();
    }

    @Bean
    public Step managerStep(JobRepository jobRepository,
                            Partitioner partitioner,
                            Step workerStep,
                            PlatformTransactionManager txManager) {
        return new StepBuilder("managerStep", jobRepository)
            .partitioner("workerStep", partitioner)
            .step(workerStep)
            .gridSize(8)  // 파티션 수
            .taskExecutor(partitionTaskExecutor())
            .build();
    }

    @Bean
    public TaskExecutor partitionTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(0);  // 큐 없이 즉시 실행
        executor.setThreadNamePrefix("partition-");
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Bean
    @StepScope
    public Step workerStep(JobRepository jobRepository,
                           PlatformTransactionManager txManager,
                           ItemReader<Order> reader,
                           ItemProcessor<Order, ProcessedOrder> processor,
                           ItemWriter<ProcessedOrder> writer) {
        return new StepBuilder("workerStep", jobRepository)
            .<Order, ProcessedOrder>chunk(500, txManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .retryLimit(3)
            .retry(DeadlockLoserDataAccessException.class)
            .build();
    }
}

Worker Step의 Reader: @StepScope 필수

@Bean
@StepScope
public JdbcPagingItemReader<Order> orderReader(
        DataSource dataSource,
        @Value("#{stepExecutionContext['minId']}") Long minId,
        @Value("#{stepExecutionContext['maxId']}") Long maxId) {

    Map<String, Object> params = new HashMap<>();
    params.put("minId", minId);
    params.put("maxId", maxId);

    return new JdbcPagingItemReaderBuilder<Order>()
        .name("orderReader")
        .dataSource(dataSource)
        .selectClause("SELECT id, customer_id, amount, status")
        .fromClause("FROM orders")
        .whereClause("WHERE id BETWEEN :minId AND :maxId AND status = 'PENDING'")
        .sortKeys(Map.of("id", Order.ASCENDING))
        .parameterValues(params)
        .pageSize(500)
        .rowMapper(new BeanPropertyRowMapper<>(Order.class))
        .build();
}

핵심 포인트: @StepScope를 사용해야 각 Worker가 자기 파티션의 ExecutionContext에서 minId/maxId를 주입받는다. 이 어노테이션 없으면 모든 Worker가 같은 범위를 읽게 된다.

gridSize 동적 계산

고정 gridSize 대신 데이터량에 따라 동적으로 파티션 수를 결정하면 효율적이다.

@Component
public class DynamicGridSizeCalculator {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    private static final int RECORDS_PER_PARTITION = 10_000;
    private static final int MIN_PARTITIONS = 1;
    private static final int MAX_PARTITIONS = 32;

    public int calculate() {
        Long totalRecords = jdbcTemplate.queryForObject(
            "SELECT COUNT(*) FROM orders WHERE status = 'PENDING'",
            Long.class);

        if (totalRecords == null || totalRecords == 0) return MIN_PARTITIONS;

        int calculated = (int) Math.ceil(
            (double) totalRecords / RECORDS_PER_PARTITION);

        // CPU 코어 수 고려
        int availableCores = Runtime.getRuntime().availableProcessors();

        return Math.min(
            Math.max(Math.min(calculated, availableCores * 2), MIN_PARTITIONS),
            MAX_PARTITIONS
        );
    }
}

// Job 설정에서 사용
@Bean
public Step managerStep(JobRepository jobRepository,
                        Partitioner partitioner,
                        Step workerStep,
                        DynamicGridSizeCalculator gridCalc) {
    return new StepBuilder("managerStep", jobRepository)
        .partitioner("workerStep", partitioner)
        .step(workerStep)
        .gridSize(gridCalc.calculate())
        .taskExecutor(partitionTaskExecutor())
        .build();
}

데이터 Skew 방지

ID 범위로 분할하면 삭제된 레코드 때문에 파티션 간 데이터 불균형이 생길 수 있다. 실제 레코드 수 기반 분할로 해결한다.

@Component
public class BalancedPartitioner implements Partitioner {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // 실제 ID 목록에서 균등 분할 지점 계산
        List<Long> boundaryIds = jdbcTemplate.queryForList(
            """
            WITH numbered AS (
                SELECT id, ROW_NUMBER() OVER (ORDER BY id) AS rn,
                       COUNT(*) OVER () AS total
                FROM orders WHERE status = 'PENDING'
            )
            SELECT id FROM numbered
            WHERE rn % GREATEST(total / ?, 1) = 0
            ORDER BY id
            """,
            Long.class, gridSize);

        Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
        Long prevId = 0L;

        for (int i = 0; i < boundaryIds.size(); i++) {
            ExecutionContext ctx = new ExecutionContext();
            ctx.putLong("minId", prevId + 1);
            ctx.putLong("maxId", boundaryIds.get(i));
            partitions.put("partition-" + i, ctx);
            prevId = boundaryIds.get(i);
        }

        // 마지막 파티션: 나머지 처리
        if (!partitions.isEmpty()) {
            ExecutionContext last = new ExecutionContext();
            last.putLong("minId", prevId + 1);
            last.putLong("maxId", Long.MAX_VALUE);
            partitions.put("partition-" + boundaryIds.size(), last);
        }

        return partitions;
    }
}

장애 복구: Restartability

파티셔닝의 가장 큰 장점 중 하나는 실패한 파티션만 재시작할 수 있다는 것이다.

// 실패한 Job 재시작 시 COMPLETED 파티션은 스킵
// FAILED 파티션만 다시 실행됨

@Bean
public Step workerStep(JobRepository jobRepository,
                       PlatformTransactionManager txManager) {
    return new StepBuilder("workerStep", jobRepository)
        .<Order, ProcessedOrder>chunk(500, txManager)
        .reader(orderReader(null, null, null))
        .processor(orderProcessor())
        .writer(orderWriter())
        .faultTolerant()
        .skipLimit(100)
        .skip(DataIntegrityViolationException.class)
        .retryLimit(3)
        .retry(TransientDataAccessException.class)
        .listener(new StepExecutionListener() {
            @Override
            public ExitStatus afterStep(StepExecution stepExecution) {
                log.info("Partition [{}] 완료: read={}, write={}, skip={}",
                    stepExecution.getStepName(),
                    stepExecution.getReadCount(),
                    stepExecution.getWriteCount(),
                    stepExecution.getSkipCount());
                return stepExecution.getExitStatus();
            }
        })
        .build();
}

모니터링: 파티션별 진행률 추적

@Component
public class PartitionProgressListener implements StepExecutionListener {

    private final AtomicInteger completedPartitions = new AtomicInteger(0);
    private int totalPartitions;

    public void setTotalPartitions(int total) {
        this.totalPartitions = total;
        this.completedPartitions.set(0);
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        int completed = completedPartitions.incrementAndGet();
        double progress = (double) completed / totalPartitions * 100;

        log.info("[{}/{}] {:.1f}% 완료 | {} read={} write={} skip={} | 소요={}ms",
            completed, totalPartitions, progress,
            stepExecution.getStepName(),
            stepExecution.getReadCount(),
            stepExecution.getWriteCount(),
            stepExecution.getSkipCount(),
            Duration.between(
                stepExecution.getStartTime(),
                stepExecution.getEndTime()).toMillis());

        return stepExecution.getExitStatus();
    }
}

성능 비교: 파티셔닝 vs 멀티스레드 Step

항목 멀티스레드 Step Partitioning
Reader thread-safety 필수 (SynchronizedItemReader) 불필요 (각자 독립 Reader)
Restart 처음부터 다시 실패 파티션만 재시작
모니터링 전체 StepExecution 1개 파티션별 StepExecution
DB 부하 단일 쿼리에 동시 접근 범위별 분산 쿼리
확장 단일 JVM 내 원격 Worker 가능

실전 팁: 커넥션 풀 사이징

파티션 수만큼 동시에 DB 커넥션을 사용하므로, HikariCP 풀 크기를 반드시 조정해야 한다.

# application.yml
spring:
  datasource:
    hikari:
      # gridSize + Manager + 여유분
      maximum-pool-size: ${PARTITION_POOL_SIZE:12}
      minimum-idle: 4
      connection-timeout: 30000
      # 배치 전용 DataSource 분리 권장
  batch:
    jdbc:
      isolation-level-for-create: ISOLATION_READ_COMMITTED

마무리

Spring Batch Partitioning은 대량 데이터 처리의 핵심 패턴이다. 멀티스레드 Step과 달리 Reader의 thread-safety를 신경 쓸 필요 없고, 실패한 파티션만 재시작할 수 있어 운영 안정성이 크게 향상된다.

실무 적용 시 핵심 체크리스트:

  • @StepScope 필수: Worker Step의 Reader/Writer에 반드시 적용
  • 데이터 Skew 주의: ID 범위 대신 실제 레코드 수 기반 분할 고려
  • gridSize 최적화: CPU 코어 수 × 2를 상한으로, 데이터량에 비례
  • 커넥션 풀: gridSize + α로 HikariCP 설정

Spring Boot HikariCP 커넥션 풀 글에서 풀 사이징 전략을 참고하고, Spring JPA Batch Insert 최적화와 결합하면 Writer 성능까지 극대화할 수 있다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux