Of course! "Job Guarding" in Python is a common and essential pattern for ensuring that long-running, critical tasks complete successfully, even if the main program crashes or is terminated unexpectedly.

The core idea is to have a guardian process that monitors a worker process. If the worker dies, the guardian can either restart it or perform other critical cleanup actions.
Here’s a complete guide to implementing a robust job-guarding system in Python, covering the concepts, code, and best practices.
The Core Concept: Guardian vs. Worker
- Worker Process: This is the process that does the actual work. It could be a data analysis script, a video encoder, a web scraper, etc. It's designed to run for a long time.
- Guardian Process (or Watchdog): This is a separate, lightweight process whose only job is to watch the worker. It periodically checks if the worker is still alive. If the worker dies, the guardian takes action (e.g., restart it, log the failure, send an alert).
We will use Python's built-in multiprocessing module, as it's the standard and most reliable way to handle processes on most operating systems.
Why is Job Guarding Important?
- Crash Resilience: If your worker process encounters an unhandled exception, a segmentation fault, or runs out of memory, it will terminate. The guardian can restart it, making your system self-healing.
- Handling
SIGINT(Ctrl+C): When you pressCtrl+C, aKeyboardInterruptis sent to the main process. The worker processes might not receive it cleanly. A guardian can ensure that when the main program is stopped, the worker is properly terminated and any cleanup is performed. - Resource Management: It prevents "zombie" processes from continuing to consume CPU or memory after the main program has exited.
- Automation: It allows you to build systems that run unattended for long periods, automatically recovering from transient errors.
Implementation: A Robust Job Guardian
We'll create two Python scripts: one for the worker and one for the guardian. The guardian will use a simple "ping-pong" mechanism to check if the worker is alive.

