杰瑞科技汇

Windows下Python多进程如何高效实现?

Windows 和 Linux/Unix 在多进程方面有一个关键区别,这直接影响了 Python 的 multiprocessing 模块在它们上的行为。

Windows下Python多进程如何高效实现?-图1
(图片来源网络,侵删)

核心概念:Windows 上的进程创建

在 Windows 系统上,创建新进程(子进程)的唯一方式是重新执行父进程的 Python 脚本,父进程会启动一个全新的 Python 解释器来运行这个脚本。

为了区分父进程和子进程,multiprocessing 模块使用了一个特殊的变量 multiprocessing.current_process(),它会返回一个 Process 对象,这个对象有一个 pid (进程ID) 属性,更重要的是,它有一个 name 属性。

  • 父进程:它的 name 通常是 MainProcess
  • 子进程:它的 name 通常是 Process-XProcess-1, Process-2)。

multiprocessing 模块通过检查这个 name 来决定是否要创建新的子进程。nameMainProcess,它就执行创建子进程的逻辑;否则,它就认为自己是子进程,并开始执行目标函数。


multiprocessing 模块入门

multiprocessing 模块是 Python 标准库的一部分,提供了与 threading 模块类似的 API,但用于进程。

Windows下Python多进程如何高效实现?-图2
(图片来源网络,侵删)

基本步骤

  1. 导入模块from multiprocessing import Process
  2. 定义目标函数:这个函数是子进程要执行的代码。
  3. 创建进程对象Process(target=目标函数, args=(参数1, 参数2, ...))
  4. 启动进程进程对象.start()
  5. 等待进程结束(可选但推荐)进程对象.join()

示例代码:hello_process.py

import os
import time
from multiprocessing import Process
def worker_function(process_name, duration):
    """子进程要执行的函数"""
    pid = os.getpid()
    print(f"[{process_name}] 进程 (PID: {pid}) 开始工作...")
    time.sleep(duration)
    print(f"[{process_name}] 进程 (PID: {pid}) 工作完成,耗时 {duration} 秒。")
if __name__ == '__main__':
    print(f"主进程 (PID: {os.getpid()}) 启动。")
    # 创建两个子进程
    p1 = Process(target=worker_function, args=('Worker-1', 2))
    p2 = Process(target=worker_function, args=('Worker-2', 3))
    # 启动子进程
    p1.start()
    p2.start()
    print("主进程已启动两个子进程。")
    # 等待子进程执行完毕
    # 如果没有 join(),主进程会继续执行,甚至可能在子进程结束前就退出了。
    p1.join()
    p2.join()
    print("主进程结束,所有子进程均已退出。")

如何运行

将上述代码保存为 hello_process.py,然后在命令行中运行:

python hello_process.py

预期输出:

主进程 启动。
主进程已启动两个子进程。
[Worker-1] 进程 (PID: 12348) 开始工作...
[Worker-2] 进程 (PID: 12352) 开始工作...
[Worker-1] 进程 (PID: 12348) 工作完成,耗时 2 秒。
[Worker-2] 进程 (PID: 12352) 工作完成,耗时 3 秒。
主进程结束,所有子进程均已退出。

注意:PID (进程ID) 每次运行都会不同。


进程间通信

由于每个进程有自己独立的内存空间,它们不能直接共享变量。multiprocessing 提供了多种机制来实现进程间通信。

a) Queue (队列)

Queue 是一个先进先出的数据结构,是进程间安全传递数据的常用方式。

示例代码:process_queue.py

from multiprocessing import Process, Queue
def put_data(q, data):
    """向队列中放入数据"""
    for item in data:
        print(f"放入数据: {item}")
        q.put(item)
        # time.sleep(0.5) # 可以取消注释来观察更清晰
def get_data(q):
    """从队列中取出数据"""
    while True:
        try:
            # 设置超时,避免无限等待
            item = q.get(timeout=1)
            print(f"取出数据: {item}")
        except Exception:
            # 如果超时且队列为空,则退出循环
            print("队列已空,消费者退出。")
            break
if __name__ == '__main__':
    # 创建一个Queue对象
    q = Queue()
    data_to_put = ['apple', 'banana', 'cherry']
    # 创建生产者和消费者进程
    producer = Process(target=put_data, args=(q, data_to_put))
    consumer = Process(target=get_data, args=(q,))
    producer.start()
    consumer.start()
    producer.join()  # 等待生产者完成
    q.close()       # 关闭队列,不再向其中放入数据
    consumer.join() # 等待消费者完成
    print("主进程结束。")

b) Pipe (管道)

Pipe 创建一对连接对象,代表管道的两端,进程可以通过这些连接对象发送和接收数据。

示例代码:process_pipe.py

from multiprocessing import Process, Pipe
def sender(conn, data):
    """发送数据"""
    for item in data:
        print(f"发送: {item}")
        conn.send(item)
    print("发送完毕,关闭连接。")
    conn.close()
def receiver(conn):
    """接收数据"""
    while True:
        try:
            item = conn.recv()
            print(f"接收: {item}")
        except EOFError:
            # 当另一端关闭连接时,recv() 会抛出 EOFError
            print("接收端检测到连接已关闭。")
            break
    conn.close()
if __name__ == '__main__':
    # 创建一对连接对象
    parent_conn, child_conn = Pipe()
    data_to_send = [1, 2, 3, 4, 5]
    # 创建子进程,只使用 child_conn
    p = Process(target=sender, args=(child_conn, data_to_send))
    p.start()
    # 父进程使用 parent_conn 接收数据
    receiver(parent_conn)
    p.join()
    print("主进程结束。")

