杰瑞科技汇

如何用apschedule实现定时任务?

Of course! Here is a comprehensive guide to using APScheduler (Advanced Python Scheduler) in Python.

如何用apschedule实现定时任务?-图1
(图片来源网络,侵删)

What is APScheduler?

APScheduler is a powerful, in-process task scheduler that lets you schedule jobs (Python functions or methods) to be executed at certain times or intervals. It's like a built-in cron for your Python application.

Key features:

  • Multiple Schedulers: Supports three different scheduler backends (BackgroundScheduler, BlockingScheduler, AsyncIOScheduler).
  • Multiple Job Stores: Store job definitions in memory, in a database (like PostgreSQL, MySQL, SQLite), or in Redis.
  • Multiple Triggers: Schedule jobs using cron-like expressions, dates, or intervals.
  • Robust Execution: Supports job coalescing (running only once if multiple triggers fire at the same time) and misfire handling (what to do if a job is delayed).

Installation

First, you need to install the library. It's highly recommended to install the extras for specific features you'll need.

# Basic installation
pip install apscheduler
# Recommended for database-backed job stores
pip install apscheduler[sqlalchemy]
# Recommended for Redis-backed job stores
pip install apscheduler[redis]

Core Concepts

Before we dive into code, let's understand the main components:

如何用apschedule实现定时任务?-图2
(图片来源网络,侵删)
  • Scheduler: The central object that manages jobs and triggers. It's responsible for dispatching jobs to the correct executor.
  • Job: A single task to be run. A job has a function to execute, a trigger to define when it should run, and various other settings (like its ID, name, and parameters).
  • Job Store: Where the scheduler stores the job definitions. The default is an in-memory store, but you can use persistent stores like a database or Redis to survive application restarts.
  • Executor: The mechanism that actually runs the job. The default is a thread pool, which is great for I/O-bound tasks. For CPU-bound tasks, you might use a ProcessPoolExecutor.
  • Trigger: The rule that defines when a job should run. The most common are:
    • date: Run once at a specific point in time.
    • interval: Run at fixed intervals (e.g., every 5 seconds).
    • cron: Run at specific times (e.g., every day at 5 PM).

Basic Usage: The "Hello, World!" Example

Let's start with the simplest case: running a function once when the script starts. We'll use the BlockingScheduler, which pauses the main thread and runs the scheduler in the background.

from apscheduler.schedulers.blocking import BlockingScheduler
import time
def my_job():
    print("Hello, World! The time is", time.strftime("%Y-%m-%d %H:%M:%S"))
# 1. Create a scheduler instance
scheduler = BlockingScheduler()
# 2. Add a job
# - The target function is `my_job`
# - It should run on date '2025-10-27 12:00:00'
scheduler.add_job(my_job, 'date', run_date='2025-10-27 12:00:00')
print("Scheduler started. Job is set for a specific time in the past/future.")
print("The program will now block and wait for the job to run.")
# 3. Start the scheduler
# The scheduler will run until all jobs have completed or it's shut down.
scheduler.start()

To run this: Save it as a .py file and execute it. It will print "Scheduler started..." and then wait. If the run_date is in the past, APScheduler will run the job immediately. If it's in the future, it will wait until that exact time.


Scheduling with Different Triggers

This is where APScheduler becomes truly useful. Let's explore the main trigger types.

a) Interval Trigger

Run a job repeatedly at a set interval.

如何用apschedule实现定时任务?-图3
(图片来源网络,侵删)
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_time():
    print(f"Current time: {time.strftime('%Y-%m-%d %H:%M:%S')}")
scheduler = BlockingScheduler()
# Run the job every 5 seconds
# start_date: when the first run should happen
# end_date: when the scheduling should stop
scheduler.add_job(
    print_time,
    trigger='interval',
    seconds=5,
    start_date='2025-10-27 12:00:00',
    end_date='2025-10-27 12:01:00'
)
print("Scheduler started. Job will run every 5 seconds for 1 minute.")
scheduler.start()

b) Cron Trigger

Run a job on a schedule similar to the Unix cron utility. This is perfect for recurring tasks like "every day at 5 AM".

from apscheduler.schedulers.blocking import BlockingScheduler
import time
def report():
    print("Generating daily report...")
scheduler = BlockingScheduler()
# Run every weekday (Monday-Friday) at 17:00 (5 PM)
scheduler.add_job(report, trigger='cron', day_of_week='mon-fri', hour=17, minute=0)
print("Scheduler started. Job will run every weekday at 5 PM.")
# Note: This will run forever. Press Ctrl+C to stop.
scheduler.start()

Common Cron Expressions:

  • minute=0, hour=0: Run at midnight every day.
  • day_of_week='sun': Run every Sunday.
  • day=1: Run on the 1st of every month.
  • hour='*/2': Run every 2 hours (0, 2, 4, 6, etc.).

Advanced Features

a) Passing Arguments to a Job

You can pass arguments and keyword arguments to your job function.

def greet(name, greeting="Hello"):
    print(f"{greeting}, {name}!")
scheduler = BlockingScheduler()
# Pass 'name' as a positional argument and 'greeting' as a keyword argument
scheduler.add_job(greet, args=['Alice'], kwargs={'greeting': 'Good morning'})
scheduler.start()
# Output: Good morning, Alice!

b) Job ID and Name

It's good practice to give your jobs an ID and a name. This allows you to modify or remove them later.

scheduler.add_job(
    my_job,
    trigger='interval',
    seconds=10,
    id='my_job_id',      # Unique ID for the job
    name='My Important Job' # Human-readable name
)

c) Modifying and Removing Jobs

You can control the scheduler after it has started.

# ... (assuming a scheduler is running)
# Modify an existing job
scheduler.modify_job('my_job_id', trigger='interval', seconds=15)
# Pause a job
scheduler.pause_job('my_job_id')
# Resume a paused job
scheduler.resume_job('my_job_id')
# Remove a job completely
scheduler.remove_job('my_job_id')

d) Handling Job Completion and Errors

You can add listeners to be notified when jobs succeed or fail.

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED
def my_listener(event):
    if event.exception:
        print(f"Job {event.job_id} failed with exception: {event.exception}")
    elif event.code == EVENT_JOB_MISSED:
        print(f"Job {event.job_id} missed its run time!")
    else:
        print(f"Job {event.job_id} executed successfully.")
scheduler.add_listener(my_listener, EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_EXECUTED)
# ... add and start jobs as usual

Choosing the Right Scheduler

This is a crucial step.

Scheduler When to Use Main Thread Use Case
BlockingScheduler When your script's only purpose is to run scheduled tasks. Blocks A standalone script that just needs to run jobs on a schedule.
BackgroundScheduler When you need to run the scheduler alongside other application logic (e.g., a web server). Does not block The most common choice for web apps (Flask, Django) and long-running services.
AsyncIOScheduler When using asyncio and you want non-blocking scheduling. Does not block Asynchronous web frameworks like FastAPI, Sanic, or any other asyncio application.

Example: BackgroundScheduler (Very Common)

This is what you'd use in a web application.

from apscheduler.schedulers.background import BackgroundScheduler
import time
def scheduled_task():
    print("Running scheduled task...", time.strftime("%H:%M:%S"))
# Create a background scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(scheduled_task, 'interval', seconds=10)
scheduler.start()
print("Scheduler started in the background. Main thread is free.")
print("Do some other work here...")
# The main thread can continue doing other things.
# For example, in a web app, this would be where you run your server.
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    # Shut down the scheduler gracefully when exiting
    scheduler.shutdown()
    print("Scheduler shut down.")

Persistence with Job Stores

By default, jobs are stored in memory and will be lost if your application restarts. To make jobs persistent, you can use a database.

Example: Using SQLAlchemy (SQLite)

This is a great way to persist jobs.

  1. Setup: Make sure you have apscheduler[sqlalchemy] installed. You'll need a database URL. We'll use SQLite for simplicity.

  2. Code:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED
import time
import atexit
# 1. Configure the job store
# The URL points to your database file
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 2. Create the scheduler, passing in the job store
scheduler = BackgroundScheduler(jobstores=jobstores)
# 3. Define the job
def my_persistent_job():
    print("This job is persistent and will run every 5 seconds.")
    print(f"Time: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# 4. Add the job
# The job will now be saved to the 'jobs.sqlite' database
scheduler.add_job(my_persistent_job, 'interval', seconds=5, id='persistent_job_1')
# 5. Add a shutdown hook to ensure jobs are saved on exit
atexit.register(lambda: scheduler.shutdown())
# 6. Start the scheduler
scheduler.start()
print("Scheduler started with a persistent job store.")
print("The job definition is saved in 'jobs.sqlite'.")
print("If you restart this script, the job will still be there.")
# Keep the main thread alive
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    pass

Now, if you run this script, stop it, and run it again, the job persistent_job_1 will be automatically loaded from the jobs.sqlite file and will continue to run every 5 seconds.

分享:
扫描分享到社交APP
上一篇
下一篇