Spring Batch 대용량 처리 가이드

Spring Batch란? 대용량 데이터 처리의 표준

Spring Batch는 대용량 데이터를 안정적으로 처리하기 위한 배치 프레임워크다. 수백만~수억 건의 데이터를 읽고, 변환하고, 쓰는 작업을 청크(Chunk) 단위 트랜잭션으로 처리하며, 실패 시 재시도·스킵·재시작 메커니즘을 제공한다. Spring Batch 5(Spring Boot 3)부터는 Java 17 필수, @EnableBatchProcessing 자동 구성 변경 등 주요 업데이트가 있었다.

이 글에서는 Job/Step 구조, Chunk 처리 모델, ItemReader·Processor·Writer 구현, 재시도·스킵 전략, 파티셔닝, 그리고 실무 운영 패턴까지 다룬다.

핵심 아키텍처: Job → Step → Chunk

Spring Batch의 실행 단위는 Job > Step > Chunk 3계층이다.

개념 역할 트랜잭션 범위
Job 배치 작업의 최상위 단위
Step Job 내의 독립 실행 단계
Chunk N건씩 읽고→변환→쓰기 Chunk 단위 커밋
Tasklet 단일 작업 (파일 삭제, API 호출 등) Tasklet 단위 커밋
// Spring Batch 5 기본 구성 (Spring Boot 3)
@Configuration
public class BatchConfig {

    @Bean
    public Job userMigrationJob(JobRepository jobRepository,
                                 Step extractStep,
                                 Step transformStep,
                                 Step loadStep) {
        return new JobBuilder("userMigrationJob", jobRepository)
            .incrementer(new RunIdIncrementer())
            .start(extractStep)
            .next(transformStep)
            .next(loadStep)
            .listener(jobCompletionListener())
            .build();
    }

    @Bean
    public Step extractStep(JobRepository jobRepository,
                            PlatformTransactionManager txManager,
                            ItemReader<UserEntity> reader,
                            ItemProcessor<UserEntity, UserDto> processor,
                            ItemWriter<UserDto> writer) {
        return new StepBuilder("extractStep", jobRepository)
            .<UserEntity, UserDto>chunk(500, txManager)  // 500건씩 처리
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .retryLimit(3)
            .retry(TransientDataAccessException.class)
            .skipLimit(100)
            .skip(FlatFileParseException.class)
            .build();
    }
}

Chunk 사이즈 500은 500건을 읽고 변환한 뒤 한 번에 DB에 쓰고 커밋한다는 의미다. 중간에 실패하면 해당 청크만 롤백된다.

ItemReader: 데이터 소스별 구현

Spring Batch는 DB, 파일, API 등 다양한 데이터 소스에 대한 Reader를 기본 제공한다.

// 1. DB 커서 기반 Reader (대용량에 적합)
@Bean
@StepScope
public JdbcCursorItemReader<UserEntity> dbReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<UserEntity>()
        .name("userDbReader")
        .dataSource(dataSource)
        .sql("SELECT id, name, email, created_at FROM users " +
             "WHERE created_at >= :startDate ORDER BY id")
        .preparedStatementSetter(ps ->
            ps.setDate(1, Date.valueOf(startDate)))
        .rowMapper(new BeanPropertyRowMapper<>(UserEntity.class))
        .fetchSize(1000)        // JDBC fetch size (메모리 제어)
        .build();
}

// 2. 페이징 Reader (커서 미지원 DB용)
@Bean
@StepScope
public JdbcPagingItemReader<UserEntity> pagingReader(
        DataSource dataSource,
        @Value("#{jobParameters['startDate']}") String startDate) {
    Map<String, Object> params = Map.of("startDate", startDate);

    return new JdbcPagingItemReaderBuilder<UserEntity>()
        .name("userPagingReader")
        .dataSource(dataSource)
        .selectClause("SELECT id, name, email")
        .fromClause("FROM users")
        .whereClause("WHERE created_at >= :startDate")
        .sortKeys(Map.of("id", Order.ASCENDING))
        .parameterValues(params)
        .pageSize(500)          // chunk size와 동일하게
        .rowMapper(new BeanPropertyRowMapper<>(UserEntity.class))
        .build();
}

