杰瑞科技汇

Python如何实现WebSocket心跳机制?

什么是 WebSocket 心跳?

WebSocket 连接建立后,可能会因为网络问题(如 NAT 超时、防火墙切断等)导致连接实际上已经断开,但双方却不知情,为了解决这个问题,我们引入了“心跳”机制。

Python如何实现WebSocket心跳机制?-图1
(图片来源网络,侵删)

心跳机制的本质是:客户端和服务器之间定期发送简单的“ping/pong”消息,以确认对方是否在线,并保持连接的活跃状态。

  • 客户端 -> 服务器: 发送一个 ping 帧。
  • 服务器 -> 客户端: 收到 ping 后,回复一个 pong 帧。
  • 服务器 -> 客户端: 也可以主动发送 ping,客户端回复 pong
  • 超时处理: 如果一方在一定时间内(30 秒)没有收到对方的 pongping,就认为连接已经断开,并采取相应措施(如重连、关闭连接等)。

实现方案

在 Python 中,最常用的 WebSocket 库是 websockets,它本身内置了对 RFC 6455 标准中定义的 Ping/Pong 帧的支持,这使得实现心跳变得非常简单。

我们将从最简单的客户端心跳开始,然后逐步完善,并实现一个支持心跳的服务器。

客户端心跳(使用 websockets 库)

这是最核心的场景,客户端需要定期发送 ping,并处理服务器的 pong 响应,客户端也需要能处理服务器发来的 ping 并回复 pong

Python如何实现WebSocket心跳机制?-图2
(图片来源网络,侵删)

websockets 库的 connect 函数提供了一个 ping_interval 参数,可以自动完成大部分工作。

安装库

pip install websockets

实现代码

下面的客户端代码会:

  1. 每 20 秒自动向服务器发送一个 ping 帧。
  2. 30 秒内没有收到服务器的 pong 响应,会抛出 websockets.exceptions.ConnectionClosed 异常。
  3. 它也能正确响应服务器主动发来的 ping
import asyncio
import websockets
import logging
# 设置日志,方便查看连接状态
logging.basicConfig(level=logging.INFO)
async def heartbeat_client(uri):
    """
    带有心跳机制的 WebSocket 客户端
    :param uri: WebSocket 服务器的 URI
    """
    try:
        # ping_interval: 每 20 秒发送一次 ping
        # ping_timeout: 等待 pong 响应的超时时间为 30 秒
        # close_timeout: 关闭连接的超时时间
        async with websockets.connect(
            uri,
            ping_interval=20,  # 心跳间隔,单位:秒
            ping_timeout=30,    # 心跳超时,单位:秒
            close_timeout=2     # 关闭连接的超时时间
        ) as websocket:
            logging.info(f"已连接到服务器: {uri}")
            # 启动一个任务来接收服务器的消息
            receive_task = asyncio.create_task(receive_messages(websocket))
            # 启动一个任务来发送消息
            send_task = asyncio.create_task(send_user_input(websocket))
            # 等待接收或发送任务完成
            done, pending = await asyncio.wait(
                [receive_task, send_task],
                return_when=asyncio.FIRST_COMPLETED
            )
            # 如果一个任务完成,取消另一个任务
            for task in pending:
                task.cancel()
    except websockets.exceptions.ConnectionClosed as e:
        logging.error(f"连接已关闭: {e}")
        # 在这里可以触发重连逻辑
        await asyncio.sleep(5) # 等待5秒后重连
        # asyncio.create_task(heartbeat_client(uri)) # 取消注释以实现自动重连
    except Exception as e:
        logging.error(f"发生错误: {e}")
async def receive_messages(websocket):
    """接收来自服务器的消息"""
    try:
        async for message in websocket:
            logging.info(f"收到消息: {message}")
    except websockets.exceptions.ConnectionClosed:
        logging.info("接收消息时连接已关闭。")
async def send_user_input(websocket):
    """从用户输入获取并发送消息"""
    while True:
        try:
            message = await asyncio.to_thread(input, "请输入要发送的消息 (或 'exit' 退出): ")
            if message.lower() == 'exit':
                break
            await websocket.send(message)
        except websockets.exceptions.ConnectionClosed:
            logging.info("发送消息时连接已关闭。")
            break
if __name__ == "__main__":
    # 替换为你的 WebSocket 服务器地址
    WEBSOCKET_URI = "ws://localhost:8765"
    asyncio.run(heartbeat_client(WEBSOCKET_URI))

