下面我将从简单到高级,介绍几种最主流和高效的方法,并解释它们的适用场景。

场景设定
假设我们有两个需要被调用的 Python 脚本:
worker.py- 这是一个模拟耗时任务的脚本。main.py- 这是主脚本,负责调用worker.py。
worker.py 的内容:
# worker.py
import time
import random
import sys
# 从命令行参数接收任务ID和运行时间
task_id = sys.argv[1]
duration = int(sys.argv[2])
print(f"[任务 {task_id}] 开始,预计运行 {duration} 秒...")
time.sleep(duration)
print(f"[任务 {task_id}] 已完成!")
使用 subprocess 模块(最灵活、最通用)
subprocess 是 Python 的标准库,用于创建子进程,它功能强大,可以让你像在命令行中一样运行外部命令,并获取其输出、返回码等。
串行调用(顺序执行)
这是最简单的方式,主脚本会等待每个子脚本完全执行完毕后再继续。

# main_sequential.py
import subprocess
import time
start_time = time.time()
# 定义要运行的任务列表
tasks = [
("python", "worker.py", "A", "3"),
("python", "worker.py", "B", "5"),
("python", "worker.py", "C", "2"),
]
print("--- 开始串行执行 ---")
for task in tasks:
# subprocess.run 会阻塞,直到子进程结束
print(f"正在启动任务: {' '.join(task)}")
subprocess.run(task)
end_time = time.time()
print(f"\n--- 所有任务串行执行完毕,总耗时: {end_time - start_time:.2f} 秒 ---")
运行结果分析: 总耗时大约是 3 + 5 + 2 = 10 秒,任务 A、B、C 按顺序一个接一个地执行。
并行调用(并发执行)
为了实现并行,我们需要在后台启动每个任务,并让主程序继续执行,而不是等待它们。subprocess.Popen 是 subprocess.run 的非阻塞版本,非常适合这个场景。
# main_parallel_subprocess.py
import subprocess
import time
start_time = time.time()
tasks = [
("python", "worker.py", "A", "3"),
("python", "worker.py", "B", "5"),
("python", "worker.py", "C", "2"),
]
print("--- 开始并行执行 ---")
processes = []
# 启动所有子进程
for task in tasks:
print(f"在后台启动任务: {' '.join(task)}")
# Popen是非阻塞的,它会立即返回
p = subprocess.Popen(task)
processes.append(p)
# 等待所有子进程完成
for p in processes:
p.wait() # 阻塞,直到对应的进程结束
end_time = time.time()
print(f"\n--- 所有任务并行执行完毕,总耗时: {end_time - start_time:.2f} 秒 ---")
运行结果分析:
总耗时大约是 5 秒(因为最长的任务是 B,耗时 5 秒),任务 A 和 C 会在 B 运行期间同时进行。p.wait() 确保主脚本在所有子任务都结束后才退出。
使用 multiprocessing 模块(Pythonic 的并行方式)
如果你所有的逻辑都在一个 Python 项目中,并且需要共享内存或进行复杂的进程间通信,multiprocessing 是更 Pythonic 的选择,它专门为在多核 CPU 上并行计算而设计。

