Spring WebFlux 리액티브 심화

Spring WebFlux란?

Spring WebFlux는 Spring 5에서 도입된 리액티브 웹 프레임워크입니다. 전통적인 Spring MVC가 스레드-per-요청 모델이라면, WebFlux는 논블로킹 이벤트 루프 기반으로 적은 스레드로 대량의 동시 요청을 처리합니다. Netty를 기본 서버로 사용하며, MonoFlux라는 리액티브 타입이 핵심입니다.

Mono와 Flux 핵심 이해

Mono<T>는 0~1개, Flux<T>는 0~N개의 요소를 비동기로 방출하는 Publisher입니다. 구독(subscribe)하기 전까지는 아무 일도 일어나지 않는 Cold Stream이 기본입니다.

// Mono — 단일 값
Mono<User> user = userRepository.findById(id);

// Flux — 다중 값
Flux<User> users = userRepository.findAll();

// 변환 체이닝
Mono<UserDto> dto = userRepository.findById(id)
    .map(user -> new UserDto(user.getName(), user.getEmail()))
    .switchIfEmpty(Mono.error(new NotFoundException("User not found")));

핵심 원칙: 리액티브 체인 안에서 .block()을 호출하면 안 됩니다. 이벤트 루프 스레드가 블로킹되어 전체 성능이 무너집니다.

Annotated Controller vs Functional Endpoint

WebFlux는 두 가지 프로그래밍 모델을 지원합니다.

Annotated Controller (익숙한 방식)

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserDto>> getUser(@PathVariable String id) {
        return userService.findById(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @GetMapping
    public Flux<UserDto> getAllUsers(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size) {
        return userService.findAll(page, size);
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<UserDto> createUser(@Valid @RequestBody Mono<CreateUserRequest> request) {
        // RequestBody를 Mono로 받으면 역직렬화도 논블로킹
        return request.flatMap(userService::create);
    }
}

Functional Endpoint (라우터 함수 방식)

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions.route()
            .path("/api/users", builder -> builder
                .GET("/{id}", handler::getUser)
                .GET("", handler::getAllUsers)
                .POST("", handler::createUser)
            )
            .filter(this::errorHandlingFilter)
            .build();
    }

    private Mono<ServerResponse> errorHandlingFilter(
            ServerRequest request, HandlerFunction<ServerResponse> next) {
        return next.handle(request)
            .onErrorResume(NotFoundException.class, e ->
                ServerResponse.notFound().build())
            .onErrorResume(Exception.class, e ->
                ServerResponse.status(500)
                    .bodyValue(Map.of("error", e.getMessage())));
    }
}

@Component
public class UserHandler {

    private final UserService userService;

    public Mono<ServerResponse> getUser(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok().bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> getAllUsers(ServerRequest request) {
        Flux<UserDto> users = userService.findAll();
        return ServerResponse.ok().body(users, UserDto.class);
    }

    public Mono<ServerResponse> createUser(ServerRequest request) {
        return request.bodyToMono(CreateUserRequest.class)
            .flatMap(userService::create)
            .flatMap(user -> ServerResponse
                .created(URI.create("/api/users/" + user.getId()))
                .bodyValue(user));
    }
}

Functional Endpoint는 라우팅과 핸들러가 분리되어 있어 테스트와 조합이 유연합니다. 복잡한 라우팅 로직이 필요할 때 적합합니다.

WebFlux + R2DBC: 완전 논블로킹 DB 접근

JDBC는 블로킹 API입니다. WebFlux의 성능을 살리려면 R2DBC(Reactive Relational Database Connectivity)를 사용해야 합니다.

// build.gradle
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
implementation 'io.r2dbc:r2dbc-postgresql'

// Entity
@Table("users")
public class User {
    @Id
    private Long id;
    private String name;
    private String email;
    private LocalDateTime createdAt;
}

// Repository — ReactiveCrudRepository 상속
public interface UserRepository extends ReactiveCrudRepository<User, Long> {

