Spring Batch란? 대용량 데이터 처리의 표준
Spring Batch는 대용량 데이터를 안정적으로 처리하기 위한 배치 프레임워크다. 수백만~수억 건의 데이터를 읽고, 변환하고, 쓰는 작업을 청크(Chunk) 단위 트랜잭션으로 처리하며, 실패 시 재시도·스킵·재시작 메커니즘을 제공한다. Spring Batch 5(Spring Boot 3)부터는 Java 17 필수, @EnableBatchProcessing 자동 구성 변경 등 주요 업데이트가 있었다.
이 글에서는 Job/Step 구조, Chunk 처리 모델, ItemReader·Processor·Writer 구현, 재시도·스킵 전략, 파티셔닝, 그리고 실무 운영 패턴까지 다룬다.
핵심 아키텍처: Job → Step → Chunk
Spring Batch의 실행 단위는 Job > Step > Chunk 3계층이다.
| 개념 | 역할 | 트랜잭션 범위 |
|---|---|---|
Job |
배치 작업의 최상위 단위 | — |
Step |
Job 내의 독립 실행 단계 | — |
Chunk |
N건씩 읽고→변환→쓰기 | Chunk 단위 커밋 |
Tasklet |
단일 작업 (파일 삭제, API 호출 등) | Tasklet 단위 커밋 |
// Spring Batch 5 기본 구성 (Spring Boot 3)
@Configuration
public class BatchConfig {
@Bean
public Job userMigrationJob(JobRepository jobRepository,
Step extractStep,
Step transformStep,
Step loadStep) {
return new JobBuilder("userMigrationJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(extractStep)
.next(transformStep)
.next(loadStep)
.listener(jobCompletionListener())
.build();
}
@Bean
public Step extractStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
ItemReader<UserEntity> reader,
ItemProcessor<UserEntity, UserDto> processor,
ItemWriter<UserDto> writer) {
return new StepBuilder("extractStep", jobRepository)
.<UserEntity, UserDto>chunk(500, txManager) // 500건씩 처리
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.retryLimit(3)
.retry(TransientDataAccessException.class)
.skipLimit(100)
.skip(FlatFileParseException.class)
.build();
}
}
Chunk 사이즈 500은 500건을 읽고 변환한 뒤 한 번에 DB에 쓰고 커밋한다는 의미다. 중간에 실패하면 해당 청크만 롤백된다.
ItemReader: 데이터 소스별 구현
Spring Batch는 DB, 파일, API 등 다양한 데이터 소스에 대한 Reader를 기본 제공한다.
// 1. DB 커서 기반 Reader (대용량에 적합)
@Bean
@StepScope
public JdbcCursorItemReader<UserEntity> dbReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<UserEntity>()
.name("userDbReader")
.dataSource(dataSource)
.sql("SELECT id, name, email, created_at FROM users " +
"WHERE created_at >= :startDate ORDER BY id")
.preparedStatementSetter(ps ->
ps.setDate(1, Date.valueOf(startDate)))
.rowMapper(new BeanPropertyRowMapper<>(UserEntity.class))
.fetchSize(1000) // JDBC fetch size (메모리 제어)
.build();
}
// 2. 페이징 Reader (커서 미지원 DB용)
@Bean
@StepScope
public JdbcPagingItemReader<UserEntity> pagingReader(
DataSource dataSource,
@Value("#{jobParameters['startDate']}") String startDate) {
Map<String, Object> params = Map.of("startDate", startDate);
return new JdbcPagingItemReaderBuilder<UserEntity>()
.name("userPagingReader")
.dataSource(dataSource)
.selectClause("SELECT id, name, email")
.fromClause("FROM users")
.whereClause("WHERE created_at >= :startDate")
.sortKeys(Map.of("id", Order.ASCENDING))
.parameterValues(params)
.pageSize(500) // chunk size와 동일하게
.rowMapper(new BeanPropertyRowMapper<>(UserEntity.class))
.build();
}
// 3. CSV 파일 Reader
@Bean
@StepScope
public FlatFileItemReader<UserCsv> csvReader(
@Value("#{jobParameters['inputFile']}") Resource file) {
return new FlatFileItemReaderBuilder<UserCsv>()
.name("userCsvReader")
.resource(file)
.encoding("UTF-8")
.linesToSkip(1) // 헤더 스킵
.delimited()
.names("name", "email", "phone")
.targetType(UserCsv.class)
.build();
}
| Reader | 용도 | 메모리 사용 |
|---|---|---|
JdbcCursorItemReader |
대용량 DB 조회 | 낮음 (fetchSize 제어) |
JdbcPagingItemReader |
페이징 기반 조회 | 페이지 단위 |
FlatFileItemReader |
CSV/TSV 파일 | 라인 단위 |
JsonItemReader |
JSON 배열 파일 | 아이템 단위 |
ItemProcessor: 변환과 필터링
// 단일 Processor
@Component
public class UserProcessor implements ItemProcessor<UserEntity, UserDto> {
@Override
public UserDto process(UserEntity item) {
// null 반환 시 해당 아이템 필터링 (Writer로 전달되지 않음)
if (item.getEmail() == null || !item.isActive()) {
return null;
}
return UserDto.builder()
.id(item.getId())
.name(item.getName().trim().toUpperCase())
.email(item.getEmail().toLowerCase())
.tier(calculateTier(item))
.migratedAt(Instant.now())
.build();
}
}
// 복합 Processor (체이닝)
@Bean
public CompositeItemProcessor<UserEntity, UserDto> compositeProcessor() {
return new CompositeItemProcessorBuilder<UserEntity, UserDto>()
.delegates(List.of(
validationProcessor(), // 유효성 검사
enrichmentProcessor(), // 외부 API 보강
transformProcessor() // 변환
))
.build();
}
ItemWriter: 대량 쓰기
// JDBC Batch Writer (최고 성능)
@Bean
public JdbcBatchItemWriter<UserDto> dbWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<UserDto>()
.dataSource(dataSource)
.sql("INSERT INTO users_v2 (id, name, email, tier, migrated_at) " +
"VALUES (:id, :name, :email, :tier, :migratedAt) " +
"ON CONFLICT (id) DO UPDATE SET " +
"name = EXCLUDED.name, tier = EXCLUDED.tier")
.beanMapped()
.build();
}
// 복합 Writer: 여러 대상에 동시 쓰기
@Bean
public CompositeItemWriter<UserDto> compositeWriter() {
return new CompositeItemWriterBuilder<UserDto>()
.delegates(List.of(
dbWriter(), // DB 저장
elasticsearchWriter(), // 검색 인덱스 동기화
kafkaWriter() // 이벤트 발행
))
.build();
}
// CSV 파일 Writer (리포트 생성)
@Bean
@StepScope
public FlatFileItemWriter<UserDto> csvWriter(
@Value("#{jobParameters['outputFile']}") Resource file) {
return new FlatFileItemWriterBuilder<UserDto>()
.name("userCsvWriter")
.resource(file)
.delimited()
.names("id", "name", "email", "tier")
.headerCallback(writer -> writer.write("ID,Name,Email,Tier"))
.build();
}
재시도·스킵·재시작: 장애 복원력
실무 배치에서 100% 성공은 불가능하다. 네트워크 순단, 데이터 오류, 외부 API 타임아웃 등을 faultTolerant()로 대응한다.
@Bean
public Step resilientStep(JobRepository jobRepository,
PlatformTransactionManager txManager) {
return new StepBuilder("resilientStep", jobRepository)
.<Input, Output>chunk(500, txManager)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
// 재시도: 일시적 장애
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(WebClientResponseException.ServiceUnavailable.class)
.backOffPolicy(new ExponentialBackOffPolicy()) // 지수 백오프
// 스킵: 복구 불가능한 데이터 오류
.skipLimit(1000)
.skip(FlatFileParseException.class)
.skip(ValidationException.class)
.noSkip(DatabaseCorruptionException.class) // 절대 스킵 불가
// 스킵 로깅
.listener(new SkipListener<Input, Output>() {
@Override
public void onSkipInRead(Throwable t) {
log.warn("Skipped read: {}", t.getMessage());
metricsService.incrementSkipCount("read");
}
@Override
public void onSkipInProcess(Input item, Throwable t) {
log.warn("Skipped process: id={}, error={}",
item.getId(), t.getMessage());
failedItemRepository.save(item, t);
}
})
.build();
}
재시작(Restart)은 Spring Batch의 강력한 기능이다. JobRepository에 실행 상태가 저장되므로, 실패한 Job을 동일 파라미터로 재실행하면 마지막 성공 청크 다음부터 이어서 처리한다.
# 재시작: 동일 파라미터로 다시 실행
java -jar batch.jar --spring.batch.job.name=userMigrationJob
startDate=2025-01-01
# ExecutionContext에 진행 상태 저장
# → Reader가 마지막으로 읽은 위치부터 재개
파티셔닝: 대규모 병렬 처리
단일 스레드로 1억 건을 처리하면 수 시간이 걸린다. 파티셔닝으로 데이터를 분할하여 병렬 처리한다.
// Partitioner: 데이터 범위 분할
@Component
public class UserIdPartitioner implements Partitioner {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Long min = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM users", Long.class);
Long max = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM users", Long.class);
long range = (max - min) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext ctx = new ExecutionContext();
ctx.putLong("minId", min + (i * range));
ctx.putLong("maxId", min + ((i + 1) * range) - 1);
partitions.put("partition" + i, ctx);
}
return partitions;
}
}
// 파티션 Step 구성
@Bean
public Step partitionedStep(JobRepository jobRepository,
Step workerStep) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", userIdPartitioner())
.step(workerStep)
.gridSize(8) // 8개 파티션
.taskExecutor(batchTaskExecutor()) // 병렬 실행
.build();
}
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("batch-partition-");
return executor;
}
// Worker Step: 파티션 범위로 필터링
@Bean
@StepScope
public JdbcPagingItemReader<UserEntity> partitionReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcPagingItemReaderBuilder<UserEntity>()
.name("partitionReader")
.dataSource(dataSource)
.selectClause("SELECT *")
.fromClause("FROM users")
.whereClause("WHERE id BETWEEN :minId AND :maxId")
.sortKeys(Map.of("id", Order.ASCENDING))
.parameterValues(Map.of("minId", minId, "maxId", maxId))
.pageSize(500)
.build();
}
조건부 Flow: Step 분기 처리
@Bean
public Job conditionalJob(JobRepository jobRepository) {
return new JobBuilder("conditionalJob", jobRepository)
.start(validationStep())
.on("FAILED").to(errorNotificationStep()) // 실패 시 알림
.on("COMPLETED").to(processingStep()) // 성공 시 처리
.from(processingStep())
.on("*").to(reportStep()) // 항상 리포트
.end()
.build();
}
// Step의 ExitStatus로 분기 제어
@Bean
public Step validationStep(JobRepository jobRepository,
PlatformTransactionManager txManager) {
return new StepBuilder("validationStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
boolean isValid = dataValidator.validate();
if (!isValid) {
contribution.setExitStatus(ExitStatus.FAILED);
}
return RepeatStatus.FINISHED;
}, txManager)
.build();
}
JobParameter와 스케줄링
// REST API로 Job 실행
@RestController
@RequestMapping("/api/batch")
public class BatchController {
private final JobLauncher jobLauncher;
private final Job userMigrationJob;
@PostMapping("/migrate")
public ResponseEntity<String> launchMigration(
@RequestParam String startDate) throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("startDate", startDate)
.addLong("timestamp", System.currentTimeMillis()) // 유니크 보장
.toJobParameters();
JobExecution execution = jobLauncher.run(userMigrationJob, params);
return ResponseEntity.ok("Job started: " + execution.getId());
}
@GetMapping("/status/{executionId}")
public ResponseEntity<BatchStatus> getStatus(
@PathVariable Long executionId) {
JobExecution execution = jobExplorer.getJobExecution(executionId);
return ResponseEntity.ok(execution.getStatus());
}
}
// Spring Scheduler 연동
@Component
public class BatchScheduler {
@Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
public void runDailyMigration() {
String yesterday = LocalDate.now().minusDays(1).toString();
JobParameters params = new JobParametersBuilder()
.addString("startDate", yesterday)
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(userMigrationJob, params);
}
}
운영 체크리스트
- Chunk 사이즈 튜닝: 너무 작으면 커밋 오버헤드, 너무 크면 메모리 부족과 긴 트랜잭션. 100~1000 사이에서 벤치마크한다
- JobRepository DB: 메타데이터 테이블(
BATCH_JOB_EXECUTION등)은 운영 DB와 분리하고, 주기적으로 오래된 실행 기록을 정리한다 - 멱등성: 재시작 시 중복 처리가 발생하지 않도록 Writer에 UPSERT를 사용하거나 처리 완료 플래그를 관리한다
- 모니터링: Micrometer로 청크 처리 시간, 스킵 건수, Job 실행 시간을 메트릭으로 노출한다
- 메모리:
fetchSize와pageSize를 chunk size에 맞춰 설정한다. 커서 Reader의 fetchSize가 chunk보다 크면 불필요한 메모리를 사용한다 - 장애 대응: Resilience4j와 조합하여 외부 API 호출 시 서킷브레이커를 적용한다
Spring Batch는 대용량 데이터 처리의 복잡성을 청크 기반 트랜잭션, 재시도·스킵, 파티셔닝으로 체계적으로 해결한다. 핵심은 적절한 청크 사이즈 튜닝과 멱등한 Writer 설계, 그리고 파티셔닝을 통한 병렬화다. 이 세 가지만 잡으면 수억 건의 데이터도 안정적으로 처리할 수 있다.