如何实现 Spring WebSocket 通信 – wiki基地


Spring WebSocket 深入详解:构建实时通信应用的利器

在现代 Web 应用中,实时通信扮演着越来越重要的角色,从在线聊天、实时数据更新、协作编辑到金融交易、游戏状态同步,都离不开高效、低延迟的双向通信机制。传统的 HTTP 请求-响应模式在这种场景下显得力不从心,因为它本质上是客户端驱动的、无状态的短连接。为了克服这些限制,WebSocket 应运而生。

WebSocket 协议(RFC 6455)提供了一种在单个 TCP 连接上进行全双工通信的机制。它允许服务器主动向客户端推送数据,也允许客户端随时向服务器发送数据,极大地降低了通信延迟和网络开销。

Spring 框架,作为 Java 生态系统中最流行的应用开发框架之一,对 WebSocket 提供了强大而灵活的支持。通过 spring-websocketspring-messaging 模块,开发者可以轻松地在 Spring 应用中集成和管理 WebSocket 连接,并利用 STOMP(Simple Text Oriented Messaging Protocol)等高级协议简化消息处理。

本文将深入探讨如何使用 Spring Boot(基于 Spring 框架)来实现 WebSocket 通信,涵盖从基本概念、环境搭建、核心配置、消息处理到客户端交互的方方面面,旨在为你提供一个全面而详细的实践指南。

一、 WebSocket 与 STOMP 基础

1. WebSocket 协议

  • 全双工通信: 客户端和服务器可以在建立连接后,随时相互发送数据,无需等待对方响应。
  • 持久连接: 一旦建立,WebSocket 连接会保持打开状态,直到显式关闭或网络中断,避免了 HTTP 频繁建立和断开连接的开销。
  • 低延迟: 数据传输无需携带冗余的 HTTP 头信息,减少了网络负载,提高了传输效率。
  • 协议升级: WebSocket 连接始于一个标准的 HTTP/S 请求(包含 Upgrade: websocketConnection: Upgrade 头),服务器同意后,协议切换到 WebSocket。

2. 为何需要 STOMP?

WebSocket 本身定义的是一种底层的双向通信协议,它只规范了数据帧(Frame)的格式和传输机制,但并未规定消息内容的具体格式或语义。这意味着开发者需要自行设计消息协议,例如使用 JSON 或自定义格式,并处理消息的路由、订阅、确认等逻辑。

STOMP(Simple Text Oriented Messaging Protocol)是一种基于文本的消息传递协议,它借鉴了传统消息队列(如 JMS、AMQP)的概念,定义了如 CONNECTSUBSCRIBESENDMESSAGEUNSUBSCRIBEDISCONNECT 等命令(Command)和消息头(Header),使得 WebSocket 通信更具结构化和语义化。

Spring WebSocket 对 STOMP 的支持优势:

  • 简化开发: 提供了类似 @MessageMapping@SendTo 等注解,将 WebSocket 消息处理映射到 Controller 方法,如同处理 HTTP 请求一样方便。
  • 消息代理 (Message Broker): Spring 支持内置的简单内存消息代理,也支持集成外部功能更强大的消息中间件(如 RabbitMQ, ActiveMQ),实现消息的发布/订阅模式、点对点通信,以及更好的扩展性和可靠性。
  • 用户目标 (User Destinations): 能够方便地向特定用户发送消息,而无需开发者手动管理用户与 WebSocket Session 的映射。
  • SockJS 回退: 对于不支持 WebSocket 的旧浏览器或受网络限制的环境,Spring 可以无缝集成 SockJS,提供 WebSocket 的模拟实现(如 XHR streaming, XHR polling 等),确保应用的兼容性。

二、 环境搭建与项目初始化

我们将使用 Spring Boot 来快速搭建项目。

1. 创建 Spring Boot 项目

可以通过 Spring Initializr (start.spring.io) 或 IDE(如 IntelliJ IDEA, Eclipse)创建一个新的 Spring Boot 项目。

2. 添加依赖

pom.xml (Maven) 或 build.gradle (Gradle) 文件中添加 WebSocket 相关的依赖:

Maven (pom.xml):

“`xml



org.springframework.boot
spring-boot-starter-web

<!-- Spring Boot WebSocket Starter -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<!-- (可选) 前端库依赖,方便演示 -->
<!-- WebJars for SockJS client -->
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>sockjs-client</artifactId>
    <version>1.5.1</version> <!-- 使用合适的版本 -->
</dependency>
<!-- WebJars for STOMP over WebSocket client -->
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>stomp-websocket</artifactId>
    <version>2.3.4</version> <!-- 使用合适的版本 -->
</dependency>
<!-- (可选) WebJars for Bootstrap and jQuery for UI -->
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>bootstrap</artifactId>
    <version>5.2.3</version> <!-- 使用合适的版本 -->
</dependency>
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>jquery</artifactId>
    <version>3.6.4</version> <!-- 使用合适的版本 -->
</dependency>

<!-- (可选) Lombok for reducing boilerplate code -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

<!-- Spring Boot Test Starter (用于测试) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>


“`