创建并运行进程池
multiprocessing.Pool 可以轻松地管理一个进程池,自动分配任务。
# main_multiprocessing.py
import multiprocessing
import time
import os
# 这个函数将在每个子进程中执行
def run_worker(task_args):
"""
task_args 是一个元组,('A', '3')
"""
task_id, duration_str = task_args
duration = int(duration_str)
# 每个进程都有自己的内存空间
process_id = os.getpid()
print(f"[进程 {process_id}] 任务 {task_id} 开始,预计运行 {duration} 秒...")
time.sleep(duration)
print(f"[进程 {process_id}] 任务 {task_id} 已完成!")
return f"任务 {task_id} 的结果"
if __name__ == "__main__":
# Windows 系统需要这个 if __name__ == "__main__" 保护
# 防止子进程无限递归地重新导入主模块
start_time = time.time()
tasks = [
("A", "3"),
("B", "5"),
("C", "2"),
]
print("--- 使用 multiprocessing.Pool 开始并行执行 ---")
# 创建一个包含 3 个进程的池
with multiprocessing.Pool(processes=3) as pool:
# pool.map 会阻塞,直到所有任务完成
# 它会自动将任务列表分配给进程池中的进程
results = pool.map(run_worker, tasks)
print("\n--- 所有任务执行完毕 ---")
print("收集到的结果:", results)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")
运行结果分析:
multiprocessing.Pool 会创建指定数量的进程(这里是3个),并将 tasks 列表中的任务分配给这些进程执行,总耗时同样由最长的任务决定(约5秒)。pool.map 会等待所有任务完成并收集它们返回的结果。
使用 asyncio 模块(I/O 密集型任务的最佳选择)
如果你的任务是I/O密集型的(网络请求、文件读写、数据库查询),asyncio 是最佳选择,它使用单线程和事件循环来实现并发,比创建多个进程或线程的开销要小得多。
注意: asyncio 不能加速 CPU 密集型计算(如 worker.py 中的 time.sleep 模拟的是 I/O 等待,所以可以用),如果你的 worker.py 里是大量的数学计算,asyncio 就不合适。
# main_asyncio.py
import asyncio
import time
# async/await 语法是 asyncio 的核心
async def run_worker_async(task_id, duration):
print(f"[任务 {task_id}] 开始,预计运行 {duration} 秒...")
# asyncio.sleep 是一个非阻塞的协程,它会让出控制权给事件循环
await asyncio.sleep(duration)
print(f"[任务 {task_id}] 已完成!")
return f"任务 {task_id} 的结果"
async def main():
start_time = time.time()
tasks = [
("A", 3),
("B", 5),
("C", 2),
]
print("--- 使用 asyncio 开始并发执行 ---")
# 创建一个任务列表
coroutines = [run_worker_async(task_id, dur) for task_id, dur in tasks]
# asyncio.gather 会并发地运行所有协程,并等待它们全部完成
results = await asyncio.gather(*coroutines)
print("\n--- 所有任务执行完毕 ---")
print("收集到的结果:", results)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())
运行结果分析:
asyncio 的运行结果和 multiprocessing 类似,总耗时约5秒,但底层机制完全不同:它在一个线程内通过事件循环快速切换任务,当一个任务在等待 asyncio.sleep(I/O操作)时,它会去执行其他任务,从而实现并发。
总结与如何选择
| 方法 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
subprocess |
- 调用外部程序、脚本、命令行工具 - 需要完全隔离的执行环境 - 任务之间不需要共享数据 |
- 最灵活,可以调用任何外部程序 - 与操作系统交互能力强 |
- 进程间通信较复杂(需通过文件、管道等) - 启动新进程开销较大 |
multiprocessing |
- CPU 密集型任务(科学计算、数据处理) - 任务逻辑复杂,用 Python 编写 - 需要在进程间共享数据(通过 Queue, Pipe, Manager) |
- Python 原生,易于使用 - 充分利用多核 CPU - 进程间通信机制成熟 |
- 进程创建和销毁开销大 - 共享内存数据需要特殊处理 |
asyncio |
- I/O 密集型任务(网络请求、文件读写、数据库操作) - 需要处理大量并发连接 |
- 轻量级,协程切换开销极小 - 单线程即可实现高并发 - 代码结构清晰( async/await) |
- 不适用于 CPU 密集型任务 - 需要重构为异步模型,传统阻塞代码不兼容 - 调试可能更复杂 |
简单决策流程:
-
我是不是在调用一个独立的
.py文件,或者一个命令行工具(如ping,ffmpeg)?- 是 -> 使用
subprocess,这是最直接、最安全的方式。 - 否 -> 进入下一步。
- 是 -> 使用
-
我的任务是计算密集型(比如做大量数学运算)还是 I/O 密集型(比如发很多网络请求)?
- 计算密集型 -> 使用
multiprocessing,它能真正利用你的多核 CPU。 - I/O 密集型 -> 使用
asyncio,它在处理大量等待任务时效率更高,资源占用更少。
- 计算密集型 -> 使用
-
我的任务是否需要共享内存或复杂的数据结构?
- 是 ->
multiprocessing提供了更成熟的进程间通信工具(如Manager)。 - 否 ->
asyncio的协程模型更简单,但如果任务本身是 CPU 密集的,还是首选multiprocessing。
- 是 ->
对于你最初的问题“python同时调用多个python”,如果这些“python”是独立的脚本,subprocess.Popen 是最标准、最通用的答案,如果它们是同一个项目中的不同模块,并且任务是计算型的,multiprocessing.Pool 则是更优雅的选择。
