Spring Batch Partitioning이란?
Spring Batch의 Partitioning은 대량의 데이터를 논리적으로 분할하고, 각 파티션을 독립된 StepExecution으로 병렬 처리하는 기법이다. Chunk 기반 병렬 처리(멀티스레드 Step)와 달리, 파티셔닝은 데이터 자체를 나눠서 각 Worker가 자기 범위만 처리하므로 Reader의 thread-safety 문제가 없다.
이 글에서는 Partitioner 구현, PartitionHandler 설정, 로컬/원격 파티셔닝, 동적 파티션 크기 조정, 그리고 장애 복구 패턴까지 실전 중심으로 다룬다.
파티셔닝 아키텍처
파티셔닝은 Manager Step과 Worker Step으로 구성된다. Manager는 데이터를 분할하고 각 Worker에게 할당하며, Worker는 독립적으로 자기 파티션을 처리한다.
┌─────────────────────────────────────────┐
│ Manager Step │
│ ┌─────────────────────────────────┐ │
│ │ Partitioner │ │
│ │ (데이터 범위 분할) │ │
│ └──────────┬──────────────────────┘ │
│ │ ExecutionContext × N │
│ ┌──────────┴──────────────────────┐ │
│ │ PartitionHandler │ │
│ │ (Worker에게 파티션 분배) │ │
│ └──┬───────┬───────┬──────────────┘ │
│ │ │ │ │
│ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐ │
│ │ W-1 │ │ W-2 │ │ W-3 │ Worker Steps │
│ │0-999│ │1000-│ │2000-│ │
│ │ │ │1999 │ │2999 │ │
│ └─────┘ └─────┘ └─────┘ │
└─────────────────────────────────────────┘
Partitioner 구현
Partitioner는 전체 데이터를 논리적 범위로 나누고, 각 범위를 ExecutionContext에 담아 반환한다.
1. 범위 기반 Partitioner (ID Range)
@Component
public class OrderIdRangePartitioner implements Partitioner {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// 전체 범위 조회
Long minId = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'PENDING'", Long.class);
Long maxId = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'PENDING'", Long.class);
if (minId == null || maxId == null) {
return Collections.emptyMap();
}
long range = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext ctx = new ExecutionContext();
long start = minId + (i * range);
long end = Math.min(start + range - 1, maxId);
ctx.putLong("minId", start);
ctx.putLong("maxId", end);
ctx.putString("partitionName", "partition-" + i);
partitions.put("partition-" + i, ctx);
}
return partitions;
}
}
2. 날짜 기반 Partitioner
@Component
public class DateRangePartitioner implements Partitioner {
@Value("${batch.start-date}")
private LocalDate startDate;
@Value("${batch.end-date}")
private LocalDate endDate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
long totalDays = ChronoUnit.DAYS.between(startDate, endDate);
long daysPerPartition = Math.max(totalDays / gridSize, 1);
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
LocalDate current = startDate;
int idx = 0;
while (current.isBefore(endDate)) {
ExecutionContext ctx = new ExecutionContext();
LocalDate partEnd = current.plusDays(daysPerPartition);
if (partEnd.isAfter(endDate)) partEnd = endDate;
ctx.putString("startDate", current.toString());
ctx.putString("endDate", partEnd.toString());
partitions.put("date-" + idx++, ctx);
current = partEnd;
}
return partitions;
}
}
3. 파일 기반 Partitioner (멀티 파일 처리)
@Component
public class MultiFilePartitioner implements Partitioner {
@Value("${batch.input-dir}")
private String inputDir;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>();
try {
Resource[] resources = new PathMatchingResourcePatternResolver()
.getResources("file:" + inputDir + "/*.csv");
for (int i = 0; i < resources.length; i++) {
ExecutionContext ctx = new ExecutionContext();
ctx.putString("fileName", resources[i].getFile().getAbsolutePath());
partitions.put("file-" + i, ctx);
}
} catch (IOException e) {
throw new RuntimeException("파일 스캔 실패", e);
}
return partitions;
}
}
PartitionHandler 설정
로컬 TaskExecutor 기반 (단일 JVM)
@Configuration
@EnableBatchProcessing
public class PartitionJobConfig {
@Bean
public Job partitionJob(JobRepository jobRepository,
Step managerStep) {
return new JobBuilder("partitionJob", jobRepository)
.start(managerStep)
.build();
}
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
Step workerStep,
PlatformTransactionManager txManager) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
.step(workerStep)
.gridSize(8) // 파티션 수
.taskExecutor(partitionTaskExecutor())
.build();
}
@Bean
public TaskExecutor partitionTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(0); // 큐 없이 즉시 실행
executor.setThreadNamePrefix("partition-");
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean
@StepScope
public Step workerStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<Order> reader,
ItemProcessor<Order, ProcessedOrder> processor,
ItemWriter<ProcessedOrder> writer) {
return new StepBuilder("workerStep", jobRepository)
.<Order, ProcessedOrder>chunk(500, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
}
Worker Step의 Reader: @StepScope 필수
@Bean
@StepScope
public JdbcPagingItemReader<Order> orderReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
Map<String, Object> params = new HashMap<>();
params.put("minId", minId);
params.put("maxId", maxId);
return new JdbcPagingItemReaderBuilder<Order>()
.name("orderReader")
.dataSource(dataSource)
.selectClause("SELECT id, customer_id, amount, status")
.fromClause("FROM orders")
.whereClause("WHERE id BETWEEN :minId AND :maxId AND status = 'PENDING'")
.sortKeys(Map.of("id", Order.ASCENDING))
.parameterValues(params)
.pageSize(500)
.rowMapper(new BeanPropertyRowMapper<>(Order.class))
.build();
}
핵심 포인트: @StepScope를 사용해야 각 Worker가 자기 파티션의 ExecutionContext에서 minId/maxId를 주입받는다. 이 어노테이션 없으면 모든 Worker가 같은 범위를 읽게 된다.
gridSize 동적 계산
고정 gridSize 대신 데이터량에 따라 동적으로 파티션 수를 결정하면 효율적이다.
@Component
public class DynamicGridSizeCalculator {
@Autowired
private JdbcTemplate jdbcTemplate;
private static final int RECORDS_PER_PARTITION = 10_000;
private static final int MIN_PARTITIONS = 1;
private static final int MAX_PARTITIONS = 32;
public int calculate() {
Long totalRecords = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM orders WHERE status = 'PENDING'",
Long.class);
if (totalRecords == null || totalRecords == 0) return MIN_PARTITIONS;
int calculated = (int) Math.ceil(
(double) totalRecords / RECORDS_PER_PARTITION);
// CPU 코어 수 고려
int availableCores = Runtime.getRuntime().availableProcessors();
return Math.min(
Math.max(Math.min(calculated, availableCores * 2), MIN_PARTITIONS),
MAX_PARTITIONS
);
}
}
// Job 설정에서 사용
@Bean
public Step managerStep(JobRepository jobRepository,
Partitioner partitioner,
Step workerStep,
DynamicGridSizeCalculator gridCalc) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner)
.step(workerStep)
.gridSize(gridCalc.calculate())
.taskExecutor(partitionTaskExecutor())
.build();
}
데이터 Skew 방지
ID 범위로 분할하면 삭제된 레코드 때문에 파티션 간 데이터 불균형이 생길 수 있다. 실제 레코드 수 기반 분할로 해결한다.
@Component
public class BalancedPartitioner implements Partitioner {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// 실제 ID 목록에서 균등 분할 지점 계산
List<Long> boundaryIds = jdbcTemplate.queryForList(
"""
WITH numbered AS (
SELECT id, ROW_NUMBER() OVER (ORDER BY id) AS rn,
COUNT(*) OVER () AS total
FROM orders WHERE status = 'PENDING'
)
SELECT id FROM numbered
WHERE rn % GREATEST(total / ?, 1) = 0
ORDER BY id
""",
Long.class, gridSize);
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
Long prevId = 0L;
for (int i = 0; i < boundaryIds.size(); i++) {
ExecutionContext ctx = new ExecutionContext();
ctx.putLong("minId", prevId + 1);
ctx.putLong("maxId", boundaryIds.get(i));
partitions.put("partition-" + i, ctx);
prevId = boundaryIds.get(i);
}
// 마지막 파티션: 나머지 처리
if (!partitions.isEmpty()) {
ExecutionContext last = new ExecutionContext();
last.putLong("minId", prevId + 1);
last.putLong("maxId", Long.MAX_VALUE);
partitions.put("partition-" + boundaryIds.size(), last);
}
return partitions;
}
}
장애 복구: Restartability
파티셔닝의 가장 큰 장점 중 하나는 실패한 파티션만 재시작할 수 있다는 것이다.
// 실패한 Job 재시작 시 COMPLETED 파티션은 스킵
// FAILED 파티션만 다시 실행됨
@Bean
public Step workerStep(JobRepository jobRepository,
PlatformTransactionManager txManager) {
return new StepBuilder("workerStep", jobRepository)
.<Order, ProcessedOrder>chunk(500, txManager)
.reader(orderReader(null, null, null))
.processor(orderProcessor())
.writer(orderWriter())
.faultTolerant()
.skipLimit(100)
.skip(DataIntegrityViolationException.class)
.retryLimit(3)
.retry(TransientDataAccessException.class)
.listener(new StepExecutionListener() {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("Partition [{}] 완료: read={}, write={}, skip={}",
stepExecution.getStepName(),
stepExecution.getReadCount(),
stepExecution.getWriteCount(),
stepExecution.getSkipCount());
return stepExecution.getExitStatus();
}
})
.build();
}
모니터링: 파티션별 진행률 추적
@Component
public class PartitionProgressListener implements StepExecutionListener {
private final AtomicInteger completedPartitions = new AtomicInteger(0);
private int totalPartitions;
public void setTotalPartitions(int total) {
this.totalPartitions = total;
this.completedPartitions.set(0);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
int completed = completedPartitions.incrementAndGet();
double progress = (double) completed / totalPartitions * 100;
log.info("[{}/{}] {:.1f}% 완료 | {} read={} write={} skip={} | 소요={}ms",
completed, totalPartitions, progress,
stepExecution.getStepName(),
stepExecution.getReadCount(),
stepExecution.getWriteCount(),
stepExecution.getSkipCount(),
Duration.between(
stepExecution.getStartTime(),
stepExecution.getEndTime()).toMillis());
return stepExecution.getExitStatus();
}
}
성능 비교: 파티셔닝 vs 멀티스레드 Step
| 항목 | 멀티스레드 Step | Partitioning |
|---|---|---|
| Reader thread-safety | 필수 (SynchronizedItemReader) | 불필요 (각자 독립 Reader) |
| Restart | 처음부터 다시 | 실패 파티션만 재시작 |
| 모니터링 | 전체 StepExecution 1개 | 파티션별 StepExecution |
| DB 부하 | 단일 쿼리에 동시 접근 | 범위별 분산 쿼리 |
| 확장 | 단일 JVM 내 | 원격 Worker 가능 |
실전 팁: 커넥션 풀 사이징
파티션 수만큼 동시에 DB 커넥션을 사용하므로, HikariCP 풀 크기를 반드시 조정해야 한다.
# application.yml
spring:
datasource:
hikari:
# gridSize + Manager + 여유분
maximum-pool-size: ${PARTITION_POOL_SIZE:12}
minimum-idle: 4
connection-timeout: 30000
# 배치 전용 DataSource 분리 권장
batch:
jdbc:
isolation-level-for-create: ISOLATION_READ_COMMITTED
마무리
Spring Batch Partitioning은 대량 데이터 처리의 핵심 패턴이다. 멀티스레드 Step과 달리 Reader의 thread-safety를 신경 쓸 필요 없고, 실패한 파티션만 재시작할 수 있어 운영 안정성이 크게 향상된다.
실무 적용 시 핵심 체크리스트:
- @StepScope 필수: Worker Step의 Reader/Writer에 반드시 적용
- 데이터 Skew 주의: ID 범위 대신 실제 레코드 수 기반 분할 고려
- gridSize 최적화: CPU 코어 수 × 2를 상한으로, 데이터량에 비례
- 커넥션 풀: gridSize + α로 HikariCP 설정
Spring Boot HikariCP 커넥션 풀 글에서 풀 사이징 전략을 참고하고, Spring JPA Batch Insert 최적화와 결합하면 Writer 성능까지 극대화할 수 있다.