K8s Indexed Job 병렬 배치

K8s Indexed Job이란?

Kubernetes 1.24에서 GA된 Indexed Job은 각 Pod에 고유한 인덱스(0, 1, 2…)를 부여하는 병렬 배치 처리 패턴입니다. 기존 Job의 단순 병렬 실행과 달리, 각 Pod가 자신의 인덱스를 환경변수 JOB_COMPLETION_INDEX로 인식하여 데이터를 분할 처리할 수 있습니다. 대규모 데이터 파이프라인, 분산 렌더링, 배치 마이그레이션에서 핵심적인 패턴입니다.

기본 Indexed Job 생성

completionMode: Indexed를 설정하면 각 Pod에 고유 인덱스가 할당됩니다:

# indexed-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: data-processor
spec:
  completionMode: Indexed    # 핵심: Indexed 모드 활성화
  completions: 10             # 총 10개 인덱스 (0~9)
  parallelism: 3              # 동시 실행 Pod 수
  template:
    spec:
      restartPolicy: Never
      containers:
      - name: worker
        image: python:3.12-slim
        command:
        - python3
        - -c
        - |
          import os
          index = int(os.environ['JOB_COMPLETION_INDEX'])
          total = 10
          
          # 인덱스 기반 데이터 분할
          # 예: 100만 건을 10개로 나눠 처리
          chunk_size = 1_000_000 // total
          start = index * chunk_size
          end = start + chunk_size
          
          print(f"Worker {index}: processing rows {start}-{end}")
          # 실제 데이터 처리 로직
        env:
        - name: TOTAL_COMPLETIONS
          value: "10"
# 실행 및 확인
kubectl apply -f indexed-job.yaml

# 각 Pod의 인덱스 확인
kubectl get pods -l job-name=data-processor 
  -o custom-columns='NAME:.metadata.name,INDEX:.metadata.annotations.batch.kubernetes.io/job-completion-index,STATUS:.status.phase'

# 출력 예:
# NAME                       INDEX   STATUS
# data-processor-0-abc12     0       Succeeded
# data-processor-1-def34     1       Running
# data-processor-2-ghi56     2       Running

NonIndexed vs Indexed: 동작 차이

항목 NonIndexed (기본) Indexed
Pod 식별 구분 없음 JOB_COMPLETION_INDEX 환경변수
완료 기준 성공 Pod 수 ≥ completions 각 인덱스별 1회 성공 필수
재시도 아무 Pod나 재실행 실패한 인덱스만 재실행
데이터 분할 외부 큐 필요 인덱스로 자체 분할 가능
적합한 용도 독립적 동일 작업 데이터 샤딩, 분산 처리

실전 패턴 1: S3 파일 분산 처리

S3 버킷의 파일 목록을 인덱스별로 분할 처리하는 패턴입니다:

apiVersion: batch/v1
kind: Job
metadata:
  name: s3-file-processor
spec:
  completionMode: Indexed
  completions: 20
  parallelism: 5
  backoffLimit: 3              # 인덱스별 최대 재시도 3회
  template:
    spec:
      restartPolicy: Never
      serviceAccountName: s3-reader
      containers:
      - name: processor
        image: my-app:latest
        command: ["/app/process-files"]
        env:
        - name: WORKER_INDEX
          valueFrom:
            fieldRef:
              fieldPath: metadata.annotations['batch.kubernetes.io/job-completion-index']
        - name: TOTAL_WORKERS
          value: "20"
        - name: S3_BUCKET
          value: "data-lake-raw"
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"
          limits:
            cpu: "1"
            memory: "1Gi"
# process-files 스크립트 예시 (Go/Python)
import boto3, os

index = int(os.environ['WORKER_INDEX'])
total = int(os.environ['TOTAL_WORKERS'])
bucket = os.environ['S3_BUCKET']

s3 = boto3.client('s3')
paginator = s3.get_paginator('list_objects_v2')

all_keys = []
for page in paginator.paginate(Bucket=bucket, Prefix='2026/03/'):
    all_keys.extend([obj['Key'] for obj in page.get('Contents', [])])

# 해시 기반 분할: 각 워커가 자기 몫만 처리
my_keys = [k for k in all_keys if hash(k) % total == index]

print(f"Worker {index}: processing {len(my_keys)} files out of {len(all_keys)}")
for key in my_keys:
    process_file(bucket, key)

실전 패턴 2: DB 배치 마이그레이션

