杰瑞科技汇

Python WebSocket中文怎么用?

目录

  1. 什么是 WebSocket? (核心概念)
  2. 为什么需要 WebSocket? (与 HTTP 的对比)
  3. Python WebSocket 生态 (主流库介绍)
  4. 实战示例:构建一个简单的聊天室
    • 服务端
    • 客户端
  5. 进阶概念与最佳实践
    • 异步处理
    • 认证与安全
    • 生产环境部署

什么是 WebSocket?

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。

Python WebSocket中文怎么用?-图1
(图片来源网络,侵删)
  • 全双工:意味着客户端和服务器可以同时、独立地发送和接收数据,就像一个双向的通话电话,而不是像发短信(HTTP)那样,一方发完另一方才能发。
  • 持久连接:与 HTTP 的“请求-响应”模式不同,WebSocket 连接一旦建立,就会保持打开状态,直到客户端或服务器主动关闭它,这使得服务器可以主动向客户端推送数据,而无需客户端先发起请求。

WebSocket 连接的建立过程通常被称为“握手”(Handshake),这个过程是基于 HTTP 协议的升级(Upgrade)机制来完成的。


为什么需要 WebSocket?

传统的网页交互模式主要是 HTTP,它是一种请求-响应模式。

  • 轮询:客户端每隔一段时间就向服务器发送一次请求,询问是否有新数据,这种方式效率低下,会产生大量不必要的请求,并且延迟高。
  • 长轮询:客户端发送一个请求,服务器会保持这个连接打开,直到有新数据可发送或超时,这比轮询好一些,但连接频繁地建立和关闭,依然比较浪费资源。

WebSocket 的优势:

  • 实时性:服务器可以随时向客户端推送数据,实现真正的实时通信(如聊天室、实时股票行情、在线游戏)。
  • 高效性:只需要一次 HTTP 握手,之后就在同一个连接上传输数据,避免了 HTTP 请求头带来的额外开销,通信效率高。
  • 节省带宽:由于是持久连接,减少了重复建立连接的开销。

适用场景:

Python WebSocket中文怎么用?-图2
(图片来源网络,侵删)
  • 即时通讯应用(微信、QQ网页版)
  • 在线协作工具(多人同时编辑文档)
  • 实时数据展示(股票、体育赛事比分)
  • 在线游戏和多人对战
  • 通知和提醒系统

Python WebSocket 生态

Python 有几个非常流行的 WebSocket 库,适用于不同的场景。

库名 类型 特点 适用场景
websockets 异步 最流行、功能强大,基于 asyncio,是 Python 异步生态的标配。 构建高性能的异步 WebSocket 服务端和客户端。
Django Channels 框架集成 将 WebSocket 集成到 Django 框架中,使得 Django 可以处理异步请求。 基于 Django 的全功能 Web 应用,需要 WebSocket 支持(如聊天、通知)。
Socket.IO 客户端/服务端 不仅仅是 WebSocket,它是一个库,提供了自动降级机制(WebSocket 不可用,会自动使用轮询等)。 需要跨浏览器、高兼容性,并且希望有额外功能(如房间、自动重连)的项目。

本文将重点介绍 websockets,因为它最纯粹、最常用,并且能让你深入理解 WebSocket 的工作原理。


实战示例:构建一个简单的聊天室

我们将使用 websockets 库来创建一个简单的命令行聊天室,一个用户(服务端)发送消息,所有连接的客户端都能收到。

第 1 步:安装 websockets

打开你的终端或命令行,运行以下命令:

Python WebSocket中文怎么用?-图3
(图片来源网络,侵删)
pip install websockets

第 2 步:创建服务端

服务端负责接收新的客户端连接,并将一个客户端发送的消息广播给所有其他已连接的客户端。

创建一个名为 server.py 的文件:

