什么是 WebSocket 心跳?
WebSocket 连接建立后,可能会因为网络问题(如 NAT 超时、防火墙切断等)导致连接实际上已经断开,但双方却不知情,为了解决这个问题,我们引入了“心跳”机制。

心跳机制的本质是:客户端和服务器之间定期发送简单的“ping/pong”消息,以确认对方是否在线,并保持连接的活跃状态。
- 客户端 -> 服务器: 发送一个
ping帧。 - 服务器 -> 客户端: 收到
ping后,回复一个pong帧。 - 服务器 -> 客户端: 也可以主动发送
ping,客户端回复pong。 - 超时处理: 如果一方在一定时间内(30 秒)没有收到对方的
pong或ping,就认为连接已经断开,并采取相应措施(如重连、关闭连接等)。
实现方案
在 Python 中,最常用的 WebSocket 库是 websockets,它本身内置了对 RFC 6455 标准中定义的 Ping/Pong 帧的支持,这使得实现心跳变得非常简单。
我们将从最简单的客户端心跳开始,然后逐步完善,并实现一个支持心跳的服务器。
客户端心跳(使用 websockets 库)
这是最核心的场景,客户端需要定期发送 ping,并处理服务器的 pong 响应,客户端也需要能处理服务器发来的 ping 并回复 pong。

