Spring CompletableFuture 병렬 처리

CompletableFuture란?

CompletableFuture는 Java 8에서 도입된 비동기 프로그래밍 API로, 여러 작업을 병렬로 실행하고 결과를 조합하는 강력한 기능을 제공합니다. Spring에서 외부 API 호출, DB 쿼리, 파일 처리 등을 병렬화하면 응답 시간을 획기적으로 줄일 수 있습니다.

Spring @Async 비동기 처리 심화에서 다룬 @Async가 메서드 단위 비동기라면, CompletableFuture는 작업 단위 비동기와 조합에 특화되어 있습니다.

기본 패턴: supplyAsync와 thenApply

@Service
@RequiredArgsConstructor
public class ProductDetailService {

    private final ProductRepository productRepo;
    private final ReviewClient reviewClient;
    private final RecommendClient recommendClient;
    private final Executor asyncExecutor;

    // 순차 실행: 총 900ms
    public ProductDetail getDetailSequential(Long productId) {
        Product product = productRepo.findById(productId).orElseThrow();  // 100ms
        List<Review> reviews = reviewClient.getReviews(productId);        // 300ms
        List<Product> recommended = recommendClient.get(productId);       // 500ms
        return new ProductDetail(product, reviews, recommended);
    }

    // 병렬 실행: 총 ~500ms (가장 느린 작업 기준)
    public ProductDetail getDetailParallel(Long productId) {
        CompletableFuture<Product> productFuture =
            CompletableFuture.supplyAsync(
                () -> productRepo.findById(productId).orElseThrow(),
                asyncExecutor);

        CompletableFuture<List<Review>> reviewsFuture =
            CompletableFuture.supplyAsync(
                () -> reviewClient.getReviews(productId),
                asyncExecutor);

        CompletableFuture<List<Product>> recommendFuture =
            CompletableFuture.supplyAsync(
                () -> recommendClient.get(productId),
                asyncExecutor);

        // 모든 작업 완료 대기 후 조합
        CompletableFuture.allOf(productFuture, reviewsFuture, recommendFuture)
            .join();

        return new ProductDetail(
            productFuture.join(),
            reviewsFuture.join(),
            recommendFuture.join()
        );
    }
}

Executor 설정

반드시 커스텀 Executor를 사용해야 합니다. 기본 ForkJoinPool.commonPool()은 CPU 코어 수 – 1개 스레드로, I/O 작업에서 병목이 됩니다.

@Configuration
public class AsyncConfig {

    @Bean("asyncExecutor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(30);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("async-");
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy());  // 풀 가득 차면 호출 스레드에서 실행
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(30);
        executor.initialize();
        return executor;
    }

    // Virtual Thread 사용 (Java 21+)
    @Bean("virtualExecutor")
    @ConditionalOnProperty(name = "app.virtual-threads", havingValue = "true")
    public Executor virtualExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }
}
Executor 적합한 작업 주의
ThreadPoolTaskExecutor 범용 I/O 작업 풀 사이즈 튜닝 필요
ForkJoinPool CPU 연산 집약적 I/O 블로킹 시 스레드 고갈
Virtual Thread Executor 대량 I/O 병렬 처리 Java 21+, synchronized 주의

조합 패턴: thenCombine, thenCompose, allOf

@Service
public class OrderSummaryService {

    // thenCombine: 두 결과를 합치기
    public CompletableFuture<OrderSummary> getSummary(Long userId) {
        CompletableFuture<UserProfile> profile =
            CompletableFuture.supplyAsync(() -> userClient.getProfile(userId));

        CompletableFuture<List<Order>> orders =
            CompletableFuture.supplyAsync(() -> orderRepo.findByUserId(userId));

        return profile.thenCombine(orders, OrderSummary::new);
    }

    // thenCompose: 순차 의존 (결과를 다음 작업에 전달)
    public CompletableFuture<ShippingEstimate> getEstimate(Long orderId) {
        return CompletableFuture
            .supplyAsync(() -> orderRepo.findById(orderId).orElseThrow())
            .thenCompose(order ->
                CompletableFuture.supplyAsync(
                    () -> shippingClient.estimate(order.getAddress())))
            ;
    }

    // allOf + 부분 실패 허용
    public DashboardData getDashboard(Long userId) {
        CompletableFuture<List<Order>> orders =
            supplyWithFallback(() -> orderClient.recent(userId), List.of());

        CompletableFuture<Integer> points =
            supplyWithFallback(() -> pointClient.balance(userId), 0);

        CompletableFuture<List<Coupon>> coupons =
            supplyWithFallback(() -> couponClient.available(userId), List.of());

        CompletableFuture.allOf(orders, points, coupons).join();

        return new DashboardData(
            orders.join(), points.join(), coupons.join());
    }

    private <T> CompletableFuture<T> supplyWithFallback(
            Supplier<T> supplier, T fallback) {
        return CompletableFuture.supplyAsync(supplier, asyncExecutor)
            .exceptionally(ex -> {
                log.warn("Fallback used: {}", ex.getMessage());
                return fallback;
            });
    }
}
메서드 용도 반환
thenApply 결과 변환 (map) CompletableFuture<U>
thenCompose 결과로 다음 비동기 작업 (flatMap) CompletableFuture<U>
thenCombine 두 Future 결과 합치기 CompletableFuture<V>
allOf 모든 작업 완료 대기 CompletableFuture<Void>
anyOf 가장 먼저 완료된 작업 CompletableFuture<Object>

타임아웃과 에러 처리

// Java 9+ orTimeout / completeOnTimeout
public ProductDetail getDetailWithTimeout(Long productId) {
    CompletableFuture<List<Review>> reviews =
        CompletableFuture.supplyAsync(
            () -> reviewClient.getReviews(productId), asyncExecutor)
        .orTimeout(3, TimeUnit.SECONDS);  // 3초 초과 시 TimeoutException

    CompletableFuture<List<Product>> recommended =
        CompletableFuture.supplyAsync(
            () -> recommendClient.get(productId), asyncExecutor)
        .completeOnTimeout(List.of(), 2, TimeUnit.SECONDS);  // 2초 초과 시 빈 리스트

    // 에러 처리 체인
    CompletableFuture<List<Review>> safeReviews = reviews
        .exceptionally(ex -> {
            if (ex.getCause() instanceof TimeoutException) {
                log.warn("Review API timeout");
            }
            return List.of();
        });

    CompletableFuture.allOf(safeReviews, recommended).join();

    Product product = productRepo.findById(productId).orElseThrow();
    return new ProductDetail(product, safeReviews.join(), recommended.join());
}

@Async와 CompletableFuture 통합

Spring Virtual Thread 실전에서 다룬 Virtual Thread와도 조합할 수 있습니다.

@Service
public class NotificationService {

    @Async("asyncExecutor")
    public CompletableFuture<Boolean> sendEmail(String to, String content) {
        // Spring이 자동으로 비동기 실행 + CompletableFuture 반환
        boolean result = emailClient.send(to, content);
        return CompletableFuture.completedFuture(result);
    }

    @Async("asyncExecutor")
    public CompletableFuture<Boolean> sendPush(String deviceToken, String msg) {
        boolean result = pushClient.send(deviceToken, msg);
        return CompletableFuture.completedFuture(result);
    }
}

// 호출 측
@Service
@RequiredArgsConstructor
public class OrderNotifier {
    private final NotificationService notificationService;

    public void notifyOrderComplete(Order order) {
        CompletableFuture<Boolean> email =
            notificationService.sendEmail(order.getEmail(), "주문 완료");
        CompletableFuture<Boolean> push =
            notificationService.sendPush(order.getDeviceToken(), "주문 완료");

        // 둘 다 완료 대기 (비동기 병렬)
        CompletableFuture.allOf(email, push)
            .orTimeout(5, TimeUnit.SECONDS)
            .exceptionally(ex -> null)
            .join();
    }
}

주의사항

주의 문제 해결
join() 호출 위치 너무 일찍 join하면 순차 실행됨 allOf 후 마지막에 join
트랜잭션 컨텍스트 다른 스레드에선 @Transactional 무효 읽기 작업만 병렬화
SecurityContext 다른 스레드에서 인증 정보 유실 DelegatingSecurityContextExecutor
예외 무시 exceptionally 없으면 예외 삼킴 반드시 에러 핸들링 체인 추가

정리

CompletableFuture는 독립적인 I/O 작업을 병렬화하여 응답 시간을 줄이는 핵심 도구입니다. 커스텀 Executor로 스레드 풀을 관리하고, exceptionally/completeOnTimeout으로 부분 실패를 허용하며, allOf로 결과를 조합하는 것이 실전 패턴입니다. 트랜잭션과 SecurityContext는 스레드 경계를 넘지 못하므로, 읽기 작업 위주로 병렬화하세요.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux