我们需要自己编写逻辑来包裹 schedule 的任务,实现当任务失败时,按照一定的策略(如固定间隔、指数退避)重新执行。

下面我将为你介绍几种实现重试的方法,从简单到健壮,并提供完整的代码示例。
简单的固定间隔重试
这是最基础的重试方式,如果任务失败,就等待一个固定的时间后再次尝试,直到成功或达到最大重试次数。
核心思想:
- 创建一个外层函数,它接收原始任务函数、最大重试次数和重试间隔作为参数。
- 在这个外层函数内部,使用一个循环来执行任务。
- 捕获任务函数可能抛出的异常。
- 如果发生异常且重试次数未用完,则等待指定间隔后继续下一次循环。
- 如果成功或重试次数用完,则退出循环。
代码示例:

import schedule
import time
import random
# --- 1. 定义一个可能会失败的任务 ---
def my_task_that_might_fail():
print(f"任务执行中... 时间: {time.ctime()}")
# 模拟随机失败,比如有 50% 的概率失败
if random.random() < 0.5:
print("任务执行失败!")
raise ValueError("模拟一个随机错误")
print("任务执行成功!")
# --- 2. 创建一个带重试逻辑的包装器 ---
def retry_wrapper(task_func, max_retries=3, retry_interval=10):
"""
一个通用的重试包装器。
:param task_func: 要执行的任务函数
:param max_retries: 最大重试次数
:param retry_interval: 重试间隔(秒)
"""
attempts = 0
while attempts <= max_retries:
attempts += 1
try:
print(f"--- 尝试第 {attempts} 次 ---")
task_func()
# 如果任务成功执行,跳出循环
break
except Exception as e:
print(f"第 {attempts} 次尝试失败: {e}")
if attempts <= max_retries:
print(f"将在 {retry_interval} 秒后进行第 {attempts + 1} 次重试...")
time.sleep(retry_interval)
else:
print(f"已达到最大重试次数 ({max_retries}),任务最终失败。")
# 可以选择在这里记录日志或发送通知
# raise # 可以选择重新抛出异常
# --- 3. 使用 schedule ---
# 将我们的包装器函数交给 schedule
schedule.every(30).seconds.do(retry_wrapper, task_func=my_task_that_might_fail, max_retries=3, retry_interval=5)
print("调度器已启动,按 Ctrl+C 停止。")
while True:
schedule.run_pending()
time.sleep(1)
如何运行和观察:
- 运行脚本。
- 你会看到
my_task_that_might_fail每 30 秒被schedule调用一次。 - 每次被调用时,它会进入
retry_wrapper的逻辑。 - 如果任务失败,它会等待 5 秒后重试,最多重试 3 次。
- 如果某次成功了,就不再重试,等待下一次 30 秒的调度周期。
更健壮的指数退避重试
对于网络请求或数据库连接等外部依赖服务,固定间隔重试可能不够高效,更好的策略是“指数退避”,即每次重试的等待时间 exponentially(指数级)增加,这样可以避免在服务短暂不可用时给服务器造成过大压力。
核心思想:
与方法一类似,只是将固定的 retry_interval 替换为一个动态计算的值,通常是 initial_interval * (2 ** (attempt - 1))。
代码示例:

import schedule
import time
import random
# 1. 定义一个可能会失败的任务(和方法一一样)
def my_task_that_might_fail():
print(f"任务执行中... 时间: {time.ctime()}")
if random.random() < 0.6: # 提高失败概率,更容易看到退避效果
print("任务执行失败!")
raise ConnectionError("模拟网络连接超时")
print("任务执行成功!")
# 2. 创建指数退避重试的包装器
def exponential_backoff_retry(task_func, max_retries=5, initial_interval=1):
"""
指数退避重试包装器。
:param task_func: 要执行的任务函数
:param max_retries: 最大重试次数
:param initial_interval: 初始重试间隔(秒)
"""
attempts = 0
while attempts <= max_retries:
attempts += 1
try:
print(f"--- 尝试第 {attempts} 次 ---")
task_func()
break # 成功则退出
except Exception as e:
print(f"第 {attempts} 次尝试失败: {e}")
if attempts <= max_retries:
# 计算等待时间:1, 2, 4, 8, 16...
wait_time = initial_interval * (2 ** (attempts - 1))
print(f"将在 {wait_time} 秒后进行第 {attempts + 1} 次重试...")
time.sleep(wait_time)
else:
print(f"已达到最大重试次数 ({max_retries}),任务最终失败。")
# 3. 使用 schedule
schedule.every(60).seconds.do(exponential_backoff_retry, task_func=my_task_that_might_fail, max_retries=5, initial_interval=2)
print("调度器已启动(指数退避重试),按 Ctrl+C 停止。")
while True:
schedule.run_pending()
time.sleep(1)
如何运行和观察:
- 运行脚本。
- 当任务第一次失败后,它会等待 2 秒(
initial_interval)。 - 第二次失败后,等待 4 秒(
2 * 2^1)。 - 第三次失败后,等待 8 秒(
2 * 2^2),以此类推。
结合日志和更高级的库(tenacity)
当重试逻辑变得复杂时(需要针对特定异常类型重试、在特定条件下停止重试等),手动编写重试代码会变得繁琐,这时,使用专门的重试库如 tenacity 是更好的选择。
tenacity 是一个非常强大和灵活的 Python 重试库,可以轻松实现各种复杂的重试策略。
核心思想:
使用 tenacity 的装饰器或重试器来修饰你的任务函数,然后将这个被“增强”后的函数交给 schedule。
首先安装 tenacity:
pip install tenacity
代码示例:
import schedule
import time
import random
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
# 1. 定义任务函数
# 注意:我们直接修饰原始任务,而不是在 schedule 外面包装
@retry(
stop=stop_after_attempt(5), # 最多重试5次
wait=wait_exponential(multiplier=1, min=4, max=10), # 指数退避,基础间隔1秒,最小4秒,最大10秒
retry=retry_if_exception_type((ConnectionError, TimeoutError)) # 只对网络类错误重试
)
def my_task_with_tenacity():
print(f"任务执行中... 时间: {time.ctime()}")
if random.random() < 0.7:
# 模拟不同类型的错误
if random.random() < 0.5:
raise ConnectionError("模拟网络连接错误")
else:
raise ValueError("这是一个数据错误,不应该重试")
print("任务执行成功!")
# 2. 使用 schedule
# 直接将 tenacity 装饰过的函数传给 schedule
schedule.every(45).seconds.do(my_task_with_tenacity)
print("调度器已启动(使用 tenacity 库),按 Ctrl+C 停止。")
while True:
schedule.run_pending()
time.sleep(1)
代码解析:
@retry(...): 这是一个装饰器,它会修改my_task_with_tenacity的行为。stop=stop_after_attempt(5): 设置重试停止的条件,这里是最多尝试 5 次(包括第一次)。wait=wait_exponential(...): 设置重试的等待策略,这里使用指数退避,multiplier是基础乘数,min和max限制了等待时间的范围,使其不至于过长或过短。retry=retry_if_exception_type(...): 设置重试的条件,这里只有当异常是ConnectionError或TimeoutError时才重试,如果是ValueError,它将立即停止重试并抛出异常。
总结与选择
| 方法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 简单重试 | 实现简单,无需额外依赖 | 不够灵活,无法处理复杂重试逻辑 | 简单的内部任务,失败原因单一,且不希望引入新依赖。 |
| 指数退避 | 更高效,能更好地保护外部服务 | 代码比方法一复杂 | 网络请求、API调用、数据库连接等与外部服务交互的任务。 |
tenacity库 |
功能最强大、最灵活,代码清晰,可读性好 | 需要额外安装一个库 | 对重试逻辑有复杂要求的场景,如特定异常重试、动态停止条件、重试回调等,是生产环境中的推荐做法。 |
对于大多数项目,我强烈推荐使用方法三(tenacity),因为它将重试逻辑和业务逻辑清晰地分离开来,使得代码更易于维护和扩展。