代码解析:

  • websockets.connect(uri, ping_interval=20, ping_timeout=30): 这是实现客户端心跳的关键。
    • ping_interval=20: 客户端会每隔 20 秒自动向服务器发送一个 ping 帧。
    • ping_timeout=30: 客户端发送 ping 后,30 秒内没有收到服务器的 pong 响应,客户端会主动关闭连接,并抛出 ConnectionClosed 异常。
  • async with ... as websocket: 这是 websockets 库推荐的连接管理方式,可以确保连接在使用后被正确关闭。
  • receive_tasksend_task: 我们将接收和发送操作放在两个独立的异步任务中,这样客户端既能保持心跳,又能同时收发业务数据,互不阻塞。

服务器端心跳(使用 websockets 库)

服务器端也需要有心跳机制来检测客户端是否存活。websockets 库的服务器端同样支持自动的 Pong 响应。

服务器代码

服务器会:

  1. 自动响应客户端发来的 ping(这是 websockets 库的默认行为)。
  2. 主动向客户端发送 ping 来检测客户端是否存活。
  3. 如果客户端在一定时间内没有响应 pong,服务器会认为客户端已断开并关闭连接。
import asyncio
import websockets
import logging
import json
# 设置日志
logging.basicConfig(level=logging.INFO)
# 存储所有连接的客户端
connected_clients = set()
async def handler(websocket, path):
    """
    处理每个 WebSocket 连接
    """
    client_address = websocket.remote_address
    logging.info(f"新客户端连接: {client_address}")
    connected_clients.add(websocket)
    try:
        # 启动一个任务来定期向客户端发送 ping
        ping_task = asyncio.create_task(periodic_ping(websocket))
        # 启动一个任务来接收客户端消息
        receive_task = asyncio.create_task(receive_from_client(websocket, client_address))
        # 等待任一任务完成(连接关闭或出错)
        done, pending = await asyncio.wait(
            [ping_task, receive_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        # 清理任务
        for task in pending:
            task.cancel()
    except websockets.exceptions.ConnectionClosed as e:
        logging.info(f"客户端 {client_address} 断开连接: {e}")
    finally:
        connected_clients.remove(websocket)
        logging.info(f"客户端 {client_address} 已从列表中移除。")
async def periodic_ping(websocket):
    """
    定期向客户端发送 ping
    """
    while websocket.open:
        try:
            # await websocket.ping() 发送 ping 并等待 pong
            # 如果在 ping_timeout 时间内没有收到 pong,会抛出 ConnectionClosed
            await websocket.ping()
            logging.info(f"已向 {websocket.remote_address} 发送 ping")
            await asyncio.sleep(20)  # 每 20 秒 ping 一次
        except websockets.exceptions.ConnectionClosed:
            logging.info(f"客户端 {websocket.remote_address} 未响应 pong,连接已断开。")
            break # 退出循环,任务结束
async def receive_from_client(websocket, client_address):
    """
    接收来自客户端的消息
    """
    try:
        async for message in websocket:
            logging.info(f"从 {client_address} 收到消息: {message}")
            # 这里可以添加业务逻辑,例如将消息广播给所有客户端
            # await broadcast(message)
    except websockets.exceptions.ConnectionClosed:
        logging.info(f"接收消息时,客户端 {client_address} 连接已关闭。")
async def broadcast(message):
    """广播消息给所有连接的客户端"""
    if connected_clients:
        await asyncio.gather(
            *[client.send(message) for client in connected_clients],
            return_exceptions=True
        )
if __name__ == "__main__":
    start_server = websockets.serve(handler, "localhost", 8765)
    logging.info("WebSocket 服务器启动在 ws://localhost:8765")
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()

代码解析:

  • websockets.serve(handler, ...): 启动 WebSocket 服务器。
  • periodic_ping(websocket): 这是一个自定义的异步任务,它会定期调用 await websocket.ping()
    • await websocket.ping() 不仅是发送 ping,它还会等待 pong 响应。
    • 如果在服务器配置的 ping_timeout(默认 20 秒)内没有收到 pongawait websocket.ping() 会抛出 ConnectionClosed 异常,从而让我们知道客户端已经失联。
  • handler 函数中的 asyncio.wait: 它会等待 ping_taskreceive_task 中任何一个完成,如果客户端失联,ping_task 会先失败,然后整个 handler 协程会优雅地退出,finally 块会确保客户端从 connected_clients 集合中移除。

手动实现心跳(更灵活)

有时候你可能需要更精细的控制,比如自定义心跳消息内容(而不仅仅是空帧),这时可以手动实现。

手动心跳客户端

import asyncio
import websockets
import json
import logging
logging.basicConfig(level=logging.INFO)
async def manual_heartbeat_client(uri):
    ping_interval = 20
    pong_timeout = 30
    last_pong_time = asyncio.get_event_loop().time()
    async with websockets.connect(uri) as websocket:
        logging.info("连接已建立")
        # 启动心跳任务
        heartbeat_task = asyncio.create_task(send_heartbeat(websocket, ping_interval))
        # 启动接收任务
        receive_task = asyncio.create_task(handle_responses(websocket, last_pong_time, pong_timeout))
        # 等待任务完成
        done, pending = await asyncio.wait(
            [heartbeat_task, receive_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        for task in pending:
            task.cancel()
async def send_heartbeat(websocket, interval):
    """手动发送心跳消息"""
    while websocket.open:
        try:
            # 发送自定义的 JSON 心跳消息
            ping_message = {"type": "ping", "timestamp": asyncio.get_event_loop().time()}
            await websocket.send(json.dumps(ping_message))
            logging.info("已发送心跳 ping")
            await asyncio.sleep(interval)
        except websockets.exceptions.ConnectionClosed:
            logging.info("发送心跳时连接已关闭")
            break
async def handle_responses(websocket, last_pong_time, timeout):
    """处理所有服务器的响应,包括 pong 和业务消息"""
    while websocket.open:
        try:
            response = await asyncio.wait_for(websocket.recv(), timeout=timeout)
            current_time = asyncio.get_event_loop().time()
            last_pong_time = current_time # 更新最后收到响应的时间
            message = json.loads(response)
            if message.get("type") == "pong":
                logging.info(f"收到心跳 pong: {response}")
            else:
                logging.info(f"收到业务消息: {response}")
        except asyncio.TimeoutError:
            logging.error(f"在 {timeout} 秒内未收到服务器的响应,认为连接已断开。")
            break # 触发连接关闭
        except websockets.exceptions.ConnectionClosed:
            logging.info("连接已关闭")
            break
        except json.JSONDecodeError:
            logging.warning("收到非 JSON 格式消息")
if __name__ == "__main__":
    WEBSOCKET_URI = "ws://localhost:8765"
    # 注意:服务器需要能处理这种自定义格式的 ping/pong
    asyncio.run(manual_heartbeat_client(WEBSOCKET_URI))

配套的手动心跳服务器

服务器需要能识别自定义的 ping 消息并回复 pong

import asyncio
import websockets
import json
import logging
logging.basicConfig(level=logging.INFO)
async def manual_handler(websocket, path):
    logging.info(f"新客户端连接: {websocket.remote_address}")
    try:
        async for message in websocket:
            try:
                data = json.loads(message)
                if data.get("type") == "ping":
                    # 回复一个自定义的 pong 消息
                    pong_message = {"type": "pong", "timestamp": data.get("timestamp")}
                    await websocket.send(json.dumps(pong_message))
                    logging.info(f"已向 {websocket.remote_address} 回复 pong")
                else:
                    # 处理其他业务消息
                    logging.info(f"收到业务消息: {data}")
                    # await websocket.send(f"Echo: {data}")
            except json.JSONDecodeError:
                logging.warning(f"收到无效的 JSON 消息: {message}")
    except websockets.exceptions.ConnectionClosed:
        logging.info(f"客户端 {websocket.remote_address} 断开连接")
if __name__ == "__main__":
    start_server = websockets.serve(manual_handler, "localhost", 8765)
    logging.info("支持手动心跳的 WebSocket 服务器启动")
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()

总结与建议

特性 自动心跳 (ping_interval) 手动心跳
实现难度 非常简单,库内置支持 较复杂,需要自己管理任务和超时
心跳帧 标准的 Ping/Pong 帧 自定义格式的消息(如 JSON)
灵活性 低,只能配置间隔和超时 ,可以完全自定义心跳逻辑和消息内容
适用场景 大多数标准应用,快速开发 需要特定心跳协议或复杂心跳逻辑的场景

给你的建议:

  1. 优先使用 websockets 库的自动心跳 (ping_interval),对于 99% 它足够健壮且实现简单。
  2. 只有当你的项目有特殊需求,比如心跳消息需要携带特定业务信息,或者心跳逻辑非常复杂时,才考虑手动实现
  3. 客户端心跳是必须的,因为它可以主动发现连接问题并触发重连,极大地提升了应用的健壮性。
  4. 服务器端的心跳(主动 ping 客户端)可以作为客户端心跳的补充,用于清理长时间失联的客户端连接,但通常不是必需的,因为客户端的 ping_timeout 最终也会导致连接关闭。
分享:
扫描分享到社交APP
上一篇
下一篇