杰瑞科技汇

Python进程间通信有哪些常用方式?

在 Python 中,由于 全局解释器锁 的存在,多线程并不能真正实现并行计算,以充分利用多核 CPU,对于 CPU 密集型任务,我们通常使用多进程,进程拥有独立的内存空间,不像线程那样共享内存,所以当多个进程需要协作时,就必须通过特定的机制进行通信和数据交换。

Python进程间通信有哪些常用方式?-图1
(图片来源网络,侵删)

下面我将从 简单到复杂,介绍 Python 中几种主要的进程通信方式,并附上代码示例。

核心概念:multiprocessing 模块

Python 提供了强大的 multiprocessing 模块,它是实现多进程和进程通信的标准库,我们所有的示例都将围绕这个模块展开。


队列

队列是最常用、最基础的进程通信方式之一,它遵循 先进先出 的原则,一个进程可以把数据放入队列,另一个进程从队列中取出数据。multiprocessing.Queue 是线程和进程安全的,非常适合在生产者和消费者模型中使用。

工作原理: multiprocessing.Queue 底层使用管道 和锁机制来实现,数据被序列化后存放在管道中,锁确保了同一时间只有一个进程能访问管道,从而避免了数据竞争。

Python进程间通信有哪些常用方式?-图2
(图片来源网络,侵删)

适用场景:

  • 生产者-消费者模型。
  • 需要在多个进程间传递简单数据结构(如数字、字符串、列表、字典等)。
  • 任务分发和结果收集。

示例代码:

import multiprocessing
import time
import random
# 消费者进程
def consumer(queue):
    """从队列中获取任务并处理"""
    print(f"消费者 {multiprocessing.current_process().name} 启动")
    while True:
        # 从队列中获取数据,block=True 表示会阻塞等待
        item = queue.get()
        if item is None:  # 收到结束信号
            print(f"消费者 {multiprocessing.current_process().name} 收到结束信号,退出。")
            break
        print(f"消费者 {multiprocessing.current_process().name} 正在处理: {item}")
        time.sleep(random.random())  # 模拟处理耗时
    print(f"消费者 {multiprocessing.current_process().name} 结束")
# 生产者进程
def producer(queue, count):
    """生成任务并放入队列"""
    print(f"生产者 {multiprocessing.current_process().name} 启动")
    for i in range(count):
        item = f"任务-{i}"
        print(f"生产者 {multiprocessing.current_process().name} 放入: {item}")
        queue.put(item)
        time.sleep(random.random()) # 模拟生产耗时
    # 放入结束信号,通知消费者所有任务已完成
    print(f"生产者 {multiprocessing.current_process().name} 放入结束信号")
    queue.put(None)
if __name__ == "__main__":
    # 创建一个队列
    task_queue = multiprocessing.Queue()
    # 创建并启动消费者进程
    consumer_p = multiprocessing.Process(target=consumer, args=(task_queue,))
    consumer_p.start()
    # 创建并启动生产者进程
    producer_p = multiprocessing.Process(target=producer, args=(task_queue, 5))
    producer_p.start()
    # 等待生产者完成
    producer_p.join()
    print("生产者已结束")
    # 等待消费者完成(消费者在收到None后会自行退出)
    consumer_p.join()
    print("消费者已结束")
    print("所有进程执行完毕")

管道

管道也是一种进程通信方式,它提供的是一个双向的通信通道,与队列相比,管道更底层,功能也更基础。

工作原理: multiprocessing.Pipe() 返回一个连接对象对 (conn1, conn2),每个连接对象都有 send()recv() 方法。conn1 发送的数据可以被 conn2 接收,反之亦然。

Python进程间通信有哪些常用方式?-图3
(图片来源网络,侵删)

注意事项:

  • 管道是 双向 的,但通信是 半双工 的,即同一时间只能有一个方向的数据传输。
  • recv() 方法在管道为空时被调用,它会 阻塞,直到有数据到达。
  • close() 一个连接端,另一端在 recv() 时会收到 EOFError 异常。

