杰瑞科技汇

Python Selenium多线程如何高效执行?

为什么需要多线程?

在 Web 自动化测试或爬虫场景中,我们常常需要执行大量重复性任务,

Python Selenium多线程如何高效执行?-图1
(图片来源网络,侵删)
  • 对多个网站进行登录检查。
  • 对一个电商网站的不同商品进行价格监控。
  • 对一个列表中的 URL 进行数据抓取。

如果使用单线程,程序会一个接一个地执行这些任务,效率非常低下。多线程可以让你同时启动多个“任务流”(即线程),每个线程负责处理一个任务,从而极大地缩短总执行时间。

Python 中的多线程实现

Python 提供了 threading 模块来支持多线程,基本用法如下:

  1. 创建一个线程类:继承 threading.Thread,并重写其 run() 方法。run() 方法就是线程要执行的代码。
  2. 创建线程对象:实例化你的线程类。
  3. 启动线程:调用线程对象的 start() 方法,这会告诉 Python 解释器去执行该线程的 run() 方法。
  4. 等待线程结束 (可选):调用主线程的 join() 方法,可以等待子线程执行完毕后再继续执行主线程的后续代码。

Selenium 多线程的最佳实践

在 Selenium 中使用多线程时,有几个极其重要的注意事项:

WebDriver 实例不能跨线程共享

这是最关键的一点,Selenium 的 WebDriver 对象(如 ChromeDriver)是线程不安全的,你不能在一个线程中创建一个 driver,然后把它传递给另一个线程去使用,每个线程都必须独立创建自己的 WebDriver 实例。

Python Selenium多线程如何高效执行?-图2
(图片来源网络,侵删)

错误示范

import threading
from selenium import webdriver
# 全局变量,在多个线程间共享
driver = webdriver.Chrome()
def thread_task():
    try:
        driver.get("https://www.example.com")
        print(driver.title)
    except Exception as e:
        print(f"Error in thread: {e}")
if __name__ == '__main__':
    t1 = threading.Thread(target=thread_task)
    t2 = threading.Thread(target=thread_task)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    driver.quit()

这个例子几乎肯定会失败或导致不可预测的行为,因为两个线程会同时操作同一个 driver 对象。

正确的资源管理

每个线程创建的 WebDriver 实例,在该线程任务结束后,应该被正确关闭,否则,会留下大量无法回收的浏览器进程,导致系统资源耗尽。

一个优雅的做法是使用 try...finally 结构来确保 driver.quit() 一定会被执行。

Python Selenium多线程如何高效执行?-图3
(图片来源网络,侵删)

完整示例:多线程打开不同网站

下面是一个完整且正确的多线程 Selenium 示例,我们将创建多个线程,每个线程独立打开一个不同的网站,打印标题,然后安全地关闭浏览器。

准备工作

  1. 安装必要的库:
    pip install selenium
  2. 确保你已经下载了与你的 Chrome 浏览器版本匹配的 ChromeDriver,并将其路径添加到系统环境变量中,或者直接在代码中指定路径。

代码实现

import threading
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import WebDriverException
# --- 线程任务定义 ---
def visit_website(url, thread_name):
    """
    每个线程要执行的任务:打开指定网站,打印标题,然后关闭浏览器。
    :param url: 要访问的网址
    :param thread_name: 线程名称,用于日志区分
    """
    print(f"[{thread_name}] 开始任务,准备访问: {url}")
    # 每个线程都必须创建自己的 WebDriver 实例
    # 这里使用 Service 对象是更现代的方式
    # 请确保你的 chromedriver.exe 的路径正确,或者已经在系统PATH中
    service = Service(executable_path='chromedriver.exe') 
    driver = webdriver.Chrome(service=service)
    try:
        driver.get(url)
        # 模拟一些操作
        time.sleep(2) 
        title = driver.title
        print(f"[{thread_name}] 成功访问! 网站标题是: '{title}'")
    except WebDriverException as e:
        print(f"[{thread_name}] 访问 {url} 时出错: {e}")
    finally:
        # 确保无论如何,浏览器都会被关闭
        print(f"[{thread_name}] 任务完成,关闭浏览器。")
        driver.quit()
if __name__ == '__main__':
    # --- 主程序 ---
    print("主程序启动,准备创建线程...")
    # 定义一个任务列表
    websites = [
        ("https://www.python.org", "Python-Thread"),
        ("https://www.github.com", "GitHub-Thread"),
        ("https://www.selenium.dev", "Selenium-Thread"),
        ("https://www.example.com", "Example-Thread")
    ]
    threads = []
    # 为每个网站创建并启动一个线程
    for url, name in websites:
        # 创建线程对象
        # target 指定线程要执行的函数
        # args 是一个元组,传递给 target 函数的位置参数
        thread = threading.Thread(target=visit_website, args=(url, name))
        threads.append(thread)
        thread.start()
        print(f"[Main] 已启动线程: {name}")
    print("\n所有线程已启动,主线程等待子线程完成...")
    # 等待所有线程执行完毕
    for thread in threads:
        thread.join()
    print("\n所有子线程已结束,主程序退出。")

如何运行和解读输出

  1. 将上述代码保存为 selenium_multithreading.py
  2. 确保你的 chromedriver.exe 和代码在同一个目录下,或者修改 Service(executable_path=...) 中的路径。
  3. 运行脚本:python selenium_multithreading.py

你将看到类似下面的输出(顺序可能略有不同):

主程序启动,准备创建线程...
[Main] 已启动线程: Python-Thread
[Python-Thread] 开始任务,准备访问: https://www.python.org
[Main] 已启动线程: GitHub-Thread
[GitHub-Thread] 开始任务,准备访问: https://www.github.com
[Main] 已启动线程: Selenium-Thread
[Selenium-Thread] 开始任务,准备访问: https://www.selenium.dev
[Main] 已启动线程: Example-Thread
[Example-Thread] 开始任务,准备访问: https://www.example.com
所有线程已启动,主线程等待子线程完成...
[Python-Thread] 成功访问! 网站标题是: 'Welcome to Python.org'
[Python-Thread] 任务完成,关闭浏览器。
[GitHub-Thread] 成功访问! 网站标题是: 'GitHub: Let’s build from here · GitHub'
[GitHub-Thread] 任务完成,关闭浏览器。
[Selenium-Thread] 成功访问! 网站标题是: 'Selenium'
[Selenium-Thread] 任务完成,关闭浏览器。
[Example-Thread] 成功访问! 网站标题是: 'Example Domain'
[Example-Thread] 任务完成,关闭浏览器。
所有子线程已结束,主程序退出。

从输出中可以清晰地看到:

  • 主线程快速启动了4个子线程。
  • 4个浏览器实例(在后台)几乎同时开始打开各自的网站。
  • 每个线程独立完成自己的任务,并打印出自己的日志。
  • 只有在所有子线程都结束后(join() 完成),主线程的最后一行才会打印。

进阶方案:使用 concurrent.futures.ThreadPoolExecutor

对于更现代和简洁的并发编程,Python 的 concurrent.futures 模块是更好的选择,它提供了更高级的 API,可以让你用更少的代码实现线程池(或进程池)。

ThreadPoolExecutor 会自动管理一个线程池,你只需要提交任务即可。

ThreadPoolExecutor 示例

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import WebDriverException
def visit_website_future(url):
    print(f"开始访问: {url}")
    service = Service(executable_path='chromedriver.exe')
    driver = webdriver.Chrome(service=service)
    try:
        driver.get(url)
        time.sleep(2)
        title = driver.title
        return f"{url} -> {title}"
    except WebDriverException as e:
        return f"访问 {url} 失败: {e}"
    finally:
        driver.quit()
if __name__ == '__main__':
    websites = [
        "https://www.python.org",
        "https://www.github.com",
        "https://www.selenium.dev",
        "https://www.example.com"
    ]
    # 创建一个线程池,最大线程数为4
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交所有任务到线程池
        future_to_url = {executor.submit(visit_website_future, url): url for url in websites}
        print("所有任务已提交到线程池,等待结果...")
        # as_completed 会按照任务完成的顺序返回 future 对象
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result() # 获取任务返回的结果
                print(f"任务完成: {result}")
            except Exception as e:
                print(f"任务 {url} 产生了一个异常: {e}")
    print("\n所有任务处理完毕。")

ThreadPoolExecutor 的优点

  1. 代码更简洁:无需手动创建和管理 Thread 对象。
  2. 资源管理更方便with 语句会确保线程池在使用完毕后被正确关闭。
  3. 结果获取更灵活as_completed 可以让你在任务完成时立即获取结果,而不需要等待所有任务都结束。
  4. 易于扩展:如果想从多线程切换到多进程(ProcessPoolExecutor),只需要改动一行代码即可。
特性 threading.Thread concurrent.futures.ThreadPoolExecutor
复杂度 较低,需要手动管理线程 推荐,更高层,代码更简洁
线程创建 手动 thread = Thread(...) 自动管理,通过 executor.submit()
资源管理 需要手动 join() with 语句自动管理
结果处理 需要通过共享变量或队列获取 通过 future.result() 获取
适用场景 简单、需要精细控制线程行为的场景 大多数并发任务,特别是需要处理返回结果的场景

核心要点回顾:

  1. 切勿共享 WebDriver:每个线程必须创建自己的 driver 实例。
  2. 务必关闭 driver:使用 try...finallywith 语句确保浏览器进程被正确回收。
  3. 优先使用 ThreadPoolExecutor:它提供了更现代、更健壮、更易用的多线程解决方案。
分享:
扫描分享到社交APP
上一篇
下一篇