대용량 테이블의 데이터를 ID 범위로 분할하여 병렬 마이그레이션합니다:

apiVersion: batch/v1
kind: Job
metadata:
  name: db-migration
spec:
  completionMode: Indexed
  completions: 50              # 50개 청크로 분할
  parallelism: 10              # 동시 10개 커넥션
  backoffLimitPerIndex: 2      # K8s 1.29+: 인덱스별 재시도 제한
  template:
    spec:
      restartPolicy: Never
      containers:
      - name: migrator
        image: migration-tool:latest
        env:
        - name: CHUNK_INDEX
          valueFrom:
            fieldRef:
              fieldPath: metadata.annotations['batch.kubernetes.io/job-completion-index']
        - name: TOTAL_CHUNKS
          value: "50"
        - name: TABLE_NAME
          value: "user_events"
        - name: TOTAL_ROWS
          value: "50000000"       # 5천만 건
        - name: DB_URL
          valueFrom:
            secretKeyRef:
              name: db-credentials
              key: url
        command:
        - /bin/sh
        - -c
        - |
          CHUNK_SIZE=$((TOTAL_ROWS / TOTAL_CHUNKS))
          START_ID=$((CHUNK_INDEX * CHUNK_SIZE + 1))
          END_ID=$(((CHUNK_INDEX + 1) * CHUNK_SIZE))
          
          echo "Migrating $TABLE_NAME rows $START_ID to $END_ID"
          
          psql "$DB_URL" -c "
            INSERT INTO user_events_new
            SELECT id, user_id, event_type, payload, 
                   created_at, now() as migrated_at
            FROM $TABLE_NAME
            WHERE id BETWEEN $START_ID AND $END_ID
            ON CONFLICT (id) DO NOTHING;
          "
          
          echo "Chunk $CHUNK_INDEX completed"

backoffLimitPerIndex: 인덱스별 재시도 제어

Kubernetes 1.29에서 추가된 backoffLimitPerIndex인덱스별 독립적인 재시도 횟수를 설정합니다. 기존 backoffLimit는 전체 Job 합산이어서 하나의 인덱스가 반복 실패하면 전체 Job이 중단되는 문제가 있었습니다:

apiVersion: batch/v1
kind: Job
metadata:
  name: resilient-processor
spec:
  completionMode: Indexed
  completions: 100
  parallelism: 10
  backoffLimitPerIndex: 3      # 각 인덱스별 최대 3회 재시도
  maxFailedIndexes: 5          # 최대 5개 인덱스 실패 허용
  template:
    spec:
      restartPolicy: Never
      containers:
      - name: worker
        image: processor:latest

# 동작 방식:
# - 인덱스 7이 3번 실패 → 인덱스 7만 FailedIndex로 표시
# - 나머지 인덱스는 계속 실행
# - 5개 인덱스가 실패하면 전체 Job 중단
# - 기존 backoffLimit: 인덱스 7이 3번 실패하면 전체 Job 중단 위험
# 실패한 인덱스 확인
kubectl get job resilient-processor -o jsonpath='{.status.failedIndexes}'
# 출력: "3,17,42"  → 인덱스 3, 17, 42가 실패

# 성공/실패 인덱스 상세 확인
kubectl get job resilient-processor -o jsonpath='{.status.completedIndexes}'
# 출력: "0-2,4-16,18-41,43-99"

Pod Failure Policy: 세밀한 실패 제어

podFailurePolicy와 Indexed Job을 결합하면 특정 에러 코드에 따라 재시도 여부를 제어할 수 있습니다:

apiVersion: batch/v1
kind: Job
metadata:
  name: smart-processor
spec:
  completionMode: Indexed
  completions: 50
  parallelism: 5
  backoffLimitPerIndex: 3
  podFailurePolicy:
    rules:
    # OOM 에러 → 해당 인덱스 즉시 실패 처리 (재시도 무의미)
    - action: FailIndex
      onExitCodes:
        containerName: worker
        operator: In
        values: [137]            # OOMKilled

    # 일시적 에러 → 재시도
    - action: Ignore
      onPodConditions:
      - type: DisruptionTarget   # 노드 드레인 등 외부 요인

    # 설정 에러 → 전체 Job 중단
    - action: FailJob
      onExitCodes:
        containerName: worker
        operator: In
        values: [2]              # 설정 파일 누락 등
  template:
    spec:
      restartPolicy: Never
      containers:
      - name: worker
        image: processor:latest
        command:
        - /bin/sh
        - -c
        - |
          # Exit 코드로 에러 유형 구분
          if [ ! -f /config/settings.yaml ]; then
            echo "Config missing" && exit 2    # → FailJob
          fi
          
          # 메모리 부족하면 137(OOMKilled) → FailIndex
          # 네트워크 에러면 1 → 재시도
          /app/process --index=$JOB_COMPLETION_INDEX

CronJob + Indexed Job 조합

정기 배치를 Indexed Job으로 실행하는 패턴입니다:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: daily-etl
spec:
  schedule: "0 2 * * *"         # 매일 새벽 2시
  concurrencyPolicy: Forbid     # 이전 Job 완료 전 새 Job 금지
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 5
  jobTemplate:
    spec:
      completionMode: Indexed
      completions: 24            # 시간대별 24개 청크
      parallelism: 6
      backoffLimitPerIndex: 2
      ttlSecondsAfterFinished: 86400  # 완료 후 24시간 뒤 정리
      template:
        spec:
          restartPolicy: Never
          containers:
          - name: etl
            image: etl-pipeline:latest
            env:
            - name: HOUR_SLOT
              valueFrom:
                fieldRef:
                  fieldPath: metadata.annotations['batch.kubernetes.io/job-completion-index']
            command:
            - /app/etl
            - --hour=$(HOUR_SLOT)
            - --date=yesterday
            resources:
              requests:
                cpu: "1"
                memory: "2Gi"

모니터링과 디버깅

# Job 진행 상황 실시간 확인
kubectl get job data-processor -w

# 완료된 인덱스 목록
kubectl get job data-processor 
  -o jsonpath='{.status.completedIndexes}' | tr ',' 'n'

# 특정 인덱스의 Pod 로그 확인
kubectl logs -l job-name=data-processor 
  --selector batch.kubernetes.io/job-completion-index=7

# 실패 인덱스 재실행 (Job 삭제 후 재생성)
kubectl delete job data-processor
kubectl apply -f indexed-job.yaml

# Prometheus 메트릭으로 Job 모니터링
# kube_job_status_active: 실행 중 Pod 수
# kube_job_status_succeeded: 완료된 Pod 수
# kube_job_complete: Job 완료 여부
# kube_job_failed: Job 실패 여부

# Grafana 대시보드 쿼리 예시
# 완료율: kube_job_status_succeeded / kube_job_spec_completions * 100
# 예상 완료 시간: (kube_job_spec_completions - kube_job_status_succeeded) 
#                 / rate(kube_job_status_succeeded[5m])

Elastic Indexed Job (K8s 1.27+)

실행 중에 completions 수를 동적으로 변경할 수 있는 Elastic Indexed Job입니다:

# Elastic Indexed Job: completions 필드 생략
apiVersion: batch/v1
kind: Job
metadata:
  name: elastic-processor
  annotations:
    batch.kubernetes.io/job-completion-index: "true"
spec:
  completionMode: Indexed
  parallelism: 5
  # completions 생략 → Elastic 모드
  # parallelism 값이 동적으로 completions 역할
  template:
    spec:
      restartPolicy: Never
      containers:
      - name: worker
        image: processor:latest

# 실행 중 스케일 조정
kubectl patch job elastic-processor 
  --type=merge -p '{"spec":{"parallelism":10}}'

# 또는 kubectl scale
kubectl scale job elastic-processor --replicas=15

핵심 정리

기능 최소 버전 용도
Indexed Job 1.24 GA 인덱스 기반 데이터 분할 처리
backoffLimitPerIndex 1.29 GA 인덱스별 독립 재시도
maxFailedIndexes 1.29 GA 부분 실패 허용 임계치
podFailurePolicy 1.26 GA 에러 코드별 재시도 제어
Elastic Indexed Job 1.27 Beta 동적 병렬도 조정

Indexed Job은 외부 메시지 큐 없이 K8s 네이티브로 데이터 분할 병렬 처리를 구현하는 핵심 패턴입니다. backoffLimitPerIndexpodFailurePolicy를 결합하면 대규모 배치에서도 하나의 청크 실패가 전체를 중단시키지 않는 견고한 파이프라인을 구축할 수 있습니다. K8s Job·CronJob 배치 처리Spring Batch Chunk 병렬 처리도 함께 참고하세요.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux