目录
- 为什么需要多线程?
threading模块核心概念Thread类:创建和管理线程Lock(锁):解决资源竞争问题RLock(可重入锁):特殊的锁Semaphore(信号量):控制资源访问数量Event(事件):线程间通信Queue(队列):线程安全的任务队列
- 一个完整的示例:多线程下载器
- 多线程的优缺点与适用场景
- 重要陷阱:GIL (全局解释器锁)
为什么需要多线程?
想象一下你正在用电脑同时做三件事:

- 听音乐
- 写文档
- 下载文件
你的 CPU 核心只有一个,但它可以在这些任务之间快速切换,让你感觉它们在“进行,这就是并发。
在编程中,多线程就是让一个程序能够同时执行多个任务,这对于I/O 密集型任务尤其有效,比如网络请求、文件读写、数据库操作等,在这些任务中,程序大部分时间都在等待外部响应,此时切换到其他线程执行,可以大大提高 CPU 的利用率。
多线程的核心目的是:提高程序的执行效率和响应速度。
threading 模块核心概念
threading 模块是 Python 内置的用于多线程编程的模块,我们先来看最核心的 Thread 类。

Thread 类:创建和管理线程
创建线程有两种主要方式:
继承 Thread 类 (面向对象)
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
# 线程要执行的代码放在这里
print(f"线程 {self.name} 开始运行")
time.sleep(2) # 模拟耗时操作
print(f"线程 {self.name} 运行结束")
# 创建线程实例
thread1 = MyThread("线程-A")
thread2 = MyThread("线程-B")
# 启动线程 (这会调用 run 方法)
thread1.start()
thread2.start()
# 等待所有线程执行完毕 (主线程会在这里等待)
thread1.join()
thread2.join()
print("所有线程都执行完毕,主程序结束")
传入函数 (更简洁)
import threading
import time
def worker_task(name):
print(f"线程 {name} 开始运行")
time.sleep(2)
print(f"线程 {name} 运行结束")
# 创建线程实例,传入目标函数和参数
# 注意:参数必须是一个元组,如果只有一个参数,后面要加逗号
thread1 = threading.Thread(target=worker_task, args=("线程-C",))
thread2 = threading.Thread(target=worker_task, args=("线程-D",))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("所有线程都执行完毕,主程序结束")
常用 Thread 方法:
start(): 启动线程,调用run()方法。run(): 线程的执行体,可以被重写。join(timeout=None): 阻塞当前线程,直到被调用的线程执行完毕。timeout是可选的超时时间。is_alive(): 检查线程是否还在运行。
Lock (锁):解决资源竞争问题
当多个线程同时读写同一个共享资源(比如一个变量、一个文件)时,会发生竞态条件,导致数据不一致,锁是用来保护共享资源,确保同一时间只有一个线程能访问它。
经典例子:银行账户取款
import threading
balance = 1000 # 共享资源
def withdraw(amount):
global balance
# 检查余额是否足够
if balance >= amount:
# 模拟网络延迟,让问题更容易发生
threading.Event().wait(0.001)
balance -= amount
print(f"取款 {amount} 元,剩余 {balance} 元")
else:
print(f"余额不足,无法取款 {amount} 元")
threads = []
for _ in range(10):
t = threading.Thread(target=withdraw, args=(100,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终余额: {balance}") # 预期是 900,但结果可能小于 900
使用 Lock 修复:
import threading
balance = 1000
lock = threading.Lock() # 创建一个锁
def withdraw_with_lock(amount):
global balance
# 在访问共享资源前,先获取锁
# 如果锁被其他线程持有,这里会阻塞,直到锁被释放
with lock: # 'with' 语句会自动处理 acquire() 和 release()
if balance >= amount:
threading.Event().wait(0.001)
balance -= amount
print(f"取款 {amount} 元,剩余 {balance} 元")
else:
print(f"余额不足,无法取款 {amount} 元")
# ... 后面的代码与上面相同 ...
threads = []
for _ in range(10):
t = threading.Thread(target=withdraw_with_lock, args=(100,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终余额: {balance}") # 现在结果一定是 900
RLock (可重入锁)
RLock (Reentrant Lock) 是一种特殊的锁,同一个线程可以多次获取它,而不会造成死锁,这在递归函数中非常有用。
import threading
lock = threading.RLock()
def recursive_function(depth):
with lock:
print(f"递归深度: {depth}")
if depth > 0:
recursive_function(depth - 1)
t = threading.Thread(target=recursive_function, args=(5,))
t.start()
t.join()
# 使用普通 Lock 会在这里死锁,因为第一次 'with' 获取了锁,
# 递归调用时,同一个线程再次尝试获取同一个锁,被阻塞。
Semaphore (信号量)
信号量控制可以同时访问某个资源的线程数量,它就像一个有 N 个座位的休息室,只有 N 个人能同时进入,其他人必须在外面等待。
示例:模拟连接池
import threading
import time
# 限制最多3个线程同时访问
semaphore = threading.Semaphore(3)
def worker(worker_id):
print(f"线程 {worker_id} 尝试获取资源...")
with semaphore:
print(f"线程 {worker_id} 已获取资源,开始工作...")
time.sleep(2) # 模拟工作
print(f"线程 {worker_id} 释放资源,工作结束")
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
Event (事件)
Event 是一个简单的线程间通信机制,一个线程可以发出一个“事件信号”,其他等待这个信号的线程会被唤醒。
event.set(): 设置事件,表示事件发生。event.clear(): 清除事件,表示事件未发生。event.wait(): 阻塞线程,直到事件被设置。
示例:生产者-消费者模型
import threading
import time
event = threading.Event()
def producer():
print("生产者:准备生产数据...")
time.sleep(3)
print("生产者:数据已准备好!")
event.set() # 通知消费者数据准备好了
def consumer():
print("消费者:等待数据...")
event.wait() # 阻塞,直到收到通知
print("消费者:收到数据,开始处理!")
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
p.join()
c.join()
Queue (队列)
queue.Queue 是线程安全的队列,是生产者-消费者模型的最佳实践,它内部已经处理了锁,你不需要手动加锁。
put(item): 向队列中添加一个元素,如果队列满则阻塞。get(): 从队列中获取一个元素,如果队列空则阻塞。task_done(): 表示一个任务已完成。join(): 阻塞,直到队列中的所有任务都被task_done()标记。
示例:多线程任务处理
import threading
import queue
import time
# 创建一个队列
task_queue = queue.Queue()
def worker():
while True:
try:
# 从队列获取任务,设置超时以避免永久阻塞
task = task_queue.get(timeout=1)
print(f"线程 {threading.current_thread().name} 正在处理任务: {task}")
time.sleep(1) # 模拟处理时间
print(f"线程 {threading.current_thread().name} 完成任务: {task}")
task_queue.task_done() # 标记任务完成
except queue.Empty:
print(f"线程 {threading.current_thread().name} 队列为空,退出...")
break
# 创建并启动工作线程
worker_threads = []
for i in range(3):
t = threading.Thread(target=worker, name=f"Worker-{i}")
t.start()
worker_threads.append(t)
# 主线程(生产者)向队列中添加任务
for i in range(10):
task_queue.put(f"任务-{i}")
# 等待所有任务被处理
task_queue.join()
print("所有任务已完成!")
# 通知工作线程退出
for t in worker_threads:
t.join()
print("主程序结束")
一个完整的示例:多线程下载器
这个示例将展示如何使用 threading 和 queue 来并行下载多个 URL。
import threading
import queue
import requests
import time
# 1. 准备任务
urls = [
"http://example.com",
"http://example.org",
"http://example.net",
"https://www.python.org",
"https://github.com"
]
# 2. 创建线程安全的任务队列
download_queue = queue.Queue()
for url in urls:
download_queue.put(url)
# 3. 定义下载函数(消费者)
def download_worker():
while True:
try:
url = download_queue.get_nowait() # 不阻塞,如果队列为空则抛出 Empty 异常
print(f"线程 {threading.current_thread().name} 开始下载: {url}")
try:
response = requests.get(url, timeout=5)
filename = url.split("/")[-1] or "index.html"
with open(filename, "w", encoding="utf-8") as f:
f.write(response.text)
print(f"线程 {threading.current_thread().name} 下载成功: {filename}")
except Exception as e:
print(f"线程 {threading.current_thread().name} 下载失败 {url}: {e}")
finally:
download_queue.task_done()
except queue.Empty:
print(f"线程 {threading.current_thread().name} 所有任务下载完毕,退出。")
break
# 4. 创建并启动工作线程
NUM_WORKERS = 3
threads = []
for i in range(NUM_WORKERS):
t = threading.Thread(target=download_worker, name=f"Downloader-{i}")
t.start()
threads.append(t)
# 5. 等待所有任务被处理
download_queue.join()
# 6. 等待所有线程结束
for t in threads:
t.join()
print("所有下载任务完成!")
多线程的优缺点与适用场景
优点
- 提高 I/O 密集型任务效率:在等待 I/O(网络、磁盘)时,CPU 可以切换到其他线程工作,避免资源浪费。
- 提高用户界面响应性:在桌面应用中,耗时操作放在后台线程执行,可以防止界面卡死。
- 简化并发模型:相比于复杂的
multiprocessing,多线程创建和销毁的开销更小,通信也更简单(共享内存)。
缺点
- GIL 限制:这是 Python 多线程最大的瓶颈,下面会详细讲。
- 资源竞争:需要手动使用锁等同步机制,增加了编程的复杂性,容易出错(死锁、活锁)。
- 内存占用:每个线程都有自己独立的栈空间,线程数量过多会消耗大量内存。
适用场景
- 网络爬虫:同时发起多个 HTTP 请求。
- Web 服务器:同时处理多个客户端连接。
- 文件批量处理:同时读写多个文件。
- GUI 应用:在后台执行耗时计算,保持界面流畅。
不适用场景:
- CPU 密集型任务:如科学计算、视频编解码,由于 GIL 的存在,多线程无法利用多核 CPU 的优势,此时应使用
multiprocessing模块。
重要陷阱:GIL (全局解释器锁)
GIL 是什么? GIL 是 CPython 解释器(标准的 Python 实现)的一个互斥锁,它确保在任何时刻,只有一个线程可以执行 Python 的字节码。
GIL 的影响是什么? 这意味着即使在多核 CPU 的机器上,Python 的多线程也无法实现真正的并行执行,对于 CPU 密集型任务,使用多线程并不能带来性能提升,反而因为线程切换的开销而可能更慢。
GIL 不影响 I/O 密集型任务吗?
不影响,因为在执行 I/O 操作时(如 requests.get() 或 time.sleep()),线程会释放 GIL,让其他线程可以运行,这正是为什么多线程对 I/O 密集型任务有效。
如何绕过 GIL?
- 使用
multiprocessing模块:它创建多个独立的进程,每个进程有自己的 Python 解释器和内存空间,可以真正地并行执行,绕过 GIL,但进程间通信比线程间通信复杂得多。 - 使用其他 Python 实现:如 Jython 或 IronPython,它们没有 GIL。
- 使用 C 扩展:将性能瓶颈部分用 C 语言编写,并通过 Python 的 C API 调用。
| 特性 | 描述 |
|---|---|
| 核心用途 | 实现并发,尤其擅长处理I/O 密集型任务。 |
| 核心类 | Thread 是基础,Lock, RLock, Semaphore, Event, Queue 是用于同步和通信的工具。 |
Queue |
是实现生产者-消费者模型的推荐方式,因为它内置了线程安全机制。 |
with lock |
是使用锁的标准、安全的方式,能确保锁一定会被释放。 |
| 最大陷阱 | GIL (全局解释器锁) 限制了 Python 多线程在 CPU 密集型任务上的并行能力。 |
| 适用场景 | 网络请求、文件读写、GUI 应用等。 |
| 不适用场景 | 科学计算、大规模数据处理等 CPU 密集型任务。 |
掌握 threading 模块是成为一名合格的 Python 开发者的必经之路,理解其工作原理、同步机制以及 GIL 的影响,能帮助你写出更高效、更健壮的并发程序。