Gradle (build.gradle):

“`gradle
dependencies {
implementation ‘org.springframework.boot:spring-boot-starter-web’
implementation ‘org.springframework.boot:spring-boot-starter-websocket’

// (可选) 前端库依赖
implementation 'org.webjars:sockjs-client:1.5.1' // 使用合适的版本
implementation 'org.webjars:stomp-websocket:2.3.4' // 使用合适的版本
implementation 'org.webjars:bootstrap:5.2.3' // 使用合适的版本
implementation 'org.webjars:jquery:3.6.4' // 使用合适的版本

// (可选) Lombok
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

testImplementation 'org.springframework.boot:spring-boot-starter-test'

}
“`

确保刷新项目依赖。

三、 配置 WebSocket (服务端)

核心配置在于创建一个实现 WebSocketMessageBrokerConfigurer 接口的配置类。

1. 创建配置类

“`java
package com.example.websocketdemo.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker // 启用 WebSocket 消息代理
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

/**
 * 注册 STOMP 端点 (Endpoints),供客户端连接。
 * @param registry StompEndpointRegistry
 */
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    // 注册一个名为 "/ws" 的 STOMP 端点。
    // 这是客户端尝试建立 WebSocket 连接的 HTTP URL。
    // withSockJS() 启用 SockJS 回退选项,以便在浏览器不支持 WebSocket 时使用替代传输方式。
    registry.addEndpoint("/ws")
            .setAllowedOrigins("*") // 允许所有来源的连接 (生产环境应配置具体来源)
            .withSockJS();
}

/**
 * 配置消息代理 (Message Broker)。
 * @param registry MessageBrokerRegistry
 */
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
    // 配置一个或多个用于处理 "简单" 消息代理的前缀。
    // 这里配置了 "/topic" 和 "/queue" 两个前缀。
    // 发送到这些前缀的目的地的消息将由内存中的简单代理处理,并广播给订阅了相应目的地的客户端。
    // "/topic" 通常用于发布/订阅模式 (一对多)
    // "/queue" 通常用于点对点消息 (一对一,常与用户目标结合使用)
    registry.enableSimpleBroker("/topic", "/queue");

    // 配置应用程序目标的前缀。
    // 客户端发送的消息,如果目的地前缀是 "/app",则这些消息将被路由到 @MessageMapping 注解的方法进行处理。
    // 例如,客户端发送消息到 "/app/hello",消息将路由到带有 @MessageMapping("/hello") 的控制器方法。
    registry.setApplicationDestinationPrefixes("/app");

    // (可选) 配置用户目标的前缀。
    // 当使用 @SendToUser 或 SimpMessagingTemplate.convertAndSendToUser 时,
    // Spring 会将目标地址修改为包含用户唯一标识和此前缀的形式 (例如 "/user/{sessionId}/queue/private")
    // 默认是 "/user"。
    // registry.setUserDestinationPrefix("/user");
}

}
“`

配置详解:

  • @Configuration: 表明这是一个 Spring 配置类。
  • @EnableWebSocketMessageBroker: 启用 WebSocket 消息处理功能,并配置一个基于消息代理的 STOMP 支持。
  • registerStompEndpoints(StompEndpointRegistry registry):
    • registry.addEndpoint("/ws"): 定义了客户端连接 WebSocket 服务器的入口点。客户端将通过 http://yourhost:port/ws (或 https://...) 发起握手请求。
    • .setAllowedOrigins("*"): 设置允许跨域连接的来源。在生产环境中,应将其限制为你的前端应用的实际域。
    • .withSockJS(): 启用 SockJS 支持。SockJS 是一个 JavaScript 库,它提供了一个类似 WebSocket 的对象,并在必要时(如浏览器不支持 WebSocket 或网络代理阻止)智能地选择最佳的可用传输方式(如 WebSocket, XHR-Streaming, XHR-Polling 等)。这极大地提高了应用的兼容性。
  • configureMessageBroker(MessageBrokerRegistry registry):
    • registry.enableSimpleBroker("/topic", "/queue"): 配置了一个简单的内存消息代理。所有目的地以 /topic/queue 开头的消息,将由这个内置代理处理。它会将消息路由到所有订阅了相应主题的客户端。
      • /topic: 通常用于实现广播或群发消息。
      • /queue: 通常用于实现点对点消息,尤其是结合用户目标时。
    • registry.setApplicationDestinationPrefixes("/app"): 指定了应用处理消息的目标前缀。当客户端发送 STOMP 消息时,如果其 destination/app 开头(例如 /app/chat),则该消息会被路由到 Spring 应用中带有 @MessageMapping 注解的方法进行处理。
    • registry.setUserDestinationPrefix("/user"): (可选) 配置用户特定消息的目标前缀。当使用 @SendToUserSimpMessagingTemplate.convertAndSendToUser 发送消息给特定用户时,Spring 会自动将目标地址转换为类似 /user/{username}/queue/private 的形式,确保只有该用户能收到。默认前缀是 /user

