杰瑞科技汇

Python线程threading如何高效使用?

目录

  1. 为什么需要多线程?
  2. threading 模块核心概念
    • Thread 类:创建和管理线程
    • Lock (锁):解决资源竞争问题
    • RLock (可重入锁):特殊的锁
    • Semaphore (信号量):控制资源访问数量
    • Event (事件):线程间通信
    • Queue (队列):线程安全的任务队列
  3. 一个完整的示例:多线程下载器
  4. 多线程的优缺点与适用场景
  5. 重要陷阱:GIL (全局解释器锁)

为什么需要多线程?

想象一下你正在用电脑同时做三件事:

Python线程threading如何高效使用?-图1
(图片来源网络,侵删)
  • 听音乐
  • 写文档
  • 下载文件

你的 CPU 核心只有一个,但它可以在这些任务之间快速切换,让你感觉它们在“进行,这就是并发

在编程中,多线程就是让一个程序能够同时执行多个任务,这对于I/O 密集型任务尤其有效,比如网络请求、文件读写、数据库操作等,在这些任务中,程序大部分时间都在等待外部响应,此时切换到其他线程执行,可以大大提高 CPU 的利用率。

多线程的核心目的是:提高程序的执行效率和响应速度。


threading 模块核心概念

threading 模块是 Python 内置的用于多线程编程的模块,我们先来看最核心的 Thread 类。

Python线程threading如何高效使用?-图2
(图片来源网络,侵删)

Thread 类:创建和管理线程

创建线程有两种主要方式:

继承 Thread 类 (面向对象)

import threading
import time
class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    def run(self):
        # 线程要执行的代码放在这里
        print(f"线程 {self.name} 开始运行")
        time.sleep(2)  # 模拟耗时操作
        print(f"线程 {self.name} 运行结束")
# 创建线程实例
thread1 = MyThread("线程-A")
thread2 = MyThread("线程-B")
# 启动线程 (这会调用 run 方法)
thread1.start()
thread2.start()
# 等待所有线程执行完毕 (主线程会在这里等待)
thread1.join()
thread2.join()
print("所有线程都执行完毕,主程序结束")

传入函数 (更简洁)

import threading
import time
def worker_task(name):
    print(f"线程 {name} 开始运行")
    time.sleep(2)
    print(f"线程 {name} 运行结束")
# 创建线程实例,传入目标函数和参数
# 注意:参数必须是一个元组,如果只有一个参数,后面要加逗号
thread1 = threading.Thread(target=worker_task, args=("线程-C",))
thread2 = threading.Thread(target=worker_task, args=("线程-D",))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("所有线程都执行完毕,主程序结束")

常用 Thread 方法:

  • start(): 启动线程,调用 run() 方法。
  • run(): 线程的执行体,可以被重写。
  • join(timeout=None): 阻塞当前线程,直到被调用的线程执行完毕。timeout 是可选的超时时间。
  • is_alive(): 检查线程是否还在运行。

Lock (锁):解决资源竞争问题

当多个线程同时读写同一个共享资源(比如一个变量、一个文件)时,会发生竞态条件,导致数据不一致,锁是用来保护共享资源,确保同一时间只有一个线程能访问它。

经典例子:银行账户取款

import threading
balance = 1000  # 共享资源
def withdraw(amount):
    global balance
    # 检查余额是否足够
    if balance >= amount:
        # 模拟网络延迟,让问题更容易发生
        threading.Event().wait(0.001) 
        balance -= amount
        print(f"取款 {amount} 元,剩余 {balance} 元")
    else:
        print(f"余额不足,无法取款 {amount} 元")
threads = []
for _ in range(10):
    t = threading.Thread(target=withdraw, args=(100,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"最终余额: {balance}") # 预期是 900,但结果可能小于 900

使用 Lock 修复:

import threading
balance = 1000
lock = threading.Lock() # 创建一个锁
def withdraw_with_lock(amount):
    global balance
    # 在访问共享资源前,先获取锁
    # 如果锁被其他线程持有,这里会阻塞,直到锁被释放
    with lock: # 'with' 语句会自动处理 acquire() 和 release()
        if balance >= amount:
            threading.Event().wait(0.001)
            balance -= amount
            print(f"取款 {amount} 元,剩余 {balance} 元")
        else:
            print(f"余额不足,无法取款 {amount} 元")
# ... 后面的代码与上面相同 ...
threads = []
for _ in range(10):
    t = threading.Thread(target=withdraw_with_lock, args=(100,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"最终余额: {balance}") # 现在结果一定是 900

RLock (可重入锁)

RLock (Reentrant Lock) 是一种特殊的锁,同一个线程可以多次获取它,而不会造成死锁,这在递归函数中非常有用。

import threading
lock = threading.RLock()
def recursive_function(depth):
    with lock:
        print(f"递归深度: {depth}")
        if depth > 0:
            recursive_function(depth - 1)
t = threading.Thread(target=recursive_function, args=(5,))
t.start()
t.join()
# 使用普通 Lock 会在这里死锁,因为第一次 'with' 获取了锁,
# 递归调用时,同一个线程再次尝试获取同一个锁,被阻塞。

Semaphore (信号量)

信号量控制可以同时访问某个资源的线程数量,它就像一个有 N 个座位的休息室,只有 N 个人能同时进入,其他人必须在外面等待。

示例:模拟连接池

import threading
import time
# 限制最多3个线程同时访问
semaphore = threading.Semaphore(3)
def worker(worker_id):
    print(f"线程 {worker_id} 尝试获取资源...")
    with semaphore:
        print(f"线程 {worker_id} 已获取资源,开始工作...")
        time.sleep(2) # 模拟工作
    print(f"线程 {worker_id} 释放资源,工作结束")
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

Event (事件)

Event 是一个简单的线程间通信机制,一个线程可以发出一个“事件信号”,其他等待这个信号的线程会被唤醒。

  • event.set(): 设置事件,表示事件发生。
  • event.clear(): 清除事件,表示事件未发生。
  • event.wait(): 阻塞线程,直到事件被设置。

示例:生产者-消费者模型

import threading
import time
event = threading.Event()
def producer():
    print("生产者:准备生产数据...")
    time.sleep(3)
    print("生产者:数据已准备好!")
    event.set() # 通知消费者数据准备好了
def consumer():
    print("消费者:等待数据...")
    event.wait() # 阻塞,直到收到通知
    print("消费者:收到数据,开始处理!")
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
p.join()
c.join()

Queue (队列)

queue.Queue 是线程安全的队列,是生产者-消费者模型的最佳实践,它内部已经处理了锁,你不需要手动加锁。

  • put(item): 向队列中添加一个元素,如果队列满则阻塞。
  • get(): 从队列中获取一个元素,如果队列空则阻塞。
  • task_done(): 表示一个任务已完成。
  • join(): 阻塞,直到队列中的所有任务都被 task_done() 标记。

示例:多线程任务处理

import threading
import queue
import time
# 创建一个队列
task_queue = queue.Queue()
def worker():
    while True:
        try:
            # 从队列获取任务,设置超时以避免永久阻塞
            task = task_queue.get(timeout=1)
            print(f"线程 {threading.current_thread().name} 正在处理任务: {task}")
            time.sleep(1) # 模拟处理时间
            print(f"线程 {threading.current_thread().name} 完成任务: {task}")
            task_queue.task_done() # 标记任务完成
        except queue.Empty:
            print(f"线程 {threading.current_thread().name} 队列为空,退出...")
            break
# 创建并启动工作线程
worker_threads = []
for i in range(3):
    t = threading.Thread(target=worker, name=f"Worker-{i}")
    t.start()
    worker_threads.append(t)
# 主线程(生产者)向队列中添加任务
for i in range(10):
    task_queue.put(f"任务-{i}")
# 等待所有任务被处理
task_queue.join()
print("所有任务已完成!")
# 通知工作线程退出
for t in worker_threads:
    t.join()
print("主程序结束")

一个完整的示例:多线程下载器

这个示例将展示如何使用 threadingqueue 来并行下载多个 URL。

import threading
import queue
import requests
import time
# 1. 准备任务
urls = [
    "http://example.com",
    "http://example.org",
    "http://example.net",
    "https://www.python.org",
    "https://github.com"
]
# 2. 创建线程安全的任务队列
download_queue = queue.Queue()
for url in urls:
    download_queue.put(url)
# 3. 定义下载函数(消费者)
def download_worker():
    while True:
        try:
            url = download_queue.get_nowait() # 不阻塞,如果队列为空则抛出 Empty 异常
            print(f"线程 {threading.current_thread().name} 开始下载: {url}")
            try:
                response = requests.get(url, timeout=5)
                filename = url.split("/")[-1] or "index.html"
                with open(filename, "w", encoding="utf-8") as f:
                    f.write(response.text)
                print(f"线程 {threading.current_thread().name} 下载成功: {filename}")
            except Exception as e:
                print(f"线程 {threading.current_thread().name} 下载失败 {url}: {e}")
            finally:
                download_queue.task_done()
        except queue.Empty:
            print(f"线程 {threading.current_thread().name} 所有任务下载完毕,退出。")
            break
# 4. 创建并启动工作线程
NUM_WORKERS = 3
threads = []
for i in range(NUM_WORKERS):
    t = threading.Thread(target=download_worker, name=f"Downloader-{i}")
    t.start()
    threads.append(t)
# 5. 等待所有任务被处理
download_queue.join()
# 6. 等待所有线程结束
for t in threads:
    t.join()
print("所有下载任务完成!")

多线程的优缺点与适用场景

优点

  1. 提高 I/O 密集型任务效率:在等待 I/O(网络、磁盘)时,CPU 可以切换到其他线程工作,避免资源浪费。
  2. 提高用户界面响应性:在桌面应用中,耗时操作放在后台线程执行,可以防止界面卡死。
  3. 简化并发模型:相比于复杂的 multiprocessing,多线程创建和销毁的开销更小,通信也更简单(共享内存)。

缺点

  1. GIL 限制:这是 Python 多线程最大的瓶颈,下面会详细讲。
  2. 资源竞争:需要手动使用锁等同步机制,增加了编程的复杂性,容易出错(死锁、活锁)。
  3. 内存占用:每个线程都有自己独立的栈空间,线程数量过多会消耗大量内存。

适用场景

  • 网络爬虫:同时发起多个 HTTP 请求。
  • Web 服务器:同时处理多个客户端连接。
  • 文件批量处理:同时读写多个文件。
  • GUI 应用:在后台执行耗时计算,保持界面流畅。

不适用场景

  • CPU 密集型任务:如科学计算、视频编解码,由于 GIL 的存在,多线程无法利用多核 CPU 的优势,此时应使用 multiprocessing 模块。

重要陷阱:GIL (全局解释器锁)

GIL 是什么? GIL 是 CPython 解释器(标准的 Python 实现)的一个互斥锁,它确保在任何时刻,只有一个线程可以执行 Python 的字节码。

GIL 的影响是什么? 这意味着即使在多核 CPU 的机器上,Python 的多线程也无法实现真正的并行执行,对于 CPU 密集型任务,使用多线程并不能带来性能提升,反而因为线程切换的开销而可能更慢。

GIL 不影响 I/O 密集型任务吗? 不影响,因为在执行 I/O 操作时(如 requests.get()time.sleep()),线程会释放 GIL,让其他线程可以运行,这正是为什么多线程对 I/O 密集型任务有效。

如何绕过 GIL?

  1. 使用 multiprocessing 模块:它创建多个独立的进程,每个进程有自己的 Python 解释器和内存空间,可以真正地并行执行,绕过 GIL,但进程间通信比线程间通信复杂得多。
  2. 使用其他 Python 实现:如 Jython 或 IronPython,它们没有 GIL。
  3. 使用 C 扩展:将性能瓶颈部分用 C 语言编写,并通过 Python 的 C API 调用。

特性 描述
核心用途 实现并发,尤其擅长处理I/O 密集型任务。
核心类 Thread 是基础,Lock, RLock, Semaphore, Event, Queue 是用于同步和通信的工具。
Queue 是实现生产者-消费者模型的推荐方式,因为它内置了线程安全机制。
with lock 是使用锁的标准、安全的方式,能确保锁一定会被释放。
最大陷阱 GIL (全局解释器锁) 限制了 Python 多线程在 CPU 密集型任务上的并行能力。
适用场景 网络请求、文件读写、GUI 应用等。
不适用场景 科学计算、大规模数据处理等 CPU 密集型任务。

掌握 threading 模块是成为一名合格的 Python 开发者的必经之路,理解其工作原理、同步机制以及 GIL 的影响,能帮助你写出更高效、更健壮的并发程序。

分享:
扫描分享到社交APP
上一篇
下一篇