    Flux<User> findByNameContaining(String name);

    @Query("SELECT * FROM users WHERE email = :email")
    Mono<User> findByEmail(String email);

    @Query("SELECT * FROM users ORDER BY created_at DESC LIMIT :limit OFFSET :offset")
    Flux<User> findAllPaged(int limit, long offset);
}

R2DBC 주의사항:

  • JPA의 Lazy Loading, Cascade, 양방향 관계가 없습니다. 관계 조회는 직접 구현해야 합니다.
  • 트랜잭션은 @Transactional이 동작하지만, 리액티브 체인이 끊기지 않아야 합니다.
  • 스키마 자동 생성(DDL auto)이 없으므로 Flyway나 Liquibase로 마이그레이션을 관리하세요.

WebClient: 리액티브 HTTP 클라이언트

RestTemplate은 블로킹입니다. WebFlux 환경에서는 반드시 WebClient를 사용하세요.

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient webClient() {
        return WebClient.builder()
            .baseUrl("https://api.external.com")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .filter(ExchangeFilterFunctions.basicAuthentication("user", "pass"))
            .filter(this::loggingFilter)
            .build();
    }

    private Mono<ClientResponse> loggingFilter(ClientRequest request, ExchangeFunction next) {
        log.info("Request: {} {}", request.method(), request.url());
        return next.exchange(request)
            .doOnNext(response -> log.info("Response: {}", response.statusCode()));
    }
}

@Service
public class ExternalApiService {

    private final WebClient webClient;

    // 단일 조회
    public Mono<ExternalData> fetchData(String id) {
        return webClient.get()
            .uri("/data/{id}", id)
            .retrieve()
            .onStatus(HttpStatusCode::is4xxClientError, response ->
                response.bodyToMono(String.class)
                    .flatMap(body -> Mono.error(new ClientException(body))))
            .bodyToMono(ExternalData.class)
            .timeout(Duration.ofSeconds(5))
            .retryWhen(Retry.backoff(3, Duration.ofMillis(500))
                .filter(e -> e instanceof WebClientResponseException.ServiceUnavailable));
    }

    // 병렬 호출
    public Mono<AggregatedResult> fetchParallel(String userId) {
        Mono<Profile> profile = fetchProfile(userId);
        Mono<List<Order>> orders = fetchOrders(userId).collectList();
        Mono<Settings> settings = fetchSettings(userId);

        return Mono.zip(profile, orders, settings)
            .map(tuple -> new AggregatedResult(
                tuple.getT1(), tuple.getT2(), tuple.getT3()
            ));
    }
}

Mono.zip은 여러 비동기 호출을 병렬로 실행하고 모든 결과가 도착하면 합칩니다. 순차 호출 대비 응답 시간이 크게 단축됩니다.

에러 처리 패턴

리액티브 스트림에서는 try-catch 대신 연산자 기반 에러 처리를 사용합니다.

// 1. onErrorResume — 대체 값 반환
userRepository.findById(id)
    .onErrorResume(DataAccessException.class, e -> {
        log.error("DB error", e);
        return Mono.empty();
    });

// 2. onErrorMap — 에러 변환
userRepository.findById(id)
    .onErrorMap(DataAccessException.class, e ->
        new ServiceException("사용자 조회 실패", e));

// 3. doOnError — 사이드 이펙트 (로깅 등)
userRepository.findById(id)
    .doOnError(e -> log.error("Error: {}", e.getMessage()))
    .onErrorResume(e -> Mono.empty());

// 4. Global Error Handler
@ControllerAdvice
public class GlobalErrorHandler {

    @ExceptionHandler(NotFoundException.class)
    @ResponseStatus(HttpStatus.NOT_FOUND)
    public Mono<ErrorResponse> handleNotFound(NotFoundException e) {
        return Mono.just(new ErrorResponse(404, e.getMessage()));
    }

