目录
- 为什么需要多线程? - 理解多线程的动机
- Python 的多线程:GIL 的限制 - 理解 Python 多线程的核心
threading模块核心组件 - 线程、锁、事件等- 实践:从创建线程到同步控制 - 代码示例与详解
- 高级主题 - 线程池、生产者-消费者模式等
- 最佳实践与注意事项 - 如何写出健壮的多线程代码
- 何时使用多线程?
为什么需要多线程?
想象一个场景:你正在用浏览器下载一个大文件,同时还在听音乐,并且在后台运行着一个杀毒软件扫描。

- 单线程(单道程序):你必须等文件下载完,才能听音乐,才能运行杀毒软件,这显然效率极低。
- 多线程:操作系统将 CPU 的时间片分配给不同的任务,你的下载程序、音乐播放器和杀毒软件各自作为一个“线程”运行,当一个线程(如下载)因等待网络响应而暂时闲置时,CPU 可以立即切换到另一个线程(如音乐播放)去执行,从而宏观上实现了“进行多个任务。
在编程中,引入多线程主要有两个目的:
- I/O 密集型任务:程序的大部分时间都在等待 I/O 操作完成,如网络请求、文件读写、数据库查询等,在等待期间,CPU 是空闲的,使用多线程,当一个线程在等待 I/O 时,其他线程可以继续使用 CPU,极大地提高了程序的响应速度和吞吐量。
- 并行计算(CPU 密集型任务):程序需要进行大量的计算,理论上,多线程可以利用多核 CPU 的优势,让不同的核心同时处理不同的计算任务。但在 Python 中,这一点会受到 GIL 的限制,我们稍后会详细解释。
Python 的多线程:GIL 的限制
这是理解 Python 多线程最关键的一点。
GIL (Global Interpreter Lock) 全局解释器锁 是 CPython 解释器(Python 官方实现)所特有的一把互斥锁。
- 它的作用:在任何时刻,一个 Python 进程中只允许一个线程在执行 Python 字节码。
- 它的目的:为了保证 Python 对象的内存管理是线程安全的,因为 CPython 的内存管理不是线程安全的,如果没有 GIL,多个线程同时操作一个对象可能会导致内存崩溃。
GIL 带来的直接影响:

-
对于 CPU 密集型任务:多线程并不能实现真正的并行,因为同一时间只有一个线程在运行,它只是在多个线程之间快速切换(上下文切换),这种切换本身还有开销,所以对于纯计算任务,多线程的性能甚至可能比单线程更差,这时,应该使用
multiprocessing模块来创建多个进程,每个进程有自己的 Python 解释器和 GIL,可以真正地在多核上并行。 -
对于 I/O 密集型任务:多线程依然非常有效!当一个线程因为 I/O 操作(如
requests.get()或time.sleep())而释放 GIL 并进入等待状态时,Python 解释器会立即切换到另一个就绪的线程去执行,这使得在等待 I/O 的间隙,CPU 可以被充分利用。
一句话总结 GIL:
Python 的多线程,适合 I/O 密集型任务,不适合 CPU 密集型任务。
(图片来源网络,侵删)
threading 模块核心组件
threading 模块是 Python 中进行多线程编程的标准库,我们来逐一了解其核心类和函数。
1 Thread 类 - 线程的基石
Thread 是用来创建和管理线程的类。
常用构造函数参数:
target: 一个可调用的对象(函数或方法),线程启动后会执行这个函数。args: 一个元组,传递给target函数的位置参数。kwargs: 一个字典,传递给target函数的关键字参数。name: 线程的名称,默认是Thread-N的形式。
常用方法:
start(): 启动线程,调用此方法后,线程进入“就绪”状态,等待操作系统调度。join([timeout]): 阻塞当前线程,直到被调用的线程执行完毕,可以设置超时时间。run(): 线程的活动方法,默认情况下,它会调用target函数,通常我们不需要重写它。is_alive(): 检查线程是否还在运行。
2 Lock (锁) - 解决数据竞争
当多个线程同时读写同一个共享资源(如一个全局变量)时,会导致数据竞争,最终得到不可预期的结果。
锁的工作原理:
acquire():尝试获取锁,如果锁未被其他线程持有,则获取成功,并立即将锁标记为“已锁定”,如果锁已被其他线程持有,则当前线程会阻塞,直到锁被释放。release():释放锁,将锁标记为“未锁定”,并唤醒一个正在等待获取该锁的线程。
重要原则:任何被锁保护的代码块,都必须在 try...finally 结构中使用,以确保即使在代码块内部发生异常,锁也一定会被释放,否则,可能会导致死锁。
# 错误示范:func() 抛出异常,锁将不会被释放
lock.acquire()
func() # func() 出错
lock.release() # 这行代码不会执行
# 正确示范
lock.acquire()
try:
# 临界区 - 需要被保护的共享资源操作
shared_data += 1
finally:
lock.release() # 保证锁一定会被释放
3 RLock (可重入锁)
RLock 是 Lock 的一个变种,它允许同一个线程多次获取同一个锁,而不会导致自己被阻塞。
- 适用场景:当一个函数已经持有一个锁,但它又需要调用另一个也需要同一个锁的函数时。
- 内部机制:
RLock内部维护一个“持有计数器”,同一个线程每获取一次锁,计数器加 1;每释放一次,计数器减 1,只有当计数器降为 0 时,锁才会真正被释放给其他线程。
4 Semaphore (信号量)
信号量是一个更通用的同步原语,它控制同时访问某个特定资源的线程数量。
__init__(value): 初始化一个信号量,value是初始允许的并发数量。acquire(): 尝试获取信号量,如果当前信号量的值大于 0,则减 1 并继续执行;如果为 0,则阻塞,直到有其他线程释放信号量。release(): 释放信号量,将内部计数器加 1。
经典应用场景:限制数据库连接池的大小、限制同时下载文件的数量等。
5 Event (事件)
Event 提供了一种简单的线程间通信机制,一个线程可以通知另一个线程某个事件已经发生。
set(): 设置事件,将内部标志位设为 True。clear(): 清除事件,将内部标志位设为 False。is_set(): 检查事件标志位是否为 True。wait(): 阻塞线程,直到事件的标志位被设置为 True。
比喻:就像一个红绿灯,一个线程是绿灯(set()),其他等待的线程就可以通过(wait() 返回),如果变红灯(clear()),等待的线程就会再次阻塞。
6 Queue (队列)
queue.Queue 是线程安全的队列,是生产者-消费者模式的完美实现,它内部已经处理了所有的锁逻辑,让线程间的数据交换变得非常简单和安全。
put(item, block=True, timeout=None): 将元素放入队列,如果队列已满,block=True时会阻塞。get(block=True, timeout=None): 从队列中取出一个元素,如果队列为空,block=True时会阻塞。task_done(): 消费者从队列中获取一个任务并完成后调用,用于通知队列该任务已处理。join(): 阻塞调用线程,直到队列中的所有任务都被处理完毕(即每个put的任务都被对应的task_done调用过)。
实践:从创建线程到同步控制
1 基础:创建和启动线程
import threading
import time
def worker(num):
"""线程要执行的函数"""
print(f"Worker {num} is starting...")
time.sleep(2) # 模拟 I/O 操作
print(f"Worker {num} has finished.")
if __name__ == "__main__":
threads = []
# 创建并启动 5 个线程
for i in range(5):
# 创建线程对象
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start() # 启动线程
# 主线程等待所有子线程完成
for t in threads:
t.join()
print("All threads have completed.")
运行结果分析:
你会发现 "Worker ... is starting..." 几乎是立即打印出来的,然后程序会等待 2 秒,才打印所有 "Worker ... has finished.",这证明了 5 个线程几乎是并发启动的,并且主线程通过 join() 等待它们全部结束。
2 进阶:使用锁保护共享数据
下面是一个经典的银行账户取款例子,不加锁会导致错误。
import threading
# 共享资源
balance = 1000
lock = threading.Lock()
def withdraw(amount):
global balance
# 模拟多个线程同时检查余额
# 如果不加锁,这里可能会发生竞态条件
if balance >= amount:
# 在这里加入延迟,大大增加竞态条件发生的概率
# 因为线程可能在检查后、修改前被切换出去
# time.sleep(0.0001)
print(f"Thread {threading.current_thread().name} is withdrawing {amount}.")
balance -= amount
print(f"Withdrawal successful. New balance: {balance}")
else:
print(f"Thread {threading.current_thread().name} failed to withdraw. Insufficient funds. Balance: {balance}")
if __name__ == "__main__":
# 创建两个线程,尝试同时取款 800
t1 = threading.Thread(target=withdraw, args=(800,), name="Thread-1")
t2 = threading.Thread(target=withdraw, args=(800,), name="Thread-2")
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Final balance: {balance}")
不加锁的可能结果(竞态条件):
Thread-1检查balance(1000) >= 800,条件为真。- 操作系统切换到
Thread-2,Thread-2也检查balance(1000) >= 800,条件也为真。 Thread-2执行balance -= 800,balance变为 200。- 操作系统切换回
Thread-1,Thread-1也执行balance -= 800,balance变为 -600。 - 最终余额为 -600,这是错误的结果。
使用锁修复:
只需在 if 判断和 balance -= amount 的外部加上 with 语句即可。with 语句是 try...finally 的优雅写法,能保证锁的自动获取和释放。
def withdraw_with_lock(amount):
global balance
with lock: # 自动 acquire() 和 release()
if balance >= amount:
print(f"Thread {threading.current_thread().name} is withdrawing {amount}.")
# time.sleep(0.0001) # 即使有延迟,结果也是正确的
balance -= amount
print(f"Withdrawal successful. New balance: {balance}")
else:
print(f"Thread {threading.current_thread().name} failed to withdraw. Insufficient funds. Balance: {balance}")
修复后的结果:
无论 Thread-1 和 Thread-2 谁先获取锁,另一个线程都必须等待,只有当第一个线程完成修改并释放锁后,第二个线程才能进入,最终结果一定是 balance 为 200 或 0,绝不会出现负数。
高级主题
1 线程池 (concurrent.futures.ThreadPoolExecutor)
手动创建和管理大量线程(成百上千)会非常复杂,并且会消耗大量资源,线程池应运而生。
- 优点:
- 重用线程:避免了频繁创建和销毁线程的开销。
- 控制并发数:可以限制同时运行的线程数量,防止系统资源耗尽。
- 简化编程:提供了更高级、更简洁的 API。
使用 ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
import time
def task(name):
print(f"Task {name} is starting.")
time.sleep(2)
print(f"Task {name} is finished.")
return f"Result of {name}"
if __name__ == "__main__":
# 创建一个最大线程数为 3 的线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交 5 个任务到线程池
futures = [executor.submit(task, i) for i in range(5)]
print("All tasks have been submitted.")
# 可以获取每个任务的结果
for future in futures:
# result() 方法会阻塞,直到任务完成
print(future.result())
print("All tasks in the pool are done.")
执行流程:
- 前 3 个任务立即被线程池中的线程执行。
- 第 4 和第 5 个任务进入任务队列,等待线程空闲。
- 当某个任务(如 Task 0)完成后,线程池中的空闲线程会从队列中取出 Task 4 并执行。
- 所有任务按顺序完成,
with块确保所有线程都被正确清理。
2 生产者-消费者模式
这是多线程/多进程编程中最经典、最常用的模式之一,它使用一个队列来解耦“生产数据”的线程和“消费数据”的线程。
角色:
- 生产者:创建数据,并将数据放入队列。
- 消费者:从队列中取出数据,并对数据进行处理。
- 队列:缓冲区,平衡生产者和消费者的速度差异。
示例: 一个线程负责从网站上下载图片(生产者),多个线程负责将图片保存到本地(消费者)。
import threading
import queue
import time
import random
# 生产者函数
def producer(q, url_list):
for url in url_list:
print(f"Producer: Producing item from {url}")
time.sleep(random.uniform(0.5, 1.5)) # 模拟下载耗时
q.put(url) # 将下载好的URL放入队列
print("Producer: Finished producing all items.")
# 放入一个特殊的哨兵值,通知消费者生产结束
for _ in range(3): # 假设有3个消费者
q.put(None)
# 消费者函数
def consumer(q, consumer_id):
while True:
item = q.get() # 从队列中获取任务
if item is None: # 收到哨兵值,退出循环
print(f"Consumer {consumer_id}: Received sentinel, exiting.")
q.task_done() # 对哨兵值也调用 task_done
break
print(f"Consumer {consumer_id}: Processing item {item}")
time.sleep(random.uniform(0.5, 1)) # 模拟处理耗时
print(f"Consumer {consumer_id}: Finished processing {item}")
q.task_done() # 通知队列,该任务已完成
if __name__ == "__main__":
q = queue.Queue()
urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7", "url8"]
# 创建生产者线程
producer_thread = threading.Thread(target=producer, args=(q, urls))
# 创建消费者线程
consumers = []
for i in range(3):
c = threading.Thread(target=consumer, args=(q, i))
consumers.append(c)
# 启动所有线程
producer_thread.start()
for c in consumers:
c.start()
# 主线程等待生产者完成
producer_thread.join()
# 主线程等待队列中所有任务被处理完毕
q.join()
print("All tasks have been processed by consumers.")
# 不需要手动 join 消费者,因为它们在收到哨兵值后会自己退出
for c in consumers:
c.join()
print("Program finished.")
最佳实践与注意事项
- 避免共享状态:这是最重要的原则,尽量让每个线程处理自己的数据,减少线程间的通信,如果必须共享,请使用
Queue或其他同步原语(如Lock)来保护。 - 谨慎使用
join():join()会阻塞调用线程,在循环中对所有线程调用join()是常见的,但要确保理解其阻塞行为,在线程池中,with语句会自动处理等待。 - 死锁:避免多个线程以不同的顺序获取多个锁,线程 A 获取锁 1 后等待锁 2,而线程 B 获取锁 2 后等待锁 1,两者将永远等待下去,一个简单的原则是:总是以相同的顺序获取多个锁。
- 资源泄漏:确保所有获取的资源(如文件、锁)都能被正确释放。
with语句是你的好朋友。 - 注意
daemon线程:守护线程会在主线程结束时立即退出,无论它是否执行完毕,它们通常用于执行后台任务,如监控。不要在守护线程中执行任何需要清理资源的操作,因为它们可能会被突然终止。t = threading.Thread(target=my_func) t.daemon = True # 设置为守护线程 t.start() # 主线程结束时,t 会立即终止
- 优先使用
ThreadPoolExecutor:对于大多数多线程任务,使用线程池比手动创建和管理线程更简单、更高效。
何时使用多线程?
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| I/O 密集型任务 | threading / ThreadPoolExecutor |
线程在等待 I/O 时会释放 GIL,其他线程可以运行,有效利用 CPU 时间。 |
| CPU 密集型任务 | multiprocessing / ProcessPoolExecutor |
绕过 GIL 限制,利用多核 CPU 实现真正的并行计算。 |
| 需要高并发、简单网络服务 | asyncio (异步 I/O) |
单线程+事件循环模型,避免了线程切换的开销,能处理成千上万的并发连接,比多线程更轻量。 |
| 需要与 C/C++ 库交互 | multiprocessing |
C/C++ 库能释放 GIL,multiprocessing 可以获得更好的并行性能。 |
希望这篇详解能帮助你全面理解 Python 的 threading 模块!理解 GIL 是关键,并根据你的任务类型选择最合适的并发模型。

