Python WebSocket 新手指南:构建实时交互应用的基石
在现代 Web 应用中,实时交互已成为一种核心需求。无论是聊天室、在线游戏、协作编辑,还是实时数据展示(如股票行情、物联网监控),传统的 HTTP 请求/响应模式都显得力不从心。这时,WebSocket 技术应运而生,它提供了一种在客户端和服务器之间进行全双工、低延迟通信的能力。
本篇文章将带你深入了解 WebSocket,并重点讲解如何使用 Python 构建 WebSocket 应用。无论你是想为你的 Web 应用添加实时功能,还是对网络编程中的持久连接感兴趣,这篇指南都将为你提供坚实的基础。
文章目录
- 什么是 WebSocket?为什么需要它?
- HTTP 的局限性
- WebSocket 的工作原理
- WebSocket 的优势
- WebSocket 的应用场景
- WebSocket vs. HTTP:深入对比
- 连接方式
- 通信模式
- 开销
- 适用场景
- Python 中的 WebSocket 库
- 主流库介绍 (
websockets
,python-socketio
,aiohttp
) - 选择合适的库
- 主流库介绍 (
- 使用
websockets
构建基础 WebSocket 应用- 安装
websockets
- 理解
asyncio
- 构建一个简单的 WebSocket 服务器 (Echo Server)
- 代码实现与解析
- 运行服务器
- 构建一个简单的 WebSocket 客户端
- 代码实现与解析
- 运行客户端
- 安装
- 深入
websockets
:处理多连接与消息广播- 如何管理多个连接
- 实现一个简单的广播服务器
- 代码实现与解析
- 处理客户端断开连接
- 使用
python-socketio
构建高级 WebSocket 应用- 什么是 Socket.IO?
- 安装
python-socketio
- 构建一个简单的 Socket.IO 服务器
- 基于事件的通信
- 代码实现与解析
- 构建一个简单的 Socket.IO 客户端
- 代码实现与解析
- WebSocket 应用的常见问题与最佳实践
- 错误处理与连接断开
- 心跳机制 (Ping/Pong)
- 安全性考虑 (SSL/TLS, Origin 校验)
- 性能与扩展性 (消息队列)
- 与现有 Web 框架集成
- 总结与进一步学习
1. 什么是 WebSocket?为什么需要它?
HTTP 的局限性
在我们深入 WebSocket 之前,先回顾一下传统的 HTTP 协议。HTTP 是一个基于请求/响应模式的协议。客户端(通常是浏览器)发起一个请求,服务器处理请求并返回一个响应。这个过程是单向的,并且在每次请求/响应完成后,连接通常会被关闭(尽管 HTTP/1.1 引入了持久连接,可以在一定时间内复用,但通信模式仍是请求/响应)。
这种模式对于获取网页、提交表单等场景非常有效。但对于需要服务器主动向客户端推送数据的实时应用来说,HTTP 就显得笨拙了。为了模拟服务器推送,开发者们不得不采用一些折衷方案:
- 轮询 (Polling): 客户端每隔固定时间就向服务器发送请求,询问是否有新数据。这会导致大量无效请求,浪费带宽和服务器资源,并且数据的新鲜度取决于轮询间隔。
- 长轮询 (Long Polling): 客户端发送请求后,服务器会 holding 住连接,直到有新数据或者超时才返回响应。客户端收到响应后再立即发起新的请求。这比普通轮询效率高,但仍然是基于请求/响应,并且服务器需要维护大量半开放的连接,资源消耗较大。
- 流 (Streaming): 服务器发送响应头部后,保持连接打开,并持续向客户端发送数据。这可以实现单向推送,但客户端难以向服务器发送数据,且实现复杂。
这些技术都是对 HTTP 请求/响应模式的“滥用”,无法实现真正的双向、低延迟的实时通信。
WebSocket 的工作原理
WebSocket (RFC 6455) 协议旨在解决 HTTP 在实时通信方面的不足。它提供了一个全双工 (Full-Duplex) 的通信通道。这意味着数据可以在同一时间双向传输,客户端和服务器都可以主动发送消息。
WebSocket 连接的建立过程通常是这样的:
- 客户端通过 HTTP 发起一个特殊的请求,称为 WebSocket 握手 (Handshake)。这个请求包含一些特殊的头部(如
Upgrade: websocket
,Connection: Upgrade
,Sec-WebSocket-Key
等),表明客户端希望将当前连接升级到 WebSocket 协议。 - 服务器收到握手请求后,如果支持 WebSocket 协议并同意升级,会返回一个特殊的 HTTP 响应(状态码 101 Switching Protocols),表示协议切换成功。响应中也会包含一些特殊的头部(如
Sec-WebSocket-Accept
)。 - 握手成功后,底层 TCP 连接将不再使用 HTTP 协议进行通信,而是切换到 WebSocket 协议。此时,客户端和服务器之间建立了一个持久的、双向的连接。
- 在这个持久连接上,双方可以随时互相发送数据帧 (data frames)。数据帧可以是文本数据 (Text Frame) 或二进制数据 (Binary Frame)。WebSocket 协议负责处理帧的封装、掩码 (masking,客户端到服务器的数据需要掩码,增强安全性) 等细节。
- 连接可以由客户端或服务器主动关闭,或者由于网络问题意外断开。双方会发送特定的控制帧 (control frames,如 Close Frame) 来管理连接状态。
核心概念: 握手(Upgrade)、协议切换(101)、持久连接、全双工通信、数据帧(文本/二进制)、控制帧(心跳、关闭)。
WebSocket 的优势
相比于模拟的实时技术,WebSocket 具有显著优势:
- 真正的全双工通信: 服务器和客户端可以同时发送和接收数据,无需等待对方。
- 低延迟: 一旦连接建立,数据传输无需重复进行 HTTP 请求/响应的开销,消息可以更快速地送达。
- 较低的开销: 握手完成后,后续数据传输使用的帧协议头部非常小,相比 HTTP 头部显著减少,节省了带宽。
- 持久连接: 连接在握手后保持打开状态,避免了频繁建立/关闭连接的开销。
WebSocket 的应用场景
WebSocket 非常适合以下需要实时交互的应用:
- 即时聊天应用: 消息的发送和接收都是实时的。
- 在线游戏: 实时同步玩家状态、游戏事件。
- 实时股票/数据行情: 服务器主动推送最新的数据。
- 实时协作工具: 多人同时编辑文档、白板等。
- 通知系统: 服务器向客户端推送新通知。
- 物联网 (IoT) 监控: 设备状态的实时上报与命令的实时下发。
- 实时地理位置服务: 共享和更新位置信息。
2. WebSocket vs. HTTP:深入对比
为了更好地理解 WebSocket 的定位,我们将其与 HTTP 进行更详细的对比:
特性 | HTTP | WebSocket |
---|---|---|
通信模式 | 请求/响应 (Request/Response) | 全双工 (Full-Duplex) |
连接方式 | 短连接 (除非使用 HTTP/1.1 持久连接) | 持久连接 (在整个会话期间保持开放) |
初始化 | 客户端总是发起请求 | 客户端发起握手请求,服务器同意后升级协议 |
开销 | 每个请求/响应都有完整的头部,开销较大 | 握手后开销小,数据传输使用小型帧头部 |
数据流 | 单向 (请求 -> 响应) | 双向 (客户端和服务器都可以主动发送) |
协议 | 应用层协议,基于 TCP | 应用层协议,基于 TCP,通过 HTTP 握手建立 |
URL 方案 | http:// , https:// |
ws:// , wss:// |
适用场景 | 静态资源获取、表单提交、API 调用等 | 实时数据推送、聊天、游戏、协作等需要低延迟双向通信的场景 |
总结来说,HTTP 适合“一次性”的数据交互,而 WebSocket 适合需要持续、双向、低延迟通信的场景。WebSocket 并不是取代 HTTP,而是对其的补充,它们常常在同一个应用中协同工作,例如使用 HTTP 获取网页内容,然后通过 WebSocket 建立连接进行实时通信。
3. Python 中的 WebSocket 库
Python 生态系统中有几个流行的库可以用来构建 WebSocket 应用。选择哪个库取决于你的具体需求、对异步编程的熟悉程度以及是否需要与现有的 Web 框架集成。
以下是几个主流的库:
-
websockets
:- 这是一个基于
asyncio
的 WebSocket 库。 - 它提供了 WebSocket 协议的底层实现,非常适合需要精细控制 WebSocket 连接的场景。
- 支持 WebSocket 协议的全部特性,包括文本/二进制帧、控制帧、SSL/TLS 等。
- 特点: 纯异步、高性能、符合标准、提供较低级别的 API。
- 适用场景: 需要构建高性能、可扩展的 WebSocket 服务器或客户端,对
asyncio
比较熟悉,或者不需要 Socket.IO 等高级特性。
- 这是一个基于
-
python-socketio
:- 这是一个实现了 Socket.IO 协议的 Python 库。Socket.IO 是一个建立在 WebSocket 之上,提供更多高级特性(如自动重连、事件广播、命名空间、房间等)的库,它还包含一个降级机制,在 WebSocket 不可用时可以使用长轮询等方式模拟实时通信。
python-socketio
可以作为一个独立的服务器运行,也可以轻松地与流行的 Python Web 框架(如 Flask, Django, FastAPI, Starlette, aiohttp)集成。- 特点: 提供高级功能、易于集成、自带降级机制、基于事件。
- 适用场景: 需要 Socket.IO 提供的额外功能(如房间、命名空间),希望与现有 Web 框架紧密集成,或者客户端主要使用 Socket.IO 库(通常是 JavaScript 客户端库)。
-
aiohttp
:aiohttp
是一个用于asyncio
的异步 HTTP 客户端/服务器框架。- 它也内置了对 WebSocket 服务器和客户端的支持。
- 如果你已经在使用
aiohttp
构建 Web 应用,那么使用它内置的 WebSocket 功能会非常方便。 - 特点: 全功能的异步 HTTP 框架,包含 WebSocket 支持,与
aiohttp
生态系统集成。 - 适用场景: 已经或计划使用
aiohttp
作为主要的 Web 框架,希望在一个框架内处理 HTTP 和 WebSocket。
对于新手来说,websockets
是一个很好的起点,因为它专注于 WebSocket 协议本身,代码相对简洁,能帮助你理解 WebSocket 的基本工作方式,同时也是基于现代的 asyncio
异步编程模式。如果你后续需要更高级的功能或与特定前端库(如 Socket.IO.js)配合,再考虑 python-socketio
。aiohttp
则适合作为完整异步 Web 框架的一部分来学习。
本指南将主要使用 websockets
库来演示基础概念和实现,因为它最直接地体现了 WebSocket 的协议特性。我们也会简要介绍 python-socketio
的用法,展示高级抽象的便利性。
4. 使用 websockets
构建基础 WebSocket 应用
websockets
库是基于 Python 的 asyncio
异步框架构建的。因此,在使用 websockets
之前,你需要对 asyncio
的基本概念有所了解,例如协程 (async def
), await
关键字, 事件循环 (event loop) 等。简单来说,asyncio
允许你在单线程中以协作式的方式处理多个并发任务(I/O 密集型任务,如网络通信),而不会阻塞整个程序。
安装 websockets
首先,确保你安装了 websockets
库。打开终端或命令行,运行以下命令:
bash
pip install websockets
理解 asyncio
websockets
的操作(如发送、接收消息)都是异步的。这意味着当你调用 await websocket.recv()
等方法时,如果当前没有消息可接收,程序不会停在那里干等,而是会暂停当前协程的执行,让出控制权给事件循环,事件循环可以去执行其他已经准备好的协程(例如处理另一个客户端的连接)。当有新消息到达时,事件循环会恢复之前暂停的协程,继续执行。
所有 WebSocket 相关的操作都需要在 async def
定义的协程函数中,并且需要使用 await
关键字来等待异步操作完成。程序的入口点通常需要一个 asyncio.run()
函数来启动事件循环并运行顶层的协程。
构建一个简单的 WebSocket 服务器 (Echo Server)
我们先来创建一个最简单的 WebSocket 服务器:一个回显服务器 (Echo Server)。它接收客户端发送的任何消息,然后将同一个消息原样发回给客户端。
创建一个名为 server.py
的文件,并输入以下代码:
“`python
server.py
import asyncio
import websockets
WebSocket连接处理函数
当一个新的客户端连接建立时,asyncio事件循环会为此连接创建一个新的协程,并执行这个函数
websocket 参数代表当前的连接对象
path 参数代表客户端连接的路径(比如 ws://localhost:8765/some/path,path就是/some/path)
async def echo_handler(websocket, path):
print(f”客户端连接已建立 from {websocket.remote_address}, path: {path}”)
try:
# 无限循环,持续接收客户端消息
async for message in websocket:
print(f"收到消息 from {websocket.remote_address}: {message}")
# 将收到的消息发回给客户端
await websocket.send(f"服务器收到:{message}")
print(f"发送消息给 {websocket.remote_address}: 服务器收到:{message}")
except websockets.exceptions.ConnectionClosedOK:
# 客户端正常关闭连接
print(f"客户端 {websocket.remote_address} 已正常断开连接")
except websockets.exceptions.ConnectionClosedError as e:
# 客户端异常关闭连接
print(f"客户端 {websocket.remote_address} 连接异常断开: {e}")
except Exception as e:
# 其他未知异常
print(f"处理客户端 {websocket.remote_address} 时发生错误: {e}")
finally:
# 无论正常还是异常,连接关闭后执行清理操作(在这个简单例子中无需特别清理)
print(f"客户端 {websocket.remote_address} 连接处理结束")
启动WebSocket服务器的主函数
async def main():
# websockets.serve() 函数用于创建一个WebSocket服务器
# 第一个参数是连接处理协程函数
# host 是服务器绑定的地址,None 或 ‘localhost’ 绑定到本地所有可用接口
# port 是服务器监听的端口
# start_server 返回一个 server 对象,需要在协程中 await
server = await websockets.serve(echo_handler, “localhost”, 8765)
print("WebSocket服务器已启动,监听在 ws://localhost:8765")
# server.wait_closed() 会一直阻塞,直到服务器关闭
await server.wait_closed()
print("WebSocket服务器已关闭")
运行主函数,启动asyncio事件循环
asyncio.run() 是Python 3.7+ 推荐的运行顶层async函数的入口点
它会自动管理事件循环的创建和关闭
if name == “main“:
try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n服务器通过Ctrl+C中断”)
“`
代码解析:
import asyncio
和import websockets
: 导入所需的库。async def echo_handler(websocket, path):
: 这是一个协程函数,它将作为每个新连接的处理程序。当一个客户端通过 WebSocket 连接到服务器时,websockets
库会自动创建一个新的协程来运行这个函数。websocket
: 这是表示当前 WebSocket 连接的对象。你可以通过它发送 (await websocket.send(message)
) 和接收 (await websocket.recv()
) 数据。path
: 这是客户端连接时请求的路径(例如/
,/chat
,/data
等)。在我们的简单例子中,它总是/
。
print(...)
: 打印一些日志信息,方便观察服务器状态。async for message in websocket:
: 这是一个异步迭代器。它会持续等待并接收来自客户端的新消息。每当收到一条消息,循环体就会执行一次,message
变量将包含收到的消息内容(通常是字符串,但也可以是字节)。await websocket.send(...)
: 使用await
关键字发送消息回客户端。这是异步操作,程序会等待消息发送完成。try...except...finally
: 这是标准的 Python 异常处理。在这里,我们捕获websockets.exceptions.ConnectionClosedOK
(客户端正常关闭) 和websockets.exceptions.ConnectionClosedError
(客户端异常断开),以及其他可能的异常,以便在连接断开时进行相应的处理和日志记录。使用async for
循环接收消息时,当连接关闭,循环会自动结束,然后进入finally
块(如果正常关闭则会先进入ConnectionClosedOK
块再进入finally
)。async def main():
: 这是我们程序的异步入口点。server = await websockets.serve(echo_handler, "localhost", 8765)
: 这行代码创建并启动 WebSocket 服务器。echo_handler
: 指定处理每个新连接的协程函数。"localhost"
: 服务器将绑定到本地地址。8765
: 服务器将监听 8765 端口。await
:serve
函数是一个异步操作,需要等待服务器启动完成。它返回一个服务器对象。
print(...)
: 提示服务器已启动。await server.wait_closed()
: 这是一个异步操作,它会暂停main
协程的执行,直到服务器被关闭(例如通过server.close()
调用或接收到中断信号)。这使得main
函数保持运行,服务器持续监听连接。if __name__ == "__main__":
: 确保只有当脚本直接运行时才执行main
函数。asyncio.run(main())
: 这是启动asyncio
事件循环并运行main
协程的标准方法(Python 3.7+)。它会负责创建事件循环,运行main
直到完成,然后关闭事件循环。except KeyboardInterrupt
: 捕获用户按下Ctrl+C
时的中断信号,友好地退出程序。
运行服务器
保存 server.py
文件,然后在终端中运行:
bash
python server.py
你应该会看到输出:
WebSocket服务器已启动,监听在 ws://localhost:8765
服务器现在正在等待客户端连接。
构建一个简单的 WebSocket 客户端
现在,我们来编写一个简单的客户端,连接到我们刚刚启动的服务器,发送消息并接收回显。
创建一个名为 client.py
的文件,并输入以下代码:
“`python
client.py
import asyncio
import websockets
WebSocket客户端协程函数
async def hello_client():
# 定义服务器的WebSocket URI
uri = “ws://localhost:8765”
# 使用 websockets.connect() 建立连接
# 这是一个异步上下文管理器,连接会在进入时建立,退出时关闭
async with websockets.connect(uri) as websocket:
print(f"已连接到服务器: {uri}")
# 发送一条消息给服务器
message_to_send = "Hello, WebSocket Server!"
await websocket.send(message_to_send)
print(f"客户端发送: {message_to_send}")
# 接收服务器的回显消息
response = await websocket.recv()
print(f"客户端收到: {response}")
# 可以发送多条消息
message_to_send_2 = "How are you?"
await websocket.send(message_to_send_2)
print(f"客户端发送: {message_to_send_2}")
response_2 = await websocket.recv()
print(f"客户端收到: {response_2}")
运行客户端协程
async def main():
await hello_client()
启动asyncio事件循环运行main函数
if name == “main“:
print(“正在启动WebSocket客户端…”)
asyncio.run(main())
print(“客户端已结束”)
“`
代码解析:
import asyncio
和import websockets
: 导入所需库。async def hello_client():
: 客户端协程函数。uri = "ws://localhost:8765"
: 定义要连接的服务器地址。ws://
表示普通的 WebSocket 连接,如果服务器使用 SSL/TLS,则需要使用wss://
。async with websockets.connect(uri) as websocket:
: 这是一个异步上下文管理器。它负责建立与指定 URI 的 WebSocket 连接。- 进入
with
块时,await websockets.connect(uri)
会执行,建立连接,并将连接对象赋值给websocket
变量。 - 退出
with
块时(无论是正常退出还是发生异常),连接会自动关闭。这是一种管理资源的好方法。
- 进入
await websocket.send(message_to_send)
: 向服务器发送一条消息。await websocket.recv()
: 等待并接收服务器发送过来的消息。如果服务器没有发送消息,这里会一直阻塞(但不会占用 CPU,因为是异步等待),直到收到消息或连接关闭。async def main()
: 客户端的异步入口点。asyncio.run(main())
: 启动事件循环并运行客户端。
运行客户端
在服务器 server.py
正在运行的终端之外,打开另一个终端,运行客户端脚本:
bash
python client.py
你应该会看到客户端输出:
正在启动WebSocket客户端...
已连接到服务器: ws://localhost:8765
客户端发送: Hello, WebSocket Server!
客户端收到: 服务器收到:Hello, WebSocket Server!
客户端发送: How are you?
客户端收到: 服务器收到:How are you?
客户端已结束
同时,查看运行服务器的终端,你应该看到服务器记录了连接建立、接收消息、发送消息以及连接断开的日志:
WebSocket服务器已启动,监听在 ws://localhost:8765
客户端连接已建立 from 127.0.0.1:xxxxx, path: / # xxxxx 是随机端口号
收到消息 from 127.0.0.1:xxxxx: Hello, WebSocket Server!
发送消息给 127.0.0.1:xxxxx: 服务器收到:Hello, WebSocket Server!
收到消息 from 127.0.0.1:xxxxx: How are you?
发送消息给 127.0.0.1:xxxxx: 服务器收到:服务器收到:How are you?
客户端 127.0.0.1:xxxxx 已正常断开连接
客户端 127.0.0.1:xxxxx 连接处理结束
恭喜!你已经成功构建并运行了一个简单的 WebSocket 服务器和客户端,实现了双向通信。
5. 深入 websockets
:处理多连接与消息广播
一个 WebSocket 服务器通常需要同时处理来自多个客户端的连接,并且可能需要向所有或部分连接的客户端广播消息。websockets
库结合 asyncio
可以很好地处理这种情况。
如何管理多个连接
在 websockets
中,每个客户端连接都由一个独立的协程来处理(也就是我们在 websockets.serve()
中指定的那个处理函数)。为了能够向所有连接发送消息,我们需要在服务器端维护一个所有当前活跃连接的集合。Python 的 set
数据结构非常适合用来存储这些连接对象。
实现一个简单的广播服务器
我们将修改之前的 Echo Server,使其能够接收消息,并将收到的消息广播给所有当前在线的客户端。
修改 server.py
文件:
“`python
server_broadcast.py
import asyncio
import websockets
import json # 通常实时消息是JSON格式
创建一个全局集合,用于存放所有活跃的WebSocket连接
这是一个 set,存储的是 websocket 对象
CONNECTED_USERS = set()
WebSocket连接处理函数
async def handler(websocket, path):
print(f”客户端连接已建立 from {websocket.remote_address}, path: {path}”)
# 将新的连接添加到集合中
CONNECTED_USERS.add(websocket)
print(f"当前连接数: {len(CONNECTED_USERS)}")
try:
# 通知所有客户端有新用户加入 (可选)
# await broadcast(f"用户 {websocket.remote_address} 已加入") # 可以添加用户名等信息
# 无限循环,持续接收客户端消息
async for message in websocket:
print(f"收到消息 from {websocket.remote_address}: {message}")
# 收到消息后,广播给所有连接的客户端
await broadcast(f"[{websocket.remote_address}] 说: {message}") # 广播消息格式
except websockets.exceptions.ConnectionClosedOK:
print(f"客户端 {websocket.remote_address} 已正常断开连接")
except websockets.exceptions.ConnectionClosedError as e:
print(f"客户端 {websocket.remote_address} 连接异常断开: {e}")
except Exception as e:
print(f"处理客户端 {websocket.remote_address} 时发生错误: {e}")
finally:
# 无论正常还是异常,连接关闭后将其从集合中移除
print(f"客户端 {websocket.remote_address} 连接处理结束")
CONNECTED_USERS.remove(websocket)
print(f"当前连接数: {len(CONNECTED_USERS)}")
# 通知所有客户端有用户离开 (可选)
# await broadcast(f"用户 {websocket.remote_address} 已离开")
广播消息给所有连接的客户端
async def broadcast(message):
# 确保有客户端在线,且要发送的消息不是空
if CONNECTED_USERS and message:
# 创建一个需要发送的任务列表
# 使用 asyncio.gather 同时发送,可以提高效率
# 注意:发送时可能会遇到连接已经断开的情况,需要处理 exceptions.ConnectionClosed
send_tasks = []
for user_websocket in CONNECTED_USERS:
# 使用 asyncio.ensure_future 或 asyncio.create_task (Python 3.7+)
# 将发送操作包装成一个任务,避免一个慢速连接阻塞广播给其他连接
# 同时处理发送可能引起的 ConnectionClosed 异常
task = asyncio.create_task(safe_send(user_websocket, message))
send_tasks.append(task)
# 等待所有发送任务完成 (或者直到出现异常,取决于 return_exceptions)
# return_exceptions=True 可以在任务失败时不中断 gather,而是返回异常对象
await asyncio.gather(*send_tasks, return_exceptions=True)
# 可以选择检查返回的异常列表,处理失败的发送任务
async def safe_send(websocket, message):
“””尝试安全地发送消息,处理连接关闭异常”””
try:
await websocket.send(message)
except websockets.exceptions.ConnectionClosed:
# 此时连接已经关闭,移除它会在 handler 的 finally 块中进行
print(f”发送消息到已关闭的连接 {websocket.remote_address} 失败”)
pass # 忽略发送失败,等待连接处理协程自己清理
启动WebSocket服务器的主函数
async def main():
server = await websockets.serve(handler, “localhost”, 8765)
print(“WebSocket广播服务器已启动,监听在 ws://localhost:8765”)
await server.wait_closed()
print(“WebSocket广播服务器已关闭”)
运行主函数
if name == “main“:
try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n服务器通过Ctrl+C中断”)
“`
代码解析:
CONNECTED_USERS = set()
: 在函数外部定义一个全局集合,用于存储所有连接的websocket
对象。- 在
handler
函数中:CONNECTED_USERS.add(websocket)
: 当建立新连接时,将当前的websocket
对象添加到集合中。CONNECTED_USERS.remove(websocket)
: 在finally
块中,无论连接如何断开,都将其从集合中移除。await broadcast(...)
: 在收到客户端消息后,不再是简单回显,而是调用broadcast
函数将消息发送给所有连接。
async def broadcast(message):
: 这是一个新的异步函数,负责将消息发送给集合中的所有连接。if CONNECTED_USERS and message:
: 简单的检查,避免空集合或空消息。send_tasks = []
: 创建一个列表来存放即将创建的异步发送任务。for user_websocket in CONNECTED_USERS:
: 遍历所有连接的websocket
对象。task = asyncio.create_task(safe_send(user_websocket, message))
: 对于每个连接,我们创建一个新的asyncio
任务来执行safe_send
函数。asyncio.create_task()
(或 Python 3.6 及以前的asyncio.ensure_future()
) 会将协程包装成一个任务,并安排它在事件循环中运行。这样做的好处是,如果一个客户端的网络非常慢,发送消息给它需要很长时间,这不会阻塞向其他客户端发送消息的任务。每个发送操作都在自己的任务中异步进行。await asyncio.gather(*send_tasks, return_exceptions=True)
: 使用asyncio.gather
并发地运行所有发送任务。*send_tasks
将任务列表解包作为参数传递给gather
。return_exceptions=True
表示即使某个任务(发送给某个客户端的操作)失败(例如因为连接意外断开),gather
也不会中断,而是会继续尝试发送给其他客户端,并将失败的异常作为结果返回。
async def safe_send(websocket, message):
: 这个辅助函数包装了websocket.send()
调用,并添加了try...except websockets.exceptions.ConnectionClosed
来优雅地处理在尝试发送时发现连接已经关闭的情况。这样可以防止广播过程中因某个连接断开而导致整个broadcast
协程崩溃。
运行广播服务器和多个客户端
- 运行修改后的
server_broadcast.py
:
bash
python server_broadcast.py
服务器将启动并等待连接。 - 打开多个终端窗口,在每个窗口中运行客户端脚本
client.py
(可以使用我们之前编写的那个简单的客户端,虽然它只会发送两条消息就退出,但足以测试广播):
bash
python client.py
每当一个客户端连接时,服务器会打印出连接信息,并显示当前连接数。
当一个客户端发送消息时,所有连接的客户端都会收到服务器转发的这条消息,并且服务器端的日志也会显示广播的过程。
通过这个例子,你学会了如何在 websockets
中管理多个连接,并使用 asyncio.create_task
和 asyncio.gather
来并发地向多个客户端发送消息,实现了消息广播功能。这是构建聊天室、实时通知等应用的基础。
6. 使用 python-socketio
构建高级 WebSocket 应用
尽管 websockets
提供了强大的底层功能,但在构建更复杂的实时应用时,你可能会需要 Socket.IO 提供的额外特性,例如:
- 事件驱动: 使用具名的事件来发送和接收消息,而不是简单的文本或二进制数据。
- 自动重连: 客户端断开后会自动尝试重新连接。
- ACKs (确认回执): 可以请求客户端或服务器在收到消息后发送确认回执。
- 广播和房间 (Rooms): 更方便地向特定组或所有连接发送消息。
- 命名空间 (Namespaces): 将不同功能的实时通信隔离在不同的通道中。
- 降级机制: 在 WebSocket 不可用时,自动回退到长轮询等其他传输方式,提供更广泛的兼容性。
python-socketio
库实现了 Socket.IO 协议的服务器和客户端部分。
什么是 Socket.IO?
Socket.IO 实际上由两部分组成:
- Server-side library (e.g.,
python-socketio
): 运行在服务器端。 - Client-side library (e.g.,
socket.io.js
for browsers,socket.io-client
for Node.js/Python/Mobile): 运行在客户端。
这两部分需要配合使用,因为 Socket.IO 协议在 WebSocket 协议之上增加了一层自己的帧格式和管理逻辑。使用 python-socketio
服务器通常需要搭配 Socket.IO 客户端库,而不是标准的 WebSocket 客户端。
安装 python-socketio
安装 python-socketio
:
bash
pip install python-socketio
请注意,python-socketio
需要一个底层的 ASGI 或 WSGI 服务器来运行。它默认安装了 python-engineio
,这个库处理 HTTP 长轮询和 WebSocket 的实际传输。运行独立的 Socket.IO 服务器还需要一个 ASGI 服务器,例如 uvicorn
或 eventlet
(WSGI)。我们这里使用 uvicorn
,因为它基于 asyncio
,与我们之前的 websockets
例子风格一致。
bash
pip install uvicorn
构建一个简单的 Socket.IO 服务器
Socket.IO 是事件驱动的。客户端和服务器不是发送任意消息,而是触发(emit)一个带有名称的事件,并可以携带数据。另一方监听(on)这个事件并执行相应的处理函数。
创建一个名为 sio_server.py
的文件:
“`python
sio_server.py
import socketio
import uvicorn
import asyncio
创建一个Socket.IO服务器实例
async_mode=’asgi’ 表示使用ASGI模式,可以与uvicorn等ASGI服务器配合
sio = socketio.AsyncServer(cors_allowed_origins=’*’, async_mode=’asgi’) # 允许所有源的跨域请求
创建一个ASGI应用
Socket.IO服务器本身可以作为一个ASGI应用运行
app = socketio.ASGIApp(sio)
定义事件处理函数
使用 @sio.event 装饰器注册事件处理器
事件名称与函数名相同,例如 ‘connect’ 事件会触发 connect 函数
@sio.event
async def connect(sid, environ):
“””客户端连接成功时触发”””
# sid (session ID) 是每个客户端连接的唯一标识符
print(f”客户端连接成功: {sid}”)
# 可以选择向刚刚连接的客户端发送一条欢迎消息
# await sio.emit(‘message’, {‘data’: f’欢迎您, {sid}!’}, room=sid)
@sio.event
async def disconnect(sid):
“””客户端断开连接时触发”””
print(f”客户端 {sid} 已断开连接”)
监听自定义事件
客户端可以触发名为 ‘my_message’ 的事件
@sio.on(‘my_message’)
async def handle_my_message(sid, data):
“””处理客户端发送的 ‘my_message’ 事件”””
print(f”收到来自 {sid} 的 ‘my_message’ 事件,数据: {data}”)
# 收到消息后,可以向发送者回复
# await sio.emit('response', {'data': '消息已收到'}, room=sid)
# 也可以向所有客户端广播消息
await sio.emit('broadcast_message', {'sender': sid, 'data': data})
print(f"广播 'broadcast_message' 事件给所有客户端")
运行服务器
uvicorn 需要一个ASGI应用对象
host 和 port 指定监听地址和端口
if name == “main“:
print(“Socket.IO服务器已启动,监听在 http://localhost:8000″) # 注意:Socket.IO 通常监听HTTP端口进行握手
# 实际的WebSocket连接会在该端口的特定路径下建立
uvicorn.run(app, host=”localhost”, port=8000)
要使用 uvicorn 运行这个脚本,确保文件名为 sio_server.py,然后在命令行运行:
uvicorn sio_server:app –reload # –reload 可以在代码修改后自动重启服务器 (开发时方便)
“`
代码解析:
import socketio
和import uvicorn
: 导入所需库。sio = socketio.AsyncServer(...)
: 创建一个异步 Socket.IO 服务器实例。cors_allowed_origins='*'
是为了允许任何域的网页连接到这个服务器(开发时方便,生产环境应限制)。async_mode='asgi'
指定使用 ASGI 模式。app = socketio.ASGIApp(sio)
: 将 Socket.IO 服务器包装成一个 ASGI 应用。uvicorn
就是运行 ASGI 应用的。@sio.event
: 这个装饰器用来注册 Socket.IO 的内置事件处理器,如connect
(客户端连接成功) 和disconnect
(客户端断开连接)。这些函数名必须与事件名一致。sid
: Session ID,是 Socket.IO 为每个连接生成的唯一标识符。environ
: 包含连接的环境信息,类似于 WSGI 或 ASGI 的环境字典。
@sio.on('my_message')
: 这个装饰器用来注册监听自定义事件的处理器。当客户端触发名为'my_message'
的事件时,handle_my_message
函数就会被调用。sid
: 触发事件的客户端的 Session ID。data
: 客户端随事件发送的数据。Socket.IO 通常处理 JSON 可序列化的数据结构(字典、列表、基本类型)。
await sio.emit('event_name', data, room=...)
: 这是 Socket.IO 服务器发送事件的方法。'event_name'
: 要触发的事件名称。data
: 要发送的数据。room=sid
: 指定只发送给 SID 对应的客户端。- 不指定
room
或使用broadcast=True
(旧版本用法,新版本直接调用emit
) 默认是广播给所有客户端。
uvicorn.run(app, host="localhost", port=8000)
: 运行 ASGI 应用app
。服务器将在http://localhost:8000
监听。
构建一个简单的 Socket.IO 客户端
为了连接到 Socket.IO 服务器,我们需要一个 Socket.IO 客户端库。这里我们仍然使用 python-socketio
提供的客户端部分。
创建一个名为 sio_client.py
的文件:
“`python
sio_client.py
import socketio
import asyncio
创建一个异步Socket.IO客户端实例
sio = socketio.AsyncClient()
定义事件处理函数
@sio.event
async def connect():
“””客户端连接成功时触发”””
print(“成功连接到服务器!”)
# 连接成功后,可以立即发送一条消息
await sio.emit(‘my_message’, {‘data’: ‘Hello from Python client!’})
print(“客户端发送 ‘my_message’ 事件”)
@sio.event
async def disconnect():
“””客户端断开连接时触发”””
print(“与服务器断开连接”)
监听服务器广播的自定义事件
@sio.on(‘broadcast_message’)
async def handle_broadcast_message(data):
“””处理服务器广播的 ‘broadcast_message’ 事件”””
print(f”收到广播消息: {data}”)
客户端主函数
async def main():
print(“正在尝试连接Socket.IO服务器…”)
# 连接服务器
# url 参数指定服务器地址
await sio.connect(‘http://localhost:8000’)
# 连接后,客户端通常会保持运行,直到断开
# 我们可以让主程序等待,或者执行其他任务
# 这里我们让它等待连接断开
await sio.wait()
print("客户端等待连接结束")
运行主函数
if name == “main“:
try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n客户端通过Ctrl+C中断”)
“`
代码解析:
import socketio
和import asyncio
: 导入所需库。sio = socketio.AsyncClient()
: 创建一个异步 Socket.IO 客户端实例。@sio.event
: 注册客户端内置事件,如connect
和disconnect
。@sio.on('broadcast_message')
: 注册监听来自服务器的自定义事件'broadcast_message'
。await sio.emit('event_name', data)
: 客户端向服务器触发一个事件。await sio.connect('http://localhost:8000')
: 尝试连接 Socket.IO 服务器。注意这里使用的是 HTTP 地址,Socket.IO 内部会通过 HTTP 握手升级到 WebSocket(或其他传输方式)。await sio.wait()
: 让主协程等待,直到客户端与服务器断开连接。如果没有这一行,main
函数会立即结束,导致客户端程序退出。
运行 Socket.IO 应用
- 运行 Socket.IO 服务器 (
sio_server.py
):
bash
uvicorn sio_server:app --reload
服务器会启动并提示监听端口。 - 运行 Socket.IO 客户端 (
sio_client.py
):
bash
python sio_client.py
客户端会尝试连接,连接成功后会发送一条消息,并等待广播消息。
运行多个客户端实例,当你在一个客户端发送消息(通过修改代码或交互式环境),其他客户端都会收到广播消息。
Socket.IO 通过事件和高级功能简化了许多实时应用场景的开发,特别是与前端 Socket.IO.js 库配合时非常方便。
7. WebSocket 应用的常见问题与最佳实践
构建健壮的 WebSocket 应用需要考虑一些常见问题和最佳实践:
错误处理与连接断开
网络连接是不稳定的。客户端或服务器随时可能因为各种原因断开连接(网络问题、程序崩溃、用户关闭浏览器等)。
- 服务器端:
- 在处理客户端消息的循环 (
async for message in websocket:
) 外部使用try...except websockets.exceptions.ConnectionClosed
或更广泛的Exception
来捕获连接断开或处理过程中出现的错误。 - 在
finally
块中进行清理工作,例如将断开的连接从活跃连接列表中移除(如我们在广播服务器示例中所示)。 - 对于使用
send
发送消息时可能发生的连接关闭,也需要捕获websockets.exceptions.ConnectionClosed
异常,避免单个发送失败影响其他连接。
- 在处理客户端消息的循环 (
- 客户端端:
- 使用异步上下文管理器
async with websockets.connect(...)
可以确保连接在退出块时被关闭。 - 在接收消息循环 (
async for message in websocket:
) 或await websocket.recv()
时处理连接断开异常。 - 考虑实现自动重连逻辑(Socket.IO 内置此功能,使用
websockets
则需要手动实现)。
- 使用异步上下文管理器
心跳机制 (Ping/Pong)
长时间没有数据传输的连接可能会被中间网络设备(如防火墙、代理)关闭,而服务器和客户端可能都不会收到正常的关闭通知,导致“僵尸连接”。为了防止这种情况,WebSocket 协议内置了心跳机制:
- Ping Frame: 一方发送 Ping 帧。
- Pong Frame: 另一方收到 Ping 帧后,必须尽快回复 Pong 帧。
websockets
和 python-socketio
等库通常会自动处理 Ping/Pong 帧,以保持连接活跃。你可以配置 Ping 发送的间隔和超时时间。确保启用了心跳机制,特别是对于长时间不活跃的连接。
安全性考虑 (SSL/TLS, Origin 校验)
- 加密传输: 始终使用
wss://
而不是ws://
进行生产环境的 WebSocket 连接。wss
使用 SSL/TLS 对数据进行加密,防止中间人攻击。这意味着你的服务器需要配置 SSL 证书。在websockets.serve()
中,可以通过ssl=
参数提供 SSL 上下文。 - Origin 校验: WebSocket 握手请求中包含
Origin
头部,指示请求来源的域名。服务器应该校验Origin
头部,只接受来自允许域名的连接,拒绝来自恶意网站的连接,防止跨站 WebSocket 劫持 (Cross-Site WebSocket Hijacking)。在websockets.serve()
中,可以通过create_protocol
参数提供一个自定义的协议工厂,并在其中进行 Origin 校验。Socket.IO 的AsyncServer
构造函数提供了cors_allowed_origins
参数来配置允许的来源。 - 输入验证: 不要信任来自客户端的任何数据。对接收到的所有消息进行严格的验证和清理,防止注入攻击或恶意数据破坏服务器。
- 身份验证与授权: 对于需要用户身份的应用,应在 WebSocket 握手成功后进行身份验证(例如发送一个认证 Token),并根据用户权限进行授权,限制用户可以执行的操作(如加入特定房间、发送敏感消息)。
性能与扩展性 (消息队列)
随着应用规模增长,单个 WebSocket 服务器可能无法处理大量并发连接或高吞吐量的消息。
- 单服务器性能: Python 的 GIL (全局解释器锁) 会限制 CPU 密集型任务的并行性,但 WebSocket 应用主要是 I/O 密集型的(等待网络数据)。
asyncio
在单进程内通过协作式多任务提高了 I/O 密集型应用的并发能力。使用asyncio.create_task
并发处理多个客户端或广播任务是提高性能的关键。 - 多进程/多服务器: 当单台服务器的资源(CPU、内存、网络)成为瓶颈时,你需要部署多个 WebSocket 服务器实例。
- 消息队列 (Message Queue): 在多服务器架构中,不同服务器实例之间需要共享信息(例如用户在线状态、广播消息)。使用消息队列(如 Redis Pub/Sub, RabbitMQ, Kafka)是常见的解决方案。当一个服务器实例收到消息需要广播时,它将消息发布到消息队列的一个频道。其他所有连接到同一个频道的服务器实例都能收到这条消息,然后各自向其连接的客户端广播。这使得你可以水平扩展 WebSocket 服务器层。
与现有 Web 框架集成
在实际应用中,WebSocket 应用常常是现有 Web 应用的一部分。你可能已经使用了 Flask, Django, FastAPI 等框架处理 HTTP 请求(如用户登录、数据查询)。
python-socketio
提供了与主流 Web 框架(Flask, Django, FastAPI, Starlette, aiohttp 等)的集成,让你可以在同一个应用中方便地同时提供 HTTP 和 Socket.IO 服务。例如,Flask-SocketIO
是一个流行的 Flask 扩展。websockets
是一个独立的库,但你可以通过一些方式将其与 Web 框架结合,例如在同一个asyncio
事件循环中运行 Web 框架和websockets
服务器,或者在 Web 框架的某个路由中建立 WebSocket 连接(这通常需要 Web 框架本身支持 ASGI 或类似的异步特性,如 FastAPI 原生支持 WebSocket)。
8. 总结与进一步学习
通过本指南,你已经了解了:
- WebSocket 的基本概念、工作原理和优势,以及它如何解决 HTTP 在实时通信中的局限性。
- WebSocket 连接是如何通过 HTTP 握手建立的。
- Python 中用于构建 WebSocket 应用的主流库:
websockets
和python-socketio
。 - 如何使用
websockets
库构建一个简单的 Echo 服务器和客户端,并理解asyncio
在其中的作用。 - 如何使用
websockets
处理多个客户端连接和实现消息广播。 - 如何使用
python-socketio
库构建一个基于事件的高级实时应用。 - 构建健壮的 WebSocket 应用需要考虑的常见问题和最佳实践,包括错误处理、安全性、性能和集成。
WebSocket 是构建现代实时交互应用的关键技术。掌握 Python 中的 WebSocket 编程将为你打开新的大门。
进一步学习方向:
- 深入学习
asyncio
框架,理解事件循环、任务、Future 等概念。 - 探索
websockets
库的更多高级功能,如子协议支持、压缩、详细的控制帧处理等。 - 深入学习
python-socketio
和 Socket.IO 协议,掌握房间、命名空间、确认回执等功能,并学习如何将其与 Flask/Django/FastAPI 等框架集成。 - 学习如何使用消息队列(如 Redis Pub/Sub)来扩展你的 WebSocket 应用,使其能够处理更多连接和消息。
- 研究 WebSocket 的安全性,特别是 SSL/TLS 配置和 Origin 校验。
- 尝试构建一个真实的实时应用项目,如一个简单的多人聊天室或一个实时数据仪表盘,将所学知识应用于实践。
实时通信是一个广阔而有趣的应用领域,Python 提供了强大的工具来帮助你实现这些功能。祝你在 WebSocket 的学习旅程中取得成功!