Spring MVC 비동기: SSE·DeferredResult

Spring MVC 비동기 처리란?

Spring MVC는 동기 서블릿 기반이지만, 비동기 요청 처리를 위한 여러 메커니즘을 제공한다. 서블릿 스레드를 빠르게 반환하고, 별도 스레드에서 응답을 생성하면 동시 처리 용량이 극적으로 향상된다. WebFlux 없이도 기존 Spring MVC에서 비동기 패턴을 활용할 수 있다.

이 글에서는 Callable, DeferredResult, StreamingResponseBody, SSE(Server-Sent Events)의 실전 사용법과 차이점을 다룬다.

Callable: 간단한 비동기 위임

Callable<T>를 반환하면 Spring이 내부 TaskExecutor에서 실행하고, 서블릿 스레드는 즉시 반환된다.

@RestController
@RequestMapping("/api/reports")
public class ReportController {

    private final ReportService reportService;

    @GetMapping("/{id}")
    public Callable<ReportDto> getReport(@PathVariable Long id) {
        // 서블릿 스레드 즉시 반환
        // TaskExecutor 스레드에서 실행
        return () -> {
            Thread.sleep(3000); // 오래 걸리는 작업 시뮬레이션
            return reportService.generateReport(id);
        };
    }
}

TaskExecutor 설정으로 비동기 스레드 풀을 제어한다:

@Configuration
@EnableAsync
public class AsyncMvcConfig implements WebMvcConfigurer {

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("mvc-async-");
        executor.initialize();

        configurer.setTaskExecutor(executor);
        configurer.setDefaultTimeout(30000); // 30초 타임아웃
    }
}

DeferredResult: 외부 이벤트 기반 응답

DeferredResult는 Callable과 달리 아무 스레드에서나 결과를 설정할 수 있다. 메시지 큐 콜백, 이벤트 리스너, 다른 서비스의 비동기 응답 등에 적합하다.

@RestController
@RequestMapping("/api/orders")
public class OrderAsyncController {

    private final ConcurrentMap<String, DeferredResult<OrderStatus>> pendingResults
        = new ConcurrentHashMap<>();

    @PostMapping("/{id}/process")
    public DeferredResult<OrderStatus> processOrder(@PathVariable String id) {
        DeferredResult<OrderStatus> result = new DeferredResult<>(30000L);

        // 타임아웃 핸들러
        result.onTimeout(() -> {
            pendingResults.remove(id);
            result.setErrorResult(
                ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
                    .body("주문 처리 시간 초과")
            );
        });

        // 완료 시 정리
        result.onCompletion(() -> pendingResults.remove(id));

        // 대기 목록에 등록
        pendingResults.put(id, result);

        // 비동기 처리 트리거 (메시지 큐 발행 등)
        orderProcessingService.submitAsync(id);

        return result; // 서블릿 스레드 즉시 반환
    }

    // 다른 스레드/서비스에서 결과 설정
    @EventListener
    public void onOrderProcessed(OrderProcessedEvent event) {
        DeferredResult<OrderStatus> result = pendingResults.get(event.getOrderId());
        if (result != null) {
            result.setResult(event.getStatus());
        }
    }
}

Long Polling 패턴: DeferredResult 활용

클라이언트가 서버에 요청을 걸어두고, 새 데이터가 생기면 응답하는 Long Polling 패턴을 DeferredResult로 깔끔하게 구현할 수 있다.

@RestController
@RequestMapping("/api/notifications")
public class NotificationLongPollController {

    private final Map<String, Queue<DeferredResult<List<Notification>>>> waiters
        = new ConcurrentHashMap<>();

    @GetMapping("/poll")
    public DeferredResult<List<Notification>> poll(
            @RequestParam String userId,
            @RequestParam(defaultValue = "30000") long timeout) {

        DeferredResult<List<Notification>> result = new DeferredResult<>(timeout);

        // 타임아웃 시 빈 목록 반환 (클라이언트 재연결 유도)
        result.onTimeout(() -> result.setResult(Collections.emptyList()));

        waiters.computeIfAbsent(userId, k -> new ConcurrentLinkedQueue<>())
               .add(result);

        result.onCompletion(() ->
            waiters.getOrDefault(userId, new ConcurrentLinkedQueue<>()).remove(result)
        );

        return result;
    }

    // 알림 발생 시 대기 중인 클라이언트에게 즉시 응답
    public void pushNotification(String userId, Notification notification) {
        Queue<DeferredResult<List<Notification>>> queue = waiters.get(userId);
        if (queue != null) {
            DeferredResult<List<Notification>> waiter;
            while ((waiter = queue.poll()) != null) {
                waiter.setResult(List.of(notification));
            }
        }
    }
}

StreamingResponseBody: 대용량 스트리밍

대용량 파일 다운로드나 CSV 내보내기 등에서 메모리를 절약하며 스트리밍 응답을 생성한다.

@RestController
@RequestMapping("/api/export")
public class ExportController {

    private final OrderRepository orderRepository;

