Python WebSocket 新手指南 – wiki基地


Python WebSocket 新手指南:构建实时交互应用的基石

在现代 Web 应用中,实时交互已成为一种核心需求。无论是聊天室、在线游戏、协作编辑,还是实时数据展示(如股票行情、物联网监控),传统的 HTTP 请求/响应模式都显得力不从心。这时,WebSocket 技术应运而生,它提供了一种在客户端和服务器之间进行全双工、低延迟通信的能力。

本篇文章将带你深入了解 WebSocket,并重点讲解如何使用 Python 构建 WebSocket 应用。无论你是想为你的 Web 应用添加实时功能,还是对网络编程中的持久连接感兴趣,这篇指南都将为你提供坚实的基础。

文章目录

  1. 什么是 WebSocket?为什么需要它?
    • HTTP 的局限性
    • WebSocket 的工作原理
    • WebSocket 的优势
    • WebSocket 的应用场景
  2. WebSocket vs. HTTP:深入对比
    • 连接方式
    • 通信模式
    • 开销
    • 适用场景
  3. Python 中的 WebSocket 库
    • 主流库介绍 (websockets, python-socketio, aiohttp)
    • 选择合适的库
  4. 使用 websockets 构建基础 WebSocket 应用
    • 安装 websockets
    • 理解 asyncio
    • 构建一个简单的 WebSocket 服务器 (Echo Server)
      • 代码实现与解析
      • 运行服务器
    • 构建一个简单的 WebSocket 客户端
      • 代码实现与解析
      • 运行客户端
  5. 深入 websockets:处理多连接与消息广播
    • 如何管理多个连接
    • 实现一个简单的广播服务器
      • 代码实现与解析
      • 处理客户端断开连接
  6. 使用 python-socketio 构建高级 WebSocket 应用
    • 什么是 Socket.IO?
    • 安装 python-socketio
    • 构建一个简单的 Socket.IO 服务器
      • 基于事件的通信
      • 代码实现与解析
    • 构建一个简单的 Socket.IO 客户端
      • 代码实现与解析
  7. WebSocket 应用的常见问题与最佳实践
    • 错误处理与连接断开
    • 心跳机制 (Ping/Pong)
    • 安全性考虑 (SSL/TLS, Origin 校验)
    • 性能与扩展性 (消息队列)
    • 与现有 Web 框架集成
  8. 总结与进一步学习

1. 什么是 WebSocket?为什么需要它?

HTTP 的局限性

在我们深入 WebSocket 之前,先回顾一下传统的 HTTP 协议。HTTP 是一个基于请求/响应模式的协议。客户端(通常是浏览器)发起一个请求,服务器处理请求并返回一个响应。这个过程是单向的,并且在每次请求/响应完成后,连接通常会被关闭(尽管 HTTP/1.1 引入了持久连接,可以在一定时间内复用,但通信模式仍是请求/响应)。

这种模式对于获取网页、提交表单等场景非常有效。但对于需要服务器主动向客户端推送数据的实时应用来说,HTTP 就显得笨拙了。为了模拟服务器推送,开发者们不得不采用一些折衷方案:

  • 轮询 (Polling): 客户端每隔固定时间就向服务器发送请求,询问是否有新数据。这会导致大量无效请求,浪费带宽和服务器资源,并且数据的新鲜度取决于轮询间隔。
  • 长轮询 (Long Polling): 客户端发送请求后,服务器会 holding 住连接,直到有新数据或者超时才返回响应。客户端收到响应后再立即发起新的请求。这比普通轮询效率高,但仍然是基于请求/响应,并且服务器需要维护大量半开放的连接,资源消耗较大。
  • 流 (Streaming): 服务器发送响应头部后,保持连接打开,并持续向客户端发送数据。这可以实现单向推送,但客户端难以向服务器发送数据,且实现复杂。

这些技术都是对 HTTP 请求/响应模式的“滥用”,无法实现真正的双向、低延迟的实时通信。

WebSocket 的工作原理

WebSocket (RFC 6455) 协议旨在解决 HTTP 在实时通信方面的不足。它提供了一个全双工 (Full-Duplex) 的通信通道。这意味着数据可以在同一时间双向传输,客户端和服务器都可以主动发送消息。

WebSocket 连接的建立过程通常是这样的:

  1. 客户端通过 HTTP 发起一个特殊的请求,称为 WebSocket 握手 (Handshake)。这个请求包含一些特殊的头部(如 Upgrade: websocket, Connection: Upgrade, Sec-WebSocket-Key 等),表明客户端希望将当前连接升级到 WebSocket 协议。
  2. 服务器收到握手请求后,如果支持 WebSocket 协议并同意升级,会返回一个特殊的 HTTP 响应(状态码 101 Switching Protocols),表示协议切换成功。响应中也会包含一些特殊的头部(如 Sec-WebSocket-Accept)。
  3. 握手成功后,底层 TCP 连接将不再使用 HTTP 协议进行通信,而是切换到 WebSocket 协议。此时,客户端和服务器之间建立了一个持久的、双向的连接。
  4. 在这个持久连接上,双方可以随时互相发送数据帧 (data frames)。数据帧可以是文本数据 (Text Frame) 或二进制数据 (Binary Frame)。WebSocket 协议负责处理帧的封装、掩码 (masking,客户端到服务器的数据需要掩码,增强安全性) 等细节。
  5. 连接可以由客户端或服务器主动关闭,或者由于网络问题意外断开。双方会发送特定的控制帧 (control frames,如 Close Frame) 来管理连接状态。

核心概念: 握手(Upgrade)、协议切换(101)、持久连接、全双工通信、数据帧(文本/二进制)、控制帧(心跳、关闭)。