Step 1: The Worker Script (worker.py)
This script contains the long-running task. For this example, it will just count up to a very large number, but you would replace this with your actual job logic.
# worker.py
import time
import random
import sys
def long_running_task():
"""
This is the function that does the actual work.
It simulates a long-running process.
"""
print("[Worker] Starting long-running task...")
try:
# Simulate work by counting for a very long time
# Add a random chance to fail for demonstration
if random.random() < 0.1: # 10% chance to "crash"
print("[Worker] Oh no! A simulated error has occurred!")
raise ValueError("Simulated worker failure")
count = 0
while True:
count += 1
if count % 1000000 == 0:
print(f"[Worker] Still running... count: {count}")
time.sleep(0.1) # Prevent CPU from maxing out
except Exception as e:
print(f"[Worker] Error: {e}")
# In a real scenario, you might log this to a file or database
raise # Re-raise the exception so the process exits with an error code
finally:
print("[Worker] Task finished or was interrupted.")
if __name__ == "__main__":
long_running_task()
sys.exit(0) # Exit cleanly
Step 2: The Guardian Script (guardian.py)
This is the main script you will run. It will create and monitor the worker process.
# guardian.py
import time
import multiprocessing
import subprocess
import sys
import signal
import os
class JobGuardian:
def __init__(self, worker_script_path, max_restarts=5, restart_delay=5):
"""
Initializes the JobGuardian.
:param worker_script_path: Path to the worker script (e.g., 'worker.py').
:param max_restarts: Maximum number of times to restart the worker.
:param restart_delay: Seconds to wait before restarting a failed worker.
"""
self.worker_script_path = worker_script_path
self.max_restarts = max_restarts
self.restart_delay = restart_delay
self.worker_process = None
self.restart_count = 0
# Set up signal handler for graceful shutdown
signal.signal(signal.SIGINT, self._handle_shutdown)
signal.signal(signal.SIGTERM, self._handle_shutdown)
def _handle_shutdown(self, signum, frame):
"""Handles shutdown signals (SIGINT, SIGTERM)."""
print(f"\n[Guardian] Received signal {signum}. Shutting down gracefully...")
self.stop_worker()
print("[Guardian] Guardian stopped.")
sys.exit(0)
def start_worker(self):
"""Starts the worker process."""
# We use subprocess to ensure the worker is a separate process
# with its own Python interpreter. This is more robust.
self.worker_process = subprocess.Popen(
[sys.executable, self.worker_script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
print(f"[Guardian] Worker process started with PID: {self.worker_process.pid}")
def is_worker_alive(self):
"""Checks if the worker process is still running."""
if self.worker_process:
# poll() returns None if the process is still running, otherwise it returns the exit code.
return self.worker_process.poll() is None
return False
def stop_worker(self):
"""Gracefully stops the worker process."""
if self.worker_process and self.is_worker_alive():
print(f"[Guardian] Terminating worker process (PID: {self.worker_process.pid})...")
self.worker_process.terminate() # Sends SIGTERM
# Wait for the process to terminate, but don't wait forever
try:
self.worker_process.wait(timeout=5)
print("[Guardian] Worker terminated successfully.")
except subprocess.TimeoutExpired:
print("[Guardian] Worker did not terminate gracefully, killing it.")
self.worker_process.kill() # Sends SIGKILL
self.worker_process.wait()
print("[Guardian] Worker killed.")
def monitor(self):
"""The main monitoring loop."""
print("[Guardian] Guardian is starting...")
while self.restart_count < self.max_restarts:
if not self.is_worker_alive():
print(f"[Guardian] Worker is not alive. Restart count: {self.restart_count}/{self.max_restarts}")
# Check if the worker exited with an error
if self.worker_process and self.worker_process.returncode != 0:
print(f"[Guardian] Worker exited with code: {self.worker_process.returncode}")
self.start_worker()
self.restart_count += 1
time.sleep(1) # Check every second
# Optional: Read and print worker output in real-time
if self.worker_process:
# Check for stdout
if self.worker_process.stdout:
output = self.worker_process.stdout.readline()
if output:
print(output.strip())
# Check for stderr
if self.worker_process.stderr:
error = self.worker_process.stderr.readline()
if error:
print(error.strip(), file=sys.stderr)
print(f"[Guardian] Reached maximum restarts ({self.max_restarts}). Guardian is stopping.")
self.stop_worker()
if __name__ == "__main__":
# Make sure the worker script exists
if not os.path.exists("worker.py"):
print("Error: worker.py not found. Please create it.")
sys.exit(1)
guardian = JobGuardian(
worker_script_path="worker.py",
max_restarts=10,
restart_delay=2
)
guardian.monitor()
How to Run It
-
Save the code above into two files:
worker.pyandguardian.pyin the same directory. -
Open your terminal or command prompt.
(图片来源网络,侵删) -
Run the guardian script:
python guardian.py
What to expect:
- The guardian will start the worker.
- You'll see output from the worker being printed by the guardian.
- If the worker "crashes" (due to the 10% chance), the guardian will detect it, wait a couple of seconds, and then restart it.
- If you press
Ctrl+C, the guardian will catch the signal and gracefully terminate the worker before exiting itself. - After the worker has been restarted 10 times, the guardian will give up and shut down.
Advanced Techniques & Best Practices
Using a Queue for Communication
Instead of just checking if a process is alive, you can use a multiprocessing.Queue for more sophisticated communication. The worker can send status updates ("heartbeat"), and the guardian can send commands ("stop", "restart").
# In worker.py
from multiprocessing import Queue
def worker_with_heartbeat(queue):
while True:
queue.put("I am alive!") # Send heartbeat
time.sleep(5)
# ... do work ...
# In guardian.py
from multiprocessing import Queue
heartbeat_queue = Queue()
p = multiprocessing.Process(target=worker_with_heartbeat, args=(heartbeat_queue,))
p.start()
while True:
try:
message = heartbeat_queue.get(timeout=10) # Wait 10 seconds for a message
print(f"Heartbeat received: {message}")
except queue.Empty:
print("Heartbeat missed! Worker may be dead.")
# Restart logic here...
break
Using a PID File
For long-running services, it's common to write the worker's Process ID (PID) to a file (worker.pid). This allows other scripts or system services (systemd, supervisor) to know which process to monitor or kill.
# In guardian.py, after starting the worker
with open("worker.pid", "w") as f:
f.write(str(self.worker_process.pid))
# When stopping the worker
if os.path.exists("worker.pid"):
os.remove("worker.pid")
Using a Process Management Library
For production-level systems, consider using a dedicated library that handles these complexities for you. They provide features like process logging, automatic restarts, and dependency management.
supervisor: A client/server system that allows its users to monitor and control a number of processes on UNIX-like operating systems. You configure it with a simple.conffile. It's extremely robust and widely used.pm2: A popular process manager for Node.js, but it can also manage Python applications. It's very feature-rich.systemd: The standard init system and service manager for most modern Linux distributions. You can create a.servicefile to define your application and letsystemdhandle the lifecycle.
For example, a systemd service file (my-worker.service) would look like this:
[Unit] Description=My Python Worker After=network.target [Service] Type=simple User=your_user WorkingDirectory=/path/to/your/app ExecStart=/usr/bin/python3 /path/to/your/app/guardian.py Restart=always # Restart if it fails RestartSec=10s [Install] WantedBy=multi-user.target
You would then enable and start this service with systemctl, and systemd would become your "guardian".