适用场景:

  • 两个进程之间直接、简单的双向通信。
  • 不需要队列的先进先出特性时。

示例代码:

import multiprocessing
def worker(conn):
    """子进程,从管道接收数据并发送回去"""
    print("子进程: 等待接收数据...")
    # 从管道接收数据
    msg = conn.recv()
    print(f"子进程: 收到 '{msg}'")
    # 处理数据并发送回去
    processed_msg = msg.upper()
    print(f"子进程: 发送 '{processed_msg}'")
    conn.send(processed_msg)
    conn.close() # 关闭连接
if __name__ == "__main__":
    # 创建一个管道
    parent_conn, child_conn = multiprocessing.Pipe()
    # 创建子进程,并将管道的一端(child_conn)传给它
    p = multiprocessing.Process(target=worker, args=(child_conn,))
    p.start()
    # 父进程通过管道的另一端(parent_conn)发送数据
    print("父进程: 发送 'hello'")
    parent_conn.send("hello")
    # 父进程接收子进程处理后的数据
    print("父进程: 等待接收数据...")
    reply_msg = parent_conn.recv()
    print(f"父进程: 收到 '{reply_msg}'")
    p.join()
    print("通信结束")

Manager

Manager 提供了一种更高级的进程通信方式,它允许你在不同进程间共享 Python 的复杂对象,如列表、字典、命名空间、信号量、条件变量等。

工作原理: Manager 会创建一个服务器进程,这个进程负责管理所有共享对象,其他进程通过代理来访问这些共享对象,当一个进程修改共享对象时,它会将操作请求发送给管理器服务器,由服务器来实际修改对象,然后将结果返回给请求的进程。

优点:

  • 可以共享非常复杂的数据结构,非常灵活。

缺点:

  • 性能开销巨大,所有对共享对象的操作都需要进程间通信,比直接使用队列或管道慢得多。
  • 容易成为性能瓶颈。

适用场景:

  • 需要在多个进程间共享复杂的数据集合(如一个大列表或一个大字典)。
  • 对性能要求不高的场景。

示例代码:

import multiprocessing
def worker(shared_list, shared_dict):
    """子进程,修改共享的列表和字典"""
    print(f"子进程 ID: {multiprocessing.current_process().pid}")
    # 修改共享列表
    shared_list.append(multiprocessing.current_process().pid)
    print(f"子进程: 共享列表内容: {shared_list}")
    # 修改共享字典
    shared_dict[multiprocessing.current_process().pid] = "worker_value"
    print(f"子进程: 共享字典内容: {shared_dict}")
if __name__ == "__main__":
    # 创建一个 Manager 对象
    with multiprocessing.Manager() as manager:
        # 创建共享的列表和字典
        shared_list = manager.list()
        shared_dict = manager.dict()
        # 创建并启动多个子进程
        processes = []
        for i in range(3):
            p = multiprocessing.Process(target=worker, args=(shared_list, shared_dict))
            processes.append(p)
            p.start()
        # 等待所有子进程完成
        for p in processes:
            p.join()
        # 在主进程中查看最终结果
        print("\n主进程 ID:", multiprocessing.current_process().pid)
        print("最终共享列表:", list(shared_list)) # manager.list 需要转换
        print("最终共享字典:", dict(shared_dict)) # manager.dict 需要转换

共享内存

共享内存是最高效的进程通信方式,它允许多个进程直接读写同一块物理内存区域,无需进行数据拷贝。

工作原理: multiprocessing 模块通过 ValueArray 来提供共享内存功能。

  • multiprocessing.Value(typecode, value): 创建一个共享的变量。
  • multiprocessing.Array(typecode, sequence): 创建一个共享的数组。