# server.py
import asyncio
import websockets
import json
# 用来存储所有已连接的客户端
connected_clients = set()
async def handler(websocket, path):
    """
    为每个新的 WebSocket 连接创建一个处理器
    """
    print(f"新的客户端连接: {websocket.remote_address}")
    # 将新连接的客户端添加到集合中
    connected_clients.add(websocket)
    try:
        # 持续监听来自这个客户端的消息
        async for message in websocket:
            print(f"收到来自 {websocket.remote_address} 的消息: {message}")
            # 将消息广播给所有其他客户端
            await broadcast(message, exclude=websocket)
    except websockets.exceptions.ConnectionClosed as e:
        print(f"客户端 {websocket.remote_address} 断开连接: {e}")
    finally:
        # 无论是否发生异常,最终都要将客户端从集合中移除
        connected_clients.remove(websocket)
        print(f"客户端 {websocket.remote_address} 已移除")
async def broadcast(message, exclude=None):
    """
    将消息发送给所有连接的客户端,除了指定的那个
    """
    # 使用 asyncio.gather 并发地发送消息给所有客户端
    # 如果某个客户端发送失败,不会影响其他客户端
    await asyncio.gather(
        *[client.send(message) for client in connected_clients if client != exclude],
        return_exceptions=True
    )
async def main():
    # 启动 WebSocket 服务器,监听所有网络接口的 8765 端口
    async with websockets.serve(handler, "0.0.0.0", 8765):
        print("WebSocket 服务器已启动,监听 ws://0.0.0.0:8765")
        # 保持服务器运行
        await asyncio.Future()  # run forever
if __name__ == "__main__":
    asyncio.run(main())

代码解释:

  1. connected_clients = set():一个集合,用来保存所有当前连接的客户端对象。
  2. async def handler(websocket, path):这是每个连接进入时的回调函数。websocket 对象代表了与一个客户端的连接。
  3. connected_clients.add(websocket):新客户端连接时,将其加入集合。
  4. async for message in websocket::这是一个异步循环,会一直阻塞,直到从该客户端收到一条新消息。
  5. await broadcast(message, exclude=websocket):调用广播函数,将收到的消息发给其他人。
  6. try...finally...:确保即使客户端异常断开,也能从 connected_clients 中移除,避免内存泄漏。
  7. async def broadcast(...):遍历所有客户端,并发地发送消息。asyncio.gather 是实现并发的关键。
  8. async with websockets.serve(handler, "0.0.0.0", 8765)::启动 WebSocket 服务器。
  9. asyncio.run(main()):运行异步主程序。

第 3 步:创建客户端

客户端负责连接服务器,监听服务器发来的消息,并允许用户从键盘输入消息发送给服务器。

创建一个名为 client.py 的文件:

# client.py
import asyncio
import websockets
import json
async def client_handler():
    # WebSocket 服务器的地址
    uri = "ws://localhost:8765"
    try:
        # 连接到服务器
        async with websockets.connect(uri) as websocket:
            print(f"已连接到服务器: {uri}")
            # 创建一个任务来接收消息
            receive_task = asyncio.create_task(receiver(websocket))
            # 主循环:从键盘读取并发送消息
            while True:
                message = input("请输入消息 (输入 'exit' 退出): ")
                if message.lower() == 'exit':
                    break
                await websocket.send(message)
            # 取消接收任务,确保程序能正常退出
            receive_task.cancel()
    except ConnectionRefusedError:
        print("无法连接到服务器,请确保服务器正在运行。")
    except websockets.exceptions.ConnectionClosed:
        print("与服务器的连接已关闭。")
async def receiver(websocket):
    """
    一个独立的任务,用于接收来自服务器的消息
    """
    try:
        async for message in websocket:
            print(f"\n[收到消息]: {message}")
            # 重新打印提示符,模拟一个友好的命令行界面
            print("请输入消息 (输入 'exit' 退出): ", end="", flush=True)
    except websockets.exceptions.ConnectionClosed:
        print("\n与服务器的连接已关闭。")
if __name__ == "__main__":
    asyncio.run(client_handler())

