Spring RSocket 양방향 통신

RSocket이란?

RSocket은 Netflix가 설계한 바이너리 애플리케이션 프로토콜로, TCP·WebSocket 위에서 동작하며 4가지 인터랙션 모델을 지원합니다. HTTP의 요청-응답 패턴만으로는 한계가 있는 실시간 스트리밍, 양방향 통신, 백프레셔 제어가 필요한 시스템에서 강력한 대안입니다. Spring Framework 5.2부터 공식 지원하며, Spring Boot와의 통합이 매우 자연스럽습니다.

4가지 인터랙션 모델

모델 흐름 용도
Request-Response 1 → 1 일반 API 호출
Fire-and-Forget 1 → 0 로그 전송, 메트릭 수집
Request-Stream 1 → N 실시간 피드, 가격 스트리밍
Channel N ↔ N 양방향 채팅, 게임 통신

프로젝트 설정

// build.gradle.kts (Spring Boot 3.x)
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-rsocket")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.security:spring-security-rsocket")
    implementation("org.springframework.security:spring-security-messaging")
}
# application.yml
spring:
  rsocket:
    server:
      port: 7000
      transport: tcp    # tcp 또는 websocket
      # websocket 사용 시:
      # transport: websocket
      # mapping-path: /rsocket

Request-Response: 기본 패턴

HTTP GET처럼 단일 요청-단일 응답입니다. @MessageMapping으로 라우팅합니다.

// 서버 컨트롤러
@Controller
public class ProductRSocketController {

    private final ProductService productService;

    public ProductRSocketController(ProductService productService) {
        this.productService = productService;
    }

    @MessageMapping("product.find")
    public Mono<ProductDto> findProduct(ProductRequest request) {
        return productService.findById(request.getId());
    }

    @MessageMapping("product.create")
    public Mono<ProductDto> createProduct(CreateProductRequest request) {
        return productService.create(request);
    }
}
// 클라이언트
@Service
public class ProductRSocketClient {

    private final RSocketRequester requester;

    public ProductRSocketClient(RSocketRequester.Builder builder) {
        this.requester = builder
            .tcp("localhost", 7000);
    }

    public Mono<ProductDto> findProduct(Long id) {
        return requester
            .route("product.find")
            .data(new ProductRequest(id))
            .retrieveMono(ProductDto.class);
    }
}

Fire-and-Forget: 비동기 전송

응답을 기다리지 않으므로 로그, 메트릭, 이벤트 전송에 적합합니다.

// 서버
@MessageMapping("analytics.track")
public Mono<Void> trackEvent(AnalyticsEvent event) {
    return analyticsService.save(event);  // 반환 타입 Mono<Void>
}

// 클라이언트
public Mono<Void> trackPageView(String userId, String page) {
    return requester
        .route("analytics.track")
        .data(new AnalyticsEvent(userId, page, Instant.now()))
        .send();  // send() = fire-and-forget
}

Request-Stream: 실시간 스트리밍

하나의 요청에 대해 서버가 연속적으로 데이터를 스트리밍합니다. 주가 피드, 알림 스트림에 적합합니다.

// 서버: 실시간 주가 스트리밍
@MessageMapping("stock.prices")
public Flux<StockPrice> streamPrices(StockSubscription sub) {
    return Flux.interval(Duration.ofSeconds(1))
        .flatMap(tick -> stockService.getPrice(sub.getSymbol()))
        .distinctUntilChanged()
        .doOnCancel(() -> log.info("클라이언트 구독 취소: {}",
            sub.getSymbol()));
}

// 클라이언트: 스트림 구독
public Flux<StockPrice> subscribePrices(String symbol) {
    return requester
        .route("stock.prices")
        .data(new StockSubscription(symbol))
        .retrieveFlux(StockPrice.class);
}

Channel: 양방향 스트리밍

클라이언트와 서버가 동시에 스트림을 주고받습니다. 채팅, 게임, 협업 에디터에 적합합니다.

// 서버: 양방향 채팅 채널
@MessageMapping("chat.{roomId}")
public Flux<ChatMessage> chat(
        @DestinationVariable String roomId,
        Flux<ChatMessage> inbound) {

    // 인바운드 메시지를 Sink에 저장
    return inbound
        .doOnNext(msg -> {
            msg.setRoomId(roomId);
            msg.setTimestamp(Instant.now());
            chatRepository.save(msg).subscribe();
            roomSinks.get(roomId).tryEmitNext(msg);
        })
        .thenMany(roomSinks.get(roomId).asFlux());
}

// 클라이언트: 채널 연결
public Flux<ChatMessage> joinChat(String roomId,
                                    Flux<ChatMessage> outbound) {
    return requester
        .route("chat." + roomId)
        .data(outbound)
        .retrieveFlux(ChatMessage.class);
}

백프레셔 제어

RSocket의 핵심 장점은 프로토콜 레벨 백프레셔입니다. Reactive Streams 스펙을 네트워크 계층에서 구현하여, 소비자가 처리 가능한 만큼만 데이터를 요청합니다.

// 클라이언트에서 백프레셔 제어
requester
    .route("stock.prices")
    .data(new StockSubscription("AAPL"))
    .retrieveFlux(StockPrice.class)
    .limitRate(10)          // 한 번에 10개만 요청
    .delayElements(Duration.ofMillis(100))  // 소비 속도 제한
    .subscribe(price -> process(price));

// 서버에서 Flux 생성 시 백프레셔 반영
@MessageMapping("data.bulk")
public Flux<DataChunk> bulkStream(BulkRequest req) {
    return Flux.fromIterable(dataService.loadAll(req))
        .onBackpressureBuffer(256)     // 버퍼 크기 제한
        .doOnRequest(n -> log.debug("클라이언트가 {}개 요청", n));
}