WebSocket 的优势

相比于模拟的实时技术,WebSocket 具有显著优势:

  • 真正的全双工通信: 服务器和客户端可以同时发送和接收数据,无需等待对方。
  • 低延迟: 一旦连接建立,数据传输无需重复进行 HTTP 请求/响应的开销,消息可以更快速地送达。
  • 较低的开销: 握手完成后,后续数据传输使用的帧协议头部非常小,相比 HTTP 头部显著减少,节省了带宽。
  • 持久连接: 连接在握手后保持打开状态,避免了频繁建立/关闭连接的开销。

WebSocket 的应用场景

WebSocket 非常适合以下需要实时交互的应用:

  • 即时聊天应用: 消息的发送和接收都是实时的。
  • 在线游戏: 实时同步玩家状态、游戏事件。
  • 实时股票/数据行情: 服务器主动推送最新的数据。
  • 实时协作工具: 多人同时编辑文档、白板等。
  • 通知系统: 服务器向客户端推送新通知。
  • 物联网 (IoT) 监控: 设备状态的实时上报与命令的实时下发。
  • 实时地理位置服务: 共享和更新位置信息。

2. WebSocket vs. HTTP:深入对比

为了更好地理解 WebSocket 的定位,我们将其与 HTTP 进行更详细的对比:

特性 HTTP WebSocket
通信模式 请求/响应 (Request/Response) 全双工 (Full-Duplex)
连接方式 短连接 (除非使用 HTTP/1.1 持久连接) 持久连接 (在整个会话期间保持开放)
初始化 客户端总是发起请求 客户端发起握手请求,服务器同意后升级协议
开销 每个请求/响应都有完整的头部,开销较大 握手后开销小,数据传输使用小型帧头部
数据流 单向 (请求 -> 响应) 双向 (客户端和服务器都可以主动发送)
协议 应用层协议,基于 TCP 应用层协议,基于 TCP,通过 HTTP 握手建立
URL 方案 http://, https:// ws://, wss://
适用场景 静态资源获取、表单提交、API 调用等 实时数据推送、聊天、游戏、协作等需要低延迟双向通信的场景

总结来说,HTTP 适合“一次性”的数据交互,而 WebSocket 适合需要持续、双向、低延迟通信的场景。WebSocket 并不是取代 HTTP,而是对其的补充,它们常常在同一个应用中协同工作,例如使用 HTTP 获取网页内容,然后通过 WebSocket 建立连接进行实时通信。

3. Python 中的 WebSocket 库

Python 生态系统中有几个流行的库可以用来构建 WebSocket 应用。选择哪个库取决于你的具体需求、对异步编程的熟悉程度以及是否需要与现有的 Web 框架集成。

以下是几个主流的库:

  1. websockets:

    • 这是一个基于 asyncio 的 WebSocket 库。
    • 它提供了 WebSocket 协议的底层实现,非常适合需要精细控制 WebSocket 连接的场景。
    • 支持 WebSocket 协议的全部特性,包括文本/二进制帧、控制帧、SSL/TLS 等。
    • 特点: 纯异步、高性能、符合标准、提供较低级别的 API。
    • 适用场景: 需要构建高性能、可扩展的 WebSocket 服务器或客户端,对 asyncio 比较熟悉,或者不需要 Socket.IO 等高级特性。
  2. python-socketio:

    • 这是一个实现了 Socket.IO 协议的 Python 库。Socket.IO 是一个建立在 WebSocket 之上,提供更多高级特性(如自动重连、事件广播、命名空间、房间等)的库,它还包含一个降级机制,在 WebSocket 不可用时可以使用长轮询等方式模拟实时通信。
    • python-socketio 可以作为一个独立的服务器运行,也可以轻松地与流行的 Python Web 框架(如 Flask, Django, FastAPI, Starlette, aiohttp)集成。
    • 特点: 提供高级功能、易于集成、自带降级机制、基于事件。
    • 适用场景: 需要 Socket.IO 提供的额外功能(如房间、命名空间),希望与现有 Web 框架紧密集成,或者客户端主要使用 Socket.IO 库(通常是 JavaScript 客户端库)。
  3. aiohttp:

    • aiohttp 是一个用于 asyncio 的异步 HTTP 客户端/服务器框架。
    • 它也内置了对 WebSocket 服务器和客户端的支持。
    • 如果你已经在使用 aiohttp 构建 Web 应用,那么使用它内置的 WebSocket 功能会非常方便。
    • 特点: 全功能的异步 HTTP 框架,包含 WebSocket 支持,与 aiohttp 生态系统集成。
    • 适用场景: 已经或计划使用 aiohttp 作为主要的 Web 框架,希望在一个框架内处理 HTTP 和 WebSocket。

对于新手来说,websockets 是一个很好的起点,因为它专注于 WebSocket 协议本身,代码相对简洁,能帮助你理解 WebSocket 的基本工作方式,同时也是基于现代的 asyncio 异步编程模式。如果你后续需要更高级的功能或与特定前端库(如 Socket.IO.js)配合,再考虑 python-socketioaiohttp 则适合作为完整异步 Web 框架的一部分来学习。

本指南将主要使用 websockets 库来演示基础概念和实现,因为它最直接地体现了 WebSocket 的协议特性。我们也会简要介绍 python-socketio 的用法,展示高级抽象的便利性。

4. 使用 websockets 构建基础 WebSocket 应用

websockets 库是基于 Python 的 asyncio 异步框架构建的。因此,在使用 websockets 之前,你需要对 asyncio 的基本概念有所了解,例如协程 (async def), await 关键字, 事件循环 (event loop) 等。简单来说,asyncio 允许你在单线程中以协作式的方式处理多个并发任务(I/O 密集型任务,如网络通信),而不会阻塞整个程序。

安装 websockets

首先,确保你安装了 websockets 库。打开终端或命令行,运行以下命令:

bash
pip install websockets

理解 asyncio

websockets 的操作(如发送、接收消息)都是异步的。这意味着当你调用 await websocket.recv() 等方法时,如果当前没有消息可接收,程序不会停在那里干等,而是会暂停当前协程的执行,让出控制权给事件循环,事件循环可以去执行其他已经准备好的协程(例如处理另一个客户端的连接)。当有新消息到达时,事件循环会恢复之前暂停的协程,继续执行。

所有 WebSocket 相关的操作都需要在 async def 定义的协程函数中,并且需要使用 await 关键字来等待异步操作完成。程序的入口点通常需要一个 asyncio.run() 函数来启动事件循环并运行顶层的协程。

构建一个简单的 WebSocket 服务器 (Echo Server)

我们先来创建一个最简单的 WebSocket 服务器:一个回显服务器 (Echo Server)。它接收客户端发送的任何消息,然后将同一个消息原样发回给客户端。

创建一个名为 server.py 的文件,并输入以下代码:

“`python

server.py

import asyncio
import websockets

WebSocket连接处理函数

当一个新的客户端连接建立时,asyncio事件循环会为此连接创建一个新的协程,并执行这个函数

websocket 参数代表当前的连接对象

path 参数代表客户端连接的路径(比如 ws://localhost:8765/some/path,path就是/some/path)

async def echo_handler(websocket, path):
print(f”客户端连接已建立 from {websocket.remote_address}, path: {path}”)

try:
    # 无限循环,持续接收客户端消息
    async for message in websocket:
        print(f"收到消息 from {websocket.remote_address}: {message}")

        # 将收到的消息发回给客户端
        await websocket.send(f"服务器收到:{message}")
        print(f"发送消息给 {websocket.remote_address}: 服务器收到:{message}")

except websockets.exceptions.ConnectionClosedOK:
    # 客户端正常关闭连接
    print(f"客户端 {websocket.remote_address} 已正常断开连接")
except websockets.exceptions.ConnectionClosedError as e:
    # 客户端异常关闭连接
    print(f"客户端 {websocket.remote_address} 连接异常断开: {e}")
except Exception as e:
    # 其他未知异常
    print(f"处理客户端 {websocket.remote_address} 时发生错误: {e}")
finally:
    # 无论正常还是异常,连接关闭后执行清理操作(在这个简单例子中无需特别清理)
    print(f"客户端 {websocket.remote_address} 连接处理结束")

启动WebSocket服务器的主函数

async def main():
# websockets.serve() 函数用于创建一个WebSocket服务器
# 第一个参数是连接处理协程函数
# host 是服务器绑定的地址,None 或 ‘localhost’ 绑定到本地所有可用接口
# port 是服务器监听的端口
# start_server 返回一个 server 对象,需要在协程中 await
server = await websockets.serve(echo_handler, “localhost”, 8765)

print("WebSocket服务器已启动,监听在 ws://localhost:8765")

# server.wait_closed() 会一直阻塞,直到服务器关闭
await server.wait_closed()
print("WebSocket服务器已关闭")

运行主函数,启动asyncio事件循环

asyncio.run() 是Python 3.7+ 推荐的运行顶层async函数的入口点

它会自动管理事件循环的创建和关闭

if name == “main“:
try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n服务器通过Ctrl+C中断”)
“`

代码解析:

  1. import asyncioimport websockets: 导入所需的库。
  2. async def echo_handler(websocket, path):: 这是一个协程函数,它将作为每个新连接的处理程序。当一个客户端通过 WebSocket 连接到服务器时,websockets 库会自动创建一个新的协程来运行这个函数。
    • websocket: 这是表示当前 WebSocket 连接的对象。你可以通过它发送 (await websocket.send(message)) 和接收 (await websocket.recv()) 数据。
    • path: 这是客户端连接时请求的路径(例如 /, /chat, /data 等)。在我们的简单例子中,它总是 /
  3. print(...): 打印一些日志信息,方便观察服务器状态。
  4. async for message in websocket:: 这是一个异步迭代器。它会持续等待并接收来自客户端的新消息。每当收到一条消息,循环体就会执行一次,message 变量将包含收到的消息内容(通常是字符串,但也可以是字节)。
  5. await websocket.send(...): 使用 await 关键字发送消息回客户端。这是异步操作,程序会等待消息发送完成。
  6. try...except...finally: 这是标准的 Python 异常处理。在这里,我们捕获 websockets.exceptions.ConnectionClosedOK (客户端正常关闭) 和 websockets.exceptions.ConnectionClosedError (客户端异常断开),以及其他可能的异常,以便在连接断开时进行相应的处理和日志记录。使用 async for 循环接收消息时,当连接关闭,循环会自动结束,然后进入 finally 块(如果正常关闭则会先进入 ConnectionClosedOK 块再进入 finally)。
  7. async def main():: 这是我们程序的异步入口点。
  8. server = await websockets.serve(echo_handler, "localhost", 8765): 这行代码创建并启动 WebSocket 服务器。
    • echo_handler: 指定处理每个新连接的协程函数。
    • "localhost": 服务器将绑定到本地地址。
    • 8765: 服务器将监听 8765 端口。
    • await: serve 函数是一个异步操作,需要等待服务器启动完成。它返回一个服务器对象。
  9. print(...): 提示服务器已启动。
  10. await server.wait_closed(): 这是一个异步操作,它会暂停 main 协程的执行,直到服务器被关闭(例如通过 server.close() 调用或接收到中断信号)。这使得 main 函数保持运行,服务器持续监听连接。
  11. if __name__ == "__main__":: 确保只有当脚本直接运行时才执行 main 函数。
  12. asyncio.run(main()): 这是启动 asyncio 事件循环并运行 main 协程的标准方法(Python 3.7+)。它会负责创建事件循环,运行 main 直到完成,然后关闭事件循环。
  13. except KeyboardInterrupt: 捕获用户按下 Ctrl+C 时的中断信号,友好地退出程序。

运行服务器

保存 server.py 文件,然后在终端中运行:

bash
python server.py

你应该会看到输出:

WebSocket服务器已启动,监听在 ws://localhost:8765

服务器现在正在等待客户端连接。

构建一个简单的 WebSocket 客户端

现在,我们来编写一个简单的客户端,连接到我们刚刚启动的服务器,发送消息并接收回显。

创建一个名为 client.py 的文件,并输入以下代码:

“`python

client.py

import asyncio
import websockets

WebSocket客户端协程函数

async def hello_client():
# 定义服务器的WebSocket URI
uri = “ws://localhost:8765”

# 使用 websockets.connect() 建立连接
# 这是一个异步上下文管理器,连接会在进入时建立,退出时关闭
async with websockets.connect(uri) as websocket:
    print(f"已连接到服务器: {uri}")

    # 发送一条消息给服务器
    message_to_send = "Hello, WebSocket Server!"
    await websocket.send(message_to_send)
    print(f"客户端发送: {message_to_send}")

    # 接收服务器的回显消息
    response = await websocket.recv()
    print(f"客户端收到: {response}")

    # 可以发送多条消息
    message_to_send_2 = "How are you?"
    await websocket.send(message_to_send_2)
    print(f"客户端发送: {message_to_send_2}")

    response_2 = await websocket.recv()
    print(f"客户端收到: {response_2}")

运行客户端协程

async def main():
await hello_client()

启动asyncio事件循环运行main函数

if name == “main“:
print(“正在启动WebSocket客户端…”)
asyncio.run(main())
print(“客户端已结束”)
“`

代码解析:

  1. import asyncioimport websockets: 导入所需库。
  2. async def hello_client():: 客户端协程函数。
  3. uri = "ws://localhost:8765": 定义要连接的服务器地址。ws:// 表示普通的 WebSocket 连接,如果服务器使用 SSL/TLS,则需要使用 wss://
  4. async with websockets.connect(uri) as websocket:: 这是一个异步上下文管理器。它负责建立与指定 URI 的 WebSocket 连接。
    • 进入 with 块时,await websockets.connect(uri) 会执行,建立连接,并将连接对象赋值给 websocket 变量。
    • 退出 with 块时(无论是正常退出还是发生异常),连接会自动关闭。这是一种管理资源的好方法。
  5. await websocket.send(message_to_send): 向服务器发送一条消息。
  6. await websocket.recv(): 等待并接收服务器发送过来的消息。如果服务器没有发送消息,这里会一直阻塞(但不会占用 CPU,因为是异步等待),直到收到消息或连接关闭。
  7. async def main(): 客户端的异步入口点。
  8. asyncio.run(main()): 启动事件循环并运行客户端。

运行客户端

在服务器 server.py 正在运行的终端之外,打开另一个终端,运行客户端脚本:

bash
python client.py

你应该会看到客户端输出:

正在启动WebSocket客户端...
已连接到服务器: ws://localhost:8765
客户端发送: Hello, WebSocket Server!
客户端收到: 服务器收到:Hello, WebSocket Server!
客户端发送: How are you?
客户端收到: 服务器收到:How are you?
客户端已结束

同时,查看运行服务器的终端,你应该看到服务器记录了连接建立、接收消息、发送消息以及连接断开的日志:

WebSocket服务器已启动,监听在 ws://localhost:8765
客户端连接已建立 from 127.0.0.1:xxxxx, path: / # xxxxx 是随机端口号
收到消息 from 127.0.0.1:xxxxx: Hello, WebSocket Server!
发送消息给 127.0.0.1:xxxxx: 服务器收到:Hello, WebSocket Server!
收到消息 from 127.0.0.1:xxxxx: How are you?
发送消息给 127.0.0.1:xxxxx: 服务器收到:服务器收到:How are you?
客户端 127.0.0.1:xxxxx 已正常断开连接
客户端 127.0.0.1:xxxxx 连接处理结束

恭喜!你已经成功构建并运行了一个简单的 WebSocket 服务器和客户端,实现了双向通信。

5. 深入 websockets:处理多连接与消息广播

一个 WebSocket 服务器通常需要同时处理来自多个客户端的连接,并且可能需要向所有或部分连接的客户端广播消息。websockets 库结合 asyncio 可以很好地处理这种情况。

如何管理多个连接

websockets 中,每个客户端连接都由一个独立的协程来处理(也就是我们在 websockets.serve() 中指定的那个处理函数)。为了能够向所有连接发送消息,我们需要在服务器端维护一个所有当前活跃连接的集合。Python 的 set 数据结构非常适合用来存储这些连接对象。

实现一个简单的广播服务器

我们将修改之前的 Echo Server,使其能够接收消息,并将收到的消息广播给所有当前在线的客户端。

修改 server.py 文件:

“`python

server_broadcast.py

import asyncio
import websockets
import json # 通常实时消息是JSON格式

创建一个全局集合,用于存放所有活跃的WebSocket连接

这是一个 set,存储的是 websocket 对象

CONNECTED_USERS = set()

WebSocket连接处理函数

async def handler(websocket, path):
print(f”客户端连接已建立 from {websocket.remote_address}, path: {path}”)

# 将新的连接添加到集合中
CONNECTED_USERS.add(websocket)
print(f"当前连接数: {len(CONNECTED_USERS)}")

try:
    # 通知所有客户端有新用户加入 (可选)
    # await broadcast(f"用户 {websocket.remote_address} 已加入") # 可以添加用户名等信息

    # 无限循环,持续接收客户端消息
    async for message in websocket:
        print(f"收到消息 from {websocket.remote_address}: {message}")

        # 收到消息后,广播给所有连接的客户端
        await broadcast(f"[{websocket.remote_address}] 说: {message}") # 广播消息格式

except websockets.exceptions.ConnectionClosedOK:
    print(f"客户端 {websocket.remote_address} 已正常断开连接")
except websockets.exceptions.ConnectionClosedError as e:
    print(f"客户端 {websocket.remote_address} 连接异常断开: {e}")
except Exception as e:
    print(f"处理客户端 {websocket.remote_address} 时发生错误: {e}")
finally:
    # 无论正常还是异常,连接关闭后将其从集合中移除
    print(f"客户端 {websocket.remote_address} 连接处理结束")
    CONNECTED_USERS.remove(websocket)
    print(f"当前连接数: {len(CONNECTED_USERS)}")
    # 通知所有客户端有用户离开 (可选)
    # await broadcast(f"用户 {websocket.remote_address} 已离开")

广播消息给所有连接的客户端

async def broadcast(message):
# 确保有客户端在线,且要发送的消息不是空
if CONNECTED_USERS and message:
# 创建一个需要发送的任务列表
# 使用 asyncio.gather 同时发送,可以提高效率
# 注意:发送时可能会遇到连接已经断开的情况,需要处理 exceptions.ConnectionClosed
send_tasks = []
for user_websocket in CONNECTED_USERS:
# 使用 asyncio.ensure_future 或 asyncio.create_task (Python 3.7+)
# 将发送操作包装成一个任务,避免一个慢速连接阻塞广播给其他连接
# 同时处理发送可能引起的 ConnectionClosed 异常
task = asyncio.create_task(safe_send(user_websocket, message))
send_tasks.append(task)

    # 等待所有发送任务完成 (或者直到出现异常,取决于 return_exceptions)
    # return_exceptions=True 可以在任务失败时不中断 gather,而是返回异常对象
    await asyncio.gather(*send_tasks, return_exceptions=True)
    # 可以选择检查返回的异常列表,处理失败的发送任务

async def safe_send(websocket, message):
“””尝试安全地发送消息,处理连接关闭异常”””
try:
await websocket.send(message)
except websockets.exceptions.ConnectionClosed:
# 此时连接已经关闭,移除它会在 handler 的 finally 块中进行
print(f”发送消息到已关闭的连接 {websocket.remote_address} 失败”)
pass # 忽略发送失败,等待连接处理协程自己清理

启动WebSocket服务器的主函数

async def main():
server = await websockets.serve(handler, “localhost”, 8765)
print(“WebSocket广播服务器已启动,监听在 ws://localhost:8765”)
await server.wait_closed()
print(“WebSocket广播服务器已关闭”)

运行主函数

if name == “main“:
try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n服务器通过Ctrl+C中断”)
“`

代码解析:

  1. CONNECTED_USERS = set(): 在函数外部定义一个全局集合,用于存储所有连接的 websocket 对象。
  2. handler 函数中:
    • CONNECTED_USERS.add(websocket): 当建立新连接时,将当前的 websocket 对象添加到集合中。
    • CONNECTED_USERS.remove(websocket): 在 finally 块中,无论连接如何断开,都将其从集合中移除。
    • await broadcast(...): 在收到客户端消息后,不再是简单回显,而是调用 broadcast 函数将消息发送给所有连接。
  3. async def broadcast(message):: 这是一个新的异步函数,负责将消息发送给集合中的所有连接。
    • if CONNECTED_USERS and message:: 简单的检查,避免空集合或空消息。
    • send_tasks = []: 创建一个列表来存放即将创建的异步发送任务。
    • for user_websocket in CONNECTED_USERS:: 遍历所有连接的 websocket 对象。
    • task = asyncio.create_task(safe_send(user_websocket, message)): 对于每个连接,我们创建一个新的 asyncio 任务来执行 safe_send 函数。asyncio.create_task() (或 Python 3.6 及以前的 asyncio.ensure_future()) 会将协程包装成一个任务,并安排它在事件循环中运行。这样做的好处是,如果一个客户端的网络非常慢,发送消息给它需要很长时间,这不会阻塞向其他客户端发送消息的任务。每个发送操作都在自己的任务中异步进行。
    • await asyncio.gather(*send_tasks, return_exceptions=True): 使用 asyncio.gather 并发地运行所有发送任务。*send_tasks 将任务列表解包作为参数传递给 gatherreturn_exceptions=True 表示即使某个任务(发送给某个客户端的操作)失败(例如因为连接意外断开),gather 也不会中断,而是会继续尝试发送给其他客户端,并将失败的异常作为结果返回。
  4. async def safe_send(websocket, message):: 这个辅助函数包装了 websocket.send() 调用,并添加了 try...except websockets.exceptions.ConnectionClosed 来优雅地处理在尝试发送时发现连接已经关闭的情况。这样可以防止广播过程中因某个连接断开而导致整个 broadcast 协程崩溃。

运行广播服务器和多个客户端

  1. 运行修改后的 server_broadcast.py
    bash
    python server_broadcast.py

    服务器将启动并等待连接。
  2. 打开多个终端窗口,在每个窗口中运行客户端脚本 client.py(可以使用我们之前编写的那个简单的客户端,虽然它只会发送两条消息就退出,但足以测试广播):
    bash
    python client.py

    每当一个客户端连接时,服务器会打印出连接信息,并显示当前连接数。
    当一个客户端发送消息时,所有连接的客户端都会收到服务器转发的这条消息,并且服务器端的日志也会显示广播的过程。

通过这个例子,你学会了如何在 websockets 中管理多个连接,并使用 asyncio.create_taskasyncio.gather 来并发地向多个客户端发送消息,实现了消息广播功能。这是构建聊天室、实时通知等应用的基础。

6. 使用 python-socketio 构建高级 WebSocket 应用

尽管 websockets 提供了强大的底层功能,但在构建更复杂的实时应用时,你可能会需要 Socket.IO 提供的额外特性,例如:

  • 事件驱动: 使用具名的事件来发送和接收消息,而不是简单的文本或二进制数据。
  • 自动重连: 客户端断开后会自动尝试重新连接。
  • ACKs (确认回执): 可以请求客户端或服务器在收到消息后发送确认回执。
  • 广播和房间 (Rooms): 更方便地向特定组或所有连接发送消息。
  • 命名空间 (Namespaces): 将不同功能的实时通信隔离在不同的通道中。
  • 降级机制: 在 WebSocket 不可用时,自动回退到长轮询等其他传输方式,提供更广泛的兼容性。

python-socketio 库实现了 Socket.IO 协议的服务器和客户端部分。

什么是 Socket.IO?

Socket.IO 实际上由两部分组成:

  1. Server-side library (e.g., python-socketio): 运行在服务器端。
  2. Client-side library (e.g., socket.io.js for browsers, socket.io-client for Node.js/Python/Mobile): 运行在客户端。

这两部分需要配合使用,因为 Socket.IO 协议在 WebSocket 协议之上增加了一层自己的帧格式和管理逻辑。使用 python-socketio 服务器通常需要搭配 Socket.IO 客户端库,而不是标准的 WebSocket 客户端。

安装 python-socketio

安装 python-socketio

bash
pip install python-socketio

请注意,python-socketio 需要一个底层的 ASGI 或 WSGI 服务器来运行。它默认安装了 python-engineio,这个库处理 HTTP 长轮询和 WebSocket 的实际传输。运行独立的 Socket.IO 服务器还需要一个 ASGI 服务器,例如 uvicorneventlet (WSGI)。我们这里使用 uvicorn,因为它基于 asyncio,与我们之前的 websockets 例子风格一致。

bash
pip install uvicorn

构建一个简单的 Socket.IO 服务器

Socket.IO 是事件驱动的。客户端和服务器不是发送任意消息,而是触发(emit)一个带有名称的事件,并可以携带数据。另一方监听(on)这个事件并执行相应的处理函数。

创建一个名为 sio_server.py 的文件:

“`python

sio_server.py

import socketio
import uvicorn
import asyncio

创建一个Socket.IO服务器实例

async_mode=’asgi’ 表示使用ASGI模式,可以与uvicorn等ASGI服务器配合

sio = socketio.AsyncServer(cors_allowed_origins=’*’, async_mode=’asgi’) # 允许所有源的跨域请求

创建一个ASGI应用

Socket.IO服务器本身可以作为一个ASGI应用运行

app = socketio.ASGIApp(sio)

定义事件处理函数

使用 @sio.event 装饰器注册事件处理器

事件名称与函数名相同,例如 ‘connect’ 事件会触发 connect 函数

@sio.event
async def connect(sid, environ):
“””客户端连接成功时触发”””
# sid (session ID) 是每个客户端连接的唯一标识符
print(f”客户端连接成功: {sid}”)
# 可以选择向刚刚连接的客户端发送一条欢迎消息
# await sio.emit(‘message’, {‘data’: f’欢迎您, {sid}!’}, room=sid)

@sio.event
async def disconnect(sid):
“””客户端断开连接时触发”””
print(f”客户端 {sid} 已断开连接”)

监听自定义事件

客户端可以触发名为 ‘my_message’ 的事件

@sio.on(‘my_message’)
async def handle_my_message(sid, data):
“””处理客户端发送的 ‘my_message’ 事件”””
print(f”收到来自 {sid} 的 ‘my_message’ 事件,数据: {data}”)

# 收到消息后,可以向发送者回复
# await sio.emit('response', {'data': '消息已收到'}, room=sid)

# 也可以向所有客户端广播消息
await sio.emit('broadcast_message', {'sender': sid, 'data': data})
print(f"广播 'broadcast_message' 事件给所有客户端")

运行服务器

uvicorn 需要一个ASGI应用对象

host 和 port 指定监听地址和端口

if name == “main“:
print(“Socket.IO服务器已启动,监听在 http://localhost:8000″) # 注意:Socket.IO 通常监听HTTP端口进行握手
# 实际的WebSocket连接会在该端口的特定路径下建立
uvicorn.run(app, host=”localhost”, port=8000)

要使用 uvicorn 运行这个脚本,确保文件名为 sio_server.py,然后在命令行运行:

uvicorn sio_server:app –reload # –reload 可以在代码修改后自动重启服务器 (开发时方便)

“`