注意事项:

  • 必须使用锁!因为多个进程同时读写同一块内存会导致数据竞争和不可预期的结果。ValueArray 对象都内置了 get_lock() 方法来获取锁。
  • 通常用于存储简单的数值类型或字节数组,不适合复杂数据结构。

适用场景:

  • 对性能要求极高的场景。
  • 需要在进程间传递大量或频繁更新的简单数据(如计数器、状态标志等)。

示例代码:

import multiprocessing
import time
def worker(counter, lock):
    """子进程,竞争性地增加计数器"""
    for _ in range(100000):
        # 获取锁
        with lock:
            # 在锁的保护下进行原子操作
            counter.value += 1
    print(f"子进程 {multiprocessing.current_process().name} 完成,当前值: {counter.value}")
if __name__ == "__main__":
    # 创建一个共享的整型变量 'i' (初始值为0)
    # 'd' 表示双精度浮点数,'i' 表示有符号整数
    counter = multiprocessing.Value('i', 0)
    # 创建一个锁
    lock = multiprocessing.Lock()
    # 创建并启动多个子进程
    processes = []
    for i in range(4):
        p = multiprocessing.Process(target=worker, args=(counter, lock))
        processes.append(p)
        p.start()
    # 等待所有子进程完成
    for p in processes:
        p.join()
    # 打印最终结果
    print(f"所有进程执行完毕,最终计数值: {counter.value}")
    # 预期结果是 4 * 100000 = 400000

原语

原语不是用于传递数据,而是用于 进程同步,即控制多个进程的执行顺序,避免竞争条件,它们包括:

  • Lock (锁):确保同一时间只有一个进程能访问共享资源。
  • RLock (可重入锁):允许同一个进程多次获取锁(主要用于递归函数)。
  • Semaphore (信号量):控制同时访问某个资源的进程数量。
  • Event (事件):一个简单的进程间通信机制,一个进程可以设置事件,另一个进程等待事件被设置。
  • Condition (条件变量):允许线程/进程等待某个条件成立,并在条件满足时被通知。

这些原语通常与共享内存或队列结合使用,以确保数据操作的安全性。


总结与对比

通信方式 原理 优点 缺点 适用场景
Queue 基于管道和锁的FIFO队列 简单易用,线程/进程安全,适合生产者-消费者模型 非双向通信,有一定开销 最常用,任务分发、结果收集。
Pipe 双向通信通道 实现两个进程间的直接通信 功能基础,是半双工的,需要手动处理阻塞和关闭 两个进程间简单的双向通信。
Manager 通过管理器服务器进程代理共享对象 极其灵活,可共享任意Python对象 性能开销巨大,易成瓶颈 需要共享复杂对象(如大列表、字典),但对性能要求不高时。
Shared Memory 直接读写同一块物理内存 性能最高,无数据拷贝 必须手动加锁,只适合简单数据类型 对性能要求极高的场景,如共享计数器、状态标志。
Primitives (Lock, etc.) 同步控制机制 保证数据操作的原子性和正确性 不传递数据,只控制执行顺序 必须与共享内存或队列等配合使用,以保证数据安全。

如何选择?

  1. 首选 Queue:在绝大多数情况下,当你需要在进程间传递数据时,multiprocessing.Queue 是最简单、最安全、最合适的选择。
  2. 简单双向通信用 Pipe:如果只是两个进程需要你来我往地聊几句,Pipe 很直接。
  3. 共享复杂数据用 Manager:当你想把一个巨大的列表或字典在多个进程间共享时,Manager 是唯一的选择,但要清楚它的性能代价。
  4. 追求极致性能用 Shared Memory:在性能是唯一考量的场景(如高频更新的计数器),使用 Value/Array + Lock,但要小心处理同步问题。
  5. 同步永远是关键:任何时候当你修改共享数据时,都要考虑是否需要使用 Lock 或其他同步原语来保护它。
分享:
扫描分享到社交APP
上一篇
下一篇