// 3. CSV 파일 Reader
@Bean
@StepScope
public FlatFileItemReader<UserCsv> csvReader(
        @Value("#{jobParameters['inputFile']}") Resource file) {
    return new FlatFileItemReaderBuilder<UserCsv>()
        .name("userCsvReader")
        .resource(file)
        .encoding("UTF-8")
        .linesToSkip(1)         // 헤더 스킵
        .delimited()
        .names("name", "email", "phone")
        .targetType(UserCsv.class)
        .build();
}
Reader 용도 메모리 사용
JdbcCursorItemReader 대용량 DB 조회 낮음 (fetchSize 제어)
JdbcPagingItemReader 페이징 기반 조회 페이지 단위
FlatFileItemReader CSV/TSV 파일 라인 단위
JsonItemReader JSON 배열 파일 아이템 단위

ItemProcessor: 변환과 필터링

// 단일 Processor
@Component
public class UserProcessor implements ItemProcessor<UserEntity, UserDto> {

    @Override
    public UserDto process(UserEntity item) {
        // null 반환 시 해당 아이템 필터링 (Writer로 전달되지 않음)
        if (item.getEmail() == null || !item.isActive()) {
            return null;
        }

        return UserDto.builder()
            .id(item.getId())
            .name(item.getName().trim().toUpperCase())
            .email(item.getEmail().toLowerCase())
            .tier(calculateTier(item))
            .migratedAt(Instant.now())
            .build();
    }
}

// 복합 Processor (체이닝)
@Bean
public CompositeItemProcessor<UserEntity, UserDto> compositeProcessor() {
    return new CompositeItemProcessorBuilder<UserEntity, UserDto>()
        .delegates(List.of(
            validationProcessor(),   // 유효성 검사
            enrichmentProcessor(),   // 외부 API 보강
            transformProcessor()     // 변환
        ))
        .build();
}

ItemWriter: 대량 쓰기

// JDBC Batch Writer (최고 성능)
@Bean
public JdbcBatchItemWriter<UserDto> dbWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<UserDto>()
        .dataSource(dataSource)
        .sql("INSERT INTO users_v2 (id, name, email, tier, migrated_at) " +
             "VALUES (:id, :name, :email, :tier, :migratedAt) " +
             "ON CONFLICT (id) DO UPDATE SET " +
             "name = EXCLUDED.name, tier = EXCLUDED.tier")
        .beanMapped()
        .build();
}

// 복합 Writer: 여러 대상에 동시 쓰기
@Bean
public CompositeItemWriter<UserDto> compositeWriter() {
    return new CompositeItemWriterBuilder<UserDto>()
        .delegates(List.of(
            dbWriter(),              // DB 저장
            elasticsearchWriter(),   // 검색 인덱스 동기화
            kafkaWriter()            // 이벤트 발행
        ))
        .build();
}

// CSV 파일 Writer (리포트 생성)
@Bean
@StepScope
public FlatFileItemWriter<UserDto> csvWriter(
        @Value("#{jobParameters['outputFile']}") Resource file) {
    return new FlatFileItemWriterBuilder<UserDto>()
        .name("userCsvWriter")
        .resource(file)
        .delimited()
        .names("id", "name", "email", "tier")
        .headerCallback(writer -> writer.write("ID,Name,Email,Tier"))
        .build();
}

재시도·스킵·재시작: 장애 복원력

실무 배치에서 100% 성공은 불가능하다. 네트워크 순단, 데이터 오류, 외부 API 타임아웃 등을 faultTolerant()로 대응한다.