RSocket Security

Spring Security와 통합하여 인증·인가를 적용합니다.

@Configuration
@EnableRSocketSecurity
public class RSocketSecurityConfig {

    @Bean
    public PayloadSocketAcceptorInterceptor authorization(
            RSocketSecurity security) {
        return security
            .authorizePayload(auth -> auth
                .route("admin.*").hasRole("ADMIN")
                .route("stock.prices").authenticated()
                .route("analytics.track").permitAll()
                .anyRequest().authenticated()
                .anyExchange().permitAll()
            )
            .simpleAuthentication(Customizer.withDefaults())
            .build();
    }

    @Bean
    public MapReactiveUserDetailsService userDetailsService() {
        UserDetails user = User.withDefaultPasswordEncoder()
            .username("client1")
            .password("secret")
            .roles("USER")
            .build();
        return new MapReactiveUserDetailsService(user);
    }
}

// 클라이언트 인증
RSocketRequester requester = builder
    .setupMetadata(
        new UsernamePasswordMetadata("client1", "secret"),
        MimeTypeUtils.parseMimeType(
            WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString()))
    .tcp("localhost", 7000);

Resume: 연결 복구

네트워크 단절 후 자동 재연결과 스트림 복구를 지원합니다.

// 서버: Resume 활성화
@Bean
public RSocketServerCustomizer rSocketResume() {
    return server -> server.resume(
        new Resume()
            .sessionDuration(Duration.ofMinutes(5))
            .retry(Retry.backoff(5, Duration.ofSeconds(1))
                .maxBackoff(Duration.ofSeconds(16)))
    );
}

// 클라이언트: Resume 설정
RSocketRequester requester = builder
    .rsocketConnector(connector -> connector
        .resume(new Resume()
            .sessionDuration(Duration.ofMinutes(5))
            .retry(Retry.backoff(Long.MAX_VALUE,
                Duration.ofSeconds(1))))
        .reconnect(Retry.backoff(Long.MAX_VALUE,
            Duration.ofSeconds(1))))
    .tcp("localhost", 7000);

RSocketRequester 빈 설정

운영 환경에서 재사용 가능한 Requester 빈을 구성합니다.

@Configuration
public class RSocketClientConfig {

    @Bean
    public RSocketRequester rSocketRequester(
            RSocketRequester.Builder builder,
            RSocketStrategies strategies) {
        return builder
            .rsocketStrategies(strategies)
            .rsocketConnector(connector -> connector
                .reconnect(Retry.backoff(10, Duration.ofSeconds(1))
                    .maxBackoff(Duration.ofSeconds(30)))
                .payloadDecoder(PayloadDecoder.ZERO_COPY))
            .dataMimeType(MimeTypeUtils.APPLICATION_JSON)
            .tcp("rsocket-server.internal", 7000);
    }

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
            .encoders(encoders ->
                encoders.add(new Jackson2CborEncoder()))
            .decoders(decoders ->
                decoders.add(new Jackson2CborDecoder()))
            .build();
    }
}

테스트 전략

RSocketRequester테스트 슬라이스에서 활용합니다.

@SpringBootTest
class RSocketIntegrationTest {

    private static RSocketRequester requester;

    @BeforeAll
    static void setup(@Autowired RSocketRequester.Builder builder,
                      @LocalRSocketServerPort int port) {
        requester = builder.tcp("localhost", port);
    }

    @Test
    void requestResponse_findProduct() {
        ProductDto result = requester
            .route("product.find")
            .data(new ProductRequest(1L))
            .retrieveMono(ProductDto.class)
            .block();

        assertThat(result).isNotNull();
        assertThat(result.getId()).isEqualTo(1L);
    }

    @Test
    void requestStream_stockPrices() {
        List<StockPrice> prices = requester
            .route("stock.prices")
            .data(new StockSubscription("AAPL"))
            .retrieveFlux(StockPrice.class)
            .take(5)
            .collectList()
            .block(Duration.ofSeconds(10));

        assertThat(prices).hasSize(5);
    }

    @Test
    void fireAndForget_analytics() {
        requester
            .route("analytics.track")
            .data(new AnalyticsEvent("user1", "/home", Instant.now()))
            .send()
            .block();
        // void 반환 - 에러 없으면 성공
    }

    @Test
    void channel_bidirectionalChat() {
        Flux<ChatMessage> outbound = Flux.just(
            new ChatMessage("user1", "Hello"),
            new ChatMessage("user1", "World")
        ).delayElements(Duration.ofMillis(500));

        List<ChatMessage> received = requester
            .route("chat.room1")
            .data(outbound)
            .retrieveFlux(ChatMessage.class)
            .take(2)
            .collectList()
            .block(Duration.ofSeconds(5));

        assertThat(received).hasSize(2);
    }
}

HTTP vs RSocket 비교

항목 HTTP/REST RSocket
프로토콜 텍스트 기반 바이너리 프레임
인터랙션 요청-응답만 4가지 모델
백프레셔 없음 프로토콜 레벨
멀티플렉싱 HTTP/2 필요 기본 지원
연결 복구 별도 구현 Resume 내장
오버헤드 헤더 무거움 최소 프레임

마치며

Spring RSocket은 마이크로서비스 간 통신, 실시간 스트리밍, IoT 데이터 수집 등 HTTP만으로는 부족한 시나리오에서 강력한 선택입니다. 4가지 인터랙션 모델로 요구사항에 맞는 통신 패턴을 선택하고, 프로토콜 레벨 백프레셔로 안정적인 스트리밍을 구현하며, Resume으로 연결 복구까지 자동화할 수 있습니다. Spring Security 통합과 CBOR 인코딩으로 운영 레벨 보안과 성능도 확보됩니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux