Spring WebSocket 深入详解:构建实时通信应用的利器
在现代 Web 应用中,实时通信扮演着越来越重要的角色,从在线聊天、实时数据更新、协作编辑到金融交易、游戏状态同步,都离不开高效、低延迟的双向通信机制。传统的 HTTP 请求-响应模式在这种场景下显得力不从心,因为它本质上是客户端驱动的、无状态的短连接。为了克服这些限制,WebSocket 应运而生。
WebSocket 协议(RFC 6455)提供了一种在单个 TCP 连接上进行全双工通信的机制。它允许服务器主动向客户端推送数据,也允许客户端随时向服务器发送数据,极大地降低了通信延迟和网络开销。
Spring 框架,作为 Java 生态系统中最流行的应用开发框架之一,对 WebSocket 提供了强大而灵活的支持。通过 spring-websocket
和 spring-messaging
模块,开发者可以轻松地在 Spring 应用中集成和管理 WebSocket 连接,并利用 STOMP(Simple Text Oriented Messaging Protocol)等高级协议简化消息处理。
本文将深入探讨如何使用 Spring Boot(基于 Spring 框架)来实现 WebSocket 通信,涵盖从基本概念、环境搭建、核心配置、消息处理到客户端交互的方方面面,旨在为你提供一个全面而详细的实践指南。
一、 WebSocket 与 STOMP 基础
1. WebSocket 协议
- 全双工通信: 客户端和服务器可以在建立连接后,随时相互发送数据,无需等待对方响应。
- 持久连接: 一旦建立,WebSocket 连接会保持打开状态,直到显式关闭或网络中断,避免了 HTTP 频繁建立和断开连接的开销。
- 低延迟: 数据传输无需携带冗余的 HTTP 头信息,减少了网络负载,提高了传输效率。
- 协议升级: WebSocket 连接始于一个标准的 HTTP/S 请求(包含
Upgrade: websocket
和Connection: Upgrade
头),服务器同意后,协议切换到 WebSocket。
2. 为何需要 STOMP?
WebSocket 本身定义的是一种底层的双向通信协议,它只规范了数据帧(Frame)的格式和传输机制,但并未规定消息内容的具体格式或语义。这意味着开发者需要自行设计消息协议,例如使用 JSON 或自定义格式,并处理消息的路由、订阅、确认等逻辑。
STOMP(Simple Text Oriented Messaging Protocol)是一种基于文本的消息传递协议,它借鉴了传统消息队列(如 JMS、AMQP)的概念,定义了如 CONNECT
、SUBSCRIBE
、SEND
、MESSAGE
、UNSUBSCRIBE
、DISCONNECT
等命令(Command)和消息头(Header),使得 WebSocket 通信更具结构化和语义化。
Spring WebSocket 对 STOMP 的支持优势:
- 简化开发: 提供了类似
@MessageMapping
、@SendTo
等注解,将 WebSocket 消息处理映射到 Controller 方法,如同处理 HTTP 请求一样方便。 - 消息代理 (Message Broker): Spring 支持内置的简单内存消息代理,也支持集成外部功能更强大的消息中间件(如 RabbitMQ, ActiveMQ),实现消息的发布/订阅模式、点对点通信,以及更好的扩展性和可靠性。
- 用户目标 (User Destinations): 能够方便地向特定用户发送消息,而无需开发者手动管理用户与 WebSocket Session 的映射。
- SockJS 回退: 对于不支持 WebSocket 的旧浏览器或受网络限制的环境,Spring 可以无缝集成 SockJS,提供 WebSocket 的模拟实现(如 XHR streaming, XHR polling 等),确保应用的兼容性。
二、 环境搭建与项目初始化
我们将使用 Spring Boot 来快速搭建项目。
1. 创建 Spring Boot 项目
可以通过 Spring Initializr (start.spring.io) 或 IDE(如 IntelliJ IDEA, Eclipse)创建一个新的 Spring Boot 项目。
2. 添加依赖
在 pom.xml
(Maven) 或 build.gradle
(Gradle) 文件中添加 WebSocket 相关的依赖:
Maven (pom.xml
):
“`xml
<!-- Spring Boot WebSocket Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- (可选) 前端库依赖,方便演示 -->
<!-- WebJars for SockJS client -->
<dependency>
<groupId>org.webjars</groupId>
<artifactId>sockjs-client</artifactId>
<version>1.5.1</version> <!-- 使用合适的版本 -->
</dependency>
<!-- WebJars for STOMP over WebSocket client -->
<dependency>
<groupId>org.webjars</groupId>
<artifactId>stomp-websocket</artifactId>
<version>2.3.4</version> <!-- 使用合适的版本 -->
</dependency>
<!-- (可选) WebJars for Bootstrap and jQuery for UI -->
<dependency>
<groupId>org.webjars</groupId>
<artifactId>bootstrap</artifactId>
<version>5.2.3</version> <!-- 使用合适的版本 -->
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>jquery</artifactId>
<version>3.6.4</version> <!-- 使用合适的版本 -->
</dependency>
<!-- (可选) Lombok for reducing boilerplate code -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Spring Boot Test Starter (用于测试) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
“`
Gradle (build.gradle
):
“`gradle
dependencies {
implementation ‘org.springframework.boot:spring-boot-starter-web’
implementation ‘org.springframework.boot:spring-boot-starter-websocket’
// (可选) 前端库依赖
implementation 'org.webjars:sockjs-client:1.5.1' // 使用合适的版本
implementation 'org.webjars:stomp-websocket:2.3.4' // 使用合适的版本
implementation 'org.webjars:bootstrap:5.2.3' // 使用合适的版本
implementation 'org.webjars:jquery:3.6.4' // 使用合适的版本
// (可选) Lombok
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
“`
确保刷新项目依赖。
三、 配置 WebSocket (服务端)
核心配置在于创建一个实现 WebSocketMessageBrokerConfigurer
接口的配置类。
1. 创建配置类
“`java
package com.example.websocketdemo.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker // 启用 WebSocket 消息代理
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
/**
* 注册 STOMP 端点 (Endpoints),供客户端连接。
* @param registry StompEndpointRegistry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册一个名为 "/ws" 的 STOMP 端点。
// 这是客户端尝试建立 WebSocket 连接的 HTTP URL。
// withSockJS() 启用 SockJS 回退选项,以便在浏览器不支持 WebSocket 时使用替代传输方式。
registry.addEndpoint("/ws")
.setAllowedOrigins("*") // 允许所有来源的连接 (生产环境应配置具体来源)
.withSockJS();
}
/**
* 配置消息代理 (Message Broker)。
* @param registry MessageBrokerRegistry
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 配置一个或多个用于处理 "简单" 消息代理的前缀。
// 这里配置了 "/topic" 和 "/queue" 两个前缀。
// 发送到这些前缀的目的地的消息将由内存中的简单代理处理,并广播给订阅了相应目的地的客户端。
// "/topic" 通常用于发布/订阅模式 (一对多)
// "/queue" 通常用于点对点消息 (一对一,常与用户目标结合使用)
registry.enableSimpleBroker("/topic", "/queue");
// 配置应用程序目标的前缀。
// 客户端发送的消息,如果目的地前缀是 "/app",则这些消息将被路由到 @MessageMapping 注解的方法进行处理。
// 例如,客户端发送消息到 "/app/hello",消息将路由到带有 @MessageMapping("/hello") 的控制器方法。
registry.setApplicationDestinationPrefixes("/app");
// (可选) 配置用户目标的前缀。
// 当使用 @SendToUser 或 SimpMessagingTemplate.convertAndSendToUser 时,
// Spring 会将目标地址修改为包含用户唯一标识和此前缀的形式 (例如 "/user/{sessionId}/queue/private")
// 默认是 "/user"。
// registry.setUserDestinationPrefix("/user");
}
}
“`
配置详解:
@Configuration
: 表明这是一个 Spring 配置类。@EnableWebSocketMessageBroker
: 启用 WebSocket 消息处理功能,并配置一个基于消息代理的 STOMP 支持。registerStompEndpoints(StompEndpointRegistry registry)
:registry.addEndpoint("/ws")
: 定义了客户端连接 WebSocket 服务器的入口点。客户端将通过http://yourhost:port/ws
(或https://...
) 发起握手请求。.setAllowedOrigins("*")
: 设置允许跨域连接的来源。在生产环境中,应将其限制为你的前端应用的实际域。.withSockJS()
: 启用 SockJS 支持。SockJS 是一个 JavaScript 库,它提供了一个类似 WebSocket 的对象,并在必要时(如浏览器不支持 WebSocket 或网络代理阻止)智能地选择最佳的可用传输方式(如 WebSocket, XHR-Streaming, XHR-Polling 等)。这极大地提高了应用的兼容性。
configureMessageBroker(MessageBrokerRegistry registry)
:registry.enableSimpleBroker("/topic", "/queue")
: 配置了一个简单的内存消息代理。所有目的地以/topic
或/queue
开头的消息,将由这个内置代理处理。它会将消息路由到所有订阅了相应主题的客户端。/topic
: 通常用于实现广播或群发消息。/queue
: 通常用于实现点对点消息,尤其是结合用户目标时。
registry.setApplicationDestinationPrefixes("/app")
: 指定了应用处理消息的目标前缀。当客户端发送 STOMP 消息时,如果其destination
以/app
开头(例如/app/chat
),则该消息会被路由到 Spring 应用中带有@MessageMapping
注解的方法进行处理。registry.setUserDestinationPrefix("/user")
: (可选) 配置用户特定消息的目标前缀。当使用@SendToUser
或SimpMessagingTemplate.convertAndSendToUser
发送消息给特定用户时,Spring 会自动将目标地址转换为类似/user/{username}/queue/private
的形式,确保只有该用户能收到。默认前缀是/user
。
外部消息代理 (可选):
对于需要更高可用性、可扩展性和持久化消息的应用,可以配置外部消息代理,如 RabbitMQ 或 ActiveMQ。
java
// 示例:配置 RabbitMQ 作为消息代理
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
// 使用 RabbitMQ 作为消息代理
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost") // RabbitMQ 服务器地址
.setRelayPort(61613) // STOMP 插件端口
.setClientLogin("guest") // 用户名
.setClientPasscode("guest") // 密码
.setSystemLogin("guest") // 系统连接用户名
.setSystemPasscode("guest"); // 系统连接密码
// .setUserDestinationBroadcast("/topic/unresolved-user-destination") // 处理无法解析用户目标的广播
// .setUserRegistryBroadcast("/topic/user-registry"); // 处理用户注册表变化的广播
}
需要添加相应的依赖 (spring-boot-starter-amqp
或 spring-boot-starter-activemq
) 并确保外部代理已安装和运行。
四、 创建消息处理器 (服务端 Controller)
创建一个 Controller 类来处理来自客户端的消息。
“`java
package com.example.websocketdemo.controller;
import com.example.websocketdemo.model.ChatMessage; // 假设的消息模型
import com.example.websocketdemo.model.Greeting; // 假设的响应模型
import com.example.websocketdemo.model.HelloMessage; // 假设的请求模型
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.stereotype.Controller;
import org.springframework.web.util.HtmlUtils;
import java.security.Principal;
@Controller
public class GreetingController {
private static final Logger logger = LoggerFactory.getLogger(GreetingController.class);
/**
* 处理发送到 "/app/hello" 的消息。
* 接收一个 HelloMessage 对象作为负载。
* 处理后,将返回的 Greeting 对象发送到 "/topic/greetings" 目的地。
* 所有订阅了 "/topic/greetings" 的客户端都会收到此消息。
*
* @param message 包含客户端发送数据的对象 (JSON 会自动反序列化)
* @return 发送给订阅者的响应对象 (会自动序列化为 JSON)
* @throws Exception 模拟处理延迟
*/
@MessageMapping("/hello") // 映射客户端发送目的地 "/app/hello"
@SendTo("/topic/greetings") // 指定处理结果发送的目的地
public Greeting greeting(HelloMessage message) throws Exception {
logger.info("Received hello message: {}", message.getName());
// 模拟处理延迟
Thread.sleep(1000);
// HtmlUtils.htmlEscape 防止 XSS 攻击
return new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getName()) + "!");
}
/**
* 处理发送到 "/app/chat.sendMessage" 的消息 (模拟聊天消息)。
* 将收到的消息广播给所有订阅了 "/topic/publicChat" 的用户。
*
* @param chatMessage 聊天消息对象
* @return 将原消息广播出去
*/
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/publicChat")
public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
logger.info("Received chat message: {}", chatMessage.getContent());
// 可以进行消息存储、过滤等操作
return chatMessage; // 直接将消息广播出去
}
/**
* 处理发送到 "/app/chat.addUser" 的消息 (模拟用户加入)。
* 当用户加入时,将消息广播到公共聊天室,并设置消息头中的用户信息。
*
* @param chatMessage 包含用户名的消息
* @param headerAccessor 用于访问和修改消息头,特别是 session 属性
* @return 广播的用户加入消息
*/
@MessageMapping("/chat.addUser")
@SendTo("/topic/publicChat")
public ChatMessage addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
logger.info("User added: {}", chatMessage.getSender());
// 将用户名添加到 WebSocket session 属性中,方便后续识别用户
if (headerAccessor.getSessionAttributes() != null) {
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
}
return chatMessage;
}
/**
* 处理发送到 "/app/private.message" 的消息,并只回复给发送者。
* 使用 @SendToUser 将消息发送到特定用户的私有队列。
* 需要 Spring Security 或其他机制来识别用户 (Principal)。
*
* @param message 负载内容
* @param principal 代表当前已认证的用户
* @return 发送给发送者的私有消息
*/
@MessageMapping("/private.message")
@SendToUser("/queue/reply") // 结果将发送到 "/user/{username}/queue/reply"
public Greeting sendPrivateMessage(@Payload String message, Principal principal) {
if (principal == null) {
logger.warn("Received private message without authenticated user.");
// 可以抛出异常或返回错误信息
return new Greeting("Error: User not authenticated.");
}
logger.info("Received private message from {}: {}", principal.getName(), message);
return new Greeting("Private reply to " + principal.getName() + ": You sent '" + HtmlUtils.htmlEscape(message) + "'");
}
}
// — 假设的消息模型类 —
// (通常放在 model 包下)
// import lombok.AllArgsConstructor;
// import lombok.Data;
// import lombok.NoArgsConstructor;
// @Data
// @NoArgsConstructor
// @AllArgsConstructor
// class HelloMessage {
// private String name;
// }
// @Data
// @NoArgsConstructor
// @AllArgsConstructor
// class Greeting {
// private String content;
// }
// @Data
// @NoArgsConstructor
// @AllArgsConstructor
// class ChatMessage {
// private MessageType type;
// private String content;
// private String sender;
// public enum MessageType { CHAT, JOIN, LEAVE }
// }
“`
注解详解:
@Controller
: 标记此类为 Spring MVC 控制器,同时适用于处理 WebSocket 消息。@MessageMapping("/path")
:- 将方法映射到指定的 STOMP 目的地。
- 客户端向
/app/path
(根据setApplicationDestinationPrefixes
配置) 发送的消息将由此方法处理。 - 方法的参数可以是被
@Payload
注解的负载内容(Spring 会尝试根据 Content-Type 反序列化,通常是 JSON)、SimpMessageHeaderAccessor
(访问消息头和 Session 属性)、Principal
(如果集成了 Spring Security,表示当前用户)等。
@SendTo("/destination")
:- 指定方法返回值发送到的 STOMP 目的地。
- 消息将被发送到消息代理,然后由代理广播给所有订阅了
/destination
的客户端。 - 返回值会被自动序列化(通常为 JSON)。
@SendToUser("/destination")
:- 将方法返回值发送给当前消息的发送者。
- 目的地通常是私有队列,如
/queue/reply
。 - Spring 会自动将目标地址转换为用户特定的地址,例如
/user/{username}/queue/reply
(其中{username}
来自Principal.getName()
或其他用户识别机制)。 - 这需要用户身份认证机制(如 Spring Security)的支持。
@Payload
: (可选) 明确指定哪个方法参数是消息的负载 (body)。如果只有一个符合条件的参数,可以省略。SimpMessageHeaderAccessor
: 提供对 STOMP 消息头的访问,例如获取 Session ID、Session 属性、原始消息等。可以通过headerAccessor.getSessionAttributes().put("key", value)
来存储与特定 WebSocket Session 相关的数据。Principal
: 如果集成了 Spring Security 并且用户已认证,此参数会包含当前用户的信息。
五、 客户端实现 (JavaScript + SockJS + Stomp.js)
前端需要使用 JavaScript 库来连接 WebSocket 服务器、发送和接收 STOMP 消息。常用的组合是 SockJS 和 Stomp.js。
1. 引入库
在 HTML 页面中引入必要的 JavaScript 库。如果使用了 WebJars,可以这样引入:
“`html
Enter Your Name
Say Hello
Public Chat
“`
2. 编写 JavaScript (app.js)
“`javascript
// app.js
let stompClient = null;
let username = null;
// — DOM Ready —
$(document).ready(function() {
$(“#connect”).click(connect);
$(“#disconnect”).click(disconnect);
$(“#sendHello”).click(sendName);
$(“#sendMessage”).click(sendChatMessage);
$(“#message”).keypress(function(event) {
if (event.which === 13) { // Enter key pressed
sendChatMessage();
}
});
$(“#name”).keypress(function(event) {
if (event.which === 13) { // Enter key pressed
connect();
}
});
});
// — Connection Logic —
function connect() {
username = $(“#name”).val().trim();
if (username) {
// 创建 SockJS 连接
const socket = new SockJS(‘/ws’); // 连接到配置的 STOMP 端点
// 基于 SockJS 连接创建 STOMP 客户端
stompClient = Stomp.over(socket);
console.log("Attempting to connect...");
// 连接到 STOMP 代理
stompClient.connect(
{}, // Headers (例如可以传递认证 token)
function (frame) { // 连接成功回调
setConnected(true);
console.log('Connected: ' + frame);
// *** 订阅公共主题 ***
// 订阅 "/topic/greetings",用于接收问候消息
stompClient.subscribe('/topic/greetings', function (greeting) {
showGreeting(JSON.parse(greeting.body).content);
});
// 订阅 "/topic/publicChat",用于接收公共聊天消息
stompClient.subscribe('/topic/publicChat', function (message) {
const chatMessage = JSON.parse(message.body);
showChatMessage(chatMessage);
});
// (可选) 订阅用户私有队列,用于接收 @SendToUser 消息
// 需要用户已认证,并且服务端配置了用户目标前缀
// stompClient.subscribe('/user/queue/reply', function (reply) {
// console.log("Received private reply: ", JSON.parse(reply.body));
// showGreeting("Private: " + JSON.parse(reply.body).content);
// });
// 连接成功后,发送 "用户加入" 消息
stompClient.send("/app/chat.addUser",
{},
JSON.stringify({ sender: username, type: 'JOIN' })
);
},
function (error) { // 连接失败回调
console.error('Connection error: ' + error);
setConnected(false);
alert('Could not connect to WebSocket server. Please check console.');
// 可以尝试重连等错误处理
// reconnect();
}
);
} else {
alert("Please enter your name.");
}
}
function disconnect() {
if (stompClient !== null) {
// 发送离开消息 (如果需要)
stompClient.send(“/app/chat.sendMessage”, {}, JSON.stringify({ sender: username, type: ‘LEAVE’, content: username + ‘ left!’ }));
stompClient.disconnect(function() {
console.log("Disconnected");
setConnected(false);
});
stompClient = null; // 清理 Stomp Client 实例
}
}
// — Sending Messages —
function sendName() {
if (stompClient && stompClient.connected) {
const name = $(“#name”).val();
console.log(“Sending name: ” + name);
// 发送消息到 “/app/hello”
stompClient.send(“/app/hello”, {}, JSON.stringify({‘name’: name}));
} else {
console.log(“Cannot send, not connected.”);
}
}
function sendChatMessage() {
const messageContent = $(“#message”).val().trim();
if (messageContent && stompClient && stompClient.connected) {
const chatMessage = {
sender: username,
content: messageContent,
type: ‘CHAT’
};
console.log(“Sending chat message: “, chatMessage);
// 发送消息到 “/app/chat.sendMessage”
stompClient.send(“/app/chat.sendMessage”, {}, JSON.stringify(chatMessage));
$(“#message”).val(”); // 清空输入框
} else if (!stompClient || !stompClient.connected) {
console.log(“Cannot send message, not connected.”);
}
}
// — Updating UI —
function setConnected(connected) {
$(“#connect”).prop(“disabled”, connected);
$(“#disconnect”).prop(“disabled”, !connected);
$(“#name”).prop(“disabled”, connected);
if (connected) {
$(“#login-container”).addClass(‘d-none’);
$(“#main-content”).removeClass(‘d-none’);
$(“#greetings”).html(“”); // 清空问候区域
$(“#chatbox”).html(“”); // 清空聊天区域
} else {
$(“#login-container”).removeClass(‘d-none’);
$(“#main-content”).addClass(‘d-none’);
}
}
function showGreeting(message) {
$(“#greetings”).append(“
” + message + “
“);
}
function showChatMessage(message) {
let messageElement = $(‘
‘);
let textElement = $(‘‘);
if (message.type === 'JOIN') {
messageElement.addClass('event-message');
textElement.text(message.sender + ' joined!');
} else if (message.type === 'LEAVE') {
messageElement.addClass('event-message');
textElement.text(message.sender + ' left!');
} else { // CHAT
let senderElement = $('<strong></strong>').text(message.sender + ': ');
textElement.text(message.content);
messageElement.append(senderElement);
}
messageElement.append(textElement);
$("#chatbox").append(messageElement);
// 滚动到底部
const chatbox = document.getElementById('chatbox');
chatbox.scrollTop = chatbox.scrollHeight;
}
// — (可选) 重连逻辑 —
// function reconnect() {
// console.log(“Attempting to reconnect in 5 seconds…”);
// setTimeout(connect, 5000);
// }
“`
JavaScript 代码详解:
- 连接 (
connect
function):- 获取用户名。
new SockJS('/ws')
: 创建一个 SockJS 连接实例,指向服务器配置的 STOMP 端点 (/ws
)。SockJS 会自动处理协议协商和回退。Stomp.over(socket)
: 在 SockJS 连接上层创建一个 STOMP 客户端实例。stompClient.connect(headers, connectCallback, errorCallback)
: 尝试建立 STOMP 连接。headers
: 可以包含自定义头,例如用于身份验证的 token。connectCallback
: 连接成功时调用。在此回调中,通常会执行订阅操作。errorCallback
: 连接失败时调用。
stompClient.subscribe(destination, callback)
: 订阅 STOMP 目的地。destination
: 要订阅的主题或队列 (例如/topic/greetings
,/topic/publicChat
,/user/queue/reply
)。callback
: 当收到来自该目的地的消息时调用的函数。消息内容在message.body
中,通常是 JSON 字符串,需要JSON.parse()
解析。
- 连接成功后,发送一个 “JOIN” 消息通知其他用户。
- 断开连接 (
disconnect
function):- 可选地发送一个 “LEAVE” 消息。
stompClient.disconnect(callback)
: 关闭 STOMP 连接和底层的 WebSocket/SockJS 连接。- 清理
stompClient
变量。
- 发送消息 (
sendName
,sendChatMessage
functions):stompClient.send(destination, headers, body)
: 向指定的 STOMP 目的地发送消息。destination
: 目标地址,通常以应用前缀开头 (例如/app/hello
,/app/chat.sendMessage
)。headers
: STOMP 消息头 (通常为空对象{}
即可)。body
: 消息体,通常是 JSON 字符串 (JSON.stringify(payload)
)。
- UI 更新 (
setConnected
,showGreeting
,showChatMessage
functions):- 根据连接状态更新界面元素(按钮、输入框的禁用/启用,显示/隐藏)。
- 将收到的消息动态地添加到 HTML 页面中。
六、 服务器主动推送消息
有时,服务器需要在没有客户端请求的情况下主动向客户端推送消息,例如:系统通知、数据更新、定时任务结果等。这可以通过注入 SimpMessagingTemplate
实现。
“`java
package com.example.websocketdemo.service;
import com.example.websocketdemo.model.Greeting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class NotificationService {
private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public NotificationService(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// 定时任务,每 10 秒向 "/topic/greetings" 推送一次时间戳
@Scheduled(fixedRate = 10000) // 需要在启动类添加 @EnableScheduling
public void sendPeriodicMessages() {
String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
Greeting greeting = new Greeting("Periodic check: Time is now " + time);
logger.info("Sending periodic message to /topic/greetings: {}", greeting.getContent());
// 向所有订阅了 "/topic/greetings" 的客户端发送消息
messagingTemplate.convertAndSend("/topic/greetings", greeting);
}
/**
* 向特定用户发送消息。
*
* @param username 目标用户的用户名 (需要与 Principal.getName() 匹配)
* @param message 要发送的消息内容
*/
public void sendPrivateMessage(String username, String message) {
Greeting greeting = new Greeting("Private message for " + username + ": " + message);
logger.info("Sending private message to user {}: {}", username, greeting.getContent());
// Spring 会自动将目标地址转换为 "/user/{username}/queue/notifications"
messagingTemplate.convertAndSendToUser(username, "/queue/notifications", greeting);
// 客户端需要订阅 "/user/queue/notifications" 才能收到
}
/**
* 发送广播消息 (与 @SendTo 效果类似,但可以在任何 Service 或 Component 中调用)
* @param message 消息内容
*/
public void sendBroadcastMessage(String message) {
Greeting greeting = new Greeting("Broadcast: " + message);
logger.info("Sending broadcast message to /topic/publicChat: {}", greeting.getContent());
messagingTemplate.convertAndSend("/topic/publicChat", greeting);
}
}
“`
要点:
- 注入
SimpMessagingTemplate
: Spring 会自动配置并注入这个模板类,它是发送 STOMP 消息的核心工具。 messagingTemplate.convertAndSend(destination, payload)
: 将payload
对象转换为消息(默认使用 Jackson JSON 序列化)并发送到指定的destination
(例如/topic/...
)。所有订阅该目的地的客户端都会收到。messagingTemplate.convertAndSendToUser(username, destination, payload)
: 将消息发送给特定的用户。username
: 用户的唯一标识符,通常是登录名(需要与 Spring Security 的Principal.getName()
或其他用户识别机制一致)。destination
: 相对于用户的基础目标地址(例如/queue/notifications
)。Spring 会自动将其解析为用户的完整私有队列地址(如/user/username/queue/notifications
)。- 客户端需要订阅相应的用户目标地址(例如
/user/queue/notifications
)。
@Scheduled
: (示例) 演示了如何在定时任务中推送消息。需要在 Spring Boot 启动类上添加@EnableScheduling
注解。
七、 安全性考量 (Spring Security 集成)
在生产环境中,保护 WebSocket 端点至关重要。Spring Security 可以与 Spring WebSocket 无缝集成。
主要步骤:
- 添加 Spring Security 依赖:
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency> - 配置 Spring Security: 创建一个继承
WebSecurityConfigurerAdapter
(旧版) 或使用SecurityFilterChain
Bean (推荐) 的配置类。 - 保护 HTTP 端点: 配置 HTTP 安全性,确保用户在访问 WebSocket 连接端点 (
/ws/**
) 之前需要进行身份验证(例如通过登录表单、JWT Token 等)。 - 配置 WebSocket 安全性:
- 在
WebSocketMessageBrokerConfigurer
同级的配置类或专门的 WebSocket 安全配置类中,继承AbstractSecurityWebSocketMessageBrokerConfigurer
。 - 重写
configureInbound(MessageSecurityMetadataSourceRegistry messages)
方法来保护 STOMP 消息目的地。
- 在
示例 WebSocket 安全配置:
“`java
package com.example.websocketdemo.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.security.config.annotation.web.messaging.MessageSecurityMetadataSourceRegistry;
import org.springframework.security.config.annotation.web.socket.AbstractSecurityWebSocketMessageBrokerConfigurer;
@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
// 所有发往 "/app/**" 的消息都需要认证 (除了特定路径)
.simpMessageDestMatchers("/app/chat.addUser", "/app/hello").permitAll() // 允许匿名访问加入和问候接口
.simpMessageDestMatchers("/app/**").authenticated() // 其他 /app/** 需要认证
// 发往 "/user/**" 或 "/topic/private.*" 的消息需要认证
.simpMessageDestMatchers("/user/**", "/topic/private.*").authenticated()
// 订阅 "/topic/publicChat", "/topic/greetings" 允许匿名
.simpSubscribeDestMatchers("/topic/publicChat", "/topic/greetings").permitAll()
// 其他订阅需要认证 (例如私有主题)
.simpSubscribeDestMatchers("/topic/**", "/queue/**", "/user/**").authenticated()
// CONNECT 类型的消息需要认证 (防止未登录用户建立连接,除非你有匿名连接需求)
// 注意: 如果你的 SockJS HTTP 端点 (/ws/**) 已经受保护,这里可能不需要对 CONNECT 强制认证
// .simpTypeMatchers(SimpMessageType.CONNECT).authenticated()
// MESSAGE 和 SUBSCRIBE 类型的默认拒绝,除非上面明确允许
.simpTypeMatchers(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE).denyAll()
// 其他类型的消息 (如 DISCONNECT) 允许
.anyMessage().permitAll(); // 或者 .authenticated() 根据需要调整
}
/**
* 禁用 CSRF 保护 for WebSocket。
* STOMP over WebSocket 通常不需要 CSRF,因为浏览器不会自动发送凭证。
* 如果你使用 Cookie 认证并且客户端不是浏览器,或者有其他特殊情况,可能需要开启并处理 CSRF Token。
*/
@Override
protected boolean sameOriginDisabled() {
return true;
}
}
“`
安全配置说明:
configureInbound
: 配置入站消息(客户端发往服务器)的安全性。simpMessageDestMatchers
: 匹配客户端SEND
消息的目的地 (/app/...
)。simpSubscribeDestMatchers
: 匹配客户端SUBSCRIBE
消息的目的地 (/topic/...
,/queue/...
,/user/...
)。simpTypeMatchers
: 匹配 STOMP 消息类型 (CONNECT, MESSAGE, SUBSCRIBE, UNSUBSCRIBE, DISCONNECT)。.permitAll()
: 允许所有访问。.authenticated()
: 要求用户已认证。.denyAll()
: 拒绝所有访问。sameOriginDisabled()
: 返回true
通常会禁用 WebSocket 的 CSRF 保护。对于基于 Token 的认证(如 JWT),这是常见的做法。如果使用基于 Session/Cookie 的认证,并且希望防止跨站 WebSocket 劫持,可能需要启用 CSRF 并让客户端在连接时传递 CSRF Token。
传递认证信息:
- 基于 Session/Cookie: 如果你的 Web 应用使用标准的 Spring Security Session 管理,WebSocket 连接握手时会自动携带 Cookie,Spring Security 会识别认证状态。
- 基于 Token (如 JWT):
- 方案一 (Query Param): 客户端在连接 SockJS/WebSocket URL 时将 Token 作为查询参数传递 (不推荐,Token 可能暴露在日志中)。
- 方案二 (STOMP CONNECT Headers): 客户端在调用
stompClient.connect(headers, ...)
时,将 Token 放在headers
对象中,例如{'Authorization': 'Bearer ' + jwtToken}
。服务器端需要配置一个ChannelInterceptor
来拦截CONNECT
消息,验证 Header 中的 Token,并将认证信息设置到SimpMessageHeaderAccessor
或 SecurityContext 中。
八、 高级主题与最佳实践
- 错误处理:
- 在
@MessageMapping
方法中使用try-catch
或全局异常处理器 (@ControllerAdvice
+@MessageExceptionHandler
)。 - 配置用户目标错误队列 (
registry.setUserDestinationPrefix
),当向特定用户发送消息失败时,错误可以路由到这里。 - 实现
StompSubProtocolErrorHandler
来自定义 STOMP 错误帧的发送。
- 在
- 心跳 (Heartbeats):
- WebSocket 规范本身没有定义应用层的心跳,但 STOMP 协议支持。心跳用于检测连接是否仍然活跃,防止空闲连接被代理或防火墙断开。
-
在
configureMessageBroker
中配置:
“`java
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
// 配置消息大小限制等
// registration.setMessageSizeLimit(512 * 1024);
// registration.setSendBufferSizeLimit(1024 * 1024);
// registration.setSendTimeLimit(20000);
}@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(1);
taskScheduler.setThreadNamePrefix(“wss-heartbeat-thread-“);
taskScheduler.initialize();registry.enableSimpleBroker("/topic", "/queue") .setHeartbeatValue(new long[]{10000, 10000}) // [server-send, server-receive] in ms .setTaskScheduler(taskScheduler); // 提供任务调度器用于心跳 registry.setApplicationDestinationPrefixes("/app");
}
``
stompClient.heartbeat.outgoing = 10000; stompClient.heartbeat.incoming = 10000;
* 客户端 Stomp.js 也需要配置心跳:。
SimpMessageHeaderAccessor.getSessionAttributes()
3. **用户会话管理:**
* 使用存储与特定 WebSocket 会话相关的信息(如用户名、状态等)。
SessionConnectEvent
* 监听,
SessionConnectedEvent,
SessionSubscribeEvent,
SessionUnsubscribeEvent,
SessionDisconnectEvent等应用事件,来跟踪用户连接状态和订阅情况。
@SpringBootTest
4. **扩展性:**
* **无状态应用层:** 尽量避免在 WebSocket Controller 或 Service 中存储大量用户状态,依赖 Session 属性或外部存储(如 Redis)。
* **使用外部消息代理:** 对于需要水平扩展应用实例的场景,必须使用外部消息代理(RabbitMQ, ActiveMQ, Redis Pub/Sub 等),以便不同实例上的客户端可以相互通信或接收广播。简单内存代理无法跨实例工作。
* **负载均衡:** WebSocket 连接是长连接,需要配置支持 WebSocket 的负载均衡器(如 Nginx, HAProxy),并通常需要启用 "sticky sessions"(会话保持),确保同一客户端的后续请求(包括 SockJS 的轮询请求)路由到同一个应用实例。如果使用外部消息代理,则不一定需要 sticky sessions。
5. **测试:**
* 使用 Spring Boot 的配合
WebSocketStompClient(来自
spring-messaging) 编写集成测试,模拟客户端连接、发送和接收消息。
@MessageMapping` 方法可以像测试普通 Spring Bean 一样进行。
* 单元测试
九、 总结
Spring WebSocket 和 STOMP 为 Java 开发者构建实时、交互式 Web 应用提供了强大而便捷的工具集。通过 @EnableWebSocketMessageBroker
、WebSocketMessageBrokerConfigurer
进行核心配置,使用 @MessageMapping
、@SendTo
、@SendToUser
处理消息流,结合 SimpMessagingTemplate
实现服务器主动推送,并利用 Spring Security 保障通信安全,我们可以相对轻松地实现聊天室、实时数据看板、协作工具等复杂功能。
理解 WebSocket 和 STOMP 的基本原理,掌握 Spring 的配置选项(特别是端点、消息代理、应用前缀),熟悉客户端库(如 SockJS 和 Stomp.js)的用法,并考虑安全性、错误处理、扩展性等实际问题,是成功运用 Spring WebSocket 的关键。
虽然本文篇幅较长,力求详尽,但 WebSocket 和消息传递领域仍有许多更深入的细节和高级用法值得探索。希望本文能为你提供一个坚实的起点,助你驾驭 Spring WebSocket,构建出色的实时应用。