Spring WebFlux란?
Spring WebFlux는 Spring 5에서 도입된 리액티브 웹 프레임워크입니다. 전통적인 Spring MVC가 스레드-per-요청 모델이라면, WebFlux는 논블로킹 이벤트 루프 기반으로 적은 스레드로 대량의 동시 요청을 처리합니다. Netty를 기본 서버로 사용하며, Mono와 Flux라는 리액티브 타입이 핵심입니다.
Mono와 Flux 핵심 이해
Mono<T>는 0~1개, Flux<T>는 0~N개의 요소를 비동기로 방출하는 Publisher입니다. 구독(subscribe)하기 전까지는 아무 일도 일어나지 않는 Cold Stream이 기본입니다.
// Mono — 단일 값
Mono<User> user = userRepository.findById(id);
// Flux — 다중 값
Flux<User> users = userRepository.findAll();
// 변환 체이닝
Mono<UserDto> dto = userRepository.findById(id)
.map(user -> new UserDto(user.getName(), user.getEmail()))
.switchIfEmpty(Mono.error(new NotFoundException("User not found")));
핵심 원칙: 리액티브 체인 안에서 .block()을 호출하면 안 됩니다. 이벤트 루프 스레드가 블로킹되어 전체 성능이 무너집니다.
Annotated Controller vs Functional Endpoint
WebFlux는 두 가지 프로그래밍 모델을 지원합니다.
Annotated Controller (익숙한 방식)
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
@GetMapping("/{id}")
public Mono<ResponseEntity<UserDto>> getUser(@PathVariable String id) {
return userService.findById(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@GetMapping
public Flux<UserDto> getAllUsers(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
return userService.findAll(page, size);
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<UserDto> createUser(@Valid @RequestBody Mono<CreateUserRequest> request) {
// RequestBody를 Mono로 받으면 역직렬화도 논블로킹
return request.flatMap(userService::create);
}
}
Functional Endpoint (라우터 함수 방식)
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions.route()
.path("/api/users", builder -> builder
.GET("/{id}", handler::getUser)
.GET("", handler::getAllUsers)
.POST("", handler::createUser)
)
.filter(this::errorHandlingFilter)
.build();
}
private Mono<ServerResponse> errorHandlingFilter(
ServerRequest request, HandlerFunction<ServerResponse> next) {
return next.handle(request)
.onErrorResume(NotFoundException.class, e ->
ServerResponse.notFound().build())
.onErrorResume(Exception.class, e ->
ServerResponse.status(500)
.bodyValue(Map.of("error", e.getMessage())));
}
}
@Component
public class UserHandler {
private final UserService userService;
public Mono<ServerResponse> getUser(ServerRequest request) {
String id = request.pathVariable("id");
return userService.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
Flux<UserDto> users = userService.findAll();
return ServerResponse.ok().body(users, UserDto.class);
}
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(CreateUserRequest.class)
.flatMap(userService::create)
.flatMap(user -> ServerResponse
.created(URI.create("/api/users/" + user.getId()))
.bodyValue(user));
}
}
Functional Endpoint는 라우팅과 핸들러가 분리되어 있어 테스트와 조합이 유연합니다. 복잡한 라우팅 로직이 필요할 때 적합합니다.
WebFlux + R2DBC: 완전 논블로킹 DB 접근
JDBC는 블로킹 API입니다. WebFlux의 성능을 살리려면 R2DBC(Reactive Relational Database Connectivity)를 사용해야 합니다.
// build.gradle
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
implementation 'io.r2dbc:r2dbc-postgresql'
// Entity
@Table("users")
public class User {
@Id
private Long id;
private String name;
private String email;
private LocalDateTime createdAt;
}
// Repository — ReactiveCrudRepository 상속
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByNameContaining(String name);
@Query("SELECT * FROM users WHERE email = :email")
Mono<User> findByEmail(String email);
@Query("SELECT * FROM users ORDER BY created_at DESC LIMIT :limit OFFSET :offset")
Flux<User> findAllPaged(int limit, long offset);
}
R2DBC 주의사항:
- JPA의 Lazy Loading, Cascade, 양방향 관계가 없습니다. 관계 조회는 직접 구현해야 합니다.
- 트랜잭션은
@Transactional이 동작하지만, 리액티브 체인이 끊기지 않아야 합니다. - 스키마 자동 생성(DDL auto)이 없으므로 Flyway나 Liquibase로 마이그레이션을 관리하세요.
WebClient: 리액티브 HTTP 클라이언트
RestTemplate은 블로킹입니다. WebFlux 환경에서는 반드시 WebClient를 사용하세요.
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.external.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.filter(ExchangeFilterFunctions.basicAuthentication("user", "pass"))
.filter(this::loggingFilter)
.build();
}
private Mono<ClientResponse> loggingFilter(ClientRequest request, ExchangeFunction next) {
log.info("Request: {} {}", request.method(), request.url());
return next.exchange(request)
.doOnNext(response -> log.info("Response: {}", response.statusCode()));
}
}
@Service
public class ExternalApiService {
private final WebClient webClient;
// 단일 조회
public Mono<ExternalData> fetchData(String id) {
return webClient.get()
.uri("/data/{id}", id)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, response ->
response.bodyToMono(String.class)
.flatMap(body -> Mono.error(new ClientException(body))))
.bodyToMono(ExternalData.class)
.timeout(Duration.ofSeconds(5))
.retryWhen(Retry.backoff(3, Duration.ofMillis(500))
.filter(e -> e instanceof WebClientResponseException.ServiceUnavailable));
}
// 병렬 호출
public Mono<AggregatedResult> fetchParallel(String userId) {
Mono<Profile> profile = fetchProfile(userId);
Mono<List<Order>> orders = fetchOrders(userId).collectList();
Mono<Settings> settings = fetchSettings(userId);
return Mono.zip(profile, orders, settings)
.map(tuple -> new AggregatedResult(
tuple.getT1(), tuple.getT2(), tuple.getT3()
));
}
}
Mono.zip은 여러 비동기 호출을 병렬로 실행하고 모든 결과가 도착하면 합칩니다. 순차 호출 대비 응답 시간이 크게 단축됩니다.
에러 처리 패턴
리액티브 스트림에서는 try-catch 대신 연산자 기반 에러 처리를 사용합니다.
// 1. onErrorResume — 대체 값 반환
userRepository.findById(id)
.onErrorResume(DataAccessException.class, e -> {
log.error("DB error", e);
return Mono.empty();
});
// 2. onErrorMap — 에러 변환
userRepository.findById(id)
.onErrorMap(DataAccessException.class, e ->
new ServiceException("사용자 조회 실패", e));
// 3. doOnError — 사이드 이펙트 (로깅 등)
userRepository.findById(id)
.doOnError(e -> log.error("Error: {}", e.getMessage()))
.onErrorResume(e -> Mono.empty());
// 4. Global Error Handler
@ControllerAdvice
public class GlobalErrorHandler {
@ExceptionHandler(NotFoundException.class)
@ResponseStatus(HttpStatus.NOT_FOUND)
public Mono<ErrorResponse> handleNotFound(NotFoundException e) {
return Mono.just(new ErrorResponse(404, e.getMessage()));
}
@ExceptionHandler(WebExchangeBindException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<ErrorResponse> handleValidation(WebExchangeBindException e) {
List<String> errors = e.getFieldErrors().stream()
.map(f -> f.getField() + ": " + f.getDefaultMessage())
.toList();
return Mono.just(new ErrorResponse(400, "Validation failed", errors));
}
}
WebFilter: 리액티브 필터 체인
Spring MVC의 Filter에 해당하는 것이 WebFilter입니다. 인증, 로깅, CORS 등 공통 관심사를 처리합니다.
@Component
@Order(1)
public class RequestLoggingFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
long start = System.currentTimeMillis();
String path = exchange.getRequest().getPath().value();
return chain.filter(exchange)
.doFinally(signal -> {
long duration = System.currentTimeMillis() - start;
HttpStatusCode status = exchange.getResponse().getStatusCode();
log.info("{} {} {} {}ms", exchange.getRequest().getMethod(),
path, status, duration);
});
}
}
@Component
@Order(0)
public class JwtAuthFilter implements WebFilter {
private final JwtUtil jwtUtil;
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String path = exchange.getRequest().getPath().value();
// 공개 경로 스킵
if (path.startsWith("/api/auth/") || path.startsWith("/actuator/")) {
return chain.filter(exchange);
}
String token = extractToken(exchange.getRequest());
if (token == null) {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
return jwtUtil.validate(token)
.flatMap(claims -> {
exchange.getAttributes().put("userId", claims.getSubject());
return chain.filter(exchange);
})
.onErrorResume(e -> {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
});
}
}
SSE(Server-Sent Events)와 스트리밍
WebFlux의 강점 중 하나는 실시간 데이터 스트리밍입니다.
@GetMapping(value = "/stream/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Notification>> streamNotifications(
@RequestAttribute String userId) {
return notificationService.subscribe(userId)
.map(notification -> ServerSentEvent.<Notification>builder()
.id(notification.getId())
.event(notification.getType())
.data(notification)
.retry(Duration.ofSeconds(5))
.build());
}
클라이언트는 EventSource API로 간단하게 구독할 수 있습니다. WebSocket보다 구현이 단순하고, HTTP/2 환경에서 효율적입니다.
MVC vs WebFlux 선택 기준
| 기준 | Spring MVC | Spring WebFlux |
|---|---|---|
| 동시 연결 수 | 수백~수천 | 수만~수십만 |
| DB 드라이버 | JDBC (풍부한 생태계) | R2DBC (제한적) |
| 학습 곡선 | 낮음 | 높음 (리액티브 사고방식) |
| 디버깅 | 스택 트레이스 명확 | 비동기 체인으로 복잡 |
| 적합한 케이스 | CRUD, 관리자 페이지 | 실시간 스트리밍, API 게이트웨이, IoT |
결론: “모든 프로젝트에 WebFlux”는 오버 엔지니어링입니다. 높은 동시성이 필요하거나, 마이크로서비스 간 논블로킹 통신이 핵심인 경우에 선택하세요. 기존 Spring Transaction이나 Spring AOP에 익숙하다면, WebFlux에서도 유사하게 적용할 수 있지만 리액티브 체인 안에서의 동작 방식은 다릅니다.
정리
Spring WebFlux는 논블로킹 I/O로 높은 동시성을 달성하는 강력한 도구입니다. Mono/Flux 연산자 체이닝, R2DBC를 통한 완전 논블로킹 DB 접근, WebClient의 병렬 HTTP 호출, SSE 스트리밍까지 — 리액티브 스택의 핵심 패턴을 익히면 대규모 트래픽 환경에서 확실한 차이를 만들 수 있습니다.