Spring WebFlux 리액티브 입문

Spring WebFlux란? 리액티브 웹 프레임워크의 핵심

Spring WebFlux는 Spring 5에서 도입된 논블로킹(Non-Blocking) 리액티브 웹 프레임워크다. 기존 Spring MVC가 서블릿 기반 동기 처리 모델이라면, WebFlux는 Reactive Streams 스펙 위에 구축된 완전한 비동기 처리 모델이다. 내부적으로 Project ReactorMonoFlux를 핵심 타입으로 사용한다.

이 글에서는 WebFlux의 동작 원리, Mono/Flux 활용법, 함수형 엔드포인트, 에러 처리, 그리고 실무 성능 최적화까지 깊이 있게 다룬다.

Mono와 Flux: 리액티브 타입의 이해

Mono<T>는 0~1개의 요소를 비동기로 발행하는 Publisher이고, Flux<T>는 0~N개의 요소를 발행하는 Publisher다. 이 두 타입이 WebFlux의 모든 데이터 흐름을 지배한다.

// Mono: 단일 값 반환
Mono<User> findById(Long id) {
    return userRepository.findById(id);  // R2DBC 리포지토리
}

// Flux: 다건 스트림 반환
Flux<User> findAll() {
    return userRepository.findAll();
}

// 변환 연산자 체이닝
Mono<UserDto> getUserDto(Long id) {
    return userRepository.findById(id)
        .map(user -> new UserDto(user.getName(), user.getEmail()))
        .switchIfEmpty(Mono.error(new UserNotFoundException(id)));
}

핵심 원칙: subscribe()가 호출되기 전까지 아무 일도 일어나지 않는다(lazy evaluation). WebFlux에서는 프레임워크가 자동으로 subscribe하므로 컨트롤러에서 직접 호출할 필요가 없다.

어노테이션 기반 컨트롤러 vs 함수형 엔드포인트

WebFlux는 두 가지 프로그래밍 모델을 제공한다. 첫째는 Spring MVC와 유사한 어노테이션 기반, 둘째는 함수형 라우터(RouterFunction)다.

어노테이션 기반

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserDto>> getUser(@PathVariable Long id) {
        return userService.findById(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @GetMapping
    public Flux<UserDto> getAllUsers() {
        return userService.findAll();
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<UserDto> createUser(@Valid @RequestBody Mono<CreateUserRequest> request) {
        return request.flatMap(userService::create);
    }
}

함수형 라우터

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions.route()
            .path("/api/users", builder -> builder
                .GET("/{id}", handler::getUser)
                .GET("", handler::getAllUsers)
                .POST("", handler::createUser)
            )
            .filter((request, next) -> {
                long start = System.nanoTime();
                return next.handle(request)
                    .doOnSuccess(res -> log.info("{}ms", 
                        (System.nanoTime() - start) / 1_000_000));
            })
            .build();
    }
}

@Component
public class UserHandler {

    public Mono<ServerResponse> getUser(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
}

함수형 모델은 라우팅 로직을 명시적으로 제어할 수 있어 복잡한 조건부 라우팅이나 필터 체이닝에 유리하다. 어노테이션 모델은 Spring MVC 경험자에게 친숙하다. 실무에서는 두 모델을 혼용하는 것도 가능하다.

핵심 연산자: flatMap, zip, concat

리액티브 프로그래밍의 진짜 힘은 연산자 조합에 있다. 가장 자주 쓰이는 패턴을 정리한다.

// flatMap: 비동기 체이닝 (1:1 변환 + 구독)
Mono<OrderResponse> placeOrder(OrderRequest req) {
    return userService.findById(req.getUserId())
        .flatMap(user -> inventoryService.reserve(req.getItems())
            .flatMap(reservation -> paymentService.charge(user, req.getTotal())
                .map(payment -> new OrderResponse(user, reservation, payment))
            )
        );
}

// Mono.zip: 독립적인 비동기 호출 병렬 실행
Mono<DashboardDto> getDashboard(Long userId) {
    Mono<UserProfile> profile = userService.getProfile(userId);
    Mono<List<Order>> orders = orderService.getRecent(userId).collectList();
    Mono<WalletBalance> balance = walletService.getBalance(userId);

    return Mono.zip(profile, orders, balance)
        .map(tuple -> new DashboardDto(
            tuple.getT1(), tuple.getT2(), tuple.getT3()
        ));
}

// Flux.concat vs Flux.merge
// concat: 순서 보장 (직렬)
Flux<Event> orderedEvents = Flux.concat(
    eventStore.getEvents("2024-01"),
    eventStore.getEvents("2024-02")
);

// merge: 순서 무관 (병렬, 더 빠름)
Flux<Notification> allNotifications = Flux.merge(
    emailService.getNotifications(),
    smsService.getNotifications(),
    pushService.getNotifications()
);
연산자 용도 실행 방식
map 동기 변환 즉시
flatMap 비동기 변환 + 구독 비동기
zip 독립 호출 병렬 결합 병렬
concat 순차 스트림 결합 직렬
merge 병렬 스트림 결합 병렬
switchIfEmpty 빈 결과 대체 조건부

에러 처리: onErrorResume과 retry

리액티브 스트림에서 에러 처리는 try-catch가 아닌 연산자 기반이다. 에러가 발생하면 스트림이 종료되므로, 적절한 폴백이나 재시도 로직이 필수다.

// onErrorResume: 에러 시 대체 스트림
Mono<Product> getProductWithFallback(String id) {
    return primaryCatalog.findById(id)
        .onErrorResume(WebClientResponseException.class, ex -> {
            log.warn("Primary catalog failed: {}", ex.getMessage());
            return fallbackCatalog.findById(id);
        });
}

// retry + backoff: 일시적 장애 대응
Mono<ExchangeRate> getRate(String pair) {
    return webClient.get()
        .uri("/rates/{pair}", pair)
        .retrieve()
        .bodyToMono(ExchangeRate.class)
        .retryWhen(Retry.backoff(3, Duration.ofMillis(500))
            .filter(ex -> ex instanceof WebClientRequestException)
            .onRetryExhaustedThrow((spec, signal) ->
                new ServiceUnavailableException("Exchange rate service down"))
        )
        .timeout(Duration.ofSeconds(5))
        .onErrorResume(TimeoutException.class, 
            ex -> Mono.just(ExchangeRate.cached(pair)));
}

// 글로벌 에러 핸들러
@Component
@Order(-2)
public class GlobalErrorHandler extends AbstractErrorWebExceptionHandler {

    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(
            ErrorAttributes attrs) {
        return RouterFunctions.route(
            RequestPredicates.all(), this::renderError);
    }