c) Manager (管理器)

Manager 提供了一种更高级的共享内存方式,它创建一个服务器进程来管理共享对象(如列表、字典、命名空间等),其他进程可以通过代理访问这些共享对象,这种方式比 QueuePipe 更灵活,但性能开销也更大。

示例代码:process_manager.py

from multiprocessing import Process, Manager
def worker(shared_list, shared_dict, item):
    """修改共享数据"""
    # 修改共享列表
    shared_list.append(item)
    # 修改共享字典
    shared_dict[item] = f"value_for_{item}"
    print(f"进程 {os.getpid()} 修改了数据。")
if __name__ == '__main__':
    import os
    # 创建一个Manager对象
    with Manager() as manager:
        # 创建共享列表和字典
        shared_list = manager.list()
        shared_dict = manager.dict()
        processes = []
        for i in range(5):
            p = Process(target=worker, args=(shared_list, shared_dict, f'item-{i}'))
            processes.append(p)
            p.start()
        # 等待所有进程完成
        for p in processes:
            p.join()
        # 打印最终结果
        print("共享列表内容:", list(shared_list))
        print("共享字典内容:", dict(shared_dict))
    print("Manager 上下文管理器已退出,共享对象被销毁。")

进程池 (Pool)

当你需要创建大量子进程来执行相同的任务时,手动创建和管理每个 Process 对象会很繁琐。multiprocessing.Pool 可以解决这个问题,它为你管理一个进程池。

Pool 提供了方便的方法来分发任务,最常用的是 map()apply_async()

map() 方法

map() 的行为类似于内置的 map() 函数,它会阻塞直到所有任务完成。

示例代码:process_pool_map.py

from multiprocessing import Pool
import time
def square(n):
    """计算一个数的平方"""
    time.sleep(0.5) # 模拟耗时操作
    pid = os.getpid()
    result = n * n
    print(f"进程 {pid} 计算了 {n}^2 = {result}")
    return result
if __name__ == '__main__':
    import os
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    # 创建一个包含4个工作进程的进程池
    with Pool(processes=4) as pool:
        print("开始使用进程池计算...")
        # map 会阻塞,直到所有结果都返回
        results = pool.map(square, numbers)
    print("\n所有计算完成!")
    print("最终结果列表:", results)

apply_async() 方法

apply_async() 是非阻塞的,它会立即返回一个 AsyncResult 对象,你可以稍后通过这个对象获取结果,这对于需要并发执行不同任务并收集结果的场景非常有用。

示例代码:process_pool_async.py

from multiprocessing import Pool
import time
def square(n):
    """计算一个数的平方"""
    time.sleep(0.5)
    pid = os.getpid()
    result = n * n
    print(f"进程 {pid} 计算了 {n}^2 = {result}")
    return result
if __name__ == '__main__':
    import os
    numbers = [1, 2, 3, 4, 5]
    with Pool(processes=3) as pool:
        print("开始异步提交任务...")
        # 提交所有任务,并立即得到一个AsyncResult对象的列表
        results_async = [pool.apply_async(square, (num,)) for num in numbers]
        # 主进程可以继续做其他事情...
        print("任务已提交,主进程可以执行其他工作。")
        time.sleep(1)
        # 从AsyncResult对象中获取结果
        print("\n开始获取结果...")
        results = []
        for r in results_async:
            # r.get() 会阻塞,直到对应任务完成
            results.append(r.get())
    print("\n所有计算完成!")
    print("最终结果列表:", results)

if __name__ == '__main__': 的绝对重要性

这是在 Windows 上使用 multiprocessing 最最最重要的一点。

为什么必须要有?

如前所述,Windows 上的子进程是通过重新执行父脚本来创建的,如果把这个保护条件去掉,会发生以下情况:

  1. 父进程启动,执行 if __name__ == '__main__': 下的代码,创建子进程。
  2. 子进程启动,重新执行整个脚本。
  3. 子进程再次执行 if __name__ == '__main__': 下的代码,再次创建新的子进程
  4. 这个新创建的子进程又会再次执行脚本,创建更多的子进程...
  5. 这个过程会无限递归下去,导致进程数量爆炸式增长,最终耗尽系统资源,程序崩溃。

if __name__ == '__main__': 的作用是判断这个脚本是否是“主程序入口”,当脚本被直接运行时,__name__ 的值是 '__main__';当脚本被其他模块导入时,__name__ 的值是模块名,这样就确保了创建子进程的代码只在父进程中执行一次。


总结与最佳实践

  1. 始终使用 if __name__ == '__main__'::这是 Windows 多进程编程的铁律,可以防止无限递归创建进程。
  2. 选择合适的通信方式
    • 简单的数据交换:用 Queue
    • 两点之间的双向通信:用 Pipe
    • 需要共享复杂对象(如列表、字典):用 Manager(但要考虑性能开销)。
  3. 对于批量任务,优先使用 Pool:它能简化任务管理和结果收集,比手动创建多个 Process 对象更高效。
  4. 使用 join():在主进程中调用 join() 可以确保主进程在所有子任务完成后再退出,这对于需要等待结果的程序至关重要。
  5. 注意资源释放:使用 with 语句(上下文管理器)来管理 PoolManager,可以确保它们被正确关闭和清理,避免资源泄漏。
分享:
扫描分享到社交APP
上一篇
下一篇