代码解析:

  1. import socketioimport uvicorn: 导入所需库。
  2. sio = socketio.AsyncServer(...): 创建一个异步 Socket.IO 服务器实例。cors_allowed_origins='*' 是为了允许任何域的网页连接到这个服务器(开发时方便,生产环境应限制)。async_mode='asgi' 指定使用 ASGI 模式。
  3. app = socketio.ASGIApp(sio): 将 Socket.IO 服务器包装成一个 ASGI 应用。uvicorn 就是运行 ASGI 应用的。
  4. @sio.event: 这个装饰器用来注册 Socket.IO 的内置事件处理器,如 connect (客户端连接成功) 和 disconnect (客户端断开连接)。这些函数名必须与事件名一致。
    • sid: Session ID,是 Socket.IO 为每个连接生成的唯一标识符。
    • environ: 包含连接的环境信息,类似于 WSGI 或 ASGI 的环境字典。
  5. @sio.on('my_message'): 这个装饰器用来注册监听自定义事件的处理器。当客户端触发名为 'my_message' 的事件时,handle_my_message 函数就会被调用。
    • sid: 触发事件的客户端的 Session ID。
    • data: 客户端随事件发送的数据。Socket.IO 通常处理 JSON 可序列化的数据结构(字典、列表、基本类型)。
  6. await sio.emit('event_name', data, room=...): 这是 Socket.IO 服务器发送事件的方法。
    • 'event_name': 要触发的事件名称。
    • data: 要发送的数据。
    • room=sid: 指定只发送给 SID 对应的客户端。
    • 不指定 room 或使用 broadcast=True (旧版本用法,新版本直接调用 emit) 默认是广播给所有客户端。
  7. uvicorn.run(app, host="localhost", port=8000): 运行 ASGI 应用 app。服务器将在 http://localhost:8000 监听。

构建一个简单的 Socket.IO 客户端

为了连接到 Socket.IO 服务器,我们需要一个 Socket.IO 客户端库。这里我们仍然使用 python-socketio 提供的客户端部分。

创建一个名为 sio_client.py 的文件:

“`python

sio_client.py

import socketio
import asyncio

创建一个异步Socket.IO客户端实例

sio = socketio.AsyncClient()

定义事件处理函数

@sio.event
async def connect():
“””客户端连接成功时触发”””
print(“成功连接到服务器!”)
# 连接成功后,可以立即发送一条消息
await sio.emit(‘my_message’, {‘data’: ‘Hello from Python client!’})
print(“客户端发送 ‘my_message’ 事件”)

@sio.event
async def disconnect():
“””客户端断开连接时触发”””
print(“与服务器断开连接”)

监听服务器广播的自定义事件

@sio.on(‘broadcast_message’)
async def handle_broadcast_message(data):
“””处理服务器广播的 ‘broadcast_message’ 事件”””
print(f”收到广播消息: {data}”)

客户端主函数

async def main():
print(“正在尝试连接Socket.IO服务器…”)
# 连接服务器
# url 参数指定服务器地址
await sio.connect(‘http://localhost:8000’)

# 连接后,客户端通常会保持运行,直到断开
# 我们可以让主程序等待,或者执行其他任务
# 这里我们让它等待连接断开
await sio.wait()
print("客户端等待连接结束")

运行主函数

if name == “main“:
try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n客户端通过Ctrl+C中断”)
“`

代码解析:

  1. import socketioimport asyncio: 导入所需库。
  2. sio = socketio.AsyncClient(): 创建一个异步 Socket.IO 客户端实例。
  3. @sio.event: 注册客户端内置事件,如 connectdisconnect
  4. @sio.on('broadcast_message'): 注册监听来自服务器的自定义事件 'broadcast_message'
  5. await sio.emit('event_name', data): 客户端向服务器触发一个事件。
  6. await sio.connect('http://localhost:8000'): 尝试连接 Socket.IO 服务器。注意这里使用的是 HTTP 地址,Socket.IO 内部会通过 HTTP 握手升级到 WebSocket(或其他传输方式)。
  7. await sio.wait(): 让主协程等待,直到客户端与服务器断开连接。如果没有这一行,main 函数会立即结束,导致客户端程序退出。

运行 Socket.IO 应用

  1. 运行 Socket.IO 服务器 (sio_server.py):
    bash
    uvicorn sio_server:app --reload

    服务器会启动并提示监听端口。
  2. 运行 Socket.IO 客户端 (sio_client.py):
    bash
    python sio_client.py

    客户端会尝试连接,连接成功后会发送一条消息,并等待广播消息。
    运行多个客户端实例,当你在一个客户端发送消息(通过修改代码或交互式环境),其他客户端都会收到广播消息。

Socket.IO 通过事件和高级功能简化了许多实时应用场景的开发,特别是与前端 Socket.IO.js 库配合时非常方便。

7. WebSocket 应用的常见问题与最佳实践

构建健壮的 WebSocket 应用需要考虑一些常见问题和最佳实践:

错误处理与连接断开

网络连接是不稳定的。客户端或服务器随时可能因为各种原因断开连接(网络问题、程序崩溃、用户关闭浏览器等)。

  • 服务器端:
    • 在处理客户端消息的循环 (async for message in websocket:) 外部使用 try...except websockets.exceptions.ConnectionClosed 或更广泛的 Exception 来捕获连接断开或处理过程中出现的错误。
    • finally 块中进行清理工作,例如将断开的连接从活跃连接列表中移除(如我们在广播服务器示例中所示)。
    • 对于使用 send 发送消息时可能发生的连接关闭,也需要捕获 websockets.exceptions.ConnectionClosed 异常,避免单个发送失败影响其他连接。
  • 客户端端:
    • 使用异步上下文管理器 async with websockets.connect(...) 可以确保连接在退出块时被关闭。
    • 在接收消息循环 (async for message in websocket:) 或 await websocket.recv() 时处理连接断开异常。
    • 考虑实现自动重连逻辑(Socket.IO 内置此功能,使用 websockets 则需要手动实现)。

心跳机制 (Ping/Pong)

长时间没有数据传输的连接可能会被中间网络设备(如防火墙、代理)关闭,而服务器和客户端可能都不会收到正常的关闭通知,导致“僵尸连接”。为了防止这种情况,WebSocket 协议内置了心跳机制:

  • Ping Frame: 一方发送 Ping 帧。
  • Pong Frame: 另一方收到 Ping 帧后,必须尽快回复 Pong 帧。

websocketspython-socketio 等库通常会自动处理 Ping/Pong 帧,以保持连接活跃。你可以配置 Ping 发送的间隔和超时时间。确保启用了心跳机制,特别是对于长时间不活跃的连接。

安全性考虑 (SSL/TLS, Origin 校验)

  • 加密传输: 始终使用 wss:// 而不是 ws:// 进行生产环境的 WebSocket 连接。wss 使用 SSL/TLS 对数据进行加密,防止中间人攻击。这意味着你的服务器需要配置 SSL 证书。在 websockets.serve() 中,可以通过 ssl= 参数提供 SSL 上下文。
  • Origin 校验: WebSocket 握手请求中包含 Origin 头部,指示请求来源的域名。服务器应该校验 Origin 头部,只接受来自允许域名的连接,拒绝来自恶意网站的连接,防止跨站 WebSocket 劫持 (Cross-Site WebSocket Hijacking)。在 websockets.serve() 中,可以通过 create_protocol 参数提供一个自定义的协议工厂,并在其中进行 Origin 校验。Socket.IO 的 AsyncServer 构造函数提供了 cors_allowed_origins 参数来配置允许的来源。
  • 输入验证: 不要信任来自客户端的任何数据。对接收到的所有消息进行严格的验证和清理,防止注入攻击或恶意数据破坏服务器。
  • 身份验证与授权: 对于需要用户身份的应用,应在 WebSocket 握手成功后进行身份验证(例如发送一个认证 Token),并根据用户权限进行授权,限制用户可以执行的操作(如加入特定房间、发送敏感消息)。

性能与扩展性 (消息队列)

随着应用规模增长,单个 WebSocket 服务器可能无法处理大量并发连接或高吞吐量的消息。

  • 单服务器性能: Python 的 GIL (全局解释器锁) 会限制 CPU 密集型任务的并行性,但 WebSocket 应用主要是 I/O 密集型的(等待网络数据)。asyncio 在单进程内通过协作式多任务提高了 I/O 密集型应用的并发能力。使用 asyncio.create_task 并发处理多个客户端或广播任务是提高性能的关键。
  • 多进程/多服务器: 当单台服务器的资源(CPU、内存、网络)成为瓶颈时,你需要部署多个 WebSocket 服务器实例。
  • 消息队列 (Message Queue): 在多服务器架构中,不同服务器实例之间需要共享信息(例如用户在线状态、广播消息)。使用消息队列(如 Redis Pub/Sub, RabbitMQ, Kafka)是常见的解决方案。当一个服务器实例收到消息需要广播时,它将消息发布到消息队列的一个频道。其他所有连接到同一个频道的服务器实例都能收到这条消息,然后各自向其连接的客户端广播。这使得你可以水平扩展 WebSocket 服务器层。

与现有 Web 框架集成

在实际应用中,WebSocket 应用常常是现有 Web 应用的一部分。你可能已经使用了 Flask, Django, FastAPI 等框架处理 HTTP 请求(如用户登录、数据查询)。

  • python-socketio 提供了与主流 Web 框架(Flask, Django, FastAPI, Starlette, aiohttp 等)的集成,让你可以在同一个应用中方便地同时提供 HTTP 和 Socket.IO 服务。例如,Flask-SocketIO 是一个流行的 Flask 扩展。
  • websockets 是一个独立的库,但你可以通过一些方式将其与 Web 框架结合,例如在同一个 asyncio 事件循环中运行 Web 框架和 websockets 服务器,或者在 Web 框架的某个路由中建立 WebSocket 连接(这通常需要 Web 框架本身支持 ASGI 或类似的异步特性,如 FastAPI 原生支持 WebSocket)。

8. 总结与进一步学习

通过本指南,你已经了解了:

  • WebSocket 的基本概念、工作原理和优势,以及它如何解决 HTTP 在实时通信中的局限性。
  • WebSocket 连接是如何通过 HTTP 握手建立的。
  • Python 中用于构建 WebSocket 应用的主流库:websocketspython-socketio
  • 如何使用 websockets 库构建一个简单的 Echo 服务器和客户端,并理解 asyncio 在其中的作用。
  • 如何使用 websockets 处理多个客户端连接和实现消息广播。
  • 如何使用 python-socketio 库构建一个基于事件的高级实时应用。
  • 构建健壮的 WebSocket 应用需要考虑的常见问题和最佳实践,包括错误处理、安全性、性能和集成。

WebSocket 是构建现代实时交互应用的关键技术。掌握 Python 中的 WebSocket 编程将为你打开新的大门。

进一步学习方向:

  • 深入学习 asyncio 框架,理解事件循环、任务、Future 等概念。
  • 探索 websockets 库的更多高级功能,如子协议支持、压缩、详细的控制帧处理等。
  • 深入学习 python-socketio 和 Socket.IO 协议,掌握房间、命名空间、确认回执等功能,并学习如何将其与 Flask/Django/FastAPI 等框架集成。
  • 学习如何使用消息队列(如 Redis Pub/Sub)来扩展你的 WebSocket 应用,使其能够处理更多连接和消息。
  • 研究 WebSocket 的安全性,特别是 SSL/TLS 配置和 Origin 校验。
  • 尝试构建一个真实的实时应用项目,如一个简单的多人聊天室或一个实时数据仪表盘,将所学知识应用于实践。

实时通信是一个广阔而有趣的应用领域,Python 提供了强大的工具来帮助你实现这些功能。祝你在 WebSocket 的学习旅程中取得成功!

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部