    @GetMapping("/orders")
    public ResponseEntity<StreamingResponseBody> exportOrders(
            @RequestParam @DateTimeFormat(iso = ISO.DATE) LocalDate from,
            @RequestParam @DateTimeFormat(iso = ISO.DATE) LocalDate to) {

        StreamingResponseBody body = outputStream -> {
            try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
                // CSV 헤더
                writer.write("id,customer,amount,status,daten");
                writer.flush();

                // 스트림으로 처리 — 메모리에 전체 데이터 로드 안 함
                try (Stream<Order> orders = orderRepository.streamByDateRange(from, to)) {
                    orders.forEach(order -> {
                        try {
                            writer.write(String.format("%d,%s,%.2f,%s,%sn",
                                order.getId(),
                                order.getCustomer(),
                                order.getAmount(),
                                order.getStatus(),
                                order.getCreatedAt()
                            ));
                            writer.flush();
                        } catch (IOException e) {
                            throw new UncheckedIOException(e);
                        }
                    });
                }
            }
        };

        return ResponseEntity.ok()
            .header(HttpHeaders.CONTENT_DISPOSITION,
                "attachment; filename=orders-export.csv")
            .contentType(MediaType.parseMediaType("text/csv"))
            .body(body);
    }
}

SSE: Server-Sent Events 실시간 스트리밍

SSE는 HTTP 연결을 유지하며 서버에서 클라이언트로 단방향 이벤트를 푸시하는 표준 프로토콜이다. Spring MVC의 SseEmitter로 구현한다.

@RestController
@RequestMapping("/api/sse")
public class SseController {

    private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter stream(@RequestParam String userId) {
        SseEmitter emitter = new SseEmitter(60_000L); // 60초 타임아웃

        emitter.onCompletion(() -> emitters.remove(emitter));
        emitter.onTimeout(() -> {
            emitter.complete();
            emitters.remove(emitter);
        });
        emitter.onError(ex -> {
            emitter.completeWithError(ex);
            emitters.remove(emitter);
        });

        emitters.add(emitter);

        // 연결 직후 초기 이벤트 전송
        try {
            emitter.send(SseEmitter.event()
                .name("connected")
                .data(Map.of("message", "SSE 연결 성공", "userId", userId))
                .id(UUID.randomUUID().toString())
                .reconnectTime(5000) // 재연결 간격 5초
            );
        } catch (IOException e) {
            emitter.completeWithError(e);
        }

        return emitter;
    }

    // 이벤트 브로드캐스트
    public void broadcast(String eventName, Object data) {
        List<SseEmitter> deadEmitters = new ArrayList<>();

        emitters.forEach(emitter -> {
            try {
                emitter.send(SseEmitter.event()
                    .name(eventName)
                    .data(data, MediaType.APPLICATION_JSON)
                    .id(UUID.randomUUID().toString())
                );
            } catch (IOException e) {
                deadEmitters.add(emitter);
            }
        });

        emitters.removeAll(deadEmitters);
    }
}

SSE + Redis Pub/Sub: 분산 환경 대응

서버가 여러 대일 때 SSE 이벤트를 모든 인스턴스에 전파하려면 Redis Pub/Sub을 결합한다.

@Service
@RequiredArgsConstructor
public class DistributedSseService {

    private final StringRedisTemplate redisTemplate;
    private final SseController sseController;
    private final ObjectMapper objectMapper;

    // 이벤트 발행 → Redis 채널에 전파
    public void publishEvent(String eventName, Object data) {
        try {
            String json = objectMapper.writeValueAsString(
                Map.of("event", eventName, "data", data)
            );
            redisTemplate.convertAndSend("sse:events", json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    // Redis 구독 → 로컬 SSE 브로드캐스트
    @Bean
    public MessageListenerAdapter sseMessageListener() {
        return new MessageListenerAdapter((MessageListener) (message, pattern) -> {
            try {
                Map<String, Object> event = objectMapper.readValue(
                    message.getBody(), new TypeReference<>() {}
                );
                sseController.broadcast(
                    (String) event.get("event"),
                    event.get("data")
                );
            } catch (IOException e) {
                log.error("SSE Redis 메시지 파싱 실패", e);
            }
        });
    }
}

비동기 방식 비교 정리

방식 특징 적합한 상황
Callable TaskExecutor에 위임, 단순 단일 느린 작업 비동기화
DeferredResult 아무 스레드에서 결과 설정 가능 이벤트 기반 응답, Long Polling
StreamingResponseBody OutputStream에 직접 쓰기 대용량 파일 다운로드, CSV 내보내기
SseEmitter 서버→클라이언트 단방향 스트림 실시간 알림, 대시보드, 진행률
WebFlux 완전 비동기 리액티브 전체 스택 비동기가 필요할 때

주의점과 안티패턴

안티패턴 문제점 해결책
타임아웃 미설정 연결 누수, 메모리 고갈 DeferredResult/SseEmitter에 타임아웃 필수
SseEmitter 미정리 끊긴 연결에 계속 전송 시도 onCompletion/onError에서 제거
Callable 무한정 큐잉 OOM 위험 QueueCapacity + RejectedExecutionHandler
SecurityContext 전파 누락 비동기 스레드에서 인증 정보 없음 DelegatingSecurityContextExecutor 사용

마무리

Spring MVC의 비동기 메커니즘은 WebFlux로 전환하지 않고도 서블릿 스레드 고갈 문제를 해결하는 실용적 선택이다. Callable은 단순 위임, DeferredResult는 이벤트 기반, StreamingResponseBody는 대용량 응답, SseEmitter는 실시간 푸시에 각각 최적화되어 있다. Redis Pub/Sub과 결합하면 분산 환경에서도 실시간 이벤트 전파가 가능하고, Micrometer 커스텀 메트릭으로 비동기 요청의 대기 시간과 타임아웃 비율을 모니터링할 수 있다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux