杰瑞科技汇

python socket 全双工

什么是全双工?

明确一下全双工的定义。

python socket 全双工-图1
(图片来源网络,侵删)
  • 全双工:指通信的双方可以同时进行数据的发送和接收,想象一下打电话,你和对方可以同时说话,也能同时听到对方的声音,这就是典型的全双工通信。

  • 与之对比

    • 半双工:通信双方可以收发数据,但不能同时进行,比如对讲机,一方说完“Over”后,另一方才能说。
    • 单工:数据只能在一个方向上传输,比如广播,电台发送,听众只能接收。

在 TCP 通信中,由于其全双工的特性,客户端和服务器都拥有独立的发送和接收通道,实现全双工的关键在于:在同一个程序中,同时运行发送和接收的逻辑,并且这两个逻辑不能互相阻塞。

实现全双工的核心挑战:阻塞

标准的 socket 操作是阻塞的。

python socket 全双工-图2
(图片来源网络,侵删)
  • conn.recv(1024): 会一直等待,直到收到至少 1 字节的数据,或者连接关闭,如果没有任何数据到达,程序会“卡住”无法执行后续的代码(比如发送数据)。
  • conn.send(data): 通常也会阻塞,直到所有数据都被成功发送到操作系统的发送缓冲区。

如果我们在一个线程或主线程中先调用 recv(),那么整个程序就会被阻塞,无法再发送数据,反之亦然,这就无法实现“收发。

解决方案:多线程

最直接、最常用的解决方案是使用多线程,一个线程专门负责接收数据,另一个线程专门负责发送数据,这样,两个线程可以并行运行,互不干扰,从而实现全双工通信。

我们将分别展示一个客户端和一个服务器的全双工实现。


实现示例 1:全双工客户端

这个客户端可以同时从键盘输入发送消息,并实时接收来自服务器的消息。

python socket 全双工-图3
(图片来源网络,侵删)
# client.py
import socket
import threading
def receive_messages(client_socket):
    """接收服务器消息的线程函数"""
    while True:
        try:
            # 接收服务器发送的数据
            message = client_socket.recv(1024).decode('utf-8')
            if not message:
                # 如果接收到的消息为空,说明服务器已关闭连接
                print("服务器已关闭连接。")
                break
            print(f"\n[服务器]: {message}")
        except ConnectionResetError:
            print("与服务器连接已断开。")
            break
        except Exception as e:
            print(f"发生错误: {e}")
            break
def send_messages(client_socket):
    """发送消息到服务器的线程函数"""
    while True:
        # 从键盘获取输入
        message = input("请输入要发送的消息 (输入 'exit' 退出): ")
        if message.lower() == 'exit':
            print("正在退出...")
            break
        try:
            # 发送消息到服务器
            client_socket.sendall(message.encode('utf-8'))
        except Exception as e:
            print(f"发送消息失败: {e}")
            break
def main():
    # 1. 创建 socket 对象
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 2. 连接服务器
    server_address = ('127.0.0.1', 12345)  # 替换为你的服务器IP和端口
    print(f"正在连接到服务器 {server_address}...")
    try:
        client_socket.connect(server_address)
        print("已连接到服务器!")
    except ConnectionRefusedError:
        print("连接被拒绝,请确保服务器正在运行。")
        return
    # 3. 创建并启动接收和发送线程
    receive_thread = threading.Thread(target=receive_messages, args=(client_socket,))
    send_thread = threading.Thread(target=send_messages, args=(client_socket))
    receive_thread.start()
    send_thread.start()
    # 4. 等待发送线程结束(即用户输入 'exit')
    send_thread.join()
    # 5. 关闭连接
    print("关闭连接。")
    client_socket.close()
if __name__ == "__main__":
    main()

实现示例 2:全双工服务器

这个服务器可以同时与多个客户端通信,对于每个连接的客户端,它会启动两个线程(一个接收,一个发送)来处理,从而实现与该客户端的全双工通信。