外部消息代理 (可选):

对于需要更高可用性、可扩展性和持久化消息的应用,可以配置外部消息代理,如 RabbitMQ 或 ActiveMQ。

java
// 示例:配置 RabbitMQ 作为消息代理
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
// 使用 RabbitMQ 作为消息代理
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost") // RabbitMQ 服务器地址
.setRelayPort(61613) // STOMP 插件端口
.setClientLogin("guest") // 用户名
.setClientPasscode("guest") // 密码
.setSystemLogin("guest") // 系统连接用户名
.setSystemPasscode("guest"); // 系统连接密码
// .setUserDestinationBroadcast("/topic/unresolved-user-destination") // 处理无法解析用户目标的广播
// .setUserRegistryBroadcast("/topic/user-registry"); // 处理用户注册表变化的广播
}

需要添加相应的依赖 (spring-boot-starter-amqpspring-boot-starter-activemq) 并确保外部代理已安装和运行。

四、 创建消息处理器 (服务端 Controller)

创建一个 Controller 类来处理来自客户端的消息。

“`java
package com.example.websocketdemo.controller;

import com.example.websocketdemo.model.ChatMessage; // 假设的消息模型
import com.example.websocketdemo.model.Greeting; // 假设的响应模型
import com.example.websocketdemo.model.HelloMessage; // 假设的请求模型
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.stereotype.Controller;
import org.springframework.web.util.HtmlUtils;

import java.security.Principal;

@Controller
public class GreetingController {

private static final Logger logger = LoggerFactory.getLogger(GreetingController.class);

/**
 * 处理发送到 "/app/hello" 的消息。
 * 接收一个 HelloMessage 对象作为负载。
 * 处理后,将返回的 Greeting 对象发送到 "/topic/greetings" 目的地。
 * 所有订阅了 "/topic/greetings" 的客户端都会收到此消息。
 *
 * @param message 包含客户端发送数据的对象 (JSON 会自动反序列化)
 * @return 发送给订阅者的响应对象 (会自动序列化为 JSON)
 * @throws Exception 模拟处理延迟
 */
@MessageMapping("/hello") // 映射客户端发送目的地 "/app/hello"
@SendTo("/topic/greetings") // 指定处理结果发送的目的地
public Greeting greeting(HelloMessage message) throws Exception {
    logger.info("Received hello message: {}", message.getName());
    // 模拟处理延迟
    Thread.sleep(1000);
    // HtmlUtils.htmlEscape 防止 XSS 攻击
    return new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getName()) + "!");
}

/**
 * 处理发送到 "/app/chat.sendMessage" 的消息 (模拟聊天消息)。
 * 将收到的消息广播给所有订阅了 "/topic/publicChat" 的用户。
 *
 * @param chatMessage 聊天消息对象
 * @return 将原消息广播出去
 */
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/publicChat")
public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
    logger.info("Received chat message: {}", chatMessage.getContent());
    // 可以进行消息存储、过滤等操作
    return chatMessage; // 直接将消息广播出去
}

/**
 * 处理发送到 "/app/chat.addUser" 的消息 (模拟用户加入)。
 * 当用户加入时,将消息广播到公共聊天室,并设置消息头中的用户信息。
 *
 * @param chatMessage 包含用户名的消息
 * @param headerAccessor 用于访问和修改消息头,特别是 session 属性
 * @return 广播的用户加入消息
 */
@MessageMapping("/chat.addUser")
@SendTo("/topic/publicChat")
public ChatMessage addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
    logger.info("User added: {}", chatMessage.getSender());
    // 将用户名添加到 WebSocket session 属性中,方便后续识别用户
    if (headerAccessor.getSessionAttributes() != null) {
        headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
    }
    return chatMessage;
}

/**
 * 处理发送到 "/app/private.message" 的消息,并只回复给发送者。
 * 使用 @SendToUser 将消息发送到特定用户的私有队列。
 * 需要 Spring Security 或其他机制来识别用户 (Principal)。
 *
 * @param message 负载内容
 * @param principal 代表当前已认证的用户
 * @return 发送给发送者的私有消息
 */
@MessageMapping("/private.message")
@SendToUser("/queue/reply") // 结果将发送到 "/user/{username}/queue/reply"
public Greeting sendPrivateMessage(@Payload String message, Principal principal) {
    if (principal == null) {
        logger.warn("Received private message without authenticated user.");
        // 可以抛出异常或返回错误信息
        return new Greeting("Error: User not authenticated.");
    }
    logger.info("Received private message from {}: {}", principal.getName(), message);
    return new Greeting("Private reply to " + principal.getName() + ": You sent '" + HtmlUtils.htmlEscape(message) + "'");
}

}

// — 假设的消息模型类 —
// (通常放在 model 包下)

// import lombok.AllArgsConstructor;
// import lombok.Data;
// import lombok.NoArgsConstructor;

// @Data
// @NoArgsConstructor
// @AllArgsConstructor
// class HelloMessage {
// private String name;
// }

// @Data
// @NoArgsConstructor
// @AllArgsConstructor
// class Greeting {
// private String content;
// }

// @Data
// @NoArgsConstructor
// @AllArgsConstructor
// class ChatMessage {
// private MessageType type;
// private String content;
// private String sender;
// public enum MessageType { CHAT, JOIN, LEAVE }
// }
“`

