Spring WebSocket STOMP 심화

Spring WebSocket과 STOMP

실시간 채팅, 알림, 라이브 대시보드 같은 기능에는 HTTP 폴링 대신 WebSocket이 필수다. Spring은 저수준 WebSocket API 위에 STOMP(Simple Text Oriented Messaging Protocol) 서브프로토콜을 제공하여, pub/sub 기반의 메시지 브로커 패턴으로 실시간 통신을 구현할 수 있다.

구분 Raw WebSocket STOMP over WebSocket
메시지 형식 자유 (직접 파싱) 프레임 기반 (SEND, SUBSCRIBE, MESSAGE)
라우팅 수동 구현 destination 기반 자동 라우팅
브로커 없음 내장/외부 브로커 (RabbitMQ, ActiveMQ)
인증 핸드셰이크에서 직접 처리 CONNECT 프레임 + ChannelInterceptor

기본 설정

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // /topic: 브로드캐스트 (1:N)
        // /queue: 개인 메시지 (1:1)
        config.enableSimpleBroker("/topic", "/queue")
              .setHeartbeatValue(new long[]{10000, 10000}) // 10초 하트비트
              .setTaskScheduler(heartbeatScheduler());

        // 클라이언트 → 서버 메시지 prefix
        config.setApplicationDestinationPrefixes("/app");

        // 특정 사용자에게 보낼 때 prefix
        config.setUserDestinationPrefix("/user");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOriginPatterns("https://*.example.com")
                .withSockJS()  // SockJS 폴백
                .setStreamBytesLimit(512 * 1024)
                .setHttpMessageCacheSize(1000);
    }

    @Bean
    public TaskScheduler heartbeatScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(1);
        scheduler.setThreadNamePrefix("ws-heartbeat-");
        return scheduler;
    }
}

메시지 핸들러: @MessageMapping

STOMP 메시지를 처리하는 컨트롤러는 REST 컨트롤러와 유사한 어노테이션 패턴을 따른다. @MessageMapping이 요청을, @SendTo가 응답 destination을 정의한다.

@Controller
@Slf4j
public class ChatController {

    private final SimpMessagingTemplate messagingTemplate;

    // 클라이언트: SEND /app/chat.send
    // → /topic/chat.room.{roomId}로 브로드캐스트
    @MessageMapping("/chat.send")
    public void sendMessage(@Payload ChatMessage message,
                            SimpMessageHeaderAccessor headerAccessor) {
        String userId = headerAccessor.getUser().getName();
        message.setSenderId(userId);
        message.setTimestamp(Instant.now());

        messagingTemplate.convertAndSend(
            "/topic/chat.room." + message.getRoomId(),
            message
        );
    }

    // 1:1 개인 메시지
    @MessageMapping("/chat.direct")
    public void directMessage(@Payload DirectMessage message,
                              Principal principal) {
        message.setFrom(principal.getName());

        // /user/{targetUserId}/queue/direct로 전달
        messagingTemplate.convertAndSendToUser(
            message.getTargetUserId(),
            "/queue/direct",
            message
        );
    }

    // 응답을 직접 반환하는 패턴
    @MessageMapping("/chat.typing")
    @SendTo("/topic/chat.typing")
    public TypingEvent handleTyping(@Payload TypingEvent event,
                                    Principal principal) {
        event.setUserId(principal.getName());
        return event;
    }
}

인증과 보안: ChannelInterceptor

WebSocket 연결의 인증은 STOMP CONNECT 프레임에서 처리한다. JWT 토큰을 검증하고 Principal을 설정하는 패턴이 가장 일반적이다.

@Component
@Slf4j
public class AuthChannelInterceptor implements ChannelInterceptor {

    private final JwtTokenProvider tokenProvider;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor
            .getAccessor(message, StompHeaderAccessor.class);

        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            String token = accessor.getFirstNativeHeader("Authorization");

            if (token == null || !token.startsWith("Bearer ")) {
                throw new MessageDeliveryException("Missing auth token");
            }

            try {
                String jwt = token.substring(7);
                Authentication auth = tokenProvider.getAuthentication(jwt);
                accessor.setUser(auth);
                log.info("WebSocket 인증 성공: {}", auth.getName());
            } catch (Exception e) {
                throw new MessageDeliveryException("Invalid token: " + e.getMessage());
            }
        }

        // SUBSCRIBE 시 권한 검증
        if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
            String destination = accessor.getDestination();
            Principal user = accessor.getUser();

            if (destination != null && destination.startsWith("/topic/chat.room.")) {
                String roomId = destination.replace("/topic/chat.room.", "");
                if (!hasRoomAccess(user.getName(), roomId)) {
                    throw new MessageDeliveryException("채팅방 접근 권한 없음");
                }
            }
        }

        return message;
    }
}

// 인터셉터 등록
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.interceptors(authChannelInterceptor);
}

Spring Method Security 심화에서 다룬 메서드 보안과 결합하면 WebSocket 핸들러에도 @PreAuthorize를 적용할 수 있다.

외부 브로커 연동: RabbitMQ