# server.py
import socket
import threading
# 存储所有客户端连接的字典
# key: 客户端地址, value: 客户端socket
clients = {}
def handle_client(client_socket, client_address):
    """处理单个客户端连接的函数"""
    print(f"新客户端连接: {client_address}")
    clients[client_address] = client_socket
    def receive_messages():
        """接收客户端消息的线程函数"""
        while True:
            try:
                message = client_socket.recv(1024).decode('utf-8')
                if not message:
                    # 客户端断开连接
                    print(f"客户端 {client_address} 断开连接。")
                    break
                # 广播消息给所有其他客户端
                broadcast(f"{client_address}: {message}", client_address)
            except ConnectionResetError:
                print(f"客户端 {client_address} 异常断开。")
                break
            except Exception as e:
                print(f"处理客户端 {client_address} 时发生错误: {e}")
                break
        # 清理断开的客户端
        if client_address in clients:
            del clients[client_address]
        client_socket.close()
    def send_messages():
        """向客户端发送消息的线程函数(这里主要用于服务器主动发送)"""
        # 在这个简单的例子中,服务器不主动发送,但保留了这个结构
        # 你可以在这里添加服务器主动发送的逻辑
        pass
    # 为该客户端创建接收和发送线程
    receive_thread = threading.Thread(target=receive_messages)
    send_thread = threading.Thread(target=send_messages)
    receive_thread.start()
    send_thread.start()
    # 等待接收线程结束
    receive_thread.join()
    print(f"客户端 {client_address} 的处理线程已结束。")
def broadcast(message, sender_address):
    """向所有客户端广播消息,除了发送者自己"""
    for address, sock in clients.items():
        if address != sender_address:
            try:
                sock.sendall(message.encode('utf-8'))
            except Exception as e:
                print(f"向客户端 {address} 广播失败: {e}")
                # 如果发送失败,可能客户端已断开,可以在这里进行清理
                if address in clients:
                    del clients[address]
                    sock.close()
def main():
    # 1. 创建 socket 对象
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 允许地址重用
    # 2. 绑定地址和端口
    host = '0.0.0.0'  # 监听所有可用的网络接口
    port = 12345
    server_address = (host, port)
    server_socket.bind(server_address)
    # 3. 开始监听
    server_socket.listen(5)  # backlog 参数,表示可以挂起的连接数
    print(f"服务器正在监听 {server_address}...")
    try:
        while True:
            # 4. 接受新的客户端连接
            client_socket, client_address = server_socket.accept()
            # 5. 为每个新连接创建一个处理线程
            client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
            client_thread.start()
    except KeyboardInterrupt:
        print("\n服务器正在关闭...")
    finally:
        # 6. 关闭服务器socket
        server_socket.close()
        print("服务器已关闭。")
if __name__ == "__main__":
    main()

如何运行

  1. 启动服务器: 在一个终端窗口中运行 python server.py,你会看到 "服务器正在监听..." 的提示。

  2. 启动客户端: 在另一个(或多个)新的终端窗口中运行 python client.py,你会看到 "已连接到服务器!" 的提示。

  3. 测试通信

    • 在一个客户端输入消息,按回车,你会看到服务器将这条消息广播给所有其他连接的客户端。
    • 每个客户端都可以随时输入新的消息,而不会影响接收其他客户端消息的能力。
    • 在一个客户端输入 exit 可以退出该客户端的连接。

总结与要点

  1. 核心思想:利用多线程(或多进程)将阻塞的 recv()send() 操作分离到不同的线程中执行,实现真正的并发和全双工。
  2. 客户端模型:通常一个客户端需要两个线程,一个负责从标准输入(键盘)读取并发送,另一个负责接收并打印来自服务器的消息。
  3. 服务器模型:服务器的主线程负责 accept() 新连接,每当有新连接,就创建一个新的处理线程,在处理线程内部,通常也需要两个子线程来分别处理与该客户端的收发(尤其是在服务器需要主动向客户端推送消息的场景下)。
  4. 线程安全:在服务器端,clients 字典被多个线程(每个客户端一个)共享,虽然在这个简单例子中只是读取,但在更复杂的应用中,如果多个线程要修改它(比如添加/删除客户端),就需要使用锁(threading.Lock)来避免竞态条件。
  5. 异常处理:网络连接是不稳定的,必须妥善处理 ConnectionResetErrorOSError 等异常,以防止程序意外崩溃,并在客户端断开时正确清理资源。

除了多线程,你还可以使用 asyncio(异步 I/O)来实现全双工,它在处理大量并发连接时性能通常更好,但编程模型(基于协程)比多线程更复杂,对于初学者和理解基本原理,多线程是最清晰、最容易上手的方法。

分享:
扫描分享到社交APP
上一篇
下一篇