杰瑞科技汇

Python如何同时调用多个Python进程?

下面我将从简单到高级,介绍几种最主流和高效的方法,并解释它们的适用场景。

Python如何同时调用多个Python进程?-图1
(图片来源网络,侵删)

场景设定

假设我们有两个需要被调用的 Python 脚本:

  1. worker.py - 这是一个模拟耗时任务的脚本。
  2. main.py - 这是主脚本,负责调用 worker.py

worker.py 的内容:

# worker.py
import time
import random
import sys
# 从命令行参数接收任务ID和运行时间
task_id = sys.argv[1]
duration = int(sys.argv[2])
print(f"[任务 {task_id}] 开始,预计运行 {duration} 秒...")
time.sleep(duration)
print(f"[任务 {task_id}] 已完成!")

使用 subprocess 模块(最灵活、最通用)

subprocess 是 Python 的标准库,用于创建子进程,它功能强大,可以让你像在命令行中一样运行外部命令,并获取其输出、返回码等。

串行调用(顺序执行)

这是最简单的方式,主脚本会等待每个子脚本完全执行完毕后再继续。

Python如何同时调用多个Python进程?-图2
(图片来源网络,侵删)
# main_sequential.py
import subprocess
import time
start_time = time.time()
# 定义要运行的任务列表
tasks = [
    ("python", "worker.py", "A", "3"),
    ("python", "worker.py", "B", "5"),
    ("python", "worker.py", "C", "2"),
]
print("--- 开始串行执行 ---")
for task in tasks:
    # subprocess.run 会阻塞,直到子进程结束
    print(f"正在启动任务: {' '.join(task)}")
    subprocess.run(task)
end_time = time.time()
print(f"\n--- 所有任务串行执行完毕,总耗时: {end_time - start_time:.2f} 秒 ---")

运行结果分析: 总耗时大约是 3 + 5 + 2 = 10 秒,任务 A、B、C 按顺序一个接一个地执行。

并行调用(并发执行)

为了实现并行,我们需要在后台启动每个任务,并让主程序继续执行,而不是等待它们。subprocess.Popensubprocess.run 的非阻塞版本,非常适合这个场景。

# main_parallel_subprocess.py
import subprocess
import time
start_time = time.time()
tasks = [
    ("python", "worker.py", "A", "3"),
    ("python", "worker.py", "B", "5"),
    ("python", "worker.py", "C", "2"),
]
print("--- 开始并行执行 ---")
processes = []
# 启动所有子进程
for task in tasks:
    print(f"在后台启动任务: {' '.join(task)}")
    # Popen是非阻塞的,它会立即返回
    p = subprocess.Popen(task)
    processes.append(p)
# 等待所有子进程完成
for p in processes:
    p.wait() # 阻塞,直到对应的进程结束
end_time = time.time()
print(f"\n--- 所有任务并行执行完毕,总耗时: {end_time - start_time:.2f} 秒 ---")

运行结果分析: 总耗时大约是 5 秒(因为最长的任务是 B,耗时 5 秒),任务 A 和 C 会在 B 运行期间同时进行。p.wait() 确保主脚本在所有子任务都结束后才退出。


使用 multiprocessing 模块(Pythonic 的并行方式)

如果你所有的逻辑都在一个 Python 项目中,并且需要共享内存或进行复杂的进程间通信,multiprocessing 是更 Pythonic 的选择,它专门为在多核 CPU 上并行计算而设计。

Python如何同时调用多个Python进程?-图3
(图片来源网络,侵删)

创建并运行进程池

multiprocessing.Pool 可以轻松地管理一个进程池,自动分配任务。

# main_multiprocessing.py
import multiprocessing
import time
import os
# 这个函数将在每个子进程中执行
def run_worker(task_args):
    """
    task_args 是一个元组,('A', '3')
    """
    task_id, duration_str = task_args
    duration = int(duration_str)
    # 每个进程都有自己的内存空间
    process_id = os.getpid()
    print(f"[进程 {process_id}] 任务 {task_id} 开始,预计运行 {duration} 秒...")
    time.sleep(duration)
    print(f"[进程 {process_id}] 任务 {task_id} 已完成!")
    return f"任务 {task_id} 的结果"
if __name__ == "__main__":
    # Windows 系统需要这个 if __name__ == "__main__" 保护
    # 防止子进程无限递归地重新导入主模块
    start_time = time.time()
    tasks = [
        ("A", "3"),
        ("B", "5"),
        ("C", "2"),
    ]
    print("--- 使用 multiprocessing.Pool 开始并行执行 ---")
    # 创建一个包含 3 个进程的池
    with multiprocessing.Pool(processes=3) as pool:
        # pool.map 会阻塞,直到所有任务完成
        # 它会自动将任务列表分配给进程池中的进程
        results = pool.map(run_worker, tasks)
    print("\n--- 所有任务执行完毕 ---")
    print("收集到的结果:", results)
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f} 秒")

运行结果分析: multiprocessing.Pool 会创建指定数量的进程(这里是3个),并将 tasks 列表中的任务分配给这些进程执行,总耗时同样由最长的任务决定(约5秒)。pool.map 会等待所有任务完成并收集它们返回的结果。


使用 asyncio 模块(I/O 密集型任务的最佳选择)

如果你的任务是I/O密集型的(网络请求、文件读写、数据库查询),asyncio 是最佳选择,它使用单线程和事件循环来实现并发,比创建多个进程或线程的开销要小得多。

注意: asyncio 不能加速 CPU 密集型计算(如 worker.py 中的 time.sleep 模拟的是 I/O 等待,所以可以用),如果你的 worker.py 里是大量的数学计算,asyncio 就不合适。

# main_asyncio.py
import asyncio
import time
# async/await 语法是 asyncio 的核心
async def run_worker_async(task_id, duration):
    print(f"[任务 {task_id}] 开始,预计运行 {duration} 秒...")
    # asyncio.sleep 是一个非阻塞的协程,它会让出控制权给事件循环
    await asyncio.sleep(duration)
    print(f"[任务 {task_id}] 已完成!")
    return f"任务 {task_id} 的结果"
async def main():
    start_time = time.time()
    tasks = [
        ("A", 3),
        ("B", 5),
        ("C", 2),
    ]
    print("--- 使用 asyncio 开始并发执行 ---")
    # 创建一个任务列表
    coroutines = [run_worker_async(task_id, dur) for task_id, dur in tasks]
    # asyncio.gather 会并发地运行所有协程,并等待它们全部完成
    results = await asyncio.gather(*coroutines)
    print("\n--- 所有任务执行完毕 ---")
    print("收集到的结果:", results)
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f} 秒")
# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

运行结果分析: asyncio 的运行结果和 multiprocessing 类似,总耗时约5秒,但底层机制完全不同:它在一个线程内通过事件循环快速切换任务,当一个任务在等待 asyncio.sleep(I/O操作)时,它会去执行其他任务,从而实现并发。


总结与如何选择

方法 适用场景 优点 缺点
subprocess - 调用外部程序、脚本、命令行工具
- 需要完全隔离的执行环境
- 任务之间不需要共享数据
- 最灵活,可以调用任何外部程序
- 与操作系统交互能力强
- 进程间通信较复杂(需通过文件、管道等)
- 启动新进程开销较大
multiprocessing - CPU 密集型任务(科学计算、数据处理)
- 任务逻辑复杂,用 Python 编写
- 需要在进程间共享数据(通过 Queue, Pipe, Manager
- Python 原生,易于使用
- 充分利用多核 CPU
- 进程间通信机制成熟
- 进程创建和销毁开销大
- 共享内存数据需要特殊处理
asyncio - I/O 密集型任务(网络请求、文件读写、数据库操作)
- 需要处理大量并发连接
- 轻量级,协程切换开销极小
- 单线程即可实现高并发
- 代码结构清晰(async/await
- 不适用于 CPU 密集型任务
- 需要重构为异步模型,传统阻塞代码不兼容
- 调试可能更复杂

简单决策流程:

  1. 我是不是在调用一个独立的 .py 文件,或者一个命令行工具(如 ping, ffmpeg)?

    • -> 使用 subprocess,这是最直接、最安全的方式。
    • -> 进入下一步。
  2. 我的任务是计算密集型(比如做大量数学运算)还是 I/O 密集型(比如发很多网络请求)?

    • 计算密集型 -> 使用 multiprocessing,它能真正利用你的多核 CPU。
    • I/O 密集型 -> 使用 asyncio,它在处理大量等待任务时效率更高,资源占用更少。
  3. 我的任务是否需要共享内存或复杂的数据结构?

    • -> multiprocessing 提供了更成熟的进程间通信工具(如 Manager)。
    • -> asyncio 的协程模型更简单,但如果任务本身是 CPU 密集的,还是首选 multiprocessing

对于你最初的问题“python同时调用多个python”,如果这些“python”是独立的脚本,subprocess.Popen 是最标准、最通用的答案,如果它们是同一个项目中的不同模块,并且任务是计算型的,multiprocessing.Pool 则是更优雅的选择。

分享:
扫描分享到社交APP
上一篇
下一篇