杰瑞科技汇

Python schedule函数如何定时执行任务?

什么是 schedule 库?

schedule 库是一个轻量级的任务调度器,它允许你像写自然语言一样,以非常简洁的方式定义任务的执行频率,你可以轻松地设置一个任务“每天上午 10 点运行”或“每 5 分钟运行一次”。

它的核心思想是“声明式”的:你只需要告诉调度器“做什么”和“何时做”,调度器会负责在正确的时机触发你的任务函数。


安装

在使用之前,你需要先安装它,打开你的终端或命令行,运行:

pip install schedule

基本用法(核心概念)

schedule 的使用非常直观,主要涉及三个步骤:

  1. 定义任务:创建一个你希望定时执行的普通 Python 函数。
  2. 调度任务:使用 schedule 提供的函数(如 schedule.every().day.at())来定义任务的执行计划,并将其与你的任务函数关联起来。
  3. 运行调度器:启动一个无限循环,持续检查是否有任务到时执行,最简单的方式是 while True: schedule.run_pending()

示例:一个简单的定时任务

下面是一个最经典的 "Hello, World!" 式的定时任务示例。

import schedule
import time
# 1. 定义任务函数
def job():
    print("I'm working...")
# 2. 调度任务
# 每隔 10 分钟执行一次 job 函数
schedule.every(10).minutes.do(job)
# 每隔一小时执行一次
# schedule.every().hour.do(job)
# 每天在特定时间执行,10:30
# schedule.every().day.at("10:30").do(job)
# 每周一执行一次
# schedule.every().monday.do(job)
# 每周三的 13:15 执行一次
# schedule.every().wednesday.at("13:15").do(job)
# 3. 运行调度器
print("调度器已启动...")
while True:
    # run_pending() 会检查所有已调度的任务,如果到了执行时间,就运行它
    schedule.run_pending()
    # 让程序休眠 1 秒,避免 CPU 过载
    time.sleep(1)

代码解释:

  • import scheduleimport time:导入必要的库。
  • def job(): ...:这是我们想要定时执行的函数。
  • schedule.every(10).minutes.do(job):这是核心调度语句,它表示“ 10 分钟 job 这个任务”。
  • while True: ...:这是一个无限循环,是调度器运行的基础。
  • schedule.run_pending():这是调度器的“心跳”,它会检查所有计划好的任务,看看是否有任务已经准备好可以执行了。
  • time.sleep(1):在每次检查后让程序暂停 1 秒,这是非常重要的,否则 while 循环会以极高的速度运行,消耗大量 CPU 资源。

schedule 的丰富语法

schedule 提供了非常灵活的语法来定义各种时间周期。

1 基于时间间隔的调度

import schedule
import time
def job():
    print("执行基于间隔的任务...")
# 每隔 N 秒/分钟/小时/天 执行
schedule.every(10).seconds.do(job)
schedule.every(2).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.do(job)

2 基于具体时间的调度

import schedule
import time
def job():
    print("在特定时间执行任务...")
# 每天在特定时间执行
schedule.every().day.at("10:30").do(job)
schedule.every().day.at("22:00").do(job)
# 每周在特定时间执行
schedule.every().monday.at("13:15").do(job)
schedule.every().tuesday.at("15:45").do(job)
schedule.every().wednesday.at("11:00").do(job)
schedule.every().thursday.at("09:00").do(job)
schedule.every().friday.at("16:00").do(job)
# ... 依此类推

3 基于工作日的调度

import schedule
import time
def job():
    print("在特定工作日执行任务...")
# 每周一
schedule.every().monday.do(job)
# 每周二至周五
schedule.every().tuesday.do(job)
schedule.every().wednesday.do(job)
schedule.every().thursday.do(job)
schedule.every().friday.do(job)
# 每周六和周日
schedule.every().saturday.do(job)
schedule.every().sunday.do(job)

4 组合使用

你可以组合这些语法来创建更复杂的调度规则。schedule.every().monday.at("09:00").do(job) 表示“每周一早上 9 点”。


高级用法

1 传递参数给任务函数

如果你的任务函数需要参数,可以使用 do 方法的 *args**kwargs

import schedule
import time
def greet(name):
    print(f"你好, {name}!")
# 使用位置参数
schedule.every(2).seconds.do(greet, "张三")
# 使用关键字参数
def greet_with_title(title, name):
    print(f"你好, {title} {name}!")
schedule.every(4).seconds.do(greet_with_title, title="Mr.", name="李四")
while True:
    schedule.run_pending()
    time.sleep(1)

2 取消任务

你可以通过 schedule.cancel_job(job) 来取消一个已经调度的任务,你需要用 schedule.every().do(...) 的返回值来获取一个 Job 对象。

import schedule
import time
def job():
    print("这个任务只运行一次。")
# 获取 job 对象
job_obj = schedule.every().second.do(job)
# 运行 5 次后取消
count = 0
while True:
    schedule.run_pending()
    time.sleep(1)
    count += 1
    if count >= 5:
        print("任务已取消。")
        schedule.cancel_job(job_obj) # 取消任务
        break

3 获取所有任务

你可以查看当前所有已调度的任务。

import schedule
def job1():
    pass
def job2():
    pass
schedule.every().day.do(job1)
schedule.every().hour.do(job2)
# 获取所有任务
jobs = schedule.get_jobs()
print(f"当前有 {len(jobs)} 个任务。")
for job in jobs:
    print(f"- 任务: {job}, 下次运行时间: {job.next_run}")

4 清除所有任务

import schedule
def some_job():
    print("运行中...")
schedule.every().second.do(some_job)
# 清除所有任务
schedule.clear()
print("所有任务已清除。")

重要注意事项和最佳实践

1 错误处理

如果任务函数内部发生未捕获的异常,整个调度循环可能会中断,强烈建议在任务函数内部使用 try...except 块来捕获和处理异常。

def unreliable_job():
    try:
        # 这里可能出错的代码
        result = 1 / 0
    except Exception as e:
        print(f"任务执行出错: {e}")
schedule.every().second.do(unreliable_job)

2 多线程(推荐)

schedule 是单线程的,如果你的任务执行时间很长,它会阻塞整个调度循环,导致后续任务无法按时执行,一个任务需要 5 分钟才能完成,但下一个任务计划每 1 分钟执行一次,那么这个任务就会被严重延迟。

解决这个问题的最佳方法是使用多线程。

import schedule
import time
import threading
def long_running_task():
    print("开始一个长时间任务...")
    time.sleep(10) # 模拟任务执行 10 秒
    print("长时间任务结束。")
def run_threaded(job_func):
    """在独立线程中运行任务"""
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
# 调度一个长时间任务,让它在新线程中运行
schedule.every(5).seconds.do(run_threaded, long_running_task)
print("多线程调度器已启动...")
while True:
    schedule.run_pending()
    time.sleep(1)

在这个例子中,long_running_task 会在一个单独的线程中执行,主线程的 while True 循环不会被阻塞,可以继续检查和执行其他任务。

3 生产环境下的替代方案

对于生产环境,schedule 库虽然简单,但可能不够健壮,更强大的替代方案包括:

  • APScheduler (Advanced Python Scheduler):功能非常强大,支持多种后端(如内存、数据库、Redis)和多种调度器(如日期、间隔、 cron),它是生产环境中的首选。
  • Celery:一个专注于分布式任务队列的库,非常适合处理需要异步、可靠、可扩展的后台任务。
  • 系统级调度器:如 Linux 的 cron 或 Windows 的任务计划程序,你可以让你的 Python 脚本作为 cron 作业来运行,这通常是最稳定可靠的方式。

特性 描述
优点 语法简洁、易于上手、轻量级,非常适合快速原型开发、个人项目或简单的后台任务。
缺点 单线程设计,长时间任务会阻塞调度;不适合高精度、高可靠性的生产环境。
适用场景 爬虫定时运行、数据定期备份、发送周期性通知、个人自动化脚本等。
核心用法 schedule.every(...).do(...) + while True: schedule.run_pending()
最佳实践 在任务函数中加入 try...except,对于耗时任务使用多线程。

希望这份详细的指南能帮助你完全掌握 Python 的 schedule 函数!

分享:
扫描分享到社交APP
上一篇
下一篇