Windows 和 Linux/Unix 在多进程方面有一个关键区别,这直接影响了 Python 的 multiprocessing 模块在它们上的行为。

核心概念:Windows 上的进程创建
在 Windows 系统上,创建新进程(子进程)的唯一方式是重新执行父进程的 Python 脚本,父进程会启动一个全新的 Python 解释器来运行这个脚本。
为了区分父进程和子进程,multiprocessing 模块使用了一个特殊的变量 multiprocessing.current_process(),它会返回一个 Process 对象,这个对象有一个 pid (进程ID) 属性,更重要的是,它有一个 name 属性。
- 父进程:它的
name通常是MainProcess。 - 子进程:它的
name通常是Process-X(Process-1,Process-2)。
multiprocessing 模块通过检查这个 name 来决定是否要创建新的子进程。name 是 MainProcess,它就执行创建子进程的逻辑;否则,它就认为自己是子进程,并开始执行目标函数。
multiprocessing 模块入门
multiprocessing 模块是 Python 标准库的一部分,提供了与 threading 模块类似的 API,但用于进程。

基本步骤
- 导入模块:
from multiprocessing import Process - 定义目标函数:这个函数是子进程要执行的代码。
- 创建进程对象:
Process(target=目标函数, args=(参数1, 参数2, ...)) - 启动进程:
进程对象.start() - 等待进程结束(可选但推荐):
进程对象.join()
示例代码:hello_process.py
import os
import time
from multiprocessing import Process
def worker_function(process_name, duration):
"""子进程要执行的函数"""
pid = os.getpid()
print(f"[{process_name}] 进程 (PID: {pid}) 开始工作...")
time.sleep(duration)
print(f"[{process_name}] 进程 (PID: {pid}) 工作完成,耗时 {duration} 秒。")
if __name__ == '__main__':
print(f"主进程 (PID: {os.getpid()}) 启动。")
# 创建两个子进程
p1 = Process(target=worker_function, args=('Worker-1', 2))
p2 = Process(target=worker_function, args=('Worker-2', 3))
# 启动子进程
p1.start()
p2.start()
print("主进程已启动两个子进程。")
# 等待子进程执行完毕
# 如果没有 join(),主进程会继续执行,甚至可能在子进程结束前就退出了。
p1.join()
p2.join()
print("主进程结束,所有子进程均已退出。")
如何运行
将上述代码保存为 hello_process.py,然后在命令行中运行:
python hello_process.py
预期输出:
主进程 启动。
主进程已启动两个子进程。
[Worker-1] 进程 (PID: 12348) 开始工作...
[Worker-2] 进程 (PID: 12352) 开始工作...
[Worker-1] 进程 (PID: 12348) 工作完成,耗时 2 秒。
[Worker-2] 进程 (PID: 12352) 工作完成,耗时 3 秒。
主进程结束,所有子进程均已退出。
注意:PID (进程ID) 每次运行都会不同。
进程间通信
由于每个进程有自己独立的内存空间,它们不能直接共享变量。multiprocessing 提供了多种机制来实现进程间通信。
a) Queue (队列)
Queue 是一个先进先出的数据结构,是进程间安全传递数据的常用方式。
示例代码:process_queue.py
from multiprocessing import Process, Queue
def put_data(q, data):
"""向队列中放入数据"""
for item in data:
print(f"放入数据: {item}")
q.put(item)
# time.sleep(0.5) # 可以取消注释来观察更清晰
def get_data(q):
"""从队列中取出数据"""
while True:
try:
# 设置超时,避免无限等待
item = q.get(timeout=1)
print(f"取出数据: {item}")
except Exception:
# 如果超时且队列为空,则退出循环
print("队列已空,消费者退出。")
break
if __name__ == '__main__':
# 创建一个Queue对象
q = Queue()
data_to_put = ['apple', 'banana', 'cherry']
# 创建生产者和消费者进程
producer = Process(target=put_data, args=(q, data_to_put))
consumer = Process(target=get_data, args=(q,))
producer.start()
consumer.start()
producer.join() # 等待生产者完成
q.close() # 关闭队列,不再向其中放入数据
consumer.join() # 等待消费者完成
print("主进程结束。")
b) Pipe (管道)
Pipe 创建一对连接对象,代表管道的两端,进程可以通过这些连接对象发送和接收数据。
示例代码:process_pipe.py
from multiprocessing import Process, Pipe
def sender(conn, data):
"""发送数据"""
for item in data:
print(f"发送: {item}")
conn.send(item)
print("发送完毕,关闭连接。")
conn.close()
def receiver(conn):
"""接收数据"""
while True:
try:
item = conn.recv()
print(f"接收: {item}")
except EOFError:
# 当另一端关闭连接时,recv() 会抛出 EOFError
print("接收端检测到连接已关闭。")
break
conn.close()
if __name__ == '__main__':
# 创建一对连接对象
parent_conn, child_conn = Pipe()
data_to_send = [1, 2, 3, 4, 5]
# 创建子进程,只使用 child_conn
p = Process(target=sender, args=(child_conn, data_to_send))
p.start()
# 父进程使用 parent_conn 接收数据
receiver(parent_conn)
p.join()
print("主进程结束。")
c) Manager (管理器)
Manager 提供了一种更高级的共享内存方式,它创建一个服务器进程来管理共享对象(如列表、字典、命名空间等),其他进程可以通过代理访问这些共享对象,这种方式比 Queue 和 Pipe 更灵活,但性能开销也更大。
示例代码:process_manager.py
from multiprocessing import Process, Manager
def worker(shared_list, shared_dict, item):
"""修改共享数据"""
# 修改共享列表
shared_list.append(item)
# 修改共享字典
shared_dict[item] = f"value_for_{item}"
print(f"进程 {os.getpid()} 修改了数据。")
if __name__ == '__main__':
import os
# 创建一个Manager对象
with Manager() as manager:
# 创建共享列表和字典
shared_list = manager.list()
shared_dict = manager.dict()
processes = []
for i in range(5):
p = Process(target=worker, args=(shared_list, shared_dict, f'item-{i}'))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
# 打印最终结果
print("共享列表内容:", list(shared_list))
print("共享字典内容:", dict(shared_dict))
print("Manager 上下文管理器已退出,共享对象被销毁。")
进程池 (Pool)
当你需要创建大量子进程来执行相同的任务时,手动创建和管理每个 Process 对象会很繁琐。multiprocessing.Pool 可以解决这个问题,它为你管理一个进程池。
Pool 提供了方便的方法来分发任务,最常用的是 map() 和 apply_async()。
map() 方法
map() 的行为类似于内置的 map() 函数,它会阻塞直到所有任务完成。
示例代码:process_pool_map.py
from multiprocessing import Pool
import time
def square(n):
"""计算一个数的平方"""
time.sleep(0.5) # 模拟耗时操作
pid = os.getpid()
result = n * n
print(f"进程 {pid} 计算了 {n}^2 = {result}")
return result
if __name__ == '__main__':
import os
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 创建一个包含4个工作进程的进程池
with Pool(processes=4) as pool:
print("开始使用进程池计算...")
# map 会阻塞,直到所有结果都返回
results = pool.map(square, numbers)
print("\n所有计算完成!")
print("最终结果列表:", results)
apply_async() 方法
apply_async() 是非阻塞的,它会立即返回一个 AsyncResult 对象,你可以稍后通过这个对象获取结果,这对于需要并发执行不同任务并收集结果的场景非常有用。
示例代码:process_pool_async.py
from multiprocessing import Pool
import time
def square(n):
"""计算一个数的平方"""
time.sleep(0.5)
pid = os.getpid()
result = n * n
print(f"进程 {pid} 计算了 {n}^2 = {result}")
return result
if __name__ == '__main__':
import os
numbers = [1, 2, 3, 4, 5]
with Pool(processes=3) as pool:
print("开始异步提交任务...")
# 提交所有任务,并立即得到一个AsyncResult对象的列表
results_async = [pool.apply_async(square, (num,)) for num in numbers]
# 主进程可以继续做其他事情...
print("任务已提交,主进程可以执行其他工作。")
time.sleep(1)
# 从AsyncResult对象中获取结果
print("\n开始获取结果...")
results = []
for r in results_async:
# r.get() 会阻塞,直到对应任务完成
results.append(r.get())
print("\n所有计算完成!")
print("最终结果列表:", results)
if __name__ == '__main__': 的绝对重要性
这是在 Windows 上使用 multiprocessing 最最最重要的一点。
为什么必须要有?
如前所述,Windows 上的子进程是通过重新执行父脚本来创建的,如果把这个保护条件去掉,会发生以下情况:
- 父进程启动,执行
if __name__ == '__main__':下的代码,创建子进程。 - 子进程启动,重新执行整个脚本。
- 子进程再次执行
if __name__ == '__main__':下的代码,再次创建新的子进程。 - 这个新创建的子进程又会再次执行脚本,创建更多的子进程...
- 这个过程会无限递归下去,导致进程数量爆炸式增长,最终耗尽系统资源,程序崩溃。
if __name__ == '__main__': 的作用是判断这个脚本是否是“主程序入口”,当脚本被直接运行时,__name__ 的值是 '__main__';当脚本被其他模块导入时,__name__ 的值是模块名,这样就确保了创建子进程的代码只在父进程中执行一次。
总结与最佳实践
- 始终使用
if __name__ == '__main__'::这是 Windows 多进程编程的铁律,可以防止无限递归创建进程。 - 选择合适的通信方式:
- 简单的数据交换:用
Queue。 - 两点之间的双向通信:用
Pipe。 - 需要共享复杂对象(如列表、字典):用
Manager(但要考虑性能开销)。
- 简单的数据交换:用
- 对于批量任务,优先使用
Pool:它能简化任务管理和结果收集,比手动创建多个Process对象更高效。 - 使用
join():在主进程中调用join()可以确保主进程在所有子任务完成后再退出,这对于需要等待结果的程序至关重要。 - 注意资源释放:使用
with语句(上下文管理器)来管理Pool和Manager,可以确保它们被正确关闭和清理,避免资源泄漏。