    @ExceptionHandler(WebExchangeBindException.class)
    @ResponseStatus(HttpStatus.BAD_REQUEST)
    public Mono<ErrorResponse> handleValidation(WebExchangeBindException e) {
        List<String> errors = e.getFieldErrors().stream()
            .map(f -> f.getField() + ": " + f.getDefaultMessage())
            .toList();
        return Mono.just(new ErrorResponse(400, "Validation failed", errors));
    }
}

WebFilter: 리액티브 필터 체인

Spring MVC의 Filter에 해당하는 것이 WebFilter입니다. 인증, 로깅, CORS 등 공통 관심사를 처리합니다.

@Component
@Order(1)
public class RequestLoggingFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        long start = System.currentTimeMillis();
        String path = exchange.getRequest().getPath().value();

        return chain.filter(exchange)
            .doFinally(signal -> {
                long duration = System.currentTimeMillis() - start;
                HttpStatusCode status = exchange.getResponse().getStatusCode();
                log.info("{} {} {} {}ms", exchange.getRequest().getMethod(),
                    path, status, duration);
            });
    }
}

@Component
@Order(0)
public class JwtAuthFilter implements WebFilter {

    private final JwtUtil jwtUtil;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        String path = exchange.getRequest().getPath().value();

        // 공개 경로 스킵
        if (path.startsWith("/api/auth/") || path.startsWith("/actuator/")) {
            return chain.filter(exchange);
        }

        String token = extractToken(exchange.getRequest());
        if (token == null) {
            exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
            return exchange.getResponse().setComplete();
        }

        return jwtUtil.validate(token)
            .flatMap(claims -> {
                exchange.getAttributes().put("userId", claims.getSubject());
                return chain.filter(exchange);
            })
            .onErrorResume(e -> {
                exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
                return exchange.getResponse().setComplete();
            });
    }
}

SSE(Server-Sent Events)와 스트리밍

WebFlux의 강점 중 하나는 실시간 데이터 스트리밍입니다.

@GetMapping(value = "/stream/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Notification>> streamNotifications(
        @RequestAttribute String userId) {
    return notificationService.subscribe(userId)
        .map(notification -> ServerSentEvent.<Notification>builder()
            .id(notification.getId())
            .event(notification.getType())
            .data(notification)
            .retry(Duration.ofSeconds(5))
            .build());
}

클라이언트는 EventSource API로 간단하게 구독할 수 있습니다. WebSocket보다 구현이 단순하고, HTTP/2 환경에서 효율적입니다.

MVC vs WebFlux 선택 기준

기준 Spring MVC Spring WebFlux
동시 연결 수 수백~수천 수만~수십만
DB 드라이버 JDBC (풍부한 생태계) R2DBC (제한적)
학습 곡선 낮음 높음 (리액티브 사고방식)
디버깅 스택 트레이스 명확 비동기 체인으로 복잡
적합한 케이스 CRUD, 관리자 페이지 실시간 스트리밍, API 게이트웨이, IoT

결론: “모든 프로젝트에 WebFlux”는 오버 엔지니어링입니다. 높은 동시성이 필요하거나, 마이크로서비스 간 논블로킹 통신이 핵심인 경우에 선택하세요. 기존 Spring Transaction이나 Spring AOP에 익숙하다면, WebFlux에서도 유사하게 적용할 수 있지만 리액티브 체인 안에서의 동작 방식은 다릅니다.

정리

Spring WebFlux는 논블로킹 I/O로 높은 동시성을 달성하는 강력한 도구입니다. Mono/Flux 연산자 체이닝, R2DBC를 통한 완전 논블로킹 DB 접근, WebClient의 병렬 HTTP 호출, SSE 스트리밍까지 — 리액티브 스택의 핵심 패턴을 익히면 대규모 트래픽 환경에서 확실한 차이를 만들 수 있습니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux