杰瑞科技汇

python mythread 模块

threading 是 Python 的标准库,用于实现多线程编程,它是对底层 _thread 模块的一个高级封装,提供了更友好、更强大的功能。

python mythread 模块-图1
(图片来源网络,侵删)

为什么需要多线程?(核心概念)

在理解 threading 之前,首先要明白它的核心目的:提高程序的并发处理能力

想象一下你在餐厅点餐:

  • 单线程:你一个人点菜,然后去后厨等,等菜做好了再回来吃,在这个过程中,你什么也做不了,只能干等,这就是“阻塞”。
  • 多线程:你点完菜后,服务员让你先坐下,然后去为其他客人服务,你的“点菜”和“等待”是同时进行的,你可以同时思考、喝点水,而不仅仅是干等,这就是“并发”。

在编程中,多线程特别适合处理 I/O 密集型 任务,

  • 网络请求(等待服务器响应)
  • 文件读写(等待磁盘数据)
  • 数据库查询

在这些任务中,大部分时间程序都在等待外部资源,CPU 处于空闲状态,多线程可以让一个线程在等待时,切换到另一个线程执行计算任务,从而充分利用 CPU 时间,大大提高程序的执行效率。

python mythread 模块-图2
(图片来源网络,侵删)

注意:对于 CPU 密集型 任务(如大量数学计算、图像处理),由于 Python 的 GIL (Global Interpreter Lock) 全局解释器锁的存在,多线程并不能真正实现并行计算(同一时间只有一个线程在运行),反而可能因为线程切换的开销而降低性能,对于 CPU 密集型任务,应该使用 multiprocessing 模块。


threading 模块的核心组件

threading 模块提供了几个关键的类和函数。

1 Thread 类:创建线程

这是最核心的类,用于创建和控制线程。

基本用法:

  1. 定义一个函数:这个函数就是线程要执行的任务。
  2. 创建 Thread 对象:将函数作为 target 参数传入。
  3. 启动线程:调用 start() 方法,这会让线程开始执行 target 函数。
  4. 等待线程结束(可选):调用 join() 方法,主线程会阻塞,直到该线程执行完毕。

示例代码:

import threading
import time
def worker_task():
    """这是线程要执行的任务函数"""
    thread_name = threading.current_thread().name
    print(f"线程 '{thread_name}' 开始工作...")
    time.sleep(2)  # 模拟一个耗时操作,比如I/O等待
    print(f"线程 '{thread_name}' 工作完成!")
# --- 主程序 ---
if __name__ == "__main__":
    print("主线程开始运行")
    # 创建一个 Thread 对象
    # target: 指定线程要执行的函数
    # name: 给线程起一个名字(可选)
    t1 = threading.Thread(target=worker_task, name="Worker-1")
    # 启动线程
    t1.start()
    print("主线程继续执行,不会等待子线程...")
    # 等待线程 t1 执行完毕
    # 如果不加 join(),主程序会继续往下执行,然后直接退出,导致子线程可能还没执行完就被强制终止了。
    t1.join()
    print("主线程结束")

输出结果:

主线程开始运行
线程 'Worker-1' 开始工作...
主线程继续执行,不会等待子线程...
线程 'Worker-1' 工作完成!  # 注意:这一行在2秒后打印
主线程结束

2 Lock (锁):解决资源竞争问题

当多个线程同时读写同一个共享资源(比如一个全局变量)时,会发生“竞态条件”,导致数据不一致。

示例:不加锁的线程安全问题

import threading
# 共享资源
g_counter = 0
def increment_counter():
    global g_counter
    for _ in range(100000):
        g_counter += 1  # 这不是一个原子操作,包含读取、修改、写回三步
threads = []
for i in range(5):
    t = threading.Thread(target=increment_counter)
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"最终计数器的值应该是: 5 * 100000 = 500000")
print(f"实际计数器的值是: {g_counter}") # 结果几乎总是小于500000

如何解决?使用 Lock

Lock 提供了两种状态:锁定 和 未锁定。

  • acquire():尝试获取锁,如果锁已被其他线程获取,则当前线程会阻塞,直到锁被释放。
  • release():释放锁。

将需要原子操作(不可分割)的代码块放在 acquire()release() 之间。

示例:使用 Lock 修复线程安全问题

import threading
g_counter = 0
# 创建一个锁对象
lock = threading.Lock()
def increment_counter_safe():
    global g_counter
    for _ in range(100000):
        # 获取锁
        lock.acquire()
        try:
            g_counter += 1
        finally:
            # 确保锁一定会被释放,即使发生异常
            lock.release()
# 或者更简洁的 with 语句 (推荐)
def increment_counter_with_with():
    global g_counter
    for _ in range(100000):
        with lock: # with 语句会在代码块执行完毕后自动释放锁
            g_counter += 1
# 使用 with 语句的版本
threads = []
for i in range(5):
    t = threading.Thread(target=increment_counter_with_with)
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"最终计数器的值是: {g_counter}") # 输出: 500000

3 RLock (可重入锁)

RLockLock 类似,但它可以被同一个线程多次获取,这在递归函数中非常有用。

import threading
lock = threading.RLock()
def recursive_function(level):
    with lock:
        print(f"递归层级: {level}")
        if level > 0:
            recursive_function(level - 1)
recursive_function(3)
# 不会死锁,因为同一个线程可以多次获取 RLock

4 Semaphore (信号量)

Semaphore 用于控制同时访问某个资源的线程数量,它像一个“停车场”,只允许固定数量的车(线程)进入。

  • acquire():减少内部计数器,如果计数器为0,则阻塞。
  • release():增加内部计数器。

示例:模拟一个只有3个位置的数据库连接池

import threading
import time
# 最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def access_database(user_id):
    with semaphore:
        print(f"用户 {user_id} 正在访问数据库...")
        time.sleep(2) # 模拟数据库操作耗时
        print(f"用户 {user_id} 已释放数据库连接。")
threads = []
for i in range(10):
    t = threading.Thread(target=access_database, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

5 Event (事件)

Event 提供了一种简单的线程间通信机制,一个线程可以发出一个信号,其他线程等待这个信号。

  • set():设置事件,将内部标志位设为 True。
  • clear():清除事件,将内部标志位设为 False。
  • wait():等待事件,如果标志位为 False,则线程阻塞;如果为 True,则立即返回。

示例:生产者-消费者模型

import threading
import time
import random
event = threading.Event()
def producer():
    print("生产者:正在准备数据...")
    time.sleep(random.randint(1, 3))
    print("生产者:数据准备完毕!")
    event.set() # 通知消费者数据准备好了
def consumer():
    print("消费者:等待数据...")
    event.wait() # 阻塞,直到生产者调用 event.set()
    print("消费者:收到数据,开始处理!")
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
p.join()
c.join()

6 Queue (队列)

Queue 是线程间通信的最佳实践,它是一个线程安全的队列,可以安全地在多个线程之间传递数据,完美地解决了生产者和消费者问题。

  • put(item):将数据放入队列,如果队列已满,则阻塞。
  • get():从队列中取出数据,如果队列为空,则阻塞。

示例:使用 Queue 的生产者-消费者

import threading
import time
import queue
import random
# 创建一个线程安全的队列
q = queue.Queue()
def producer():
    for i in range(5):
        item = f"产品-{i}"
        print(f"生产者:生产了 {item}")
        q.put(item)
        time.sleep(random.random())
    print("生产者:生产任务完成!")
def consumer():
    while True:
        try:
            # 设置超时,避免消费者永远阻塞
            item = q.get(timeout=2)
            print(f"消费者:消费了 {item}")
            time.sleep(random.random())
            q.task_done() # 告诉队列,这个任务已经处理完毕
        except queue.Empty:
            print("消费者:队列已空,退出。")
            break
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
p.join() # 等待生产者生产完所有产品
# 等待队列中的所有任务被处理完毕
q.join() 
print("主线程:所有产品已被消费。")
# 消费者线程可能在等待,这里让它退出
# 在实际应用中,可能需要一个更优雅的退出机制

最佳实践和注意事项

  1. 避免共享状态:这是多线程编程的第一原则,尽量让每个线程操作自己的数据,通过 Queue 等方式交换数据,而不是直接共享可变变量。
  2. 使用 with 语句管理锁with lock:lock.acquire() + lock.release() 更安全,因为它能确保锁一定会被释放,即使在代码块中发生了异常。
  3. 警惕 GIL:牢记 GIL 的存在,对于 CPU 密集型任务,多线程不是解决方案,应使用 multiprocessing 模块来利用多核 CPU。
  4. 合理设计线程数量:不要创建成百上千的线程,线程的创建和切换都有开销,对于 I/O 密集型任务,线程数可以设置得比 CPU 核心数多一些(2 * CPU核心数),可以使用 os.cpu_count() 获取 CPU 核心数。
  5. 使用 join() 控制流程:当主线程需要等待所有子任务完成时,使用 join() 是一个好习惯,它可以防止主线程在子任务完成前就退出。
  6. 注意线程的生命周期:线程一旦启动,就不能再启动一次,如果需要重复执行任务,可以考虑创建新的线程或使用线程池。

进阶:ThreadPoolExecutor (线程池)

从 Python 3.2 开始,concurrent.futures 模块提供了 ThreadPoolExecutor,这是更现代、更高级的线程管理方式。

优点:

  • 自动管理线程:你不需要手动创建和销毁线程。
  • 任务提交:可以像提交普通函数一样提交任务。
  • 获取结果:可以方便地获取任务的返回值。
  • 优雅关闭:提供了 shutdown() 方法,可以优雅地关闭线程池,等待所有任务完成。

示例:

from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
    print(f"任务 {n} 开始执行...")
    time.sleep(1)
    return n * n
# 创建一个最大线程数为4的线程池
with ThreadPoolExecutor(max_workers=4) as executor:
    # 提交多个任务
    futures = [executor.submit(task, i) for i in range(8)]
    # 可以等待所有任务完成
    # executor.shutdown(wait=True) # with 语句会自动执行这一步
    # 获取每个任务的结果
    for future in futures:
        try:
            result = future.result()
            print(f"任务结果: {result}")
        except Exception as e:
            print(f"任务出错: {e}")
print("所有任务已完成")

ThreadPoolExecutor 是现代 Python 多线程编程的推荐方式,它封装了底层的复杂性,提供了更简洁的 API。

组件 用途 核心方法
Thread 创建和管理线程 start(), join()
Lock 保护共享资源,防止竞态条件 acquire(), release() (或 with 语句)
RLock 可重入锁,用于递归函数 Lock
Semaphore 控制同时访问资源的线程数 acquire(), release()
Event 线程间简单通信,发信号 set(), clear(), wait()
Queue 线程间安全地传递数据 put(), get(), task_done(), join()
ThreadPoolExecutor 管理线程池,高级抽象 submit(), map(), shutdown()

希望这份详细的指南能帮助你全面理解 Python 的 threading 模块!

分享:
扫描分享到社交APP
上一篇
下一篇