@Bean
public Step resilientStep(JobRepository jobRepository,
                          PlatformTransactionManager txManager) {
    return new StepBuilder("resilientStep", jobRepository)
        .<Input, Output>chunk(500, txManager)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .faultTolerant()
        // 재시도: 일시적 장애
        .retryLimit(3)
        .retry(TransientDataAccessException.class)
        .retry(WebClientResponseException.ServiceUnavailable.class)
        .backOffPolicy(new ExponentialBackOffPolicy())  // 지수 백오프
        // 스킵: 복구 불가능한 데이터 오류
        .skipLimit(1000)
        .skip(FlatFileParseException.class)
        .skip(ValidationException.class)
        .noSkip(DatabaseCorruptionException.class)  // 절대 스킵 불가
        // 스킵 로깅
        .listener(new SkipListener<Input, Output>() {
            @Override
            public void onSkipInRead(Throwable t) {
                log.warn("Skipped read: {}", t.getMessage());
                metricsService.incrementSkipCount("read");
            }
            @Override
            public void onSkipInProcess(Input item, Throwable t) {
                log.warn("Skipped process: id={}, error={}", 
                    item.getId(), t.getMessage());
                failedItemRepository.save(item, t);
            }
        })
        .build();
}

재시작(Restart)은 Spring Batch의 강력한 기능이다. JobRepository에 실행 상태가 저장되므로, 실패한 Job을 동일 파라미터로 재실행하면 마지막 성공 청크 다음부터 이어서 처리한다.

# 재시작: 동일 파라미터로 다시 실행
java -jar batch.jar --spring.batch.job.name=userMigrationJob 
  startDate=2025-01-01

# ExecutionContext에 진행 상태 저장
# → Reader가 마지막으로 읽은 위치부터 재개

파티셔닝: 대규모 병렬 처리

단일 스레드로 1억 건을 처리하면 수 시간이 걸린다. 파티셔닝으로 데이터를 분할하여 병렬 처리한다.

// Partitioner: 데이터 범위 분할
@Component
public class UserIdPartitioner implements Partitioner {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Long min = jdbcTemplate.queryForObject(
            "SELECT MIN(id) FROM users", Long.class);
        Long max = jdbcTemplate.queryForObject(
            "SELECT MAX(id) FROM users", Long.class);

        long range = (max - min) / gridSize + 1;
        Map<String, ExecutionContext> partitions = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext ctx = new ExecutionContext();
            ctx.putLong("minId", min + (i * range));
            ctx.putLong("maxId", min + ((i + 1) * range) - 1);
            partitions.put("partition" + i, ctx);
        }
        return partitions;
    }
}

// 파티션 Step 구성
@Bean
public Step partitionedStep(JobRepository jobRepository,
                            Step workerStep) {
    return new StepBuilder("partitionedStep", jobRepository)
        .partitioner("workerStep", userIdPartitioner())
        .step(workerStep)
        .gridSize(8)                              // 8개 파티션
        .taskExecutor(batchTaskExecutor())         // 병렬 실행
        .build();
}

@Bean
public TaskExecutor batchTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(8);
    executor.setMaxPoolSize(8);
    executor.setQueueCapacity(0);
    executor.setThreadNamePrefix("batch-partition-");
    return executor;
}

// Worker Step: 파티션 범위로 필터링
@Bean
@StepScope
public JdbcPagingItemReader<UserEntity> partitionReader(
        DataSource dataSource,
        @Value("#{stepExecutionContext['minId']}") Long minId,
        @Value("#{stepExecutionContext['maxId']}") Long maxId) {
    return new JdbcPagingItemReaderBuilder<UserEntity>()
        .name("partitionReader")
        .dataSource(dataSource)
        .selectClause("SELECT *")
        .fromClause("FROM users")
        .whereClause("WHERE id BETWEEN :minId AND :maxId")
        .sortKeys(Map.of("id", Order.ASCENDING))
        .parameterValues(Map.of("minId", minId, "maxId", maxId))
        .pageSize(500)
        .build();
}