websockets 库的 connect 函数提供了一个 ping_interval 参数,可以自动完成大部分工作。
安装库
pip install websockets
实现代码
下面的客户端代码会:
- 每 20 秒自动向服务器发送一个
ping帧。 - 30 秒内没有收到服务器的
pong响应,会抛出websockets.exceptions.ConnectionClosed异常。 - 它也能正确响应服务器主动发来的
ping。
import asyncio
import websockets
import logging
# 设置日志,方便查看连接状态
logging.basicConfig(level=logging.INFO)
async def heartbeat_client(uri):
"""
带有心跳机制的 WebSocket 客户端
:param uri: WebSocket 服务器的 URI
"""
try:
# ping_interval: 每 20 秒发送一次 ping
# ping_timeout: 等待 pong 响应的超时时间为 30 秒
# close_timeout: 关闭连接的超时时间
async with websockets.connect(
uri,
ping_interval=20, # 心跳间隔,单位:秒
ping_timeout=30, # 心跳超时,单位:秒
close_timeout=2 # 关闭连接的超时时间
) as websocket:
logging.info(f"已连接到服务器: {uri}")
# 启动一个任务来接收服务器的消息
receive_task = asyncio.create_task(receive_messages(websocket))
# 启动一个任务来发送消息
send_task = asyncio.create_task(send_user_input(websocket))
# 等待接收或发送任务完成
done, pending = await asyncio.wait(
[receive_task, send_task],
return_when=asyncio.FIRST_COMPLETED
)
# 如果一个任务完成,取消另一个任务
for task in pending:
task.cancel()
except websockets.exceptions.ConnectionClosed as e:
logging.error(f"连接已关闭: {e}")
# 在这里可以触发重连逻辑
await asyncio.sleep(5) # 等待5秒后重连
# asyncio.create_task(heartbeat_client(uri)) # 取消注释以实现自动重连
except Exception as e:
logging.error(f"发生错误: {e}")
async def receive_messages(websocket):
"""接收来自服务器的消息"""
try:
async for message in websocket:
logging.info(f"收到消息: {message}")
except websockets.exceptions.ConnectionClosed:
logging.info("接收消息时连接已关闭。")
async def send_user_input(websocket):
"""从用户输入获取并发送消息"""
while True:
try:
message = await asyncio.to_thread(input, "请输入要发送的消息 (或 'exit' 退出): ")
if message.lower() == 'exit':
break
await websocket.send(message)
except websockets.exceptions.ConnectionClosed:
logging.info("发送消息时连接已关闭。")
break
if __name__ == "__main__":
# 替换为你的 WebSocket 服务器地址
WEBSOCKET_URI = "ws://localhost:8765"
asyncio.run(heartbeat_client(WEBSOCKET_URI))
代码解析:
websockets.connect(uri, ping_interval=20, ping_timeout=30): 这是实现客户端心跳的关键。ping_interval=20: 客户端会每隔 20 秒自动向服务器发送一个ping帧。ping_timeout=30: 客户端发送ping后,30 秒内没有收到服务器的pong响应,客户端会主动关闭连接,并抛出ConnectionClosed异常。
async with ... as websocket: 这是websockets库推荐的连接管理方式,可以确保连接在使用后被正确关闭。receive_task和send_task: 我们将接收和发送操作放在两个独立的异步任务中,这样客户端既能保持心跳,又能同时收发业务数据,互不阻塞。
服务器端心跳(使用 websockets 库)
服务器端也需要有心跳机制来检测客户端是否存活。websockets 库的服务器端同样支持自动的 Pong 响应。
服务器代码
服务器会:
- 自动响应客户端发来的
ping(这是websockets库的默认行为)。 - 主动向客户端发送
ping来检测客户端是否存活。 - 如果客户端在一定时间内没有响应
pong,服务器会认为客户端已断开并关闭连接。
import asyncio
import websockets
import logging
import json
# 设置日志
logging.basicConfig(level=logging.INFO)
# 存储所有连接的客户端
connected_clients = set()
async def handler(websocket, path):
"""
处理每个 WebSocket 连接
"""
client_address = websocket.remote_address
logging.info(f"新客户端连接: {client_address}")
connected_clients.add(websocket)
try:
# 启动一个任务来定期向客户端发送 ping
ping_task = asyncio.create_task(periodic_ping(websocket))
# 启动一个任务来接收客户端消息
receive_task = asyncio.create_task(receive_from_client(websocket, client_address))
# 等待任一任务完成(连接关闭或出错)
done, pending = await asyncio.wait(
[ping_task, receive_task],
return_when=asyncio.FIRST_COMPLETED
)
# 清理任务
for task in pending:
task.cancel()
except websockets.exceptions.ConnectionClosed as e:
logging.info(f"客户端 {client_address} 断开连接: {e}")
finally:
connected_clients.remove(websocket)
logging.info(f"客户端 {client_address} 已从列表中移除。")
async def periodic_ping(websocket):
"""
定期向客户端发送 ping
"""
while websocket.open:
try:
# await websocket.ping() 发送 ping 并等待 pong
# 如果在 ping_timeout 时间内没有收到 pong,会抛出 ConnectionClosed
await websocket.ping()
logging.info(f"已向 {websocket.remote_address} 发送 ping")
await asyncio.sleep(20) # 每 20 秒 ping 一次
except websockets.exceptions.ConnectionClosed:
logging.info(f"客户端 {websocket.remote_address} 未响应 pong,连接已断开。")
break # 退出循环,任务结束
async def receive_from_client(websocket, client_address):
"""
接收来自客户端的消息
"""
try:
async for message in websocket:
logging.info(f"从 {client_address} 收到消息: {message}")
# 这里可以添加业务逻辑,例如将消息广播给所有客户端
# await broadcast(message)
except websockets.exceptions.ConnectionClosed:
logging.info(f"接收消息时,客户端 {client_address} 连接已关闭。")
async def broadcast(message):
"""广播消息给所有连接的客户端"""
if connected_clients:
await asyncio.gather(
*[client.send(message) for client in connected_clients],
return_exceptions=True
)
if __name__ == "__main__":
start_server = websockets.serve(handler, "localhost", 8765)
logging.info("WebSocket 服务器启动在 ws://localhost:8765")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
代码解析:
websockets.serve(handler, ...): 启动 WebSocket 服务器。periodic_ping(websocket): 这是一个自定义的异步任务,它会定期调用await websocket.ping()。await websocket.ping()不仅是发送ping,它还会等待pong响应。- 如果在服务器配置的
ping_timeout(默认 20 秒)内没有收到pong,await websocket.ping()会抛出ConnectionClosed异常,从而让我们知道客户端已经失联。
handler函数中的asyncio.wait: 它会等待ping_task或receive_task中任何一个完成,如果客户端失联,ping_task会先失败,然后整个handler协程会优雅地退出,finally块会确保客户端从connected_clients集合中移除。
手动实现心跳(更灵活)
有时候你可能需要更精细的控制,比如自定义心跳消息内容(而不仅仅是空帧),这时可以手动实现。
手动心跳客户端
import asyncio
import websockets
import json
import logging
logging.basicConfig(level=logging.INFO)
async def manual_heartbeat_client(uri):
ping_interval = 20
pong_timeout = 30
last_pong_time = asyncio.get_event_loop().time()
async with websockets.connect(uri) as websocket:
logging.info("连接已建立")
# 启动心跳任务
heartbeat_task = asyncio.create_task(send_heartbeat(websocket, ping_interval))
# 启动接收任务
receive_task = asyncio.create_task(handle_responses(websocket, last_pong_time, pong_timeout))
# 等待任务完成
done, pending = await asyncio.wait(
[heartbeat_task, receive_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
async def send_heartbeat(websocket, interval):
"""手动发送心跳消息"""
while websocket.open:
try:
# 发送自定义的 JSON 心跳消息
ping_message = {"type": "ping", "timestamp": asyncio.get_event_loop().time()}
await websocket.send(json.dumps(ping_message))
logging.info("已发送心跳 ping")
await asyncio.sleep(interval)
except websockets.exceptions.ConnectionClosed:
logging.info("发送心跳时连接已关闭")
break
async def handle_responses(websocket, last_pong_time, timeout):
"""处理所有服务器的响应,包括 pong 和业务消息"""
while websocket.open:
try:
response = await asyncio.wait_for(websocket.recv(), timeout=timeout)
current_time = asyncio.get_event_loop().time()
last_pong_time = current_time # 更新最后收到响应的时间
message = json.loads(response)
if message.get("type") == "pong":
logging.info(f"收到心跳 pong: {response}")
else:
logging.info(f"收到业务消息: {response}")
except asyncio.TimeoutError:
logging.error(f"在 {timeout} 秒内未收到服务器的响应,认为连接已断开。")
break # 触发连接关闭
except websockets.exceptions.ConnectionClosed:
logging.info("连接已关闭")
break
except json.JSONDecodeError:
logging.warning("收到非 JSON 格式消息")
if __name__ == "__main__":
WEBSOCKET_URI = "ws://localhost:8765"
# 注意:服务器需要能处理这种自定义格式的 ping/pong
asyncio.run(manual_heartbeat_client(WEBSOCKET_URI))
配套的手动心跳服务器
服务器需要能识别自定义的 ping 消息并回复 pong。
import asyncio
import websockets
import json
import logging
logging.basicConfig(level=logging.INFO)
async def manual_handler(websocket, path):
logging.info(f"新客户端连接: {websocket.remote_address}")
try:
async for message in websocket:
try:
data = json.loads(message)
if data.get("type") == "ping":
# 回复一个自定义的 pong 消息
pong_message = {"type": "pong", "timestamp": data.get("timestamp")}
await websocket.send(json.dumps(pong_message))
logging.info(f"已向 {websocket.remote_address} 回复 pong")
else:
# 处理其他业务消息
logging.info(f"收到业务消息: {data}")
# await websocket.send(f"Echo: {data}")
except json.JSONDecodeError:
logging.warning(f"收到无效的 JSON 消息: {message}")
except websockets.exceptions.ConnectionClosed:
logging.info(f"客户端 {websocket.remote_address} 断开连接")
if __name__ == "__main__":
start_server = websockets.serve(manual_handler, "localhost", 8765)
logging.info("支持手动心跳的 WebSocket 服务器启动")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
总结与建议
| 特性 | 自动心跳 (ping_interval) |
手动心跳 |
|---|---|---|
| 实现难度 | 非常简单,库内置支持 | 较复杂,需要自己管理任务和超时 |
| 心跳帧 | 标准的 Ping/Pong 帧 | 自定义格式的消息(如 JSON) |
| 灵活性 | 低,只能配置间隔和超时 | 高,可以完全自定义心跳逻辑和消息内容 |
| 适用场景 | 大多数标准应用,快速开发 | 需要特定心跳协议或复杂心跳逻辑的场景 |
给你的建议:
- 优先使用
websockets库的自动心跳 (ping_interval),对于 99% 它足够健壮且实现简单。 - 只有当你的项目有特殊需求,比如心跳消息需要携带特定业务信息,或者心跳逻辑非常复杂时,才考虑手动实现。
- 客户端心跳是必须的,因为它可以主动发现连接问题并触发重连,极大地提升了应用的健壮性。
- 服务器端的心跳(主动
ping客户端)可以作为客户端心跳的补充,用于清理长时间失联的客户端连接,但通常不是必需的,因为客户端的ping_timeout最终也会导致连接关闭。
