杰瑞科技汇

Python threading详解,如何高效实现多线程?

目录

  1. 为什么需要多线程? - 理解多线程的动机
  2. Python 的多线程:GIL 的限制 - 理解 Python 多线程的核心
  3. threading 模块核心组件 - 线程、锁、事件等
  4. 实践:从创建线程到同步控制 - 代码示例与详解
  5. 高级主题 - 线程池、生产者-消费者模式等
  6. 最佳实践与注意事项 - 如何写出健壮的多线程代码
  7. 何时使用多线程?

为什么需要多线程?

想象一个场景:你正在用浏览器下载一个大文件,同时还在听音乐,并且在后台运行着一个杀毒软件扫描。

Python threading详解,如何高效实现多线程?-图1
(图片来源网络,侵删)
  • 单线程(单道程序):你必须等文件下载完,才能听音乐,才能运行杀毒软件,这显然效率极低。
  • 多线程:操作系统将 CPU 的时间片分配给不同的任务,你的下载程序、音乐播放器和杀毒软件各自作为一个“线程”运行,当一个线程(如下载)因等待网络响应而暂时闲置时,CPU 可以立即切换到另一个线程(如音乐播放)去执行,从而宏观上实现了“进行多个任务。

在编程中,引入多线程主要有两个目的:

  • I/O 密集型任务:程序的大部分时间都在等待 I/O 操作完成,如网络请求、文件读写、数据库查询等,在等待期间,CPU 是空闲的,使用多线程,当一个线程在等待 I/O 时,其他线程可以继续使用 CPU,极大地提高了程序的响应速度和吞吐量。
  • 并行计算(CPU 密集型任务):程序需要进行大量的计算,理论上,多线程可以利用多核 CPU 的优势,让不同的核心同时处理不同的计算任务。但在 Python 中,这一点会受到 GIL 的限制,我们稍后会详细解释。

Python 的多线程:GIL 的限制

这是理解 Python 多线程最关键的一点。

GIL (Global Interpreter Lock) 全局解释器锁 是 CPython 解释器(Python 官方实现)所特有的一把互斥锁。

  • 它的作用:在任何时刻,一个 Python 进程中只允许一个线程在执行 Python 字节码。
  • 它的目的:为了保证 Python 对象的内存管理是线程安全的,因为 CPython 的内存管理不是线程安全的,如果没有 GIL,多个线程同时操作一个对象可能会导致内存崩溃。

GIL 带来的直接影响:

Python threading详解,如何高效实现多线程?-图2
(图片来源网络,侵删)
  1. 对于 CPU 密集型任务:多线程并不能实现真正的并行,因为同一时间只有一个线程在运行,它只是在多个线程之间快速切换(上下文切换),这种切换本身还有开销,所以对于纯计算任务,多线程的性能甚至可能比单线程更差,这时,应该使用 multiprocessing 模块来创建多个进程,每个进程有自己的 Python 解释器和 GIL,可以真正地在多核上并行。

  2. 对于 I/O 密集型任务:多线程依然非常有效!当一个线程因为 I/O 操作(如 requests.get()time.sleep())而释放 GIL 并进入等待状态时,Python 解释器会立即切换到另一个就绪的线程去执行,这使得在等待 I/O 的间隙,CPU 可以被充分利用。

一句话总结 GIL

Python 的多线程,适合 I/O 密集型任务,不适合 CPU 密集型任务。

Python threading详解,如何高效实现多线程?-图3
(图片来源网络,侵删)

threading 模块核心组件

threading 模块是 Python 中进行多线程编程的标准库,我们来逐一了解其核心类和函数。

1 Thread 类 - 线程的基石

Thread 是用来创建和管理线程的类。

常用构造函数参数:

  • target: 一个可调用的对象(函数或方法),线程启动后会执行这个函数。
  • args: 一个元组,传递给 target 函数的位置参数。
  • kwargs: 一个字典,传递给 target 函数的关键字参数。
  • name: 线程的名称,默认是 Thread-N 的形式。

常用方法:

  • start(): 启动线程,调用此方法后,线程进入“就绪”状态,等待操作系统调度。
  • join([timeout]): 阻塞当前线程,直到被调用的线程执行完毕,可以设置超时时间。
  • run(): 线程的活动方法,默认情况下,它会调用 target 函数,通常我们不需要重写它。
  • is_alive(): 检查线程是否还在运行。

2 Lock (锁) - 解决数据竞争

当多个线程同时读写同一个共享资源(如一个全局变量)时,会导致数据竞争,最终得到不可预期的结果。

锁的工作原理

  • acquire():尝试获取锁,如果锁未被其他线程持有,则获取成功,并立即将锁标记为“已锁定”,如果锁已被其他线程持有,则当前线程会阻塞,直到锁被释放。
  • release():释放锁,将锁标记为“未锁定”,并唤醒一个正在等待获取该锁的线程。

重要原则:任何被锁保护的代码块,都必须在 try...finally 结构中使用,以确保即使在代码块内部发生异常,锁也一定会被释放,否则,可能会导致死锁

# 错误示范:func() 抛出异常,锁将不会被释放
lock.acquire()
func() # func() 出错
lock.release() # 这行代码不会执行
# 正确示范
lock.acquire()
try:
    # 临界区 - 需要被保护的共享资源操作
    shared_data += 1
finally:
    lock.release() # 保证锁一定会被释放

3 RLock (可重入锁)

RLockLock 的一个变种,它允许同一个线程多次获取同一个锁,而不会导致自己被阻塞。

  • 适用场景:当一个函数已经持有一个锁,但它又需要调用另一个也需要同一个锁的函数时。
  • 内部机制RLock 内部维护一个“持有计数器”,同一个线程每获取一次锁,计数器加 1;每释放一次,计数器减 1,只有当计数器降为 0 时,锁才会真正被释放给其他线程。

4 Semaphore (信号量)

信号量是一个更通用的同步原语,它控制同时访问某个特定资源的线程数量。

  • __init__(value): 初始化一个信号量,value 是初始允许的并发数量。
  • acquire(): 尝试获取信号量,如果当前信号量的值大于 0,则减 1 并继续执行;如果为 0,则阻塞,直到有其他线程释放信号量。
  • release(): 释放信号量,将内部计数器加 1。

经典应用场景:限制数据库连接池的大小、限制同时下载文件的数量等。

5 Event (事件)

Event 提供了一种简单的线程间通信机制,一个线程可以通知另一个线程某个事件已经发生。

  • set(): 设置事件,将内部标志位设为 True。
  • clear(): 清除事件,将内部标志位设为 False。
  • is_set(): 检查事件标志位是否为 True。
  • wait(): 阻塞线程,直到事件的标志位被设置为 True。

比喻:就像一个红绿灯,一个线程是绿灯(set()),其他等待的线程就可以通过(wait() 返回),如果变红灯(clear()),等待的线程就会再次阻塞。

6 Queue (队列)

queue.Queue 是线程安全的队列,是生产者-消费者模式的完美实现,它内部已经处理了所有的锁逻辑,让线程间的数据交换变得非常简单和安全。

  • put(item, block=True, timeout=None): 将元素放入队列,如果队列已满,block=True 时会阻塞。
  • get(block=True, timeout=None): 从队列中取出一个元素,如果队列为空,block=True 时会阻塞。
  • task_done(): 消费者从队列中获取一个任务并完成后调用,用于通知队列该任务已处理。
  • join(): 阻塞调用线程,直到队列中的所有任务都被处理完毕(即每个 put 的任务都被对应的 task_done 调用过)。

实践:从创建线程到同步控制

1 基础:创建和启动线程

import threading
import time
def worker(num):
    """线程要执行的函数"""
    print(f"Worker {num} is starting...")
    time.sleep(2) # 模拟 I/O 操作
    print(f"Worker {num} has finished.")
if __name__ == "__main__":
    threads = []
    # 创建并启动 5 个线程
    for i in range(5):
        # 创建线程对象
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start() # 启动线程
    # 主线程等待所有子线程完成
    for t in threads:
        t.join()
    print("All threads have completed.")

运行结果分析: 你会发现 "Worker ... is starting..." 几乎是立即打印出来的,然后程序会等待 2 秒,才打印所有 "Worker ... has finished.",这证明了 5 个线程几乎是并发启动的,并且主线程通过 join() 等待它们全部结束。

2 进阶:使用锁保护共享数据

下面是一个经典的银行账户取款例子,不加锁会导致错误。

import threading
# 共享资源
balance = 1000
lock = threading.Lock()
def withdraw(amount):
    global balance
    # 模拟多个线程同时检查余额
    # 如果不加锁,这里可能会发生竞态条件
    if balance >= amount:
        # 在这里加入延迟,大大增加竞态条件发生的概率
        # 因为线程可能在检查后、修改前被切换出去
        # time.sleep(0.0001) 
        print(f"Thread {threading.current_thread().name} is withdrawing {amount}.")
        balance -= amount
        print(f"Withdrawal successful. New balance: {balance}")
    else:
        print(f"Thread {threading.current_thread().name} failed to withdraw. Insufficient funds. Balance: {balance}")
if __name__ == "__main__":
    # 创建两个线程,尝试同时取款 800
    t1 = threading.Thread(target=withdraw, args=(800,), name="Thread-1")
    t2 = threading.Thread(target=withdraw, args=(800,), name="Thread-2")
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(f"Final balance: {balance}")

不加锁的可能结果(竞态条件)

  1. Thread-1 检查 balance (1000) >= 800,条件为真。
  2. 操作系统切换到 Thread-2Thread-2 也检查 balance (1000) >= 800,条件也为真。
  3. Thread-2 执行 balance -= 800balance 变为 200。
  4. 操作系统切换回 Thread-1Thread-1 也执行 balance -= 800balance 变为 -600。
  5. 最终余额为 -600,这是错误的结果。

使用锁修复: 只需在 if 判断和 balance -= amount 的外部加上 with 语句即可。with 语句是 try...finally 的优雅写法,能保证锁的自动获取和释放。

def withdraw_with_lock(amount):
    global balance
    with lock: # 自动 acquire() 和 release()
        if balance >= amount:
            print(f"Thread {threading.current_thread().name} is withdrawing {amount}.")
            # time.sleep(0.0001) # 即使有延迟,结果也是正确的
            balance -= amount
            print(f"Withdrawal successful. New balance: {balance}")
        else:
            print(f"Thread {threading.current_thread().name} failed to withdraw. Insufficient funds. Balance: {balance}")

修复后的结果: 无论 Thread-1Thread-2 谁先获取锁,另一个线程都必须等待,只有当第一个线程完成修改并释放锁后,第二个线程才能进入,最终结果一定是 balance 为 200 或 0,绝不会出现负数。


高级主题

1 线程池 (concurrent.futures.ThreadPoolExecutor)

手动创建和管理大量线程(成百上千)会非常复杂,并且会消耗大量资源,线程池应运而生。

  • 优点
    • 重用线程:避免了频繁创建和销毁线程的开销。
    • 控制并发数:可以限制同时运行的线程数量,防止系统资源耗尽。
    • 简化编程:提供了更高级、更简洁的 API。

使用 ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
import time
def task(name):
    print(f"Task {name} is starting.")
    time.sleep(2)
    print(f"Task {name} is finished.")
    return f"Result of {name}"
if __name__ == "__main__":
    # 创建一个最大线程数为 3 的线程池
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 提交 5 个任务到线程池
        futures = [executor.submit(task, i) for i in range(5)]
        print("All tasks have been submitted.")
        # 可以获取每个任务的结果
        for future in futures:
            # result() 方法会阻塞,直到任务完成
            print(future.result())
    print("All tasks in the pool are done.")

执行流程

  1. 前 3 个任务立即被线程池中的线程执行。
  2. 第 4 和第 5 个任务进入任务队列,等待线程空闲。
  3. 当某个任务(如 Task 0)完成后,线程池中的空闲线程会从队列中取出 Task 4 并执行。
  4. 所有任务按顺序完成,with 块确保所有线程都被正确清理。

2 生产者-消费者模式

这是多线程/多进程编程中最经典、最常用的模式之一,它使用一个队列来解耦“生产数据”的线程和“消费数据”的线程。

角色

  • 生产者:创建数据,并将数据放入队列。
  • 消费者:从队列中取出数据,并对数据进行处理。
  • 队列:缓冲区,平衡生产者和消费者的速度差异。

示例: 一个线程负责从网站上下载图片(生产者),多个线程负责将图片保存到本地(消费者)。

import threading
import queue
import time
import random
# 生产者函数
def producer(q, url_list):
    for url in url_list:
        print(f"Producer: Producing item from {url}")
        time.sleep(random.uniform(0.5, 1.5)) # 模拟下载耗时
        q.put(url) # 将下载好的URL放入队列
    print("Producer: Finished producing all items.")
    # 放入一个特殊的哨兵值,通知消费者生产结束
    for _ in range(3): # 假设有3个消费者
        q.put(None)
# 消费者函数
def consumer(q, consumer_id):
    while True:
        item = q.get() # 从队列中获取任务
        if item is None: # 收到哨兵值,退出循环
            print(f"Consumer {consumer_id}: Received sentinel, exiting.")
            q.task_done() # 对哨兵值也调用 task_done
            break
        print(f"Consumer {consumer_id}: Processing item {item}")
        time.sleep(random.uniform(0.5, 1)) # 模拟处理耗时
        print(f"Consumer {consumer_id}: Finished processing {item}")
        q.task_done() # 通知队列,该任务已完成
if __name__ == "__main__":
    q = queue.Queue()
    urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7", "url8"]
    # 创建生产者线程
    producer_thread = threading.Thread(target=producer, args=(q, urls))
    # 创建消费者线程
    consumers = []
    for i in range(3):
        c = threading.Thread(target=consumer, args=(q, i))
        consumers.append(c)
    # 启动所有线程
    producer_thread.start()
    for c in consumers:
        c.start()
    # 主线程等待生产者完成
    producer_thread.join()
    # 主线程等待队列中所有任务被处理完毕
    q.join()
    print("All tasks have been processed by consumers.")
    # 不需要手动 join 消费者,因为它们在收到哨兵值后会自己退出
    for c in consumers:
        c.join()
    print("Program finished.")

最佳实践与注意事项

  1. 避免共享状态:这是最重要的原则,尽量让每个线程处理自己的数据,减少线程间的通信,如果必须共享,请使用 Queue 或其他同步原语(如 Lock)来保护。
  2. 谨慎使用 join()join() 会阻塞调用线程,在循环中对所有线程调用 join() 是常见的,但要确保理解其阻塞行为,在线程池中,with 语句会自动处理等待。
  3. 死锁:避免多个线程以不同的顺序获取多个锁,线程 A 获取锁 1 后等待锁 2,而线程 B 获取锁 2 后等待锁 1,两者将永远等待下去,一个简单的原则是:总是以相同的顺序获取多个锁。
  4. 资源泄漏:确保所有获取的资源(如文件、锁)都能被正确释放。with 语句是你的好朋友。
  5. 注意 daemon 线程:守护线程会在主线程结束时立即退出,无论它是否执行完毕,它们通常用于执行后台任务,如监控。不要在守护线程中执行任何需要清理资源的操作,因为它们可能会被突然终止。
    t = threading.Thread(target=my_func)
    t.daemon = True # 设置为守护线程
    t.start()
    # 主线程结束时,t 会立即终止
  6. 优先使用 ThreadPoolExecutor:对于大多数多线程任务,使用线程池比手动创建和管理线程更简单、更高效。

何时使用多线程?

场景 推荐方案 理由
I/O 密集型任务 threading / ThreadPoolExecutor 线程在等待 I/O 时会释放 GIL,其他线程可以运行,有效利用 CPU 时间。
CPU 密集型任务 multiprocessing / ProcessPoolExecutor 绕过 GIL 限制,利用多核 CPU 实现真正的并行计算。
需要高并发、简单网络服务 asyncio (异步 I/O) 单线程+事件循环模型,避免了线程切换的开销,能处理成千上万的并发连接,比多线程更轻量。
需要与 C/C++ 库交互 multiprocessing C/C++ 库能释放 GIL,multiprocessing 可以获得更好的并行性能。

希望这篇详解能帮助你全面理解 Python 的 threading 模块!理解 GIL 是关键,并根据你的任务类型选择最合适的并发模型。

分享:
扫描分享到社交APP
上一篇
下一篇