조건부 Flow: Step 분기 처리

@Bean
public Job conditionalJob(JobRepository jobRepository) {
    return new JobBuilder("conditionalJob", jobRepository)
        .start(validationStep())
            .on("FAILED").to(errorNotificationStep())  // 실패 시 알림
            .on("COMPLETED").to(processingStep())      // 성공 시 처리
        .from(processingStep())
            .on("*").to(reportStep())                  // 항상 리포트
        .end()
        .build();
}

// Step의 ExitStatus로 분기 제어
@Bean
public Step validationStep(JobRepository jobRepository,
                           PlatformTransactionManager txManager) {
    return new StepBuilder("validationStep", jobRepository)
        .tasklet((contribution, chunkContext) -> {
            boolean isValid = dataValidator.validate();
            if (!isValid) {
                contribution.setExitStatus(ExitStatus.FAILED);
            }
            return RepeatStatus.FINISHED;
        }, txManager)
        .build();
}

JobParameter와 스케줄링

// REST API로 Job 실행
@RestController
@RequestMapping("/api/batch")
public class BatchController {

    private final JobLauncher jobLauncher;
    private final Job userMigrationJob;

    @PostMapping("/migrate")
    public ResponseEntity<String> launchMigration(
            @RequestParam String startDate) throws Exception {
        JobParameters params = new JobParametersBuilder()
            .addString("startDate", startDate)
            .addLong("timestamp", System.currentTimeMillis())  // 유니크 보장
            .toJobParameters();

        JobExecution execution = jobLauncher.run(userMigrationJob, params);
        return ResponseEntity.ok("Job started: " + execution.getId());
    }

    @GetMapping("/status/{executionId}")
    public ResponseEntity<BatchStatus> getStatus(
            @PathVariable Long executionId) {
        JobExecution execution = jobExplorer.getJobExecution(executionId);
        return ResponseEntity.ok(execution.getStatus());
    }
}

// Spring Scheduler 연동
@Component
public class BatchScheduler {

    @Scheduled(cron = "0 0 2 * * *")  // 매일 새벽 2시
    public void runDailyMigration() {
        String yesterday = LocalDate.now().minusDays(1).toString();
        JobParameters params = new JobParametersBuilder()
            .addString("startDate", yesterday)
            .addLong("run.id", System.currentTimeMillis())
            .toJobParameters();
        jobLauncher.run(userMigrationJob, params);
    }
}

운영 체크리스트

  • Chunk 사이즈 튜닝: 너무 작으면 커밋 오버헤드, 너무 크면 메모리 부족과 긴 트랜잭션. 100~1000 사이에서 벤치마크한다
  • JobRepository DB: 메타데이터 테이블(BATCH_JOB_EXECUTION 등)은 운영 DB와 분리하고, 주기적으로 오래된 실행 기록을 정리한다
  • 멱등성: 재시작 시 중복 처리가 발생하지 않도록 Writer에 UPSERT를 사용하거나 처리 완료 플래그를 관리한다
  • 모니터링: Micrometer로 청크 처리 시간, 스킵 건수, Job 실행 시간을 메트릭으로 노출한다
  • 메모리: fetchSizepageSize를 chunk size에 맞춰 설정한다. 커서 Reader의 fetchSize가 chunk보다 크면 불필요한 메모리를 사용한다
  • 장애 대응: Resilience4j와 조합하여 외부 API 호출 시 서킷브레이커를 적용한다

Spring Batch는 대용량 데이터 처리의 복잡성을 청크 기반 트랜잭션, 재시도·스킵, 파티셔닝으로 체계적으로 해결한다. 핵심은 적절한 청크 사이즈 튜닝멱등한 Writer 설계, 그리고 파티셔닝을 통한 병렬화다. 이 세 가지만 잡으면 수억 건의 데이터도 안정적으로 처리할 수 있다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux