为什么需要多线程?
在 Web 自动化测试或爬虫场景中,我们常常需要执行大量重复性任务,

- 对多个网站进行登录检查。
- 对一个电商网站的不同商品进行价格监控。
- 对一个列表中的 URL 进行数据抓取。
如果使用单线程,程序会一个接一个地执行这些任务,效率非常低下。多线程可以让你同时启动多个“任务流”(即线程),每个线程负责处理一个任务,从而极大地缩短总执行时间。
Python 中的多线程实现
Python 提供了 threading 模块来支持多线程,基本用法如下:
- 创建一个线程类:继承
threading.Thread,并重写其run()方法。run()方法就是线程要执行的代码。 - 创建线程对象:实例化你的线程类。
- 启动线程:调用线程对象的
start()方法,这会告诉 Python 解释器去执行该线程的run()方法。 - 等待线程结束 (可选):调用主线程的
join()方法,可以等待子线程执行完毕后再继续执行主线程的后续代码。
Selenium 多线程的最佳实践
在 Selenium 中使用多线程时,有几个极其重要的注意事项:
WebDriver 实例不能跨线程共享
这是最关键的一点,Selenium 的 WebDriver 对象(如 ChromeDriver)是线程不安全的,你不能在一个线程中创建一个 driver,然后把它传递给另一个线程去使用,每个线程都必须独立创建自己的 WebDriver 实例。

错误示范:
import threading
from selenium import webdriver
# 全局变量,在多个线程间共享
driver = webdriver.Chrome()
def thread_task():
try:
driver.get("https://www.example.com")
print(driver.title)
except Exception as e:
print(f"Error in thread: {e}")
if __name__ == '__main__':
t1 = threading.Thread(target=thread_task)
t2 = threading.Thread(target=thread_task)
t1.start()
t2.start()
t1.join()
t2.join()
driver.quit()
这个例子几乎肯定会失败或导致不可预测的行为,因为两个线程会同时操作同一个 driver 对象。
正确的资源管理
每个线程创建的 WebDriver 实例,在该线程任务结束后,应该被正确关闭,否则,会留下大量无法回收的浏览器进程,导致系统资源耗尽。
一个优雅的做法是使用 try...finally 结构来确保 driver.quit() 一定会被执行。

完整示例:多线程打开不同网站
下面是一个完整且正确的多线程 Selenium 示例,我们将创建多个线程,每个线程独立打开一个不同的网站,打印标题,然后安全地关闭浏览器。
准备工作
- 安装必要的库:
pip install selenium
- 确保你已经下载了与你的 Chrome 浏览器版本匹配的
ChromeDriver,并将其路径添加到系统环境变量中,或者直接在代码中指定路径。
代码实现
import threading
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import WebDriverException
# --- 线程任务定义 ---
def visit_website(url, thread_name):
"""
每个线程要执行的任务:打开指定网站,打印标题,然后关闭浏览器。
:param url: 要访问的网址
:param thread_name: 线程名称,用于日志区分
"""
print(f"[{thread_name}] 开始任务,准备访问: {url}")
# 每个线程都必须创建自己的 WebDriver 实例
# 这里使用 Service 对象是更现代的方式
# 请确保你的 chromedriver.exe 的路径正确,或者已经在系统PATH中
service = Service(executable_path='chromedriver.exe')
driver = webdriver.Chrome(service=service)
try:
driver.get(url)
# 模拟一些操作
time.sleep(2)
title = driver.title
print(f"[{thread_name}] 成功访问! 网站标题是: '{title}'")
except WebDriverException as e:
print(f"[{thread_name}] 访问 {url} 时出错: {e}")
finally:
# 确保无论如何,浏览器都会被关闭
print(f"[{thread_name}] 任务完成,关闭浏览器。")
driver.quit()
if __name__ == '__main__':
# --- 主程序 ---
print("主程序启动,准备创建线程...")
# 定义一个任务列表
websites = [
("https://www.python.org", "Python-Thread"),
("https://www.github.com", "GitHub-Thread"),
("https://www.selenium.dev", "Selenium-Thread"),
("https://www.example.com", "Example-Thread")
]
threads = []
# 为每个网站创建并启动一个线程
for url, name in websites:
# 创建线程对象
# target 指定线程要执行的函数
# args 是一个元组,传递给 target 函数的位置参数
thread = threading.Thread(target=visit_website, args=(url, name))
threads.append(thread)
thread.start()
print(f"[Main] 已启动线程: {name}")
print("\n所有线程已启动,主线程等待子线程完成...")
# 等待所有线程执行完毕
for thread in threads:
thread.join()
print("\n所有子线程已结束,主程序退出。")
如何运行和解读输出
- 将上述代码保存为
selenium_multithreading.py。 - 确保你的
chromedriver.exe和代码在同一个目录下,或者修改Service(executable_path=...)中的路径。 - 运行脚本:
python selenium_multithreading.py
你将看到类似下面的输出(顺序可能略有不同):
主程序启动,准备创建线程...
[Main] 已启动线程: Python-Thread
[Python-Thread] 开始任务,准备访问: https://www.python.org
[Main] 已启动线程: GitHub-Thread
[GitHub-Thread] 开始任务,准备访问: https://www.github.com
[Main] 已启动线程: Selenium-Thread
[Selenium-Thread] 开始任务,准备访问: https://www.selenium.dev
[Main] 已启动线程: Example-Thread
[Example-Thread] 开始任务,准备访问: https://www.example.com
所有线程已启动,主线程等待子线程完成...
[Python-Thread] 成功访问! 网站标题是: 'Welcome to Python.org'
[Python-Thread] 任务完成,关闭浏览器。
[GitHub-Thread] 成功访问! 网站标题是: 'GitHub: Let’s build from here · GitHub'
[GitHub-Thread] 任务完成,关闭浏览器。
[Selenium-Thread] 成功访问! 网站标题是: 'Selenium'
[Selenium-Thread] 任务完成,关闭浏览器。
[Example-Thread] 成功访问! 网站标题是: 'Example Domain'
[Example-Thread] 任务完成,关闭浏览器。
所有子线程已结束,主程序退出。
从输出中可以清晰地看到:
- 主线程快速启动了4个子线程。
- 4个浏览器实例(在后台)几乎同时开始打开各自的网站。
- 每个线程独立完成自己的任务,并打印出自己的日志。
- 只有在所有子线程都结束后(
join()完成),主线程的最后一行才会打印。
进阶方案:使用 concurrent.futures.ThreadPoolExecutor
对于更现代和简洁的并发编程,Python 的 concurrent.futures 模块是更好的选择,它提供了更高级的 API,可以让你用更少的代码实现线程池(或进程池)。
ThreadPoolExecutor 会自动管理一个线程池,你只需要提交任务即可。
ThreadPoolExecutor 示例
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import WebDriverException
def visit_website_future(url):
print(f"开始访问: {url}")
service = Service(executable_path='chromedriver.exe')
driver = webdriver.Chrome(service=service)
try:
driver.get(url)
time.sleep(2)
title = driver.title
return f"{url} -> {title}"
except WebDriverException as e:
return f"访问 {url} 失败: {e}"
finally:
driver.quit()
if __name__ == '__main__':
websites = [
"https://www.python.org",
"https://www.github.com",
"https://www.selenium.dev",
"https://www.example.com"
]
# 创建一个线程池,最大线程数为4
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交所有任务到线程池
future_to_url = {executor.submit(visit_website_future, url): url for url in websites}
print("所有任务已提交到线程池,等待结果...")
# as_completed 会按照任务完成的顺序返回 future 对象
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result() # 获取任务返回的结果
print(f"任务完成: {result}")
except Exception as e:
print(f"任务 {url} 产生了一个异常: {e}")
print("\n所有任务处理完毕。")
ThreadPoolExecutor 的优点
- 代码更简洁:无需手动创建和管理
Thread对象。 - 资源管理更方便:
with语句会确保线程池在使用完毕后被正确关闭。 - 结果获取更灵活:
as_completed可以让你在任务完成时立即获取结果,而不需要等待所有任务都结束。 - 易于扩展:如果想从多线程切换到多进程(
ProcessPoolExecutor),只需要改动一行代码即可。
| 特性 | threading.Thread |
concurrent.futures.ThreadPoolExecutor |
|---|---|---|
| 复杂度 | 较低,需要手动管理线程 | 推荐,更高层,代码更简洁 |
| 线程创建 | 手动 thread = Thread(...) |
自动管理,通过 executor.submit() |
| 资源管理 | 需要手动 join() |
with 语句自动管理 |
| 结果处理 | 需要通过共享变量或队列获取 | 通过 future.result() 获取 |
| 适用场景 | 简单、需要精细控制线程行为的场景 | 大多数并发任务,特别是需要处理返回结果的场景 |
核心要点回顾:
- 切勿共享
WebDriver:每个线程必须创建自己的driver实例。 - 务必关闭
driver:使用try...finally或with语句确保浏览器进程被正确回收。 - 优先使用
ThreadPoolExecutor:它提供了更现代、更健壮、更易用的多线程解决方案。