    private Mono<ServerResponse> renderError(ServerRequest request) {
        Throwable error = getError(request);
        HttpStatus status = determineStatus(error);
        return ServerResponse.status(status)
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(new ErrorResponse(status.value(), error.getMessage()));
    }
}

WebClient: 리액티브 HTTP 클라이언트

WebFlux 환경에서는 RestTemplate 대신 WebClient를 사용해야 한다. WebClient는 논블로킹이며, 커넥션 풀을 효율적으로 관리한다.

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient webClient() {
        HttpClient httpClient = HttpClient.create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
            .responseTimeout(Duration.ofSeconds(5))
            .doOnConnected(conn -> conn
                .addHandlerLast(new ReadTimeoutHandler(5))
                .addHandlerLast(new WriteTimeoutHandler(5)));

        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .baseUrl("https://api.example.com")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, 
                MediaType.APPLICATION_JSON_VALUE)
            .filter(ExchangeFilterFunction.ofRequestProcessor(req -> {
                log.debug("Request: {} {}", req.method(), req.url());
                return Mono.just(req);
            }))
            .build();
    }
}

// 사용 예: 외부 API 호출 + 변환
Mono<PaymentResult> processPayment(PaymentRequest req) {
    return webClient.post()
        .uri("/v1/payments")
        .bodyValue(req)
        .retrieve()
        .onStatus(HttpStatusCode::is4xxClientError, response ->
            response.bodyToMono(String.class)
                .flatMap(body -> Mono.error(
                    new PaymentValidationException(body))))
        .bodyToMono(PaymentResult.class);
}

R2DBC: 리액티브 데이터 액세스

WebFlux의 논블로킹 이점을 DB 레이어까지 확장하려면 R2DBC(Reactive Relational Database Connectivity)를 사용해야 한다. JDBC는 블로킹이므로 WebFlux와 함께 사용하면 이벤트 루프를 차단한다.

// build.gradle
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
    runtimeOnly 'io.r2dbc:r2dbc-postgresql'
}

// 리포지토리 인터페이스
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
    Flux<User> findByEmailContaining(String keyword);

    @Query("SELECT u.* FROM users u JOIN orders o ON u.id = o.user_id " +
           "WHERE o.total > :minTotal GROUP BY u.id")
    Flux<User> findHighValueCustomers(@Param("minTotal") BigDecimal minTotal);
}

// R2DBC 트랜잭션 (@Transactional은 동일하게 동작)
@Service
public class OrderService {

    @Transactional
    public Mono<Order> placeOrder(CreateOrderRequest req) {
        return inventoryRepository.decrementStock(req.getProductId(), req.getQty())
            .then(orderRepository.save(Order.from(req)))
            .flatMap(order -> eventPublisher.publish(
                new OrderPlacedEvent(order.getId()))
                .thenReturn(order));
    }
}

Server-Sent Events(SSE)와 스트리밍

WebFlux의 강점 중 하나는 실시간 데이터 스트리밍이다. SSE를 활용하면 클라이언트에 지속적으로 데이터를 푸시할 수 있다.

@GetMapping(value = "/stream/prices", 
            produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<PriceUpdate>> streamPrices() {
    return priceService.getPriceStream()
        .map(price -> ServerSentEvent.<PriceUpdate>builder()
            .id(String.valueOf(price.getTimestamp()))
            .event("price-update")
            .data(price)
            .retry(Duration.ofSeconds(3))
            .build())
        .doOnCancel(() -> log.info("Client disconnected from price stream"));
}

성능 최적화: Scheduler와 블로킹 격리

실무에서 가장 흔한 실수는 리액티브 스레드에서 블로킹 코드를 실행하는 것이다. Netty의 이벤트 루프 스레드가 차단되면 전체 서버 처리량이 급락한다.

// ❌ 이벤트 루프에서 블로킹 — 절대 금지
Mono<String> bad() {
    return Mono.fromCallable(() -> {
        Thread.sleep(1000);  // 이벤트 루프 차단!
        return blockingHttpCall();
    });
}

// ✅ 블로킹 작업은 boundedElastic 스케줄러로 격리
Mono<String> good() {
    return Mono.fromCallable(() -> blockingHttpCall())
        .subscribeOn(Schedulers.boundedElastic());
}

// Blockhound로 블로킹 호출 탐지 (테스트 환경)
// build.gradle: testImplementation 'io.projectreactor.tools:blockhound:1.0.9.RELEASE'
@BeforeAll
static void setupBlockhound() {
    BlockHound.install();
}
Scheduler 용도 스레드 수
parallel() CPU 집약 연산 CPU 코어 수
boundedElastic() 블로킹 I/O 격리 최대 10 × CPU
single() 순차 실행 보장 1
immediate() 현재 스레드 유지 0 (호출 스레드)

Spring MVC vs WebFlux: 언제 무엇을 선택할까

WebFlux가 항상 더 나은 것은 아니다. 적재적소가 핵심이다.

기준 Spring MVC Spring WebFlux
동시 연결 수 수백~수천 수만~수십만
스레드 모델 Thread-per-request Event loop
학습 곡선 낮음 높음
디버깅 스택 트레이스 명확 비동기 스택 추적 어려움
적합한 시나리오 CRUD, 관리자 페이지 실시간 스트리밍, 게이트웨이, 고동시성 API
DB 드라이버 JDBC/JPA R2DBC

WebFlux를 선택해야 하는 경우: 마이크로서비스 간 대량의 비동기 호출이 발생하거나, SSE/WebSocket 기반 실시간 기능이 핵심이거나, 제한된 리소스로 높은 동시성을 처리해야 할 때다. 단순 CRUD API라면 Spring MVC가 생산성과 유지보수 면에서 더 유리하다.

테스트: WebTestClient 활용

@WebFluxTest(UserController.class)
class UserControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @MockBean
    private UserService userService;

    @Test
    void shouldReturnUser() {
        given(userService.findById(1L))
            .willReturn(Mono.just(new UserDto("Alice", "alice@test.com")));

        webTestClient.get()
            .uri("/api/users/1")
            .exchange()
            .expectStatus().isOk()
            .expectBody(UserDto.class)
            .value(user -> {
                assertThat(user.getName()).isEqualTo("Alice");
                assertThat(user.getEmail()).isEqualTo("alice@test.com");
            });
    }

    @Test
    void shouldStreamPrices() {
        Flux<PriceUpdate> prices = Flux.just(
            new PriceUpdate("BTC", 50000),
            new PriceUpdate("ETH", 3000)
        );
        given(priceService.getPriceStream()).willReturn(prices);

        webTestClient.get()
            .uri("/stream/prices")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .expectStatus().isOk()
            .returnResult(PriceUpdate.class)
            .getResponseBody()
            .as(StepVerifier::create)
            .expectNextCount(2)
            .verifyComplete();
    }
}

실무 체크리스트

WebFlux 프로젝트를 운영 환경에 배포하기 전 반드시 점검해야 할 사항들이다:

  • 블로킹 코드 탐지: BlockHound를 CI 파이프라인에 포함시켜 이벤트 루프 차단을 사전에 감지한다
  • 커넥션 풀 설정: R2DBC ConnectionPool의 maxSize를 DB 커넥션 한도에 맞게 조정한다
  • 타임아웃 설정: WebClient, R2DBC, SSE 모두에 적절한 타임아웃을 건다
  • 메모리 누수 방지: 무한 Flux에는 반드시 .take().timeout()을 걸어 자원 해제를 보장한다
  • 배압(Backpressure): onBackpressureBuffer(), onBackpressureDrop() 등으로 빠른 생산자-느린 소비자 문제를 제어한다
  • 관측성: Micrometer + Prometheus로 이벤트 루프 활용률과 커넥션 풀 상태를 모니터링한다

Spring WebFlux는 높은 동시성과 실시간 처리가 필요한 시스템에서 강력한 선택지다. 하지만 리액티브 패러다임의 학습 곡선과 디버깅 복잡성을 감수할 준비가 되어 있어야 한다. Resilience4j와 결합하면 장애 전파 차단까지 완성할 수 있다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux