杰瑞科技汇

Python多线程Socket通讯如何高效处理并发连接?

核心思想

  1. 主线程(服务器线程):这个线程不直接处理客户端的通信,它的唯一职责是监听一个特定的端口,等待新的客户端连接,一旦有新的客户端连接,它就接受这个连接,并创建一个新的工作线程来专门处理这个客户端的所有后续通信。
  2. 工作线程(客户端处理线程):每个工作线程负责与一个特定的客户端进行一对一的通信,它会接收客户端发来的数据,处理,然后发送回响应,当客户端断开连接时,这个工作线程也随之结束。

这种模型的好处是,主线程可以立即返回去监听新的连接,而不会因为处理某个客户端的耗时操作(如等待输入)而阻塞,服务器可以同时为多个客户端服务。

Python多线程Socket通讯如何高效处理并发连接?-图1
(图片来源网络,侵删)

完整代码示例

下面是一个完整的、可运行的例子,包含了服务器端和客户端。

服务器端代码 (server.py)

import socket
import threading
# 定义服务器的地址和端口
HOST = '127.0.0.1'  # 本地回环地址,表示服务器运行在本机
PORT = 65432        # 选择一个未被占用的端口
# 存储所有客户端连接的列表,方便广播消息
client_threads = []
client_sockets = []
def handle_client(client_socket, client_address):
    """
    处理单个客户端连接的函数,每个客户端连接都会启动一个此函数的线程
    """
    print(f"[新连接] {client_address} 已连接。")
    try:
        while True:
            # 接收客户端发来的数据
            # recv(1024) 表示每次最多接收 1024 字节
            # 如果客户端断开连接,recv() 会返回空字节 b''
            data = client_socket.recv(1024)
            if not data:
                # 如果没有数据,说明客户端已断开连接
                print(f"[客户端断开] {client_address} 断开了连接。")
                break
            # 将接收到的字节串解码为字符串
            message = data.decode('utf-8')
            print(f"[来自 {client_address}] 的消息: {message}")
            # 向客户端发送响应
            response = f"服务器已收到你的消息: {message}"
            client_socket.sendall(response.encode('utf-8'))
    except ConnectionResetError:
        print(f"[客户端异常断开] {client_address} 强制关闭了连接。")
    finally:
        # 确保在循环结束后关闭客户端套接字
        client_socket.close()
        # 从列表中移除该客户端
        if client_socket in client_sockets:
            client_sockets.remove(client_socket)
        print(f"[连接关闭] 与 {client_address} 的连接已关闭。")
def start_server():
    """
    启动服务器的主函数
    """
    # 创建一个 TCP 套接字 (socket.AF_INET 表示 IPv4, socket.SOCK_STREAM 表示 TCP)
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置 SO_REUSEADDR 选项,允许地址在端口被占用后立即重用
    # 这在服务器快速重启时很有用
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        # 绑定套接字到指定的地址和端口
        server_socket.bind((HOST, PORT))
        # 开始监听连接,参数 5 表示等待连接的最大队列数
        server_socket.listen(5)
        print(f"服务器正在监听 {HOST}:{PORT}...")
        while True:
            # 阻塞式地等待接受新的客户端连接
            # accept() 返回一个新的套接字对象(用于与该客户端通信)和客户端的地址
            client_socket, client_address = server_socket.accept()
            # 将新连接的客户端套接字添加到列表中
            client_sockets.append(client_socket)
            # 创建一个新的线程来处理这个客户端
            # target=handle_client 指定线程要执行的函数
            # args=(client_socket, client_address) 传递给函数的参数
            client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
            # 设置 daemon=True,这样当主线程结束时,所有子线程也会自动结束
            client_thread.daemon = True
            # 启动线程
            client_thread.start()
            # 将线程对象也存储起来,虽然在这个简单例子中没用到,但在更复杂的场景下(如广播)会很有用
            client_threads.append(client_thread)
            print(f"[活动连接数] {threading.active_count() - 1}") # 减1是因为减去主线程
    except KeyboardInterrupt:
        print("\n[服务器关闭] 用户中断了服务器。")
    finally:
        # 关闭服务器套接字
        print("正在关闭服务器套接字...")
        server_socket.close()
        # 关闭所有客户端套接字
        for sock in client_sockets:
            sock.close()
        print("服务器已完全关闭。")
if __name__ == "__main__":
    start_server()

客户端代码 (client.py)

import socket
import threading
HOST = '127.0.0.1'  # 服务器地址
PORT = 65432        # 服务器端口
def receive_messages(client_socket):
    """
    一个独立的线程,用于持续接收来自服务器的消息
    """
    try:
        while True:
            data = client_socket.recv(1024)
            if not data:
                # 如果服务器关闭连接,recv() 会返回空字节
                print("\n[服务器已断开连接]")
                break
            print(f"\n[服务器]: {data.decode('utf-8')}")
    except ConnectionResetError:
        print("\n[服务器异常断开连接]")
    finally:
        client_socket.close()
def start_client():
    """
    启动客户端的主函数
    """
    # 创建客户端套接字
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        # 连接到服务器
        client_socket.connect((HOST, PORT))
        print(f"已连接到服务器 {HOST}:{PORT}")
        # 创建一个线程来专门接收服务器的消息
        receive_thread = threading.Thread(target=receive_messages, args=(client_socket,))
        receive_thread.daemon = True  # 设置为守护线程
        receive_thread.start()
        # 主线程用于发送用户输入的消息
        while True:
            message = input("请输入要发送的消息 (输入 'exit' 退出): ")
            if message.lower() == 'exit':
                break
            client_socket.sendall(message.encode('utf-8'))
    except ConnectionRefusedError:
        print("连接失败:服务器未启动或拒绝连接。")
    finally:
        print("正在关闭客户端...")
        client_socket.close()
        print("客户端已关闭。")
if __name__ == "__main__":
    start_client()

如何运行

  1. 启动服务器: 在你的终端或命令行中,运行服务器脚本:

    python server.py

    你会看到输出:

    服务器正在监听 127.0.0.1:65432...
  2. 启动客户端: 打开另一个新的终端,运行客户端脚本:

    Python多线程Socket通讯如何高效处理并发连接?-图2
    (图片来源网络,侵删)
    python client.py

    你会看到输出:

    已连接到服务器 127.0.0.1:65432
    请输入要发送的消息 (输入 'exit' 退出):
  3. 测试多客户端: 再打开第三个终端,再次运行 python client.py,现在你有两个客户端同时连接到服务器。

  4. 通信测试

    • 在第一个客户端的输入框中输入 你好,服务器!,然后按回车。
    • 你会在第一个客户端的终端上看到服务器的响应,同时在服务器的终端上会打印出接收到的消息。
    • 在第二个客户端的输入框中输入 这是第二个客户端,然后按回车。
    • 你会在第二个客户端上看到响应,同时在服务器的终端上也会打印出这条消息。

    你会看到服务器终端的活动连接数在增加,并且能同时处理来自两个客户端的消息,互不干扰。

    Python多线程Socket通讯如何高效处理并发连接?-图3
    (图片来源网络,侵删)
  5. 关闭

    • 在客户端输入 exit 并回车,客户端会关闭连接。
    • 在服务器端,你会看到相应的连接关闭提示。
    • 在服务器终端按下 Ctrl+C,可以优雅地关闭整个服务器。

关键点解析

  1. socket.socket():创建一个套接字对象,是网络通信的基础。
  2. bind():将套接字与一个网络地址(IP和端口)绑定。
  3. listen():使套接字进入监听状态,准备接受客户端连接。
  4. accept()阻塞函数,等待并接受一个客户端连接,成功后,返回一个新的 client_socket(用于和这个特定客户端通信)和 client_address
  5. recv(size):从连接的另一端接收数据,它是阻塞的,如果没有数据到达,程序会停在这里,如果连接关闭,它会返回空字节 b''
  6. sendall(data):发送数据,它会确保所有数据都被发送出去,如果发生错误,则会抛出异常。
  7. threading.Thread(target=..., args=...):创建一个新线程。
    • target:指定线程启动后要执行的函数。
    • args:一个元组,包含了传递给 target 函数的参数。
  8. thread.daemon = True:将线程设置为守护线程,这意味着当主程序(主线程)结束时,这个线程也会被强制终止,这对于后台任务(如我们这里接收消息)非常有用,确保程序可以正常退出。
  9. thread.start():启动线程,使其开始执行 target 函数。

潜在问题与改进

这个简单的示例展示了多线程的基本原理,但在生产环境中还需要考虑更多:

  • 资源耗尽:如果恶意或大量客户端同时连接,服务器可能会创建过多线程,导致资源耗尽(内存、CPU),解决方案是使用线程池来限制并发线程的数量。
  • 线程安全:如果多个线程需要同时修改共享资源(比如一个全局列表),可能会引发竞态条件,在 Python 中,可以使用 threading.Lock 来确保对共享资源的访问是原子性的。
  • 广播功能:当前服务器无法向所有客户端广播消息,要实现这一点,可以在服务器端维护一个所有 client_socket 的列表,当需要广播时,遍历这个列表,对每个 socket 调用 sendall()
  • 更高级的并发模型:对于更高性能的场景,可以考虑使用 asyncio(异步I/O)或 multiprocessing(多进程),它们能更好地处理大规模并发。asyncio 是目前 Python 中处理高并发 I/O 密集型任务的主流方案。
分享:
扫描分享到社交APP
上一篇
下一篇