注解详解:

  • @Controller: 标记此类为 Spring MVC 控制器,同时适用于处理 WebSocket 消息。
  • @MessageMapping("/path"):
    • 将方法映射到指定的 STOMP 目的地。
    • 客户端向 /app/path (根据 setApplicationDestinationPrefixes 配置) 发送的消息将由此方法处理。
    • 方法的参数可以是被 @Payload 注解的负载内容(Spring 会尝试根据 Content-Type 反序列化,通常是 JSON)、SimpMessageHeaderAccessor(访问消息头和 Session 属性)、Principal(如果集成了 Spring Security,表示当前用户)等。
  • @SendTo("/destination"):
    • 指定方法返回值发送到的 STOMP 目的地。
    • 消息将被发送到消息代理,然后由代理广播给所有订阅了 /destination 的客户端。
    • 返回值会被自动序列化(通常为 JSON)。
  • @SendToUser("/destination"):
    • 将方法返回值发送给当前消息的发送者。
    • 目的地通常是私有队列,如 /queue/reply
    • Spring 会自动将目标地址转换为用户特定的地址,例如 /user/{username}/queue/reply (其中 {username} 来自 Principal.getName() 或其他用户识别机制)。
    • 这需要用户身份认证机制(如 Spring Security)的支持。
  • @Payload: (可选) 明确指定哪个方法参数是消息的负载 (body)。如果只有一个符合条件的参数,可以省略。
  • SimpMessageHeaderAccessor: 提供对 STOMP 消息头的访问,例如获取 Session ID、Session 属性、原始消息等。可以通过 headerAccessor.getSessionAttributes().put("key", value) 来存储与特定 WebSocket Session 相关的数据。
  • Principal: 如果集成了 Spring Security 并且用户已认证,此参数会包含当前用户的信息。

五、 客户端实现 (JavaScript + SockJS + Stomp.js)

前端需要使用 JavaScript 库来连接 WebSocket 服务器、发送和接收 STOMP 消息。常用的组合是 SockJS 和 Stomp.js。

1. 引入库

在 HTML 页面中引入必要的 JavaScript 库。如果使用了 WebJars,可以这样引入:

“`html




Spring WebSocket Demo



Seems your browser doesn’t support Javascript! Websocket relies on Javascript being enabled.

Enter Your Name


Say Hello

Public Chat








“`

2. 编写 JavaScript (app.js)

