安装 schedule 库
schedule 库并不是 Python 的标准库,所以你需要先使用 pip 来安装它,打开你的终端或命令行工具,运行以下命令:

pip install schedule
如果你使用的是 Python 3,可能需要使用 pip3:
pip3 install schedule
基本导入
安装成功后,你就可以在你的 Python 脚本中像导入其他标准库一样导入它了。
import schedule import time
通常我们还会导入 time 库,因为在实际应用中,我们常常需要让主程序保持运行,以便 schedule 能够在后台持续检查并执行任务。
核心使用方法
schedule 库的使用非常直观,主要遵循以下三个步骤:

- 定义任务:创建一个你希望定时执行的函数。
- 调度任务:使用
schedule.every()来指定任务的执行频率和间隔。 - 运行调度器:启动一个无限循环来持续检查并运行已安排好的任务。
代码示例
下面通过几个例子来展示如何使用。
示例 1:最简单的定时任务(每 10 秒执行一次)
这是一个最经典的入门示例,每隔 10 秒打印一条消息。
import schedule
import time
# 1. 定义任务函数
def job():
print("I'm working...")
# 2. 调度任务:每 10 秒执行一次 job()
schedule.every(10).seconds.do(job)
# 3. 运行调度器
while True:
schedule.run_pending() # 运行所有已到期的任务
time.sleep(1) # 暂停 1 秒,避免 CPU 过载
如何运行:
- 将上述代码保存为
.py文件(my_scheduler.py)。 - 在终端中运行
python my_scheduler.py。 - 你会看到每隔大约 10 秒,终端就会打印出 "I'm working..."。
示例 2:不同的执行频率
schedule 提供了非常灵活的调度方式。

import schedule
import time
def job():
print("任务执行!")
# 每隔 10 秒执行
schedule.every(10).seconds.do(job)
# 每隔 10 分钟执行
schedule.every(10).minutes.do(job)
# 每个小时执行
schedule.every().hour.do(job)
# 每天在特定时间执行,例如在 10:30
schedule.every().day.at("10:30").do(job)
# 每周一执行
schedule.every().monday.do(job)
# 每周一的 13:15 执行
schedule.every().monday.at("13:15").do(job)
# 每周三的特定时间执行
schedule.every().wednesday.at("13:15").do(job)
while True:
schedule.run_pending()
time.sleep(1)
示例 3:带参数的任务
如果你的任务函数需要参数,do() 方法可以接受它们。
import schedule
import time
def greet(name):
print(f"你好, {name}!")
# 每隔 2 秒调用 greet 函数,并传入参数 "Alice"
schedule.every(2).seconds.do(greet, name="Alice")
# 每隔 4 秒调用 greet 函数,并传入参数 "Bob"
schedule.every(4).seconds.do(greet, name="Bob")
while True:
schedule.run_pending()
time.sleep(1)
示例 4:取消任务
有时候你可能需要动态地取消一个已调度的任务。
import schedule
import time
def some_task():
print("这个任务正在运行...")
# 安排一个任务
job = schedule.every(3).seconds.do(some_task)
# 运行几秒钟后取消它
time.sleep(6)
print("准备取消任务...")
schedule.cancel_job(job)
# 之后这个任务就不会再执行了
while True:
schedule.run_pending()
time.sleep(1)
示例 5:一次性的任务
如果你只想让任务执行一次,可以使用 .do_once()。
import schedule
import time
def run_once():
print("这个任务只会运行一次!")
# 安排一个只运行一次的任务
schedule.every(5).seconds.do_once(run_once)
while True:
schedule.run_pending()
time.sleep(1)
高级用法和最佳实践
使用线程 (Threading)
上面的 while True 循环会阻塞你的主程序,如果你的主程序还需要处理其他事情(Web 服务器、GUI 等),你应该使用线程来让调度器在后台运行。
import schedule
import time
import threading
def job():
print("线程任务执行中...")
def run_schedule():
while True:
schedule.run_pending()
time.sleep(1)
# 创建并启动一个线程来运行调度器
schedule_thread = threading.Thread(target=run_schedule, daemon=True)
schedule_thread.start()
# 主线程可以继续执行其他任务
print("主线程正在运行,调度器在后台运行...")
for i in range(10):
time.sleep(1)
print(f"主线程计数: {i}")
print("主线程结束。")
注意:
daemon=True设置了该线程为守护线程,这意味着当主线程结束时,这个守护线程也会随之结束,如果你的程序需要一直运行,可以去掉这个参数。
结合 asyncio (异步)
如果你的项目是基于 asyncio 的,schedule 库也支持异步任务。
import schedule
import asyncio
import time
async def async_job():
print("异步任务执行中...")
# 模拟一个异步操作
await asyncio.sleep(2)
print("异步任务完成!")
def run_async_job():
# 创建一个新的事件循环来运行异步任务
# 注意:在生产环境中,事件循环的管理会更复杂
asyncio.run(async_job())
# 安排异步任务
schedule.every(5).seconds.do(run_async_job)
# 在主线程中运行调度器
while True:
schedule.run_pending()
time.sleep(1)
总结与注意事项
| 功能 | 方法 | 示例 |
|---|---|---|
| 定义任务 | 创建一个函数 | def my_job(): ... |
| 调度任务 | schedule.every(...).do(...) |
schedule.every(10).minutes.do(my_job) |
| 运行调度器 | while True: schedule.run_pending() |
while True: schedule.run_pending(); time.sleep(1) |
| 取消任务 | schedule.cancel_job(job_object) |
job = schedule.every(1).hour.do(my_job); schedule.cancel_job(job) |
| 一次性任务 | schedule.every(...).do_once(...) |
schedule.every(5).seconds.do_once(my_job) |
注意事项:
schedule的局限性:schedule库非常轻量且易于上手,但它不是一个专业的生产级调度系统,它依赖于一个持续的循环,如果程序崩溃,所有未执行的任务都会丢失。- 生产环境建议:对于需要高可靠性、持久化、分布式执行的生产环境,建议使用更强大的工具,
- APScheduler (Advanced Python Scheduler):功能强大,支持多种后端(如内存、数据库、Redis)和多种调度类型(如 cron 表达式)。
- Celery:一个分布式任务队列,非常适合处理异步任务和定时任务。
- 系统级的 Cron Job:对于简单的、系统级的定时任务,可以直接使用 Linux/Unix 的
cron或 Windows 的任务计划程序。
希望这份详细的指南能帮助你顺利地在 Python 中导入和使用 schedule 库!
