杰瑞科技汇

Python threading为何无效?

下面我将详细解释为什么会出现这种情况,并提供解决方案和最佳实践。

Python threading为何无效?-图1
(图片来源网络,侵删)

核心原因:Python 的 GIL (Global Interpreter Lock)

理解 threading 为何“无效”,关键在于理解 GIL

  1. 什么是 GIL? GIL 是 CPython 解释器(也就是我们通常说的 Python)的一个互斥锁,它确保在任何时刻,只有一个线程在执行 Python 字节码

  2. GIL 的作用与影响

    • 作用:GIL 使得 CPython 的内存管理更加简单和高效,它避免了多线程同时操作内存导致的数据竞争问题,因此在很多情况下简化了 Python 的多线程编程模型。
    • 影响(你遇到的问题):由于 GIL 的存在,即使是多核 CPU,Python 的多线程也无法真正实现并行(Parallelism)执行多个 CPU 密集型任务,它们只能并发(Concurrency)执行,即多个线程快速地在 GIL 上切换,看起来像是在同时运行,但实际上任何一个瞬间只有一个线程在运行。

打个比方: GIL 就像一个唯一的、共享的厨房,每个线程是一个厨师

Python threading为何无效?-图2
(图片来源网络,侵删)
  • CPU 密集型任务:一个厨师需要长时间使用厨房(比如做一顿复杂的菜),他做完之前,其他厨师只能等着。
  • I/O 密集型任务:一个厨师把菜放进烤箱(等待 I/O),这时他可以离开厨房,让另一个厨师进来切菜,等烤箱好了,第一个厨师再回来。

为什么你的 threading 看起来“无效”?

根据你的任务是 CPU 密集型还是 I/O 密集型,threading 的效果会截然不同。

CPU 密集型任务(计算密集)

这是 threading “无效”最主要的原因。

例子:大量数学计算、图像处理、数据分析、机器学习中的模型训练等。

为什么无效? 假设你的任务是计算 1 到 100000000 的所有质数,你创建 4 个线程,希望 4 个 CPU 核心能同时计算。

Python threading为何无效?-图3
(图片来源网络,侵删)
  • 没有 GIL:理想情况下,任务被分成 4 份,每个核心负责一份,总时间约为原来的 1/4。
  • 有 GIL:虽然有 4 个线程,但 GIL 只允许一个线程运行一小段时间(5ms),然后切换到下一个线程,这会产生大量的线程切换开销(保存和恢复线程状态),线程切换本身需要消耗 CPU 时间,但没有产生任何有用的计算,总时间可能比单线程还要长。

对于 CPU 密集型任务,Python 的 threading 不仅不能提升性能,反而可能因线程切换开销而降低性能。

I/O 密集型任务(等待密集)

这是 threading正确使用场景,效果非常明显。

例子:网络请求、文件读写、数据库查询、爬虫等。

为什么有效? 这类任务的特点是大部分时间都在等待外部响应。

  • 单线程:程序发起一个网络请求,然后阻塞(什么都不做)直到收到响应,在这等待的几秒钟里,CPU 是空闲的。
  • 多线程:当线程 A 发起网络请求并开始等待时,GIL 会被释放,线程 B 可以立即获得 GIL,继续执行其他任务(比如发起另一个请求或处理已收到的数据),当线程 A 的响应到达时,它会再次获得 GIL,处理数据。

threading 让 CPU 在一个线程等待 I/O 时,可以去为其他线程服务,极大地提高了 CPU 的利用率和程序的总体效率。


如何解决“无效”问题?

根据你的任务类型,选择正确的工具。

如果你的任务是 CPU 密集型

解决方案:使用 multiprocessing 模块

multiprocessing 会创建多个独立的进程,每个进程有自己的 Python 解释器和内存空间,因此它们不受 GIL 的限制,可以真正地在多核 CPU 上并行运行。

threading vs multiprocessing 对比

特性 threading (线程) multiprocessing (进程)
核心机制 共享内存,有 GIL 独立内存,无 GIL
并行能力 并发,不能并行执行 CPU 密集型任务 并行,可以充分利用多核 CPU
资源开销 小,创建和切换成本低 大,每个进程都有独立的内存空间
通信方式 共享变量(需加锁) 队列、管道、共享内存等
适用场景 I/O 密集型任务 CPU 密集型任务

代码示例:CPU 密集型任务

import time
import multiprocessing
def cpu_task(n):
    """一个模拟 CPU 密集型任务的函数"""
    count = 0
    while count < n:
        count += 1
    return count
if __name__ == '__main__':
    N = 50_000_000  # 一个较大的计算量
    # --- 单进程 ---
    start_time = time.time()
    cpu_task(N)
    print(f"单进程耗时: {time.time() - start_time:.2f} 秒")
    # --- 多进程 ---
    start_time = time.time()
    # 创建一个包含4个进程的池
    pool = multiprocessing.Pool(processes=4)
    # 将任务分配给进程池
    pool.map(cpu_task, [N] * 4)
    pool.close()
    pool.join()
    print(f"多进程 (4个) 耗时: {time.time() - start_time:.2f} 秒")

运行结果(取决于你的 CPU 核心数):

单进程耗时: 3.45 秒
多进程 (4个) 耗时: 1.12 秒  # 性能提升接近4倍

如果你的任务是 I/O 密集型

解决方案:继续使用 threading,但要正确使用

threading 在这里是正确的选择,但要避免一些常见的陷阱。

最佳实践:使用 concurrent.futures.ThreadPoolExecutor

这是 threading 模块的高级封装,使用起来更简单、更安全,能更好地管理线程的生命周期。

代码示例:I/O 密集型任务(模拟网络请求)

import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_url(url):
    """模拟一个网络请求"""
    print(f"开始请求: {url}")
    try:
        # 使用 httpbin.org 的 /delay/1 接口,模拟1秒延迟
        response = requests.get(url, timeout=5)
        print(f"完成请求: {url}, 状态码: {response.status_code}")
        return response.status_code
    except requests.RequestException as e:
        print(f"请求失败: {url}, 错误: {e}")
        return None
if __name__ == '__main__':
    urls = [f"http://httpbin.org/delay/1" for _ in range(10)] # 10个需要1秒才能返回的请求
    # --- 单线程 ---
    start_time = time.time()
    for url in urls:
        fetch_url(url)
    print(f"单线程总耗时: {time.time() - start_time:.2f} 秒")
    # --- 多线程 ---
    start_time = time.time()
    # 创建一个包含5个线程的线程池
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 提交所有任务
        future_to_url = {executor.submit(fetch_url, url): url for url in urls}
        # as_completed 会按完成的顺序返回 future 对象
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result()
            except Exception as e:
                print(f"任务 {url} 产生异常: {e}")
    print(f"多线程 (5个) 总耗时: {time.time() - start_time:.2f} 秒")

运行结果:

开始请求: http://httpbin.org/delay/1
完成请求: http://httpbin.org/delay/1, 状态码: 200
开始请求: http://httpbin.org/delay/1
...
单线程总耗时: 10.05 秒  # 10个请求,每个1秒,串行执行
开始请求: http://httpbin.org/delay/1
开始请求: http://httpbin.org/delay/1
开始请求: http://httpbin.org/delay/1
开始请求: http://httpbin.org/delay/1
开始请求: http://httpbin.org/delay/1
... # 5个请求几乎同时开始
完成请求: http://httpbin.org/delay/1, 状态码: 200
...
多线程 (5个) 总耗时: 2.15 秒  # 总时间接近 10 / 5 = 2 秒,性能提升显著

总结与建议

  1. 诊断你的任务类型:首先判断你的程序是 CPU 密集型还是 I/O 密集型,这是决定使用 threading 还是 multiprocessing 的关键。

  2. CPU 密集型 -> 用 multiprocessing

    • 目标是绕过 GIL,实现真正的并行计算。
    • 使用 multiprocessing.PoolProcess 类。
    • 注意进程间通信的开销。
  3. I/O 密集型 -> 用 threading

    • 目标是利用等待时间,让 CPU 为其他线程服务。
    • 优先使用 concurrent.futures.ThreadPoolExecutor,它比手动管理 threading.Thread 更方便、更健壮。
    • 注意共享数据:如果多个线程需要读写同一个变量,必须使用 threading.Lock 来防止数据竞争。
  4. 更高级的方案

    • asyncio:对于 I/O 密集型任务,asyncio 是一个更现代、更轻量级的选择,它使用单线程和协程,避免了线程切换的开销,能处理成千上万的并发连接,但它的编程模型(基于 async/await)与多线程不同。
    • 混合使用:对于既包含 CPU 密集型任务又包含 I/O 密集型任务的复杂程序,可以结合使用 multiprocessingthreading,用多个进程来处理不同的 CPU 密集型任务,每个进程内部再用多个线程来处理 I/O 操作。

希望这个详细的解释能帮助你解决 threading 无效的问题!

分享:
扫描分享到社交APP
上一篇
下一篇