杰瑞科技汇

Python schedule 如何实现任务重试机制?

我们需要自己编写逻辑来包裹 schedule 的任务,实现当任务失败时,按照一定的策略(如固定间隔、指数退避)重新执行。

Python schedule 如何实现任务重试机制?-图1
(图片来源网络,侵删)

下面我将为你介绍几种实现重试的方法,从简单到健壮,并提供完整的代码示例。

简单的固定间隔重试

这是最基础的重试方式,如果任务失败,就等待一个固定的时间后再次尝试,直到成功或达到最大重试次数。

核心思想:

  1. 创建一个外层函数,它接收原始任务函数、最大重试次数和重试间隔作为参数。
  2. 在这个外层函数内部,使用一个循环来执行任务。
  3. 捕获任务函数可能抛出的异常。
  4. 如果发生异常且重试次数未用完,则等待指定间隔后继续下一次循环。
  5. 如果成功或重试次数用完,则退出循环。

代码示例:

Python schedule 如何实现任务重试机制?-图2
(图片来源网络,侵删)
import schedule
import time
import random
# --- 1. 定义一个可能会失败的任务 ---
def my_task_that_might_fail():
    print(f"任务执行中... 时间: {time.ctime()}")
    # 模拟随机失败,比如有 50% 的概率失败
    if random.random() < 0.5:
        print("任务执行失败!")
        raise ValueError("模拟一个随机错误")
    print("任务执行成功!")
# --- 2. 创建一个带重试逻辑的包装器 ---
def retry_wrapper(task_func, max_retries=3, retry_interval=10):
    """
    一个通用的重试包装器。
    :param task_func: 要执行的任务函数
    :param max_retries: 最大重试次数
    :param retry_interval: 重试间隔(秒)
    """
    attempts = 0
    while attempts <= max_retries:
        attempts += 1
        try:
            print(f"--- 尝试第 {attempts} 次 ---")
            task_func()
            # 如果任务成功执行,跳出循环
            break
        except Exception as e:
            print(f"第 {attempts} 次尝试失败: {e}")
            if attempts <= max_retries:
                print(f"将在 {retry_interval} 秒后进行第 {attempts + 1} 次重试...")
                time.sleep(retry_interval)
            else:
                print(f"已达到最大重试次数 ({max_retries}),任务最终失败。")
                # 可以选择在这里记录日志或发送通知
                # raise  # 可以选择重新抛出异常
# --- 3. 使用 schedule ---
# 将我们的包装器函数交给 schedule
schedule.every(30).seconds.do(retry_wrapper, task_func=my_task_that_might_fail, max_retries=3, retry_interval=5)
print("调度器已启动,按 Ctrl+C 停止。")
while True:
    schedule.run_pending()
    time.sleep(1)

如何运行和观察:

  1. 运行脚本。
  2. 你会看到 my_task_that_might_fail 每 30 秒被 schedule 调用一次。
  3. 每次被调用时,它会进入 retry_wrapper 的逻辑。
  4. 如果任务失败,它会等待 5 秒后重试,最多重试 3 次。
  5. 如果某次成功了,就不再重试,等待下一次 30 秒的调度周期。

更健壮的指数退避重试

对于网络请求或数据库连接等外部依赖服务,固定间隔重试可能不够高效,更好的策略是“指数退避”,即每次重试的等待时间 exponentially(指数级)增加,这样可以避免在服务短暂不可用时给服务器造成过大压力。

核心思想: 与方法一类似,只是将固定的 retry_interval 替换为一个动态计算的值,通常是 initial_interval * (2 ** (attempt - 1))

代码示例:

Python schedule 如何实现任务重试机制?-图3
(图片来源网络,侵删)
import schedule
import time
import random
# 1. 定义一个可能会失败的任务(和方法一一样)
def my_task_that_might_fail():
    print(f"任务执行中... 时间: {time.ctime()}")
    if random.random() < 0.6: # 提高失败概率,更容易看到退避效果
        print("任务执行失败!")
        raise ConnectionError("模拟网络连接超时")
    print("任务执行成功!")
# 2. 创建指数退避重试的包装器
def exponential_backoff_retry(task_func, max_retries=5, initial_interval=1):
    """
    指数退避重试包装器。
    :param task_func: 要执行的任务函数
    :param max_retries: 最大重试次数
    :param initial_interval: 初始重试间隔(秒)
    """
    attempts = 0
    while attempts <= max_retries:
        attempts += 1
        try:
            print(f"--- 尝试第 {attempts} 次 ---")
            task_func()
            break # 成功则退出
        except Exception as e:
            print(f"第 {attempts} 次尝试失败: {e}")
            if attempts <= max_retries:
                # 计算等待时间:1, 2, 4, 8, 16...
                wait_time = initial_interval * (2 ** (attempts - 1))
                print(f"将在 {wait_time} 秒后进行第 {attempts + 1} 次重试...")
                time.sleep(wait_time)
            else:
                print(f"已达到最大重试次数 ({max_retries}),任务最终失败。")
# 3. 使用 schedule
schedule.every(60).seconds.do(exponential_backoff_retry, task_func=my_task_that_might_fail, max_retries=5, initial_interval=2)
print("调度器已启动(指数退避重试),按 Ctrl+C 停止。")
while True:
    schedule.run_pending()
    time.sleep(1)

如何运行和观察:

  1. 运行脚本。
  2. 当任务第一次失败后,它会等待 2 秒(initial_interval)。
  3. 第二次失败后,等待 4 秒(2 * 2^1)。
  4. 第三次失败后,等待 8 秒(2 * 2^2),以此类推。

结合日志和更高级的库(tenacity

当重试逻辑变得复杂时(需要针对特定异常类型重试、在特定条件下停止重试等),手动编写重试代码会变得繁琐,这时,使用专门的重试库如 tenacity 是更好的选择。

tenacity 是一个非常强大和灵活的 Python 重试库,可以轻松实现各种复杂的重试策略。

核心思想: 使用 tenacity 的装饰器或重试器来修饰你的任务函数,然后将这个被“增强”后的函数交给 schedule

首先安装 tenacity

pip install tenacity

代码示例:

import schedule
import time
import random
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
# 1. 定义任务函数
# 注意:我们直接修饰原始任务,而不是在 schedule 外面包装
@retry(
    stop=stop_after_attempt(5),                # 最多重试5次
    wait=wait_exponential(multiplier=1, min=4, max=10), # 指数退避,基础间隔1秒,最小4秒,最大10秒
    retry=retry_if_exception_type((ConnectionError, TimeoutError)) # 只对网络类错误重试
)
def my_task_with_tenacity():
    print(f"任务执行中... 时间: {time.ctime()}")
    if random.random() < 0.7:
        # 模拟不同类型的错误
        if random.random() < 0.5:
            raise ConnectionError("模拟网络连接错误")
        else:
            raise ValueError("这是一个数据错误,不应该重试")
    print("任务执行成功!")
# 2. 使用 schedule
# 直接将 tenacity 装饰过的函数传给 schedule
schedule.every(45).seconds.do(my_task_with_tenacity)
print("调度器已启动(使用 tenacity 库),按 Ctrl+C 停止。")
while True:
    schedule.run_pending()
    time.sleep(1)

代码解析:

  • @retry(...): 这是一个装饰器,它会修改 my_task_with_tenacity 的行为。
  • stop=stop_after_attempt(5): 设置重试停止的条件,这里是最多尝试 5 次(包括第一次)。
  • wait=wait_exponential(...): 设置重试的等待策略,这里使用指数退避,multiplier 是基础乘数,minmax 限制了等待时间的范围,使其不至于过长或过短。
  • retry=retry_if_exception_type(...): 设置重试的条件,这里只有当异常是 ConnectionErrorTimeoutError 时才重试,如果是 ValueError,它将立即停止重试并抛出异常。

总结与选择

方法 优点 缺点 适用场景
简单重试 实现简单,无需额外依赖 不够灵活,无法处理复杂重试逻辑 简单的内部任务,失败原因单一,且不希望引入新依赖。
指数退避 更高效,能更好地保护外部服务 代码比方法一复杂 网络请求、API调用、数据库连接等与外部服务交互的任务。
tenacity 功能最强大、最灵活,代码清晰,可读性好 需要额外安装一个库 对重试逻辑有复杂要求的场景,如特定异常重试、动态停止条件、重试回调等,是生产环境中的推荐做法。

对于大多数项目,我强烈推荐使用方法三(tenacity,因为它将重试逻辑和业务逻辑清晰地分离开来,使得代码更易于维护和扩展。

分享:
扫描分享到社交APP
上一篇
下一篇