杰瑞科技汇

Python多进程如何高效实现并发?

Of course! Let's dive deep into Python's multiprocessing module. This is a crucial topic for any Python developer looking to write high-performance, CPU-bound applications.

Python多进程如何高效实现并发?-图1
(图片来源网络,侵删)

Why Use Multiprocessing?

First, let's understand the problem it solves.

The Global Interpreter Lock (GIL)

Python has a mechanism called the Global Interpreter Lock (GIL). The GIL is a mutex that protects access to Python objects, preventing multiple native threads from executing Python bytecode at the same time within a single process.

  • For I/O-Bound Tasks: This is okay. When a thread is waiting for I/O (like reading a file or making a network request), it releases the GIL, allowing another thread to run. This is why Python's threading module is great for tasks that involve a lot of waiting.
  • For CPU-Bound Tasks: This is a problem. If your task is heavy on computation (e.g., mathematical calculations, image processing, data crunching), the GIL will prevent your threads from running in parallel on multi-core processors. They will still run, but one after the other, not simultaneously.

Multiprocessing is the solution. It bypasses the GIL by creating entirely new processes, each with its own Python interpreter and memory space. Since each process has its own GIL, they can run on different CPU cores at the same time, achieving true parallelism.


Key Concepts

  1. Process Class: The core of the module. You create a Process object, target a function for it to run, and then start it.
  2. Pool Class: A high-level abstraction that manages a pool of worker processes. It's perfect for dividing a task among multiple workers and collecting the results. It's often easier to use than manually managing individual processes.
  3. Inter-Process Communication (IPC): Since processes have separate memory spaces, they can't share variables directly. To communicate or share data, you need to use special tools like Queues, Pipes, or Managers.
  4. if __name__ == "__main__":: This is a critical guard clause. On some platforms (like Windows and macOS), the child process re-imports the script's module. Without this guard, you could end up creating an infinite loop of new processes. Always put your multiprocessing code inside this block.

Example 1: The Basics with Process

Let's start with the most fundamental example: running two functions in parallel.

Python多进程如何高效实现并发?-图2
(图片来源网络,侵删)
import multiprocessing
import time
import os
def worker_function(name: str, duration: int):
    """A simple function that simulates work."""
    pid = os.getpid()
    print(f"Process {name} (PID: {pid}) has started.")
    time.sleep(duration)  # Simulate a CPU-bound or I/O-bound task
    print(f"Process {name} (PID: {pid}) has finished after {duration} seconds.")
if __name__ == "__main__":
    print(f"Main process (PID: {os.getpid()}) starting.")
    # Create two Process objects
    # The 'target' argument is the function to run in the new process.
    # The 'args' argument is a tuple of arguments to pass to the target function.
    p1 = multiprocessing.Process(target=worker_function, args=("Worker 1", 2))
    p2 = multiprocessing.Process(target=worker_function, args=("Worker 2", 3))
    # Start the processes
    print("Starting processes...")
    p1.start()
    p2.start()
    # Wait for both processes to complete before the main script continues
    print("Main process waiting for workers to finish...")
    p1.join()
    p2.join()
    print("All processes have finished. Main process exiting.")

How to run it: Save as process_example.py and run python process_example.py.

Expected Output:

Main process (PID: 12345) starting.
Starting processes...
Process Worker 1 (PID: 12346) has started.
Process Worker 2 (PID: 12347) has started.
Main process waiting for workers to finish.
Process Worker 1 (PID: 12346) has finished after 2 seconds.
Process Worker 2 (PID: 12347) has finished after 3 seconds.
All processes have finished. Main process exiting.

Notice how the main process prints "waiting..." and then waits for the two workers to complete. The total execution time is roughly 3 seconds, not 5, because the two workers ran in parallel.


Example 2: Using a Pool for a "Divide and Conquer" Strategy

The Pool is ideal when you have a list of items and want to apply a function to each one concurrently.

Python多进程如何高效实现并发?-图3
(图片来源网络,侵删)

Let's calculate the square of numbers in a list.

import multiprocessing
import time
def calculate_square(n):
    """Function to calculate the square of a number."""
    time.sleep(0.5) # Simulate some work
    result = n * n
    print(f"Calculated square of {n} = {result}")
    return result
if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    print("Using a Pool of 4 processes...")
    # Create a pool of 4 worker processes
    with multiprocessing.Pool(processes=4) as pool:
        # pool.map applies the function to every item in the iterable
        # It blocks until all results are ready and returns them in a list
        results = pool.map(calculate_square, numbers)
    print("\nFinal results from pool.map:", results)
    print("Main process finished.")

How to run it: Save as pool_example.py and run python pool_example.py.

Expected Output:

Using a Pool of 4 processes...
Calculated square of 1 = 1
Calculated square of 2 = 4
Calculated square of 3 = 9
Calculated square of 4 = 16
Calculated square of 5 = 25
Calculated square of 6 = 36
Calculated square of 7 = 49
Calculated square of 8 = 64
Calculated square of 9 = 81
Calculated square of 10 = 100
Final results from pool.map: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
Main process finished.

Notice how the output is not strictly sequential. The pool assigns work to its 4 available workers, and they complete as they can. The total time is much shorter than doing it one by one.

Pool Methods:

  • pool.map(function, iterable): Simple, returns results in order.
  • pool.map_async(function, iterable): Non-blocking. Returns an AsyncResult object that you can check later.
  • pool.starmap(function, iterable_of_args): Like map, but for functions that take multiple arguments. The iterable should be a list of tuples.
  • pool.apply(function, args): Runs a single function call in a worker process (like Process).
  • pool.apply_async(function, args): Non-blocking version of apply.

Example 3: Inter-Process Communication with a Queue

Processes can't share memory. If you want the main process to get data from a worker, you need an IPC mechanism. A Queue is a safe and easy way to do this.

Let's have a producer process that puts numbers into a queue and a consumer process that reads them.

import multiprocessing
import time
import random
def producer(queue: multiprocessing.Queue):
    """Puts numbers into the queue."""
    for i in range(5):
        num = random.randint(1, 100)
        print(f"Producer: putting {num} into the queue.")
        queue.put(num)
        time.sleep(1) # Simulate work
    print("Producer: finished. Sending sentinel value.")
    queue.put(-1) # Sentinel value to signal the end
def consumer(queue: multiprocessing.Queue):
    """Takes numbers from the queue."""
    while True:
        num = queue.get() # Blocks until an item is available
        if num == -1: # Check for the sentinel value
            print("Consumer: received sentinel. Exiting.")
            break
        print(f"Consumer: got {num}. Squaring it to get {num * num}")
        time.sleep(0.5) # Simulate work
if __name__ == "__main__":
    # Create a shared queue object
    task_queue = multiprocessing.Queue()
    # Create and start producer and consumer processes
    p1 = multiprocessing.Process(target=producer, args=(task_queue,))
    p2 = multiprocessing.Process(target=consumer, args=(task_queue,))
    p1.start()
    p2.start()
    # Wait for both processes to finish
    p1.join()
    p2.join()
    print("Main process finished.")

How to run it: Save as queue_example.py and run python queue_example.py.

Expected Output (will vary due to randomness):


Producer: putting 42 into the queue.
Consumer: got 42. Squ
分享:
扫描分享到社交APP
上一篇
下一篇