什么是 schedule 库?
schedule 库是一个轻量级的任务调度器,它允许你像写自然语言一样,以非常简洁的方式定义任务的执行频率,你可以轻松地设置一个任务“每天上午 10 点运行”或“每 5 分钟运行一次”。
它的核心思想是“声明式”的:你只需要告诉调度器“做什么”和“何时做”,调度器会负责在正确的时机触发你的任务函数。
安装
在使用之前,你需要先安装它,打开你的终端或命令行,运行:
pip install schedule
基本用法(核心概念)
schedule 的使用非常直观,主要涉及三个步骤:
- 定义任务:创建一个你希望定时执行的普通 Python 函数。
- 调度任务:使用
schedule提供的函数(如schedule.every().day.at())来定义任务的执行计划,并将其与你的任务函数关联起来。 - 运行调度器:启动一个无限循环,持续检查是否有任务到时执行,最简单的方式是
while True: schedule.run_pending()。
示例:一个简单的定时任务
下面是一个最经典的 "Hello, World!" 式的定时任务示例。
import schedule
import time
# 1. 定义任务函数
def job():
print("I'm working...")
# 2. 调度任务
# 每隔 10 分钟执行一次 job 函数
schedule.every(10).minutes.do(job)
# 每隔一小时执行一次
# schedule.every().hour.do(job)
# 每天在特定时间执行,10:30
# schedule.every().day.at("10:30").do(job)
# 每周一执行一次
# schedule.every().monday.do(job)
# 每周三的 13:15 执行一次
# schedule.every().wednesday.at("13:15").do(job)
# 3. 运行调度器
print("调度器已启动...")
while True:
# run_pending() 会检查所有已调度的任务,如果到了执行时间,就运行它
schedule.run_pending()
# 让程序休眠 1 秒,避免 CPU 过载
time.sleep(1)
代码解释:
import schedule和import time:导入必要的库。def job(): ...:这是我们想要定时执行的函数。schedule.every(10).minutes.do(job):这是核心调度语句,它表示“每 10 分钟 做 job 这个任务”。while True: ...:这是一个无限循环,是调度器运行的基础。schedule.run_pending():这是调度器的“心跳”,它会检查所有计划好的任务,看看是否有任务已经准备好可以执行了。time.sleep(1):在每次检查后让程序暂停 1 秒,这是非常重要的,否则while循环会以极高的速度运行,消耗大量 CPU 资源。
schedule 的丰富语法
schedule 提供了非常灵活的语法来定义各种时间周期。
1 基于时间间隔的调度
import schedule
import time
def job():
print("执行基于间隔的任务...")
# 每隔 N 秒/分钟/小时/天 执行
schedule.every(10).seconds.do(job)
schedule.every(2).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.do(job)
2 基于具体时间的调度
import schedule
import time
def job():
print("在特定时间执行任务...")
# 每天在特定时间执行
schedule.every().day.at("10:30").do(job)
schedule.every().day.at("22:00").do(job)
# 每周在特定时间执行
schedule.every().monday.at("13:15").do(job)
schedule.every().tuesday.at("15:45").do(job)
schedule.every().wednesday.at("11:00").do(job)
schedule.every().thursday.at("09:00").do(job)
schedule.every().friday.at("16:00").do(job)
# ... 依此类推
3 基于工作日的调度
import schedule
import time
def job():
print("在特定工作日执行任务...")
# 每周一
schedule.every().monday.do(job)
# 每周二至周五
schedule.every().tuesday.do(job)
schedule.every().wednesday.do(job)
schedule.every().thursday.do(job)
schedule.every().friday.do(job)
# 每周六和周日
schedule.every().saturday.do(job)
schedule.every().sunday.do(job)
4 组合使用
你可以组合这些语法来创建更复杂的调度规则。schedule.every().monday.at("09:00").do(job) 表示“每周一早上 9 点”。
高级用法
1 传递参数给任务函数
如果你的任务函数需要参数,可以使用 do 方法的 *args 和 **kwargs。
import schedule
import time
def greet(name):
print(f"你好, {name}!")
# 使用位置参数
schedule.every(2).seconds.do(greet, "张三")
# 使用关键字参数
def greet_with_title(title, name):
print(f"你好, {title} {name}!")
schedule.every(4).seconds.do(greet_with_title, title="Mr.", name="李四")
while True:
schedule.run_pending()
time.sleep(1)
2 取消任务
你可以通过 schedule.cancel_job(job) 来取消一个已经调度的任务,你需要用 schedule.every().do(...) 的返回值来获取一个 Job 对象。
import schedule
import time
def job():
print("这个任务只运行一次。")
# 获取 job 对象
job_obj = schedule.every().second.do(job)
# 运行 5 次后取消
count = 0
while True:
schedule.run_pending()
time.sleep(1)
count += 1
if count >= 5:
print("任务已取消。")
schedule.cancel_job(job_obj) # 取消任务
break
3 获取所有任务
你可以查看当前所有已调度的任务。
import schedule
def job1():
pass
def job2():
pass
schedule.every().day.do(job1)
schedule.every().hour.do(job2)
# 获取所有任务
jobs = schedule.get_jobs()
print(f"当前有 {len(jobs)} 个任务。")
for job in jobs:
print(f"- 任务: {job}, 下次运行时间: {job.next_run}")
4 清除所有任务
import schedule
def some_job():
print("运行中...")
schedule.every().second.do(some_job)
# 清除所有任务
schedule.clear()
print("所有任务已清除。")
重要注意事项和最佳实践
1 错误处理
如果任务函数内部发生未捕获的异常,整个调度循环可能会中断,强烈建议在任务函数内部使用 try...except 块来捕获和处理异常。
def unreliable_job():
try:
# 这里可能出错的代码
result = 1 / 0
except Exception as e:
print(f"任务执行出错: {e}")
schedule.every().second.do(unreliable_job)
2 多线程(推荐)
schedule 是单线程的,如果你的任务执行时间很长,它会阻塞整个调度循环,导致后续任务无法按时执行,一个任务需要 5 分钟才能完成,但下一个任务计划每 1 分钟执行一次,那么这个任务就会被严重延迟。
解决这个问题的最佳方法是使用多线程。
import schedule
import time
import threading
def long_running_task():
print("开始一个长时间任务...")
time.sleep(10) # 模拟任务执行 10 秒
print("长时间任务结束。")
def run_threaded(job_func):
"""在独立线程中运行任务"""
job_thread = threading.Thread(target=job_func)
job_thread.start()
# 调度一个长时间任务,让它在新线程中运行
schedule.every(5).seconds.do(run_threaded, long_running_task)
print("多线程调度器已启动...")
while True:
schedule.run_pending()
time.sleep(1)
在这个例子中,long_running_task 会在一个单独的线程中执行,主线程的 while True 循环不会被阻塞,可以继续检查和执行其他任务。
3 生产环境下的替代方案
对于生产环境,schedule 库虽然简单,但可能不够健壮,更强大的替代方案包括:
- APScheduler (Advanced Python Scheduler):功能非常强大,支持多种后端(如内存、数据库、Redis)和多种调度器(如日期、间隔、 cron),它是生产环境中的首选。
- Celery:一个专注于分布式任务队列的库,非常适合处理需要异步、可靠、可扩展的后台任务。
- 系统级调度器:如 Linux 的
cron或 Windows 的任务计划程序,你可以让你的 Python 脚本作为cron作业来运行,这通常是最稳定可靠的方式。
| 特性 | 描述 |
|---|---|
| 优点 | 语法简洁、易于上手、轻量级,非常适合快速原型开发、个人项目或简单的后台任务。 |
| 缺点 | 单线程设计,长时间任务会阻塞调度;不适合高精度、高可靠性的生产环境。 |
| 适用场景 | 爬虫定时运行、数据定期备份、发送周期性通知、个人自动化脚本等。 |
| 核心用法 | schedule.every(...).do(...) + while True: schedule.run_pending() |
| 最佳实践 | 在任务函数中加入 try...except,对于耗时任务使用多线程。 |
希望这份详细的指南能帮助你完全掌握 Python 的 schedule 函数!
