RSocket이란?
RSocket은 Netflix가 설계한 바이너리 애플리케이션 프로토콜로, TCP·WebSocket 위에서 동작하며 4가지 인터랙션 모델을 지원합니다. HTTP의 요청-응답 패턴만으로는 한계가 있는 실시간 스트리밍, 양방향 통신, 백프레셔 제어가 필요한 시스템에서 강력한 대안입니다. Spring Framework 5.2부터 공식 지원하며, Spring Boot와의 통합이 매우 자연스럽습니다.
4가지 인터랙션 모델
| 모델 | 흐름 | 용도 |
|---|---|---|
| Request-Response | 1 → 1 | 일반 API 호출 |
| Fire-and-Forget | 1 → 0 | 로그 전송, 메트릭 수집 |
| Request-Stream | 1 → N | 실시간 피드, 가격 스트리밍 |
| Channel | N ↔ N | 양방향 채팅, 게임 통신 |
프로젝트 설정
// build.gradle.kts (Spring Boot 3.x)
dependencies {
implementation("org.springframework.boot:spring-boot-starter-rsocket")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.security:spring-security-rsocket")
implementation("org.springframework.security:spring-security-messaging")
}
# application.yml
spring:
rsocket:
server:
port: 7000
transport: tcp # tcp 또는 websocket
# websocket 사용 시:
# transport: websocket
# mapping-path: /rsocket
Request-Response: 기본 패턴
HTTP GET처럼 단일 요청-단일 응답입니다. @MessageMapping으로 라우팅합니다.
// 서버 컨트롤러
@Controller
public class ProductRSocketController {
private final ProductService productService;
public ProductRSocketController(ProductService productService) {
this.productService = productService;
}
@MessageMapping("product.find")
public Mono<ProductDto> findProduct(ProductRequest request) {
return productService.findById(request.getId());
}
@MessageMapping("product.create")
public Mono<ProductDto> createProduct(CreateProductRequest request) {
return productService.create(request);
}
}
// 클라이언트
@Service
public class ProductRSocketClient {
private final RSocketRequester requester;
public ProductRSocketClient(RSocketRequester.Builder builder) {
this.requester = builder
.tcp("localhost", 7000);
}
public Mono<ProductDto> findProduct(Long id) {
return requester
.route("product.find")
.data(new ProductRequest(id))
.retrieveMono(ProductDto.class);
}
}
Fire-and-Forget: 비동기 전송
응답을 기다리지 않으므로 로그, 메트릭, 이벤트 전송에 적합합니다.
// 서버
@MessageMapping("analytics.track")
public Mono<Void> trackEvent(AnalyticsEvent event) {
return analyticsService.save(event); // 반환 타입 Mono<Void>
}
// 클라이언트
public Mono<Void> trackPageView(String userId, String page) {
return requester
.route("analytics.track")
.data(new AnalyticsEvent(userId, page, Instant.now()))
.send(); // send() = fire-and-forget
}
Request-Stream: 실시간 스트리밍
하나의 요청에 대해 서버가 연속적으로 데이터를 스트리밍합니다. 주가 피드, 알림 스트림에 적합합니다.
// 서버: 실시간 주가 스트리밍
@MessageMapping("stock.prices")
public Flux<StockPrice> streamPrices(StockSubscription sub) {
return Flux.interval(Duration.ofSeconds(1))
.flatMap(tick -> stockService.getPrice(sub.getSymbol()))
.distinctUntilChanged()
.doOnCancel(() -> log.info("클라이언트 구독 취소: {}",
sub.getSymbol()));
}
// 클라이언트: 스트림 구독
public Flux<StockPrice> subscribePrices(String symbol) {
return requester
.route("stock.prices")
.data(new StockSubscription(symbol))
.retrieveFlux(StockPrice.class);
}
Channel: 양방향 스트리밍
클라이언트와 서버가 동시에 스트림을 주고받습니다. 채팅, 게임, 협업 에디터에 적합합니다.
// 서버: 양방향 채팅 채널
@MessageMapping("chat.{roomId}")
public Flux<ChatMessage> chat(
@DestinationVariable String roomId,
Flux<ChatMessage> inbound) {
// 인바운드 메시지를 Sink에 저장
return inbound
.doOnNext(msg -> {
msg.setRoomId(roomId);
msg.setTimestamp(Instant.now());
chatRepository.save(msg).subscribe();
roomSinks.get(roomId).tryEmitNext(msg);
})
.thenMany(roomSinks.get(roomId).asFlux());
}
// 클라이언트: 채널 연결
public Flux<ChatMessage> joinChat(String roomId,
Flux<ChatMessage> outbound) {
return requester
.route("chat." + roomId)
.data(outbound)
.retrieveFlux(ChatMessage.class);
}
백프레셔 제어
RSocket의 핵심 장점은 프로토콜 레벨 백프레셔입니다. Reactive Streams 스펙을 네트워크 계층에서 구현하여, 소비자가 처리 가능한 만큼만 데이터를 요청합니다.
// 클라이언트에서 백프레셔 제어
requester
.route("stock.prices")
.data(new StockSubscription("AAPL"))
.retrieveFlux(StockPrice.class)
.limitRate(10) // 한 번에 10개만 요청
.delayElements(Duration.ofMillis(100)) // 소비 속도 제한
.subscribe(price -> process(price));
// 서버에서 Flux 생성 시 백프레셔 반영
@MessageMapping("data.bulk")
public Flux<DataChunk> bulkStream(BulkRequest req) {
return Flux.fromIterable(dataService.loadAll(req))
.onBackpressureBuffer(256) // 버퍼 크기 제한
.doOnRequest(n -> log.debug("클라이언트가 {}개 요청", n));
}
RSocket Security
Spring Security와 통합하여 인증·인가를 적용합니다.
@Configuration
@EnableRSocketSecurity
public class RSocketSecurityConfig {
@Bean
public PayloadSocketAcceptorInterceptor authorization(
RSocketSecurity security) {
return security
.authorizePayload(auth -> auth
.route("admin.*").hasRole("ADMIN")
.route("stock.prices").authenticated()
.route("analytics.track").permitAll()
.anyRequest().authenticated()
.anyExchange().permitAll()
)
.simpleAuthentication(Customizer.withDefaults())
.build();
}
@Bean
public MapReactiveUserDetailsService userDetailsService() {
UserDetails user = User.withDefaultPasswordEncoder()
.username("client1")
.password("secret")
.roles("USER")
.build();
return new MapReactiveUserDetailsService(user);
}
}
// 클라이언트 인증
RSocketRequester requester = builder
.setupMetadata(
new UsernamePasswordMetadata("client1", "secret"),
MimeTypeUtils.parseMimeType(
WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString()))
.tcp("localhost", 7000);
Resume: 연결 복구
네트워크 단절 후 자동 재연결과 스트림 복구를 지원합니다.
// 서버: Resume 활성화
@Bean
public RSocketServerCustomizer rSocketResume() {
return server -> server.resume(
new Resume()
.sessionDuration(Duration.ofMinutes(5))
.retry(Retry.backoff(5, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(16)))
);
}
// 클라이언트: Resume 설정
RSocketRequester requester = builder
.rsocketConnector(connector -> connector
.resume(new Resume()
.sessionDuration(Duration.ofMinutes(5))
.retry(Retry.backoff(Long.MAX_VALUE,
Duration.ofSeconds(1))))
.reconnect(Retry.backoff(Long.MAX_VALUE,
Duration.ofSeconds(1))))
.tcp("localhost", 7000);
RSocketRequester 빈 설정
운영 환경에서 재사용 가능한 Requester 빈을 구성합니다.
@Configuration
public class RSocketClientConfig {
@Bean
public RSocketRequester rSocketRequester(
RSocketRequester.Builder builder,
RSocketStrategies strategies) {
return builder
.rsocketStrategies(strategies)
.rsocketConnector(connector -> connector
.reconnect(Retry.backoff(10, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(30)))
.payloadDecoder(PayloadDecoder.ZERO_COPY))
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("rsocket-server.internal", 7000);
}
@Bean
public RSocketStrategies rSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders ->
encoders.add(new Jackson2CborEncoder()))
.decoders(decoders ->
decoders.add(new Jackson2CborDecoder()))
.build();
}
}
테스트 전략
RSocketRequester를 테스트 슬라이스에서 활용합니다.
@SpringBootTest
class RSocketIntegrationTest {
private static RSocketRequester requester;
@BeforeAll
static void setup(@Autowired RSocketRequester.Builder builder,
@LocalRSocketServerPort int port) {
requester = builder.tcp("localhost", port);
}
@Test
void requestResponse_findProduct() {
ProductDto result = requester
.route("product.find")
.data(new ProductRequest(1L))
.retrieveMono(ProductDto.class)
.block();
assertThat(result).isNotNull();
assertThat(result.getId()).isEqualTo(1L);
}
@Test
void requestStream_stockPrices() {
List<StockPrice> prices = requester
.route("stock.prices")
.data(new StockSubscription("AAPL"))
.retrieveFlux(StockPrice.class)
.take(5)
.collectList()
.block(Duration.ofSeconds(10));
assertThat(prices).hasSize(5);
}
@Test
void fireAndForget_analytics() {
requester
.route("analytics.track")
.data(new AnalyticsEvent("user1", "/home", Instant.now()))
.send()
.block();
// void 반환 - 에러 없으면 성공
}
@Test
void channel_bidirectionalChat() {
Flux<ChatMessage> outbound = Flux.just(
new ChatMessage("user1", "Hello"),
new ChatMessage("user1", "World")
).delayElements(Duration.ofMillis(500));
List<ChatMessage> received = requester
.route("chat.room1")
.data(outbound)
.retrieveFlux(ChatMessage.class)
.take(2)
.collectList()
.block(Duration.ofSeconds(5));
assertThat(received).hasSize(2);
}
}
HTTP vs RSocket 비교
| 항목 | HTTP/REST | RSocket |
|---|---|---|
| 프로토콜 | 텍스트 기반 | 바이너리 프레임 |
| 인터랙션 | 요청-응답만 | 4가지 모델 |
| 백프레셔 | 없음 | 프로토콜 레벨 |
| 멀티플렉싱 | HTTP/2 필요 | 기본 지원 |
| 연결 복구 | 별도 구현 | Resume 내장 |
| 오버헤드 | 헤더 무거움 | 최소 프레임 |
마치며
Spring RSocket은 마이크로서비스 간 통신, 실시간 스트리밍, IoT 데이터 수집 등 HTTP만으로는 부족한 시나리오에서 강력한 선택입니다. 4가지 인터랙션 모델로 요구사항에 맞는 통신 패턴을 선택하고, 프로토콜 레벨 백프레셔로 안정적인 스트리밍을 구현하며, Resume으로 연결 복구까지 자동화할 수 있습니다. Spring Security 통합과 CBOR 인코딩으로 운영 레벨 보안과 성능도 확보됩니다.