“`javascript
// app.js

let stompClient = null;
let username = null;

// — DOM Ready —
$(document).ready(function() {
$(“#connect”).click(connect);
$(“#disconnect”).click(disconnect);
$(“#sendHello”).click(sendName);
$(“#sendMessage”).click(sendChatMessage);
$(“#message”).keypress(function(event) {
if (event.which === 13) { // Enter key pressed
sendChatMessage();
}
});
$(“#name”).keypress(function(event) {
if (event.which === 13) { // Enter key pressed
connect();
}
});
});

// — Connection Logic —
function connect() {
username = $(“#name”).val().trim();
if (username) {
// 创建 SockJS 连接
const socket = new SockJS(‘/ws’); // 连接到配置的 STOMP 端点
// 基于 SockJS 连接创建 STOMP 客户端
stompClient = Stomp.over(socket);

    console.log("Attempting to connect...");

    // 连接到 STOMP 代理
    stompClient.connect(
        {}, // Headers (例如可以传递认证 token)
        function (frame) { // 连接成功回调
            setConnected(true);
            console.log('Connected: ' + frame);

            // *** 订阅公共主题 ***
            // 订阅 "/topic/greetings",用于接收问候消息
            stompClient.subscribe('/topic/greetings', function (greeting) {
                showGreeting(JSON.parse(greeting.body).content);
            });

            // 订阅 "/topic/publicChat",用于接收公共聊天消息
            stompClient.subscribe('/topic/publicChat', function (message) {
                const chatMessage = JSON.parse(message.body);
                showChatMessage(chatMessage);
            });

            // (可选) 订阅用户私有队列,用于接收 @SendToUser 消息
            // 需要用户已认证,并且服务端配置了用户目标前缀
            // stompClient.subscribe('/user/queue/reply', function (reply) {
            //    console.log("Received private reply: ", JSON.parse(reply.body));
            //    showGreeting("Private: " + JSON.parse(reply.body).content);
            // });

            // 连接成功后,发送 "用户加入" 消息
            stompClient.send("/app/chat.addUser",
                {},
                JSON.stringify({ sender: username, type: 'JOIN' })
            );
        },
        function (error) { // 连接失败回调
            console.error('Connection error: ' + error);
            setConnected(false);
            alert('Could not connect to WebSocket server. Please check console.');
            // 可以尝试重连等错误处理
            // reconnect();
        }
    );
} else {
    alert("Please enter your name.");
}

}

function disconnect() {
if (stompClient !== null) {
// 发送离开消息 (如果需要)
stompClient.send(“/app/chat.sendMessage”, {}, JSON.stringify({ sender: username, type: ‘LEAVE’, content: username + ‘ left!’ }));

    stompClient.disconnect(function() {
        console.log("Disconnected");
        setConnected(false);
    });
    stompClient = null; // 清理 Stomp Client 实例
}

}

// — Sending Messages —
function sendName() {
if (stompClient && stompClient.connected) {
const name = $(“#name”).val();
console.log(“Sending name: ” + name);
// 发送消息到 “/app/hello”
stompClient.send(“/app/hello”, {}, JSON.stringify({‘name’: name}));
} else {
console.log(“Cannot send, not connected.”);
}
}

function sendChatMessage() {
const messageContent = $(“#message”).val().trim();
if (messageContent && stompClient && stompClient.connected) {
const chatMessage = {
sender: username,
content: messageContent,
type: ‘CHAT’
};
console.log(“Sending chat message: “, chatMessage);
// 发送消息到 “/app/chat.sendMessage”
stompClient.send(“/app/chat.sendMessage”, {}, JSON.stringify(chatMessage));
$(“#message”).val(”); // 清空输入框
} else if (!stompClient || !stompClient.connected) {
console.log(“Cannot send message, not connected.”);
}
}

// — Updating UI —
function setConnected(connected) {
$(“#connect”).prop(“disabled”, connected);
$(“#disconnect”).prop(“disabled”, !connected);
$(“#name”).prop(“disabled”, connected);
if (connected) {
$(“#login-container”).addClass(‘d-none’);
$(“#main-content”).removeClass(‘d-none’);
$(“#greetings”).html(“”); // 清空问候区域
$(“#chatbox”).html(“”); // 清空聊天区域
} else {
$(“#login-container”).removeClass(‘d-none’);
$(“#main-content”).addClass(‘d-none’);
}
}

function showGreeting(message) {
$(“#greetings”).append(“

” + message + “

“);
}

function showChatMessage(message) {
let messageElement = $(‘

‘);
let textElement = $(‘‘);

if (message.type === 'JOIN') {
    messageElement.addClass('event-message');
    textElement.text(message.sender + ' joined!');
} else if (message.type === 'LEAVE') {
     messageElement.addClass('event-message');
     textElement.text(message.sender + ' left!');
} else { // CHAT
    let senderElement = $('<strong></strong>').text(message.sender + ': ');
    textElement.text(message.content);
    messageElement.append(senderElement);
}

messageElement.append(textElement);
$("#chatbox").append(messageElement);

// 滚动到底部
const chatbox = document.getElementById('chatbox');
chatbox.scrollTop = chatbox.scrollHeight;

}

// — (可选) 重连逻辑 —
// function reconnect() {
// console.log(“Attempting to reconnect in 5 seconds…”);
// setTimeout(connect, 5000);
// }
“`

JavaScript 代码详解:

  1. 连接 (connect function):
    • 获取用户名。
    • new SockJS('/ws'): 创建一个 SockJS 连接实例,指向服务器配置的 STOMP 端点 (/ws)。SockJS 会自动处理协议协商和回退。
    • Stomp.over(socket): 在 SockJS 连接上层创建一个 STOMP 客户端实例。
    • stompClient.connect(headers, connectCallback, errorCallback): 尝试建立 STOMP 连接。
      • headers: 可以包含自定义头,例如用于身份验证的 token。
      • connectCallback: 连接成功时调用。在此回调中,通常会执行订阅操作。
      • errorCallback: 连接失败时调用。
    • stompClient.subscribe(destination, callback): 订阅 STOMP 目的地。
      • destination: 要订阅的主题或队列 (例如 /topic/greetings, /topic/publicChat, /user/queue/reply)。
      • callback: 当收到来自该目的地的消息时调用的函数。消息内容在 message.body 中,通常是 JSON 字符串,需要 JSON.parse() 解析。
    • 连接成功后,发送一个 “JOIN” 消息通知其他用户。
  2. 断开连接 (disconnect function):
    • 可选地发送一个 “LEAVE” 消息。
    • stompClient.disconnect(callback): 关闭 STOMP 连接和底层的 WebSocket/SockJS 连接。
    • 清理 stompClient 变量。
  3. 发送消息 (sendName, sendChatMessage functions):
    • stompClient.send(destination, headers, body): 向指定的 STOMP 目的地发送消息。
      • destination: 目标地址,通常以应用前缀开头 (例如 /app/hello, /app/chat.sendMessage)。
      • headers: STOMP 消息头 (通常为空对象 {} 即可)。
      • body: 消息体,通常是 JSON 字符串 (JSON.stringify(payload))。
  4. UI 更新 (setConnected, showGreeting, showChatMessage functions):
    • 根据连接状态更新界面元素(按钮、输入框的禁用/启用,显示/隐藏)。
    • 将收到的消息动态地添加到 HTML 页面中。

六、 服务器主动推送消息

有时,服务器需要在没有客户端请求的情况下主动向客户端推送消息,例如:系统通知、数据更新、定时任务结果等。这可以通过注入 SimpMessagingTemplate 实现。

“`java
package com.example.websocketdemo.service;

import com.example.websocketdemo.model.Greeting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

@Service
public class NotificationService {

private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);
private final SimpMessagingTemplate messagingTemplate;

@Autowired
public NotificationService(SimpMessagingTemplate messagingTemplate) {
    this.messagingTemplate = messagingTemplate;
}

// 定时任务,每 10 秒向 "/topic/greetings" 推送一次时间戳
@Scheduled(fixedRate = 10000) // 需要在启动类添加 @EnableScheduling
public void sendPeriodicMessages() {
    String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
    Greeting greeting = new Greeting("Periodic check: Time is now " + time);
    logger.info("Sending periodic message to /topic/greetings: {}", greeting.getContent());
    // 向所有订阅了 "/topic/greetings" 的客户端发送消息
    messagingTemplate.convertAndSend("/topic/greetings", greeting);
}

/**
 * 向特定用户发送消息。
 *
 * @param username 目标用户的用户名 (需要与 Principal.getName() 匹配)
 * @param message  要发送的消息内容
 */
public void sendPrivateMessage(String username, String message) {
    Greeting greeting = new Greeting("Private message for " + username + ": " + message);
    logger.info("Sending private message to user {}: {}", username, greeting.getContent());
    // Spring 会自动将目标地址转换为 "/user/{username}/queue/notifications"
    messagingTemplate.convertAndSendToUser(username, "/queue/notifications", greeting);
    // 客户端需要订阅 "/user/queue/notifications" 才能收到
}

/**
 * 发送广播消息 (与 @SendTo 效果类似,但可以在任何 Service 或 Component 中调用)
 * @param message 消息内容
 */
 public void sendBroadcastMessage(String message) {
    Greeting greeting = new Greeting("Broadcast: " + message);
    logger.info("Sending broadcast message to /topic/publicChat: {}", greeting.getContent());
    messagingTemplate.convertAndSend("/topic/publicChat", greeting);
 }

}
“`

要点:

  • 注入 SimpMessagingTemplate: Spring 会自动配置并注入这个模板类,它是发送 STOMP 消息的核心工具。
  • messagingTemplate.convertAndSend(destination, payload): 将 payload 对象转换为消息(默认使用 Jackson JSON 序列化)并发送到指定的 destination(例如 /topic/...)。所有订阅该目的地的客户端都会收到。
  • messagingTemplate.convertAndSendToUser(username, destination, payload): 将消息发送给特定的用户。
    • username: 用户的唯一标识符,通常是登录名(需要与 Spring Security 的 Principal.getName() 或其他用户识别机制一致)。
    • destination: 相对于用户的基础目标地址(例如 /queue/notifications)。Spring 会自动将其解析为用户的完整私有队列地址(如 /user/username/queue/notifications)。
    • 客户端需要订阅相应的用户目标地址(例如 /user/queue/notifications)。
  • @Scheduled: (示例) 演示了如何在定时任务中推送消息。需要在 Spring Boot 启动类上添加 @EnableScheduling 注解。