代码解释:

  1. async with websockets.connect(uri) as websocket::尝试连接到指定的服务器 URI。
  2. asyncio.create_task(receiver(websocket)):将接收消息的逻辑放到一个独立的异步任务中,这样发送和接收可以同时进行,不会互相阻塞。
  3. while True::主循环,通过 input() 等待用户输入。
  4. await websocket.send(message):将用户输入的消息发送给服务器。
  5. receive_task:这个任务会一直运行,监听并打印来自服务器的任何广播消息。

第 4 步:运行聊天室

  1. 启动服务端: 打开一个终端,运行:

    python server.py

    你会看到输出:WebSocket 服务器已启动,监听 ws://0.0.0.0:8765

  2. 启动客户端: 打开另一个终端,运行:

    python client.py

    你会看到:已连接到服务器: ws://localhost:8765请输入消息 (输入 'exit' 退出):

  3. 测试

    • 在第一个客户端输入 你好,世界! 然后回车。
    • 在第二个客户端(再开一个终端运行 client.py)你应该会立即看到 [收到消息]: 你好,世界!
    • 反之亦然,恭喜,你成功创建了一个简单的 WebSocket 聊天室!

进阶概念与最佳实践

异步处理

WebSocket 本质上是 I/O 密集型操作,非常适合使用异步(asyncio)。websockets 库就是基于 asyncio 的。始终使用 async/await 语法来处理 WebSocket 连接、发送和接收消息,这样可以避免阻塞事件循环,实现高并发。

认证与安全

在生产环境中,直接开放 WebSocket 端口是危险的,你需要进行身份验证。

  • 握手阶段认证:在 WebSocket 握手(HTTP 升级请求)时,可以检查 HTTP 头部(如 CookieAuthorization)来验证用户身份。

    # 在 server.py 的 handler 函数中
    async def handler(websocket, path):
        # 从请求头中获取 token
        token = websocket.request_headers.get("Authorization")
        if not is_valid_token(token):
            # 如果无效,直接关闭连接
            await websocket.close(code=1008, reason="Authentication failed")
            return
        # 否则,继续处理...
  • 消息级认证:在每条消息中包含用户身份信息(如用户ID),由服务端验证,这种方式更灵活,但握手阶段仍然需要基础认证。

  • 使用 WSS (WebSocket Secure):在生产环境中,必须使用 wss://(WebSocket Secure),它相当于 HTTPS 版本的 WebSocket,通过 TLS/SSL 加密所有通信内容,防止数据被窃听。

生产环境部署

  • 选择 Web 服务器websockets 库本身是一个简单的库,适合开发和测试,在生产环境中,通常将它部署在更强大的 ASGI(Asynchronous Server Gateway Interface)服务器后面,

    • Daphne:Django Channels 的推荐服务器。
    • Uvicorn:一个非常流行和轻量级的 ASGI 服务器。
    • Hypercorn:另一个功能强大的 ASGI 服务器。
  • 部署示例 (使用 Uvicorn): 假设你的服务端代码在一个名为 my_app/server.py 的文件中,主函数名为 main

    # 安装 uvicorn
    pip install uvicorn
    # 运行服务器
    uvicorn my_app.server:main --host 0.0.0.0 --port 8765 --ssl-keyfile key.pem --ssl-certfile cert.pem

    使用 --ssl-* 参数可以启用 WSS。


  • WebSocket 是实现实时、双向通信的强大协议,是 HTTP 的完美补充。
  • Python websockets 是进行异步 WebSocket 开发的首选工具,它简洁、高效。
  • 核心模式:服务端通过一个 handler 函数管理连接,使用 async for 循环接收消息;客户端通过 async with connect() 建立连接,并发地处理发送和接收。
  • 生产实践:务必关注安全性(认证、WSS)和性能(使用 Uvicorn/Daphne 等专业服务器进行部署)。

希望这份详细的中文指南能帮助你顺利地在 Python 项目中使用 WebSocket!

分享:
扫描分享到社交APP
上一篇
下一篇