杰瑞科技汇

Python如何守护职场安全?

Of course! "Job Guarding" in Python is a common and essential pattern for ensuring that long-running, critical tasks complete successfully, even if the main program crashes or is terminated unexpectedly.

Python如何守护职场安全?-图1
(图片来源网络,侵删)

The core idea is to have a guardian process that monitors a worker process. If the worker dies, the guardian can either restart it or perform other critical cleanup actions.

Here’s a complete guide to implementing a robust job-guarding system in Python, covering the concepts, code, and best practices.


The Core Concept: Guardian vs. Worker

  • Worker Process: This is the process that does the actual work. It could be a data analysis script, a video encoder, a web scraper, etc. It's designed to run for a long time.
  • Guardian Process (or Watchdog): This is a separate, lightweight process whose only job is to watch the worker. It periodically checks if the worker is still alive. If the worker dies, the guardian takes action (e.g., restart it, log the failure, send an alert).

We will use Python's built-in multiprocessing module, as it's the standard and most reliable way to handle processes on most operating systems.


Why is Job Guarding Important?

  1. Crash Resilience: If your worker process encounters an unhandled exception, a segmentation fault, or runs out of memory, it will terminate. The guardian can restart it, making your system self-healing.
  2. Handling SIGINT (Ctrl+C): When you press Ctrl+C, a KeyboardInterrupt is sent to the main process. The worker processes might not receive it cleanly. A guardian can ensure that when the main program is stopped, the worker is properly terminated and any cleanup is performed.
  3. Resource Management: It prevents "zombie" processes from continuing to consume CPU or memory after the main program has exited.
  4. Automation: It allows you to build systems that run unattended for long periods, automatically recovering from transient errors.

Implementation: A Robust Job Guardian

We'll create two Python scripts: one for the worker and one for the guardian. The guardian will use a simple "ping-pong" mechanism to check if the worker is alive.

Python如何守护职场安全?-图2
(图片来源网络,侵删)

Step 1: The Worker Script (worker.py)

This script contains the long-running task. For this example, it will just count up to a very large number, but you would replace this with your actual job logic.

# worker.py
import time
import random
import sys
def long_running_task():
    """
    This is the function that does the actual work.
    It simulates a long-running process.
    """
    print("[Worker] Starting long-running task...")
    try:
        # Simulate work by counting for a very long time
        # Add a random chance to fail for demonstration
        if random.random() < 0.1: # 10% chance to "crash"
            print("[Worker] Oh no! A simulated error has occurred!")
            raise ValueError("Simulated worker failure")
        count = 0
        while True:
            count += 1
            if count % 1000000 == 0:
                print(f"[Worker] Still running... count: {count}")
            time.sleep(0.1) # Prevent CPU from maxing out
    except Exception as e:
        print(f"[Worker] Error: {e}")
        # In a real scenario, you might log this to a file or database
        raise # Re-raise the exception so the process exits with an error code
    finally:
        print("[Worker] Task finished or was interrupted.")
if __name__ == "__main__":
    long_running_task()
    sys.exit(0) # Exit cleanly

Step 2: The Guardian Script (guardian.py)

This is the main script you will run. It will create and monitor the worker process.