七、 安全性考量 (Spring Security 集成)

在生产环境中,保护 WebSocket 端点至关重要。Spring Security 可以与 Spring WebSocket 无缝集成。

主要步骤:

  1. 添加 Spring Security 依赖:
    xml
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-security</artifactId>
    </dependency>
  2. 配置 Spring Security: 创建一个继承 WebSecurityConfigurerAdapter (旧版) 或使用 SecurityFilterChain Bean (推荐) 的配置类。
  3. 保护 HTTP 端点: 配置 HTTP 安全性,确保用户在访问 WebSocket 连接端点 (/ws/**) 之前需要进行身份验证(例如通过登录表单、JWT Token 等)。
  4. 配置 WebSocket 安全性:
    • WebSocketMessageBrokerConfigurer 同级的配置类或专门的 WebSocket 安全配置类中,继承 AbstractSecurityWebSocketMessageBrokerConfigurer
    • 重写 configureInbound(MessageSecurityMetadataSourceRegistry messages) 方法来保护 STOMP 消息目的地。

示例 WebSocket 安全配置:

“`java
package com.example.websocketdemo.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.security.config.annotation.web.messaging.MessageSecurityMetadataSourceRegistry;
import org.springframework.security.config.annotation.web.socket.AbstractSecurityWebSocketMessageBrokerConfigurer;

@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {

@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
    messages
        // 所有发往 "/app/**" 的消息都需要认证 (除了特定路径)
        .simpMessageDestMatchers("/app/chat.addUser", "/app/hello").permitAll() // 允许匿名访问加入和问候接口
        .simpMessageDestMatchers("/app/**").authenticated() // 其他 /app/** 需要认证
        // 发往 "/user/**" 或 "/topic/private.*" 的消息需要认证
        .simpMessageDestMatchers("/user/**", "/topic/private.*").authenticated()
        // 订阅 "/topic/publicChat", "/topic/greetings" 允许匿名
        .simpSubscribeDestMatchers("/topic/publicChat", "/topic/greetings").permitAll()
        // 其他订阅需要认证 (例如私有主题)
        .simpSubscribeDestMatchers("/topic/**", "/queue/**", "/user/**").authenticated()
        // CONNECT 类型的消息需要认证 (防止未登录用户建立连接,除非你有匿名连接需求)
        // 注意: 如果你的 SockJS HTTP 端点 (/ws/**) 已经受保护,这里可能不需要对 CONNECT 强制认证
        // .simpTypeMatchers(SimpMessageType.CONNECT).authenticated()
        // MESSAGE 和 SUBSCRIBE 类型的默认拒绝,除非上面明确允许
        .simpTypeMatchers(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE).denyAll()
        // 其他类型的消息 (如 DISCONNECT) 允许
        .anyMessage().permitAll(); // 或者 .authenticated() 根据需要调整
}

/**
 * 禁用 CSRF 保护 for WebSocket。
 * STOMP over WebSocket 通常不需要 CSRF,因为浏览器不会自动发送凭证。
 * 如果你使用 Cookie 认证并且客户端不是浏览器,或者有其他特殊情况,可能需要开启并处理 CSRF Token。
 */
@Override
protected boolean sameOriginDisabled() {
    return true;
}

}
“`

安全配置说明:

  • configureInbound: 配置入站消息(客户端发往服务器)的安全性。
  • simpMessageDestMatchers: 匹配客户端 SEND 消息的目的地 (/app/...)。
  • simpSubscribeDestMatchers: 匹配客户端 SUBSCRIBE 消息的目的地 (/topic/..., /queue/..., /user/...)。
  • simpTypeMatchers: 匹配 STOMP 消息类型 (CONNECT, MESSAGE, SUBSCRIBE, UNSUBSCRIBE, DISCONNECT)。
  • .permitAll(): 允许所有访问。
  • .authenticated(): 要求用户已认证。
  • .denyAll(): 拒绝所有访问。
  • sameOriginDisabled(): 返回 true 通常会禁用 WebSocket 的 CSRF 保护。对于基于 Token 的认证(如 JWT),这是常见的做法。如果使用基于 Session/Cookie 的认证,并且希望防止跨站 WebSocket 劫持,可能需要启用 CSRF 并让客户端在连接时传递 CSRF Token。

传递认证信息:

  • 基于 Session/Cookie: 如果你的 Web 应用使用标准的 Spring Security Session 管理,WebSocket 连接握手时会自动携带 Cookie,Spring Security 会识别认证状态。
  • 基于 Token (如 JWT):
    • 方案一 (Query Param): 客户端在连接 SockJS/WebSocket URL 时将 Token 作为查询参数传递 (不推荐,Token 可能暴露在日志中)。
    • 方案二 (STOMP CONNECT Headers): 客户端在调用 stompClient.connect(headers, ...) 时,将 Token 放在 headers 对象中,例如 {'Authorization': 'Bearer ' + jwtToken}。服务器端需要配置一个 ChannelInterceptor 来拦截 CONNECT 消息,验证 Header 中的 Token,并将认证信息设置到 SimpMessageHeaderAccessor 或 SecurityContext 中。

八、 高级主题与最佳实践

  1. 错误处理:
    • @MessageMapping 方法中使用 try-catch 或全局异常处理器 (@ControllerAdvice + @MessageExceptionHandler)。
    • 配置用户目标错误队列 (registry.setUserDestinationPrefix),当向特定用户发送消息失败时,错误可以路由到这里。
    • 实现 StompSubProtocolErrorHandler 来自定义 STOMP 错误帧的发送。
  2. 心跳 (Heartbeats):
    • WebSocket 规范本身没有定义应用层的心跳,但 STOMP 协议支持。心跳用于检测连接是否仍然活跃,防止空闲连接被代理或防火墙断开。
    • configureMessageBroker 中配置:
      “`java
      @Override
      public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
      // 配置消息大小限制等
      // registration.setMessageSizeLimit(512 * 1024);
      // registration.setSendBufferSizeLimit(1024 * 1024);
      // registration.setSendTimeLimit(20000);
      }

      @Override
      public void configureMessageBroker(MessageBrokerRegistry registry) {
      ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
      taskScheduler.setPoolSize(1);
      taskScheduler.setThreadNamePrefix(“wss-heartbeat-thread-“);
      taskScheduler.initialize();

      registry.enableSimpleBroker("/topic", "/queue")
              .setHeartbeatValue(new long[]{10000, 10000}) // [server-send, server-receive] in ms
              .setTaskScheduler(taskScheduler); // 提供任务调度器用于心跳
      
      registry.setApplicationDestinationPrefixes("/app");
      

      }
      ``
      * 客户端 Stomp.js 也需要配置心跳:
      stompClient.heartbeat.outgoing = 10000; stompClient.heartbeat.incoming = 10000;
      3. **用户会话管理:**
      * 使用
      SimpMessageHeaderAccessor.getSessionAttributes()存储与特定 WebSocket 会话相关的信息(如用户名、状态等)。
      * 监听
      SessionConnectEvent,SessionConnectedEvent,SessionSubscribeEvent,SessionUnsubscribeEvent,SessionDisconnectEvent等应用事件,来跟踪用户连接状态和订阅情况。
      4. **扩展性:**
      * **无状态应用层:** 尽量避免在 WebSocket Controller 或 Service 中存储大量用户状态,依赖 Session 属性或外部存储(如 Redis)。
      * **使用外部消息代理:** 对于需要水平扩展应用实例的场景,必须使用外部消息代理(RabbitMQ, ActiveMQ, Redis Pub/Sub 等),以便不同实例上的客户端可以相互通信或接收广播。简单内存代理无法跨实例工作。
      * **负载均衡:** WebSocket 连接是长连接,需要配置支持 WebSocket 的负载均衡器(如 Nginx, HAProxy),并通常需要启用 "sticky sessions"(会话保持),确保同一客户端的后续请求(包括 SockJS 的轮询请求)路由到同一个应用实例。如果使用外部消息代理,则不一定需要 sticky sessions。
      5. **测试:**
      * 使用 Spring Boot 的
      @SpringBootTest配合WebSocketStompClient(来自spring-messaging) 编写集成测试,模拟客户端连接、发送和接收消息。
      * 单元测试
      @MessageMapping` 方法可以像测试普通 Spring Bean 一样进行。

九、 总结

Spring WebSocket 和 STOMP 为 Java 开发者构建实时、交互式 Web 应用提供了强大而便捷的工具集。通过 @EnableWebSocketMessageBrokerWebSocketMessageBrokerConfigurer 进行核心配置,使用 @MessageMapping@SendTo@SendToUser 处理消息流,结合 SimpMessagingTemplate 实现服务器主动推送,并利用 Spring Security 保障通信安全,我们可以相对轻松地实现聊天室、实时数据看板、协作工具等复杂功能。

理解 WebSocket 和 STOMP 的基本原理,掌握 Spring 的配置选项(特别是端点、消息代理、应用前缀),熟悉客户端库(如 SockJS 和 Stomp.js)的用法,并考虑安全性、错误处理、扩展性等实际问题,是成功运用 Spring WebSocket 的关键。

虽然本文篇幅较长,力求详尽,但 WebSocket 和消息传递领域仍有许多更深入的细节和高级用法值得探索。希望本文能为你提供一个坚实的起点,助你驾驭 Spring WebSocket,构建出色的实时应用。


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部