在 Windows 上使用 multiprocessing 与在 Linux/macOS 上有一些关键区别,理解这些区别是成功实现多进程的关键。

核心概念:为什么 Windows 上的多进程不同?
在 Linux/macOS 等类 Unix 系统上,fork() 系统调用会创建一个子进程,这个子进程是父进程的一个完整副本,它继承了父进程的整个内存空间、文件描述符等,这使得在子进程中导入模块、创建变量等操作变得非常自然。
Windows 没有原生的 fork() 系统调用,它使用 spawn 方法来创建进程。spawn 的工作方式是:
- 启动一个全新的 Python 解释器(子进程)。
- 父进程会导入
multiprocessing模块,并寻找由if __name__ == '__main__':保护的可执行代码块。 - 父进程将这个代码块中的必要信息(比如目标函数、参数)传递给子进程。
- 子进程启动后,重新执行整个 Python 脚本。
- 当子进程执行到
if __name__ == '__main__':时,它只会执行块内的代码,也就是创建并启动子任务。
这个机制带来了两个直接后果:
- 必须用
if __name__ == '__main__':保护主程序入口:这是最重要的规则,否则,在子进程被创建时,它会无限递归地创建新的子进程,导致程序崩溃。 - 子进程是“干净”的启动:子进程不会继承父进程的全局变量或已导入的模块(除非它们是
multiprocessing需要序列化并传递的),所有必要的导入和数据都必须在if __name__ == '__main__':块内部或由被调用的函数内部完成。
主要模块和类
multiprocessing 模块提供了多种方式来创建和管理进程。

multiprocessing.Process
这是最基本的类,用于创建和控制一个单独的进程。
示例:
import multiprocessing
import time
import os
# worker 函数,将在子进程中运行
def worker(name):
"""子进程要执行的函数"""
pid = os.getpid()
print(f"子进程 {name} (PID: {pid}) 正在运行...")
time.sleep(2) # 模拟耗时任务
print(f"子进程 {name} (PID: {pid}) 已完成。")
if __name__ == '__main__':
print(f"主进程 (PID: {os.getpid()}) 正在启动子进程...")
# 创建一个 Process 对象
# target 是子进程要执行的函数
# args 是传递给 target 函数的参数元组
p = multiprocessing.Process(target=worker, args=('一号',))
# 启动子进程
p.start()
# 主进程继续执行自己的任务
print("主进程正在做其他事情...")
time.sleep(1)
# 等待子进程执行完毕 (非常重要!)
# 如果没有 join(),主进程可能会在子进程完成前就退出
p.join()
print("主进程已结束。")
运行结果:
主进程 (PID: 1234) 正在启动子进程...
主进程正在做其他事情...
子进程 一号 (PID: 5678) 正在运行...
子进程 一号 (PID: 5678) 已完成。
主进程已结束。
multiprocessing.Pool
Pool 是一个进程池,当你需要并行执行大量相似的任务时,它非常有用,它可以自动管理进程的创建、销毁和任务分配。

Pool 提供了多种方法来分配任务:
pool.apply(func, args): 阻塞式,一个一个地执行任务,相当于串行。pool.apply_async(func, args): 非阻塞式,异步提交任务,会立即返回一个AsyncResult对象,你需要手动调用get()方法来获取结果。pool.map(func, iterable): 阻塞式,将iterable中的每个元素作为参数,并行执行func,并按原始顺序返回结果列表。pool.map_async(func, iterable): 非阻塞式的map,返回一个AsyncResult对象。
示例:使用 Pool 进行并行计算
import multiprocessing
import time
# 一个简单的计算函数
def square(n):
"""计算一个数的平方,并模拟一些计算耗时"""
time.sleep(0.1) # 模拟耗时操作
result = n * n
print(f"计算 {n} 的平方结果是 {result}")
return result
if __name__ == '__main__':
numbers = range(1, 11) # 1 到 10
print("使用进程池并行计算...")
# 创建一个包含 4 个进程的进程池
with multiprocessing.Pool(processes=4) as pool:
# 使用 map_async 提交任务
# 它会立即返回一个AsyncResult对象
result_objects = pool.map_async(square, numbers)
# 主进程可以做其他事情...
print("主进程在等待子任务完成...")
# 从 result_objects 中获取所有结果
# 这一步是阻塞的,会等待所有任务完成
results = result_objects.get()
print("\n所有任务完成!")
print(f"最终结果列表: {results}")
运行结果(顺序可能不同,但结果一致):
使用进程池并行计算...
主进程在等待子任务完成...
计算 1 的平方结果是 1
计算 2 的平方结果是 4
计算 3 的平方结果是 9
计算 4 的平方结果是 16
计算 5 的平方结果是 25
计算 6 的平方结果是 36
计算 7 的平方结果是 49
计算 8 的平方结果是 64
计算 9 的平方结果是 81
计算 10 的平方结果是 100
所有任务完成!
最终结果列表: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
进程间通信
由于每个进程都有自己独立的内存空间,它们不能直接共享变量。multiprocessing 提供了几种机制来实现进程间通信。
Queue (队列)
Queue 是一个先进先出的数据结构,非常适合在不同进程之间安全地传递数据。
示例:生产者-消费者模型
import multiprocessing
import time
import random
# 生产者函数
def producer(queue, count):
for i in range(count):
item = random.randint(1, 100)
print(f"生产者: 放入项目 {item}")
queue.put(item)
time.sleep(0.5)
print("生产者: 放入一个哨兵值 (None) 来结束。")
queue.put(None) # 发送结束信号
# 消费者函数
def consumer(queue):
while True:
item = queue.get() # 阻塞,直到队列中有数据
if item is None: # 收到哨兵值,退出循环
print("消费者: 收到哨兵值,退出。")
break
print(f"消费者: 消费项目 {item}")
time.sleep(1) # 模拟处理时间
if __name__ == '__main__':
# 创建一个 Queue 对象
# 注意:Queue 对象必须在 if __name__ == '__main__': 内部创建
# 并通过参数传递给子进程
task_queue = multiprocessing.Queue()
# 创建并启动生产者和消费者进程
p1 = multiprocessing.Process(target=producer, args=(task_queue, 5))
p2 = multiprocessing.Process(target=consumer, args=(task_queue,))
p1.start()
p2.start()
# 等待两个进程都结束
p1.join()
p2.join()
print("主进程结束。")
Pipe (管道)
Pipe 创建一对连接对象,代表管道的两端,通常用于两个进程之间的双向通信。
示例:使用 Pipe
import multiprocessing
def worker(conn):
"""子进程,从管道一端接收数据并发送回去"""
print("子进程: 等待接收数据...")
msg = conn.recv()
print(f"子进程: 收到 '{msg}'")
response = f"你好,主进程!我已收到你的消息: {msg}"
conn.send(response)
print("子进程: 已发送响应。")
conn.close() # 关闭连接
if __name__ == '__main__':
# 创建一个管道,返回 (conn1, conn2)
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=worker, args=(child_conn,))
p.start()
# 主进程通过 parent_conn 发送和接收数据
parent_conn.send("你好,子进程!")
print("主进程: 已发送消息,等待响应...")
response = parent_conn.recv()
print(f"主进程: 收到响应 '{response}'")
p.join()
print("主进程结束。")
Manager (管理器)
Manager 提供了一种创建可在多个进程之间共享的列表、字典、Value、Array 等高级数据结构的方式,它通过一个服务器进程来管理这些共享对象,其他进程通过代理来访问,这种方式比 Queue 更灵活,但性能开销也更大。
示例:共享字典
import multiprocessing
# 工作函数,修改共享字典
def worker(shared_dict, key, value):
print(f"子进程 {multiprocessing.current_process().name} 正在修改字典...")
shared_dict[key] = value
print(f"子进程 {multiprocessing.current_process().name} 修改完成。")
if __name__ == '__main__':
# 创建一个 Manager 对象
with multiprocessing.Manager() as manager:
# 使用 manager 创建一个可被所有进程共享的字典
shared_dict = manager.dict()
processes = []
# 创建并启动多个进程,它们都操作同一个字典
for i in range(5):
p = multiprocessing.Process(target=worker, args=(shared_dict, f'key_{i}', f'value_{i}'))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
# 所有进程完成后,打印最终的共享字典
print("\n所有子进程执行完毕,最终共享字典内容:")
print(shared_dict)
Windows 下的最佳实践和常见陷阱
-
if __name__ == '__main__':是铁律:再次强调,这是在 Windows 上使用multiprocessing的第一原则,忘记它,程序几乎必然会失败。 -
代码结构:将你的主逻辑(创建进程、启动任务)都放在
if __name__ == '__main__':块内,将需要在子进程中运行的函数(worker)定义在块外。 -
序列化问题:在 Windows 的
spawn模式下,传递给子进程的函数和参数必须是可“pickle”的(序列化),大部分 Python 对象都可以,但有一些例外,如 lambda 函数、文件句柄、某些类的实例等,如果遇到pickle错误,通常是因为你试图传递一个不可序列化的对象。 -
调试:多程序调试比单进程调试困难得多,如果子进程出错,你可能只会看到一些模糊的错误信息,建议在子进程函数中加入大量的
print语句来跟踪其执行流程。 -
性能考量:创建进程是有开销的,如果你只是执行大量非常小的任务,进程创建和销毁的时间可能会超过任务本身执行的时间,在这种情况下,
multiprocessing.Pool会自动重用进程,是更好的选择,对于任务量少但每个任务耗时很长的情况,Process更为直接。
替代方案:concurrent.futures.ProcessPoolExecutor
concurrent.futures 模块提供了一个更高级、更现代的接口来管理并行任务,它封装了 multiprocessing 的复杂性,使得代码更简洁。
示例:使用 ProcessPoolExecutor
import concurrent.futures
import time
def square(n):
time.sleep(0.1)
return n * n
if __name__ == '__main__':
numbers = range(1, 11)
print("使用 ProcessPoolExecutor 并行计算...")
# 使用 with 语句可以自动管理进程池的创建和销毁
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
# executor.map 会像内置的 map 一样工作,但它是并行执行的
results = list(executor.map(square, numbers))
print("\n所有任务完成!")
print(f"最终结果列表: {results}")
这个示例的代码量比使用 multiprocessing.Pool 更少,功能也类似,对于很多应用场景,ProcessPoolExecutor 是更推荐的选择。
| 特性 | 描述 | Windows 注意事项 |
|---|---|---|
| 创建进程方式 | spawn |
必须用 if __name__ == '__main__': 保护主入口 |
| 核心模块 | multiprocessing |
理解 Process 和 Pool 的区别 |
| 进程间通信 | Queue, Pipe, Manager |
Manager 提供灵活但低效的共享数据结构 |
| 现代接口 | concurrent.futures.ProcessPoolExecutor |
强烈推荐,代码更简洁,易于使用 |
在 Windows 上编写多进程 Python 程序,只要牢记 if __name__ == '__main__': 的规则,并根据任务类型选择合适的工具(Process、Pool 或 ProcessPoolExecutor),就可以顺利地利用多核 CPU 提升程序性能。
