Server-Sent Events (SSE) 入门指南:解锁单向实时数据流的艺术
在现代Web应用中,实时通信已成为不可或缺的一部分。无论是实时的股票行情、新闻推送、聊天通知、进度更新,还是物联网设备的数据流,用户都期望能够即时获取最新信息。为了实现这一目标,开发者们探索了多种技术方案,其中 Server-Sent Events (SSE) 以其独特的简洁性和高效性,在特定的应用场景下脱颖而出。
本指南将带你深入了解 Server-Sent Events (SSE),从其基本概念、工作原理,到详细的客户端与服务器端实现,再到最佳实践、限制以及与其他实时通信技术的比较,帮助你全面掌握这项技术。
第一章:SSE 是什么?—— 概念与背景
1.1 实时Web通信的演进
在讨论 SSE 之前,我们先回顾一下Web实时通信的演进历程:
- 传统HTTP短连接 (Polling):客户端定时向服务器发送请求,询问是否有新数据。这种方式简单,但效率低下,会产生大量不必要的请求,且实时性受限于轮询间隔。
- 长轮询 (Long Polling):客户端发送请求后,服务器会保持连接,直到有新数据或超时才响应。客户端收到响应后立即发起新的请求。这比短轮询效率更高,但仍然存在连接建立与关闭的开销,且服务器需要维护大量挂起的连接。
- WebSockets:一种在单个TCP连接上进行全双工通信的协议。一旦握手成功,客户端和服务器可以互相发送消息,效率极高,适用于需要频繁双向通信的场景(如在线聊天、多人游戏)。然而,WebSocket 的协议层级更低,实现相对复杂,且对于仅仅需要服务器向客户端单向推送数据的场景,可能显得有些“大材小用”。
- Server-Sent Events (SSE):本文的主角,专为服务器向客户端单向推送数据而设计。它基于标准HTTP协议,利用持久连接实现高效的实时数据流。
1.2 SSE 的核心思想
Server-Sent Events(服务器发送事件)是一种HTML5规范,允许服务器通过HTTP连接持续地向客户端推送数据。与 WebSockets 的双向通信不同,SSE 是单向的,数据流只从服务器流向客户端。
它的核心理念是:客户端发起一个普通的HTTP GET请求,服务器不会立即关闭连接,而是将 Content-Type 设置为 text/event-stream,然后通过这个持久化的HTTP连接,以特定的格式持续发送事件数据。浏览器内置的 EventSource API 会负责解析这些数据流,并将它们作为DOM事件传递给Web应用。
1.3 为什么选择 SSE?
在众多实时通信技术中,SSE 提供了以下独特的优势:
- 简洁性(Simplicity):SSE 基于标准 HTTP 协议,不需要像 WebSockets 那样升级协议。这意味着它更容易与现有的 HTTP 基础设施(如代理、防火墙)兼容。客户端使用
EventSource这个原生浏览器 API,非常直观。 - 自动重连(Automatic Reconnection):
EventSourceAPI 内置了自动重连机制。当连接中断(网络问题、服务器重启等)时,浏览器会自动尝试重新连接服务器,并可以利用Last-Event-ID机制确保消息不丢失或不重复。 - 事件ID(Event IDs):服务器可以为每个事件指定一个 ID。当客户端断线重连时,它会向服务器发送
Last-Event-ID头,告知服务器它最后收到的事件 ID,服务器可以据此从中断处恢复数据推送。 - 事件类型(Event Types):SSE 支持自定义事件类型。服务器可以发送不同类型的事件,客户端可以监听特定的事件类型,从而实现更灵活的事件处理逻辑。
- 防火墙友好(Firewall-Friendly):由于 SSE 使用标准的 HTTP 协议,它通常能够穿透防火墙和代理服务器,而不需要特殊的配置。
- 开销较低(Lower Overhead for Unidirectional):对于仅需服务器向客户端单向推送数据的场景,SSE 的协议开销比 WebSockets 更低,因为它不需要处理双向心跳或复杂的帧协议。
第二章:SSE 是如何工作的?—— 工作原理揭秘
SSE 的工作原理可以概括为以下几个步骤:
-
客户端发起连接:
客户端通过 JavaScript 的EventSourceAPI 创建一个到服务器的连接。这本质上是一个标准的 HTTP GET 请求。
javascript
const eventSource = new EventSource('/stream'); -
服务器响应与保持连接:
服务器收到请求后,不会像处理普通 HTTP 请求那样立即发送完整响应并关闭连接。相反,它会:- 设置响应头
Content-Type: text/event-stream。这是告诉浏览器,这个连接将用于 SSE 流。 - 设置
Cache-Control: no-cache和Connection: keep-alive,防止缓存并保持连接开放。 - 一旦发送了这些响应头,服务器会保持连接开放,以便后续可以持续向客户端发送数据。
- 设置响应头
-
服务器推送数据:
服务器会周期性地(或当有新数据时)将数据块写入到这个开放的 HTTP 连接中。每个数据块都遵循 SSE 的特定格式,并以两个换行符\n\n结尾,表示一个事件的结束。一个基本的事件格式如下:
data: 这是第一行数据\n
data: 这是第二行数据\n
event: customEventName\n
id: 123\n
retry: 5000\n
\n
*data::事件数据。可以有多行data:字段,它们会被连接成一个字符串。
*event::事件类型。客户端可以通过eventSource.addEventListener('customEventName', ...)监听特定类型的事件。如果未指定,默认为message事件。
*id::事件 ID。客户端会自动记住最后一个收到的 ID,并在重连时通过Last-Event-ID头发送给服务器。
*retry::重连时间间隔(毫秒)。客户端在断线后会等待这个时间再尝试重连。 -
客户端接收与处理事件:
浏览器内置的EventSourceAPI 负责持续监听这个连接。每当它检测到服务器发送的以\n\n结尾的数据块时,就会解析这个数据块,并将其作为一个事件触发。
客户端可以通过注册onopen、onmessage、onerror或自定义事件类型的监听器来处理这些事件。 -
自动重连机制:
如果连接由于任何原因中断(例如,网络错误、服务器关闭、超时),EventSource会:- 触发
onerror事件。 - 等待由服务器通过
retry字段指定的时间(或默认时间,通常为3秒)。 - 尝试重新建立连接。在重连请求中,浏览器会自动包含
Last-Event-ID请求头,值为上次收到的事件的id。
服务器可以利用这个Last-Event-ID来判断客户端是从哪里断开的,并从那个事件之后开始重新发送数据,从而实现无缝的数据流恢复。
- 触发
通过这种方式,SSE 提供了一个高效、稳定且易于实现的单向实时数据推送机制。
第三章:客户端实现:使用 EventSource API
在客户端,实现 SSE 非常简单,主要依赖于浏览器原生的 EventSource 接口。
3.1 创建 EventSource 对象
要开始接收服务器发送的事件,你只需创建一个 EventSource 实例,传入服务器端流的URL。
“`javascript
// index.html
SSE 实时数据
“`
3.2 EventSource 对象的属性和方法
EventSource.readyState:一个表示连接状态的数字:0 (CONNECTING):连接尚未建立,或者正在尝试重新连接。1 (OPEN):连接已打开,可以接收数据。2 (CLOSED):连接已关闭或无法打开。
EventSource.url:建立连接的 URL。EventSource.withCredentials:一个布尔值,表示是否发送凭据(如 cookies、HTTP 认证)。EventSource.close():关闭连接。这会阻止浏览器尝试重新连接。
3.3 跨域问题 (CORS)
如果 SSE 服务器与客户端不在同一个域(协议、域名、端口),你需要处理跨域资源共享(CORS)问题。服务器端需要设置适当的 CORS 头,例如:
Access-Control-Allow-Origin: *
或者指定具体的源:
Access-Control-Allow-Origin: http://your-client-domain.com
如果客户端需要发送凭据(如 cookies),还需要在 EventSource 构造函数中设置 withCredentials 为 true,并且服务器端 Access-Control-Allow-Origin 不能是 *,必须是具体域名:
javascript
const eventSource = new EventSource('/stream', { withCredentials: true });
服务器端:
Access-Control-Allow-Origin: http://your-client-domain.com
Access-Control-Allow-Credentials: true
第四章:服务器端实现:构建 SSE 数据流
服务器端实现 SSE 的核心在于设置正确的 HTTP 响应头,并持续向客户端写入格式化的事件数据。下面我们将用几种常见的后端语言和框架来演示。
4.1 通用原则
无论使用哪种后端技术,服务器端实现 SSE 都需要遵循以下几个通用原则:
- 设置
Content-Type响应头:必须是text/event-stream。 - 设置
Cache-Control响应头:通常设置为no-cache,防止代理服务器缓存事件流。 - 设置
Connection响应头:通常设置为keep-alive,指示客户端保持连接。 - 持续写入数据:服务器需要在一个循环中,或者通过事件驱动的方式,持续向客户端的响应流中写入数据。
- 数据格式:写入的数据必须遵循 SSE 规范,即
field: value\n,每个事件以\n\n结尾。 - 刷新缓冲区:每写入一个事件后,务必刷新(flush)服务器的输出缓冲区,确保数据及时发送到客户端。
4.2 Node.js 实现示例 (Express)
Node.js 是实现 SSE 的绝佳选择,因为其异步 I/O 和流式处理的特性与 SSE 天然契合。
“`javascript
// server.js
const express = require(‘express’);
const cors = require(‘cors’); // 用于处理跨域
const app = express();
const PORT = 3000;
app.use(cors()); // 允许所有来源的跨域请求,实际应用中应更严格配置
// 存储所有连接的客户端的响应对象
const clients = [];
// SSE 数据流接口
app.get(‘/stream’, (req, res) => {
// 1. 设置 SSE 响应头
res.writeHead(200, {
‘Content-Type’: ‘text/event-stream’,
‘Cache-Control’: ‘no-cache’,
‘Connection’: ‘keep-alive’,
‘X-Accel-Buffering’: ‘no’ // Nginx 等代理服务器可能需要此头来禁用缓冲
});
// 2. 将当前客户端的响应对象存储起来
clients.push(res);
console.log(`新客户端连接,当前连接数: ${clients.length}`);
// 3. 处理客户端断开连接的事件
req.on('close', () => {
const index = clients.indexOf(res);
if (index > -1) {
clients.splice(index, 1);
}
console.log(`客户端断开连接,当前连接数: ${clients.length}`);
});
// 4. 处理 Last-Event-ID,如果客户端重连
const lastEventId = req.headers['last-event-id'];
if (lastEventId) {
console.log(`客户端重连,上次收到的事件ID为: ${lastEventId}`);
// 实际应用中,你可以在这里根据 lastEventId 重新发送从该ID之后的数据
// 为了简化示例,我们这里不实现历史数据恢复
// res.write(`id: ${lastEventId}\n`); // 告知客户端以哪个ID继续,虽然是浏览器自动发送
// res.write(`data: 从 ID ${lastEventId} 之后恢复数据推送。\n\n`);
// res.flush();
}
// 5. 初始问候消息
const initialMessage = `data: 欢迎连接到 SSE 服务!当前时间: ${new Date().toLocaleTimeString()}\n\n`;
res.write(initialMessage);
res.flush(); // 刷新缓冲区,确保数据立即发送
});
// 模拟每2秒发送一次数据到所有连接的客户端
let eventId = 0;
setInterval(() => {
eventId++;
const currentTime = new Date().toLocaleTimeString();
// 默认 message 事件
const defaultMessage = `id: ${eventId}\ndata: 服务器时间更新: ${currentTime}\n\n`;
// 自定义 greeting 事件
const greetingMessage = `id: ${eventId}\nevent: greeting\ndata: Hello from server at ${currentTime}!\n\n`;
// 自定义 update 事件,包含 JSON 数据
const updatePayload = {
timestamp: Date.now(),
value: Math.random() * 100,
status: 'ok'
};
const updateMessage = `id: ${eventId}\nevent: update\ndata: ${JSON.stringify(updatePayload)}\n\n`;
// 服务器也可以指定重连时间
// const retryMessage = `retry: 10000\n`; // 客户端断线后10秒重连
clients.forEach(client => {
try {
client.write(defaultMessage);
client.write(greetingMessage);
client.write(updateMessage);
// client.write(retryMessage); // 重连时间可以每个事件都设置,或在初始时设置一次
client.flush(); // 确保数据立即发送
} catch (error) {
console.error('发送数据失败:', error.message);
// 可以在这里移除失效的客户端连接,但req.on('close')通常会处理
}
});
}, 2000); // 每2秒推送一次
app.get(‘/’, (req, res) => {
res.send(`
<!DOCTYPE html>
SSE 实时数据
<script>
const messagesDiv = document.getElementById('messages');
const eventSource = new EventSource('/stream');
eventSource.onopen = function(event) {
console.log('SSE 连接已打开', event);
messagesDiv.innerHTML += \`<p><strong>[系统]</strong> 连接服务器成功!</p>\`;
};
eventSource.onmessage = function(event) {
console.log('收到默认消息:', event.data);
const p = document.createElement('p');
p.textContent = \`[默认消息] ${event.data} (ID: ${event.lastEventId})\`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
eventSource.addEventListener('greeting', function(event) {
console.log('收到自定义问候事件:', event.data);
const p = document.createElement('p');
p.style.color = 'blue';
p.textContent = \`[问候事件] ${event.data} (ID: ${event.lastEventId})\`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
});
eventSource.addEventListener('update', function(event) {
console.log('收到自定义更新事件:', event.data);
const data = JSON.parse(event.data);
const p = document.createElement('p');
p.style.color = 'green';
p.textContent = \`[更新事件] 时间戳: ${data.timestamp}, 值: ${data.value.toFixed(2)}, 状态: ${data.status} (ID: ${event.lastEventId})\`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
});
eventSource.onerror = function(error) {
console.error('SSE 连接发生错误:', error);
const p = document.createElement('p');
p.style.color = 'red';
if (eventSource.readyState === EventSource.CONNECTING) {
p.textContent = \`<strong>[系统]</strong> 连接中断,正在尝试重新连接...\`;
} else if (eventSource.readyState === EventSource.CLOSED) {
p.textContent = \`<strong>[系统]</strong> 连接已关闭。\`;
} else {
p.textContent = \`<strong>[系统]</strong> 发生未知错误。\`;
}
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
</script>
</body>
</html>
`);
});
app.listen(PORT, () => {
console.log(SSE Server running on http://localhost:${PORT});
});
“`
4.3 Python 实现示例 (Flask)
使用 Flask 框架,可以通过 Response 对象和生成器实现 SSE。
“`python
app.py (Flask)
from flask import Flask, Response, request
import time
import json
import random
from datetime import datetime
app = Flask(name)
一个简单的客户端列表,用于模拟广播
clients = []
@app.route(‘/stream’)
def stream():
def event_stream():
client_id = random.randint(1000, 9999) # 简单模拟客户端ID
clients.append(client_id)
print(f”新客户端连接: {client_id}, 当前连接数: {len(clients)}”)
last_event_id = request.headers.get('Last-Event-ID')
if last_event_id:
print(f"客户端 {client_id} 重连,上次收到的事件ID: {last_event_id}")
# 在这里可以实现根据 last_event_id 恢复历史数据推送
yield f"id: {last_event_id}\n" \
f"data: 从上次断开处恢复数据推送。\n\n"
yield f"data: 欢迎连接到 Flask SSE 服务!当前时间: {datetime.now().strftime('%H:%M:%S')}\n\n"
try:
current_event_id = int(last_event_id) if last_event_id else 0
while True:
current_event_id += 1
current_time = datetime.now().strftime('%H:%M:%S')
# 默认 message 事件
yield f"id: {current_event_id}\n" \
f"data: 服务器时间更新: {current_time}\n\n"
# 自定义 greeting 事件
yield f"id: {current_event_id}\n" \
f"event: greeting\n" \
f"data: Hello from Flask at {current_time}!\n\n"
# 自定义 update 事件,包含 JSON 数据
update_payload = {
"timestamp": time.time(),
"value": random.uniform(0, 100),
"status": "active"
}
yield f"id: {current_event_id}\n" \
f"event: update\n" \
f"data: {json.dumps(update_payload)}\n\n"
# 可以设置重连时间,这里设置为5秒
# yield "retry: 5000\n"
time.sleep(2) # 每2秒发送一次数据
except GeneratorExit:
# 当客户端断开连接时,GeneratorExit 会被抛出
clients.remove(client_id)
print(f"客户端 {client_id} 断开连接,当前连接数: {len(clients)}")
except Exception as e:
print(f"客户端 {client_id} 发生错误: {e}")
if client_id in clients:
clients.remove(client_id)
response = Response(event_stream(), mimetype='text/event-stream')
response.headers["Cache-Control"] = "no-cache"
response.headers["Connection"] = "keep-alive"
response.headers["X-Accel-Buffering"] = "no" # 对于Nginx等代理,禁用缓冲
# 允许跨域
response.headers["Access-Control-Allow-Origin"] = "*"
return response
@app.route(‘/’)
def index():
return “””
<!DOCTYPE html>
SSE 实时数据 (Flask)
<script>
const messagesDiv = document.getElementById('messages');
const eventSource = new EventSource('/stream');
eventSource.onopen = function(event) {
console.log('SSE 连接已打开', event);
messagesDiv.innerHTML += `<p><strong>[系统]</strong> 连接服务器成功!</p>`;
};
eventSource.onmessage = function(event) {
console.log('收到默认消息:', event.data);
const p = document.createElement('p');
p.textContent = `[默认消息] ${event.data} (ID: ${event.lastEventId})`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
eventSource.addEventListener('greeting', function(event) {
console.log('收到自定义问候事件:', event.data);
const p = document.createElement('p');
p.style.color = 'blue';
p.textContent = `[问候事件] ${event.data} (ID: ${event.lastEventId})`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
});
eventSource.addEventListener('update', function(event) {
console.log('收到自定义更新事件:', event.data);
const data = JSON.parse(event.data);
const p = document.createElement('p');
p.style.color = 'green';
p.textContent = `[更新事件] 时间戳: ${data.timestamp}, 值: ${data.value.toFixed(2)}, 状态: ${data.status} (ID: ${event.lastEventId})`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
});
eventSource.onerror = function(error) {
console.error('SSE 连接发生错误:', error);
const p = document.createElement('p');
p.style.color = 'red';
if (eventSource.readyState === EventSource.CONNECTING) {
p.textContent = `<strong>[系统]</strong> 连接中断,正在尝试重新连接...`;
} else if (eventSource.readyState === EventSource.CLOSED) {
p.textContent = `<strong>[系统]</strong> 连接已关闭。`;
} else {
p.textContent = `<strong>[系统]</strong> 发生未知错误。`;
}
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
</script>
</body>
</html>
"""
if name == ‘main‘:
app.run(debug=True, port=5000)
“`
4.4 Java 实现示例 (Spring Boot)
Spring Boot 结合 Project Reactor(响应式编程)提供了非常优雅的 SSE 实现方式。
“`java
// SpringBootApplication.java
package com.example.sseserver;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
@SpringBootApplication
@RestController
public class SseServerApplication {
public static void main(String[] args) {
SpringApplication.run(SseServerApplication.class, args);
}
// --- 方式一: 使用 SseEmitter (更传统,适合单个连接控制) ---
private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();
private final AtomicLong sseId = new AtomicLong(0);
@GetMapping("/stream-emitter")
public SseEmitter handleSseEmitter(@RequestHeader(name = "Last-Event-ID", required = false) String lastEventId) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // 不设置超时,或设置一个很长的超时时间
System.out.println("New SseEmitter client connected. Last-Event-ID: " + lastEventId);
emitter.onCompletion(() -> {
emitters.remove(emitter);
System.out.println("SseEmitter client disconnected. Current count: " + emitters.size());
});
emitter.onTimeout(() -> {
System.out.println("SseEmitter client timed out. Removing.");
emitter.complete(); // 完成连接
emitters.remove(emitter);
});
emitter.onError((e) -> {
System.err.println("SseEmitter error: " + e.getMessage());
emitter.completeWithError(e); // 错误时完成连接
emitters.remove(emitter);
});
emitters.add(emitter);
// 模拟初始消息
try {
emitter.send(SseEmitter.event()
.id(String.valueOf(sseId.incrementAndGet()))
.name("initial")
.data("欢迎连接到 Spring Boot SSE (SseEmitter) 服务!当前时间: " + LocalTime.now()));
} catch (IOException e) {
System.err.println("Error sending initial message: " + e.getMessage());
}
return emitter;
}
// 模拟定时推送,广播给所有连接
// @Scheduled(fixedRate = 2000) // 需要在主应用类上启用 @EnableScheduling
public void sendEventsToEmitters() {
if (emitters.isEmpty()) {
return;
}
System.out.println("Sending events to " + emitters.size() + " SseEmitter clients.");
long currentSseId = sseId.incrementAndGet();
String currentTime = LocalTime.now().toString();
// 默认 message 事件
SseEmitter.Event defaultEvent = SseEmitter.event()
.id(String.valueOf(currentSseId))
.data("服务器时间更新: " + currentTime);
// 自定义 greeting 事件
SseEmitter.Event greetingEvent = SseEmitter.event()
.id(String.valueOf(currentSseId))
.name("greeting")
.data("Hello from Spring Boot (SseEmitter) at " + currentTime + "!");
// 自定义 update 事件 (JSON)
String jsonPayload = String.format("{\"timestamp\": %d, \"value\": %.2f, \"status\": \"active\"}",
System.currentTimeMillis(), Math.random() * 100);
SseEmitter.Event updateEvent = SseEmitter.event()
.id(String.valueOf(currentSseId))
.name("update")
.data(jsonPayload);
// 可以设置重连时间
// SseEmitter.Event retryEvent = SseEmitter.event().reconnectTime(5000L); // 5 seconds
for (SseEmitter emitter : emitters) {
try {
emitter.send(defaultEvent);
emitter.send(greetingEvent);
emitter.send(updateEvent);
// emitter.send(retryEvent);
} catch (IOException e) {
System.err.println("Error sending to emitter: " + e.getMessage());
emitter.completeWithError(e); // 标记为完成,后续会被移除
}
}
}
// --- 方式二: 使用 WebFlux Flux<ServerSentEvent> (响应式编程,更简洁优雅) ---
// SSE 流接口
@GetMapping(path = "/stream-flux", produces = org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<org.springframework.http.codec.ServerSentEvent<String>> handleSseFlux(
@RequestHeader(name = "Last-Event-ID", required = false) String lastEventId) {
System.out.println("New Flux SSE client connected. Last-Event-ID: " + lastEventId);
// 如果有 Last-Event-ID,可以从指定ID之后开始发送数据
// 这里只是打印,实际需要复杂的逻辑来检索历史数据
long startId = (lastEventId != null && !lastEventId.isEmpty()) ? Long.parseLong(lastEventId) + 1 : 0;
System.out.println("Starting stream from ID: " + startId);
return Flux.interval(Duration.ofSeconds(2)) // 每2秒生成一个事件
.map(sequence -> {
long currentSseId = sseId.incrementAndGet();
String currentTime = LocalTime.now().toString();
String jsonPayload = String.format("{\"timestamp\": %d, \"value\": %.2f, \"status\": \"active\"}",
System.currentTimeMillis(), Math.random() * 100);
// 创建一个复合事件,包含不同类型的事件
// 在 Flux 中,每个 ServerSentEvent 对象代表一个完整的事件块
// 默认会发送 `data: ...\n\n`
// 如果要发送多个 event/data 字段,需要发送多个 ServerSentEvent 对象,
// 或者将所有信息打包到一个JSON中,再通过 data 字段发送。
// 这里的示例将不同类型的事件分别发送。
return org.springframework.http.codec.ServerSentEvent.<String>builder()
.id(String.valueOf(currentSseId))
.event("message")
.data("服务器时间更新: " + currentTime)
.comment("这是一个默认消息") // 这是一个自定义注释,不会发送给客户端
.build();
})
.doOnCancel(() -> System.out.println("Flux SSE client disconnected."))
.doOnError(e -> System.err.println("Flux SSE error: " + e.getMessage()));
}
// 假设你有一个方法可以为 Flux / SseEmitter 发送自定义事件
// 例如,一个消息队列监听器收到新消息后,调用此方法
public void pushCustomEvent(String eventName, String data) {
long currentSseId = sseId.incrementAndGet();
// 对于 SseEmitter
SseEmitter.Event customEvent = SseEmitter.event()
.id(String.valueOf(currentSseId))
.name(eventName)
.data(data);
for (SseEmitter emitter : emitters) {
try {
emitter.send(customEvent);
} catch (IOException e) {
System.err.println("Error sending custom event to emitter: " + e.getMessage());
emitter.completeWithError(e);
}
}
// 对于 Flux,你需要一个专门的 FluxSink 或 FluxProcessor 来推送
// 例如:
// messageProcessor.tryEmitNext(ServerSentEvent.<String>builder().event(eventName).data(data).build());
}
// 为了简单起见,提供一个简单的HTML页面来测试
@GetMapping("/")
public String index() {
return """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>SSE 客户端示例 (Spring Boot)</title>
</head>
<body>
<h1>SSE 实时数据 (Spring Boot)</h1>
<h2>SseEmitter Stream</h2>
<div id="messages-emitter" style="height: 200px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px; margin-bottom: 20px;"></div>
<h2>Flux Stream</h2>
<div id="messages-flux" style="height: 200px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px;"></div>
<script>
function setupSSE(endpoint, messageDivId) {
const messagesDiv = document.getElementById(messageDivId);
const eventSource = new EventSource(endpoint);
eventSource.onopen = function(event) {
console.log(`SSE 连接(${endpoint})已打开`, event);
messagesDiv.innerHTML += `<p><strong>[系统]</strong> 连接服务器成功!</p>`;
};
eventSource.onmessage = function(event) {
console.log(`收到默认消息(${endpoint}):`, event.data);
const p = document.createElement('p');
p.textContent = `[默认消息] ${event.data} (ID: ${event.lastEventId})`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
eventSource.addEventListener('greeting', function(event) {
console.log(`收到自定义问候事件(${endpoint}):`, event.data);
const p = document.createElement('p');
p.style.color = 'blue';
p.textContent = `[问候事件] ${event.data} (ID: ${event.lastEventId})`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
});
eventSource.addEventListener('update', function(event) {
console.log(`收到自定义更新事件(${endpoint}):`, event.data);
try {
const data = JSON.parse(event.data);
const p = document.createElement('p');
p.style.color = 'green';
p.textContent = `[更新事件] 时间戳: ${data.timestamp}, 值: ${data.value.toFixed(2)}, 状态: ${data.status} (ID: ${event.lastEventId})`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
} catch (e) {
console.error("Error parsing JSON for update event:", e);
const p = document.createElement('p');
p.style.color = 'orange';
p.textContent = `[更新事件] 原始数据: ${event.data} (解析失败)`;
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
});
eventSource.onerror = function(error) {
console.error(`SSE 连接(${endpoint})发生错误:`, error);
const p = document.createElement('p');
p.style.color = 'red';
if (eventSource.readyState === EventSource.CONNECTING) {
p.textContent = `<strong>[系统]</strong> 连接中断,正在尝试重新连接...`;
} else if (eventSource.readyState === EventSource.CLOSED) {
p.textContent = `<strong>[系统]</strong> 连接已关闭。`;
} else {
p.textContent = `<strong>[系统]</strong> 发生未知错误。`;
}
messagesDiv.appendChild(p);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
}
// 设置 SseEmitter 和 Flux Stream 的 SSE
setupSSE('/stream-emitter', 'messages-emitter');
setupSSE('/stream-flux', 'messages-flux');
</script>
</body>
</html>
""";
}
}
“`
请注意,Spring Boot SseEmitter 需要在定时任务中手动循环所有 emitters 来推送数据,而 Flux<ServerSentEvent> 是基于响应式编程,更适合数据源本身就是流的场景。对于 Flux,如果你需要推送不同 event 类型的事件,每次 map 返回的 ServerSentEvent 应该包含不同的 event 字段。为了在 Flux 中模拟多种事件,你可能需要更复杂的 map 逻辑或组合多个 Flux。上面的 Flux 示例仅发送了 message 类型的事件。
第五章:SSE 的最佳实践与注意事项
- 管理服务器资源:SSE 连接是持久化的 HTTP 连接。虽然它们不像 WebSockets 那样需要频繁的心跳,但每个连接仍然会占用服务器资源。你需要考虑服务器能够同时处理多少个 SSE 连接,并进行相应的伸缩。对于高并发场景,可能需要使用负载均衡器和消息队列。
- 错误处理与重连:
- 客户端:
EventSource已经内置了自动重连。通过onerror可以监控连接状态。 - 服务器:当客户端断开连接时(例如,浏览器关闭、网络中断),服务器端也应该能够检测到并清理相应的资源(例如,从连接列表中移除响应对象)。大多数框架的
req.on('close')或emitter.onCompletion()会处理这一点。
- 客户端:
- Last-Event-ID 的利用:当客户端重连时,它会发送
Last-Event-ID请求头。服务器应该利用这个 ID 从中断点之后开始发送数据,以避免数据丢失或重复。这通常需要服务器端维护一个持久化的事件日志或队列。 - 数据格式:尽管 SSE 允许发送任何文本数据,但推荐使用 JSON 格式来传输结构化数据,方便客户端解析。记得在
data:字段中对 JSON 进行编码。 - 安全性:
- 认证与授权:SSE 端点也需要像其他 API 一样进行认证和授权。这通常在建立连接时通过 cookie、HTTP 头(如
Authorization)或 URL 参数(不推荐用于敏感信息)进行。 - CORS:如果 SSE 源与客户端页面不同域,务必正确配置 CORS 头。
- 认证与授权:SSE 端点也需要像其他 API 一样进行认证和授权。这通常在建立连接时通过 cookie、HTTP 头(如
- 代理和缓存:确保在服务器端设置
Cache-Control: no-cache和X-Accel-Buffering: no(对于 Nginx 等代理),防止中间代理服务器缓冲 SSE 流,导致数据延迟。 - 心跳机制(Keep-Alive):尽管 SSE 是持久连接,但为了防止一些代理或防火墙在长时间没有数据传输时主动断开连接,服务器可以周期性地发送空事件或注释行(如
:\n或data: \n\n)作为心跳包。这些空事件不会触发客户端的onmessage。
: heartbeat\n\n - 消息队列集成:对于复杂的实时应用,服务器往往不是直接产生 SSE 数据的,而是从消息队列(如 Kafka, RabbitMQ, Redis Pub/Sub)中消费数据。将 SSE 与消息队列集成,可以实现更 robust 和可扩展的实时数据架构。
第六章:SSE 的局限性
尽管 SSE 优点显著,但它并非万能药,存在一些固有的局限性:
- 单向通信:SSE 最主要的限制是它只能从服务器向客户端推送数据。如果你的应用需要客户端也能向服务器发送实时消息(如聊天应用),那么 WebSockets 是更合适的选择。
- 二进制数据支持不足:SSE 设计为文本传输,不支持原生二进制数据。虽然你可以通过 Base64 编码等方式传输二进制数据,但会增加数据大小和编解码开销。
- 浏览器连接数限制:根据浏览器和规范,
EventSource在同一时间对同一域名下的连接数量有限制,通常为 6 到 8 个。这在某些情况下可能会成为瓶颈,例如在同一个页面上需要同时连接多个独立的 SSE 流。 - Internet Explorer/Edge 兼容性:较旧的 Internet Explorer 浏览器不支持 SSE。Microsoft Edge 在早期版本也不支持,但现代基于 Chromium 的 Edge 浏览器已全面支持。对于需要支持旧版 IE 的场景,需要使用 Polling 或 Long Polling 作为备选方案,或者使用 polyfill。
- HTTP/1.1 性能瓶颈:尽管 SSE 基于 HTTP 协议,但它本质上是利用了 HTTP/1.1 的长连接特性。在 HTTP/1.1 中,同一个 TCP 连接在同一时间只能处理一个请求。这意味着一个 SSE 连接会占用一个 HTTP/1.1 连接。虽然 HTTP/2 解决了队头阻塞问题,但 SSE 仍然是单向的。
第七章:SSE 与其他实时通信技术的比较
7.1 SSE vs. Long Polling
- Long Polling:客户端发起请求,服务器保持连接直到有新数据或超时。收到响应后,客户端立即发起新请求。
- 优点:兼容所有浏览器,基于标准 HTTP。
- 缺点:每次数据传输都需要建立和关闭 HTTP 连接,开销较大;实时性受限于网络和服务器响应速度。
- SSE:客户端发起一个请求,服务器保持连接并持续推送数据。
- 优点:只需建立一次连接,开销小;内置自动重连和事件 ID;原生浏览器支持。
- 缺点:单向通信;部分旧浏览器不支持。
结论:对于服务器单向推送数据,SSE 明显优于 Long Polling。
7.2 SSE vs. WebSockets
- WebSockets:在单个 TCP 连接上实现全双工(双向)通信。协议升级后,数据帧开销极小。
- 优点:全双工,双向实时通信;支持二进制数据;极低延迟;更适合高频、双向互动。
- 缺点:协议独立于 HTTP,可能需要特殊的代理配置;相对 SSE 更复杂;对于仅需单向推送的场景,可能引入不必要的开销。
- SSE:基于标准 HTTP 的单向通信。
- 优点:基于 HTTP 协议,防火墙友好,易于部署和调试;内置自动重连和事件 ID;原生
EventSourceAPI 简单易用。 - 缺点:单向通信;不支持二进制数据;浏览器连接数限制。
- 优点:基于 HTTP 协议,防火墙友好,易于部署和调试;内置自动重连和事件 ID;原生
结论:
* 选择 SSE:如果你只需要从服务器向客户端单向推送数据,例如新闻订阅、股票行情、实时日志、进度条更新等,SSE 是更简单、更高效的选择。
* 选择 WebSockets:如果你需要客户端和服务器之间的双向实时通信,例如在线聊天、多人游戏、协同编辑等,WebSockets 是更强大的选择。
决策树
- 需要双向通信吗?
- 是 -> WebSockets
- 否 -> 继续
- 需要推送二进制数据吗?
- 是 -> WebSockets (或通过 Base64 编码在 SSE 中传输,但效率不高)
- 否 -> 继续
- 只需要单向服务器到客户端的文本数据推送,并且看重简单性和 HTTP 兼容性?
- 是 -> SSE
- 需要支持非常多的并发连接(超过6-8个/域名)到同一页面,或者需要支持IE浏览器?
- 是 -> 可能需要考虑 WebSockets (尽管有其复杂性),或者回退到 Long Polling (但效率低)。
总结
Server-Sent Events (SSE) 是一种强大而简洁的 Web 实时通信技术,专为服务器向客户端单向推送数据而设计。它利用标准的 HTTP 协议,提供了自动重连、事件 ID、事件类型等便捷特性,使得开发者能够轻松构建实时数据流应用。
从股票行情到新闻通知,从任务进度到物联网数据监控,SSE 在许多场景下都是比 WebSockets 更轻量、更易于集成的选择。虽然它有单向通信、浏览器连接数限制等局限性,但在正确的应用场景下,SSE 无疑是实现高效、优雅实时数据推送的利器。
通过本指南的详细介绍,相信你已经对 SSE 有了全面的认识,并能够自信地将其应用到你的下一个实时Web项目中。在选择实时通信技术时,请根据你的具体需求权衡 SSE、WebSockets 和其他方案的优劣,做出最适合你的架构决策。