# guardian.py
import time
import multiprocessing
import subprocess
import sys
import signal
import os
class JobGuardian:
    def __init__(self, worker_script_path, max_restarts=5, restart_delay=5):
        """
        Initializes the JobGuardian.
        :param worker_script_path: Path to the worker script (e.g., 'worker.py').
        :param max_restarts: Maximum number of times to restart the worker.
        :param restart_delay: Seconds to wait before restarting a failed worker.
        """
        self.worker_script_path = worker_script_path
        self.max_restarts = max_restarts
        self.restart_delay = restart_delay
        self.worker_process = None
        self.restart_count = 0
        # Set up signal handler for graceful shutdown
        signal.signal(signal.SIGINT, self._handle_shutdown)
        signal.signal(signal.SIGTERM, self._handle_shutdown)
    def _handle_shutdown(self, signum, frame):
        """Handles shutdown signals (SIGINT, SIGTERM)."""
        print(f"\n[Guardian] Received signal {signum}. Shutting down gracefully...")
        self.stop_worker()
        print("[Guardian] Guardian stopped.")
        sys.exit(0)
    def start_worker(self):
        """Starts the worker process."""
        # We use subprocess to ensure the worker is a separate process
        # with its own Python interpreter. This is more robust.
        self.worker_process = subprocess.Popen(
            [sys.executable, self.worker_script_path],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        print(f"[Guardian] Worker process started with PID: {self.worker_process.pid}")
    def is_worker_alive(self):
        """Checks if the worker process is still running."""
        if self.worker_process:
            # poll() returns None if the process is still running, otherwise it returns the exit code.
            return self.worker_process.poll() is None
        return False
    def stop_worker(self):
        """Gracefully stops the worker process."""
        if self.worker_process and self.is_worker_alive():
            print(f"[Guardian] Terminating worker process (PID: {self.worker_process.pid})...")
            self.worker_process.terminate() # Sends SIGTERM
            # Wait for the process to terminate, but don't wait forever
            try:
                self.worker_process.wait(timeout=5)
                print("[Guardian] Worker terminated successfully.")
            except subprocess.TimeoutExpired:
                print("[Guardian] Worker did not terminate gracefully, killing it.")
                self.worker_process.kill() # Sends SIGKILL
                self.worker_process.wait()
                print("[Guardian] Worker killed.")
    def monitor(self):
        """The main monitoring loop."""
        print("[Guardian] Guardian is starting...")
        while self.restart_count < self.max_restarts:
            if not self.is_worker_alive():
                print(f"[Guardian] Worker is not alive. Restart count: {self.restart_count}/{self.max_restarts}")
                # Check if the worker exited with an error
                if self.worker_process and self.worker_process.returncode != 0:
                    print(f"[Guardian] Worker exited with code: {self.worker_process.returncode}")
                self.start_worker()
                self.restart_count += 1
            time.sleep(1) # Check every second
            # Optional: Read and print worker output in real-time
            if self.worker_process:
                # Check for stdout
                if self.worker_process.stdout:
                    output = self.worker_process.stdout.readline()
                    if output:
                        print(output.strip())
                # Check for stderr
                if self.worker_process.stderr:
                    error = self.worker_process.stderr.readline()
                    if error:
                        print(error.strip(), file=sys.stderr)
        print(f"[Guardian] Reached maximum restarts ({self.max_restarts}). Guardian is stopping.")
        self.stop_worker()
if __name__ == "__main__":
    # Make sure the worker script exists
    if not os.path.exists("worker.py"):
        print("Error: worker.py not found. Please create it.")
        sys.exit(1)
    guardian = JobGuardian(
        worker_script_path="worker.py",
        max_restarts=10,
        restart_delay=2
    )
    guardian.monitor()

How to Run It

  1. Save the code above into two files: worker.py and guardian.py in the same directory.

  2. Open your terminal or command prompt.

    Python如何守护职场安全?-图3
    (图片来源网络,侵删)
  3. Run the guardian script:

    python guardian.py

What to expect:

  • The guardian will start the worker.
  • You'll see output from the worker being printed by the guardian.
  • If the worker "crashes" (due to the 10% chance), the guardian will detect it, wait a couple of seconds, and then restart it.
  • If you press Ctrl+C, the guardian will catch the signal and gracefully terminate the worker before exiting itself.
  • After the worker has been restarted 10 times, the guardian will give up and shut down.

Advanced Techniques & Best Practices

Using a Queue for Communication

Instead of just checking if a process is alive, you can use a multiprocessing.Queue for more sophisticated communication. The worker can send status updates ("heartbeat"), and the guardian can send commands ("stop", "restart").

# In worker.py
from multiprocessing import Queue
def worker_with_heartbeat(queue):
    while True:
        queue.put("I am alive!") # Send heartbeat
        time.sleep(5)
        # ... do work ...
# In guardian.py
from multiprocessing import Queue
heartbeat_queue = Queue()
p = multiprocessing.Process(target=worker_with_heartbeat, args=(heartbeat_queue,))
p.start()
while True:
    try:
        message = heartbeat_queue.get(timeout=10) # Wait 10 seconds for a message
        print(f"Heartbeat received: {message}")
    except queue.Empty:
        print("Heartbeat missed! Worker may be dead.")
        # Restart logic here...
        break

Using a PID File

For long-running services, it's common to write the worker's Process ID (PID) to a file (worker.pid). This allows other scripts or system services (systemd, supervisor) to know which process to monitor or kill.

# In guardian.py, after starting the worker
with open("worker.pid", "w") as f:
    f.write(str(self.worker_process.pid))
# When stopping the worker
if os.path.exists("worker.pid"):
    os.remove("worker.pid")

Using a Process Management Library

For production-level systems, consider using a dedicated library that handles these complexities for you. They provide features like process logging, automatic restarts, and dependency management.

  • supervisor: A client/server system that allows its users to monitor and control a number of processes on UNIX-like operating systems. You configure it with a simple .conf file. It's extremely robust and widely used.
  • pm2: A popular process manager for Node.js, but it can also manage Python applications. It's very feature-rich.
  • systemd: The standard init system and service manager for most modern Linux distributions. You can create a .service file to define your application and let systemd handle the lifecycle.

For example, a systemd service file (my-worker.service) would look like this:

[Unit]
Description=My Python Worker
After=network.target
[Service]
Type=simple
User=your_user
WorkingDirectory=/path/to/your/app
ExecStart=/usr/bin/python3 /path/to/your/app/guardian.py
Restart=always # Restart if it fails
RestartSec=10s
[Install]
WantedBy=multi-user.target

You would then enable and start this service with systemctl, and systemd would become your "guardian".

分享:
扫描分享到社交APP
上一篇
下一篇