threading 是 Python 的标准库,用于实现多线程编程,它是对底层 _thread 模块的一个高级封装,提供了更友好、更强大的功能。

为什么需要多线程?(核心概念)
在理解 threading 之前,首先要明白它的核心目的:提高程序的并发处理能力。
想象一下你在餐厅点餐:
- 单线程:你一个人点菜,然后去后厨等,等菜做好了再回来吃,在这个过程中,你什么也做不了,只能干等,这就是“阻塞”。
- 多线程:你点完菜后,服务员让你先坐下,然后去为其他客人服务,你的“点菜”和“等待”是同时进行的,你可以同时思考、喝点水,而不仅仅是干等,这就是“并发”。
在编程中,多线程特别适合处理 I/O 密集型 任务,
- 网络请求(等待服务器响应)
- 文件读写(等待磁盘数据)
- 数据库查询
在这些任务中,大部分时间程序都在等待外部资源,CPU 处于空闲状态,多线程可以让一个线程在等待时,切换到另一个线程执行计算任务,从而充分利用 CPU 时间,大大提高程序的执行效率。

注意:对于 CPU 密集型 任务(如大量数学计算、图像处理),由于 Python 的 GIL (Global Interpreter Lock) 全局解释器锁的存在,多线程并不能真正实现并行计算(同一时间只有一个线程在运行),反而可能因为线程切换的开销而降低性能,对于 CPU 密集型任务,应该使用
multiprocessing模块。
threading 模块的核心组件
threading 模块提供了几个关键的类和函数。
1 Thread 类:创建线程
这是最核心的类,用于创建和控制线程。
基本用法:
- 定义一个函数:这个函数就是线程要执行的任务。
- 创建
Thread对象:将函数作为target参数传入。 - 启动线程:调用
start()方法,这会让线程开始执行target函数。 - 等待线程结束(可选):调用
join()方法,主线程会阻塞,直到该线程执行完毕。
示例代码:
import threading
import time
def worker_task():
"""这是线程要执行的任务函数"""
thread_name = threading.current_thread().name
print(f"线程 '{thread_name}' 开始工作...")
time.sleep(2) # 模拟一个耗时操作,比如I/O等待
print(f"线程 '{thread_name}' 工作完成!")
# --- 主程序 ---
if __name__ == "__main__":
print("主线程开始运行")
# 创建一个 Thread 对象
# target: 指定线程要执行的函数
# name: 给线程起一个名字(可选)
t1 = threading.Thread(target=worker_task, name="Worker-1")
# 启动线程
t1.start()
print("主线程继续执行,不会等待子线程...")
# 等待线程 t1 执行完毕
# 如果不加 join(),主程序会继续往下执行,然后直接退出,导致子线程可能还没执行完就被强制终止了。
t1.join()
print("主线程结束")
输出结果:
主线程开始运行
线程 'Worker-1' 开始工作...
主线程继续执行,不会等待子线程...
线程 'Worker-1' 工作完成! # 注意:这一行在2秒后打印
主线程结束
2 Lock (锁):解决资源竞争问题
当多个线程同时读写同一个共享资源(比如一个全局变量)时,会发生“竞态条件”,导致数据不一致。
示例:不加锁的线程安全问题
import threading
# 共享资源
g_counter = 0
def increment_counter():
global g_counter
for _ in range(100000):
g_counter += 1 # 这不是一个原子操作,包含读取、修改、写回三步
threads = []
for i in range(5):
t = threading.Thread(target=increment_counter)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数器的值应该是: 5 * 100000 = 500000")
print(f"实际计数器的值是: {g_counter}") # 结果几乎总是小于500000
如何解决?使用 Lock。
Lock 提供了两种状态:锁定 和 未锁定。
acquire():尝试获取锁,如果锁已被其他线程获取,则当前线程会阻塞,直到锁被释放。release():释放锁。
将需要原子操作(不可分割)的代码块放在 acquire() 和 release() 之间。
示例:使用 Lock 修复线程安全问题
import threading
g_counter = 0
# 创建一个锁对象
lock = threading.Lock()
def increment_counter_safe():
global g_counter
for _ in range(100000):
# 获取锁
lock.acquire()
try:
g_counter += 1
finally:
# 确保锁一定会被释放,即使发生异常
lock.release()
# 或者更简洁的 with 语句 (推荐)
def increment_counter_with_with():
global g_counter
for _ in range(100000):
with lock: # with 语句会在代码块执行完毕后自动释放锁
g_counter += 1
# 使用 with 语句的版本
threads = []
for i in range(5):
t = threading.Thread(target=increment_counter_with_with)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数器的值是: {g_counter}") # 输出: 500000
3 RLock (可重入锁)
RLock 和 Lock 类似,但它可以被同一个线程多次获取,这在递归函数中非常有用。
import threading
lock = threading.RLock()
def recursive_function(level):
with lock:
print(f"递归层级: {level}")
if level > 0:
recursive_function(level - 1)
recursive_function(3)
# 不会死锁,因为同一个线程可以多次获取 RLock
4 Semaphore (信号量)
Semaphore 用于控制同时访问某个资源的线程数量,它像一个“停车场”,只允许固定数量的车(线程)进入。
acquire():减少内部计数器,如果计数器为0,则阻塞。release():增加内部计数器。
示例:模拟一个只有3个位置的数据库连接池
import threading
import time
# 最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def access_database(user_id):
with semaphore:
print(f"用户 {user_id} 正在访问数据库...")
time.sleep(2) # 模拟数据库操作耗时
print(f"用户 {user_id} 已释放数据库连接。")
threads = []
for i in range(10):
t = threading.Thread(target=access_database, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
5 Event (事件)
Event 提供了一种简单的线程间通信机制,一个线程可以发出一个信号,其他线程等待这个信号。
set():设置事件,将内部标志位设为 True。clear():清除事件,将内部标志位设为 False。wait():等待事件,如果标志位为 False,则线程阻塞;如果为 True,则立即返回。
示例:生产者-消费者模型
import threading
import time
import random
event = threading.Event()
def producer():
print("生产者:正在准备数据...")
time.sleep(random.randint(1, 3))
print("生产者:数据准备完毕!")
event.set() # 通知消费者数据准备好了
def consumer():
print("消费者:等待数据...")
event.wait() # 阻塞,直到生产者调用 event.set()
print("消费者:收到数据,开始处理!")
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
p.join()
c.join()
6 Queue (队列)
Queue 是线程间通信的最佳实践,它是一个线程安全的队列,可以安全地在多个线程之间传递数据,完美地解决了生产者和消费者问题。
put(item):将数据放入队列,如果队列已满,则阻塞。get():从队列中取出数据,如果队列为空,则阻塞。
示例:使用 Queue 的生产者-消费者
import threading
import time
import queue
import random
# 创建一个线程安全的队列
q = queue.Queue()
def producer():
for i in range(5):
item = f"产品-{i}"
print(f"生产者:生产了 {item}")
q.put(item)
time.sleep(random.random())
print("生产者:生产任务完成!")
def consumer():
while True:
try:
# 设置超时,避免消费者永远阻塞
item = q.get(timeout=2)
print(f"消费者:消费了 {item}")
time.sleep(random.random())
q.task_done() # 告诉队列,这个任务已经处理完毕
except queue.Empty:
print("消费者:队列已空,退出。")
break
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
p.join() # 等待生产者生产完所有产品
# 等待队列中的所有任务被处理完毕
q.join()
print("主线程:所有产品已被消费。")
# 消费者线程可能在等待,这里让它退出
# 在实际应用中,可能需要一个更优雅的退出机制
最佳实践和注意事项
- 避免共享状态:这是多线程编程的第一原则,尽量让每个线程操作自己的数据,通过
Queue等方式交换数据,而不是直接共享可变变量。 - 使用
with语句管理锁:with lock:比lock.acquire()+lock.release()更安全,因为它能确保锁一定会被释放,即使在代码块中发生了异常。 - 警惕 GIL:牢记 GIL 的存在,对于 CPU 密集型任务,多线程不是解决方案,应使用
multiprocessing模块来利用多核 CPU。 - 合理设计线程数量:不要创建成百上千的线程,线程的创建和切换都有开销,对于 I/O 密集型任务,线程数可以设置得比 CPU 核心数多一些(
2 * CPU核心数),可以使用os.cpu_count()获取 CPU 核心数。 - 使用
join()控制流程:当主线程需要等待所有子任务完成时,使用join()是一个好习惯,它可以防止主线程在子任务完成前就退出。 - 注意线程的生命周期:线程一旦启动,就不能再启动一次,如果需要重复执行任务,可以考虑创建新的线程或使用线程池。
进阶:ThreadPoolExecutor (线程池)
从 Python 3.2 开始,concurrent.futures 模块提供了 ThreadPoolExecutor,这是更现代、更高级的线程管理方式。
优点:
- 自动管理线程:你不需要手动创建和销毁线程。
- 任务提交:可以像提交普通函数一样提交任务。
- 获取结果:可以方便地获取任务的返回值。
- 优雅关闭:提供了
shutdown()方法,可以优雅地关闭线程池,等待所有任务完成。
示例:
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
print(f"任务 {n} 开始执行...")
time.sleep(1)
return n * n
# 创建一个最大线程数为4的线程池
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交多个任务
futures = [executor.submit(task, i) for i in range(8)]
# 可以等待所有任务完成
# executor.shutdown(wait=True) # with 语句会自动执行这一步
# 获取每个任务的结果
for future in futures:
try:
result = future.result()
print(f"任务结果: {result}")
except Exception as e:
print(f"任务出错: {e}")
print("所有任务已完成")
ThreadPoolExecutor 是现代 Python 多线程编程的推荐方式,它封装了底层的复杂性,提供了更简洁的 API。
| 组件 | 用途 | 核心方法 |
|---|---|---|
Thread |
创建和管理线程 | start(), join() |
Lock |
保护共享资源,防止竞态条件 | acquire(), release() (或 with 语句) |
RLock |
可重入锁,用于递归函数 | 同 Lock |
Semaphore |
控制同时访问资源的线程数 | acquire(), release() |
Event |
线程间简单通信,发信号 | set(), clear(), wait() |
Queue |
线程间安全地传递数据 | put(), get(), task_done(), join() |
ThreadPoolExecutor |
管理线程池,高级抽象 | submit(), map(), shutdown() |
希望这份详细的指南能帮助你全面理解 Python 的 threading 模块!