Simple Broker는 단일 서버에서만 동작한다. 멀티 인스턴스 환경에서는 외부 메시지 브로커가 필수다. RabbitMQ STOMP 플러그인을 사용하면 수평 확장이 가능하다.

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic", "/queue")
              .setRelayHost("rabbitmq.internal")
              .setRelayPort(61613)  // STOMP 포트
              .setClientLogin("app")
              .setClientPasscode("secret")
              .setSystemLogin("system")
              .setSystemPasscode("secret")
              .setSystemHeartbeatSendInterval(10000)
              .setSystemHeartbeatReceiveInterval(10000);

        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
    }
}

// 아키텍처:
// Client A → WS Server 1 → RabbitMQ → WS Server 2 → Client B
//                                    → WS Server 3 → Client C
// 모든 서버 인스턴스가 같은 브로커를 공유하므로
// 어떤 서버에 연결되어도 메시지를 수신할 수 있다

세션 관리와 이벤트 처리

연결·해제 이벤트를 감지하면 온라인 사용자 목록, 접속 로그 등을 구현할 수 있다.

@Component
@Slf4j
public class WebSocketEventListener {

    private final SimpMessagingTemplate messagingTemplate;
    private final ConcurrentMap<String, UserSession> sessions = new ConcurrentHashMap<>();

    @EventListener
    public void handleConnect(SessionConnectedEvent event) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
        String sessionId = accessor.getSessionId();
        String userId = accessor.getUser().getName();

        sessions.put(sessionId, new UserSession(userId, Instant.now()));
        log.info("WebSocket 연결: user={}, session={}", userId, sessionId);

        // 온라인 상태 브로드캐스트
        messagingTemplate.convertAndSend("/topic/presence",
            new PresenceEvent(userId, "ONLINE"));
    }

    @EventListener
    public void handleDisconnect(SessionDisconnectEvent event) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
        String sessionId = accessor.getSessionId();
        UserSession session = sessions.remove(sessionId);

        if (session != null) {
            log.info("WebSocket 해제: user={}, duration={}s",
                session.getUserId(),
                Duration.between(session.getConnectedAt(), Instant.now()).getSeconds());

            messagingTemplate.convertAndSend("/topic/presence",
                new PresenceEvent(session.getUserId(), "OFFLINE"));
        }
    }

    // 온라인 사용자 목록 API
    public Set<String> getOnlineUsers() {
        return sessions.values().stream()
            .map(UserSession::getUserId)
            .collect(Collectors.toSet());
    }
}

에러 핸들링

WebSocket 메시지 처리 중 발생하는 예외를 클라이언트에게 전달하려면 @MessageExceptionHandler를 사용한다.

@Controller
public class ChatController {

    @MessageExceptionHandler
    @SendToUser("/queue/errors")
    public ErrorResponse handleException(Exception e) {
        log.error("WebSocket 메시지 처리 실패", e);
        return new ErrorResponse(e.getMessage());
    }

    @MessageExceptionHandler(AccessDeniedException.class)
    @SendToUser("/queue/errors")
    public ErrorResponse handleAccessDenied(AccessDeniedException e) {
        return new ErrorResponse("권한이 없습니다: " + e.getMessage());
    }
}

// 클라이언트 구독:
// stompClient.subscribe('/user/queue/errors', (msg) => {
//   showNotification(JSON.parse(msg.body).message);
// });

테스트 전략

WebSocket 통합 테스트는 WebSocketStompClient를 사용해 실제 연결을 맺고 메시지를 주고받는다. Spring Testcontainers 통합 테스트에서 다룬 패턴과 결합하면 RabbitMQ 브로커까지 포함한 전체 통합 테스트가 가능하다.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class ChatWebSocketTest {

    @LocalServerPort
    private int port;

    @Test
    void 채팅_메시지_전송_수신() throws Exception {
        WebSocketStompClient client = new WebSocketStompClient(
            new StandardWebSocketClient());
        client.setMessageConverter(new MappingJackson2MessageConverter());

        BlockingQueue<ChatMessage> received = new LinkedBlockingDeque<>();

        StompSession session = client.connectAsync(
            "ws://localhost:" + port + "/ws",
            new StompSessionHandlerAdapter() {}
        ).get(5, TimeUnit.SECONDS);

        // 구독
        session.subscribe("/topic/chat.room.test", new StompFrameHandler() {
            @Override
            public Type getPayloadType(StompHeaders headers) {
                return ChatMessage.class;
            }
            @Override
            public void handleFrame(StompHeaders headers, Object payload) {
                received.offer((ChatMessage) payload);
            }
        });

        // 메시지 전송
        ChatMessage msg = new ChatMessage("test", "Hello!");
        session.send("/app/chat.send", msg);

        // 수신 확인
        ChatMessage result = received.poll(5, TimeUnit.SECONDS);
        assertThat(result).isNotNull();
        assertThat(result.getContent()).isEqualTo("Hello!");
    }
}

마무리

Spring WebSocket + STOMP는 실시간 기능 구현의 표준 스택이다. Simple Broker로 빠르게 프로토타이핑하고, 스케일아웃이 필요할 때 RabbitMQ Relay로 전환하면 된다. ChannelInterceptor 기반 인증, 세션 이벤트 관리, 에러 핸들링까지 갖추면 프로덕션 레벨의 실시간 시스템이 완성된다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux