杰瑞科技汇

Python多进程如何高效实现并行计算?

Of course! Let's dive deep into Python's multiprocessing module. It's a powerful tool for achieving parallelism, which means running multiple processes simultaneously to speed up your code, especially on multi-core CPUs.

Why Do We Need multiprocessing? The GIL Problem

In Python, the Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple native threads from executing Python bytecode at the same time within a single process. This means that even if you use the threading module, you won't get true parallelism for CPU-bound tasks (tasks that involve a lot of computation).

For example, if you have a task that performs heavy calculations, using threading won't make it run faster on a multi-core CPU because only one thread can execute at a time due to the GIL.

The Solution: The multiprocessing module sidesteps this GIL limitation by creating separate processes, each with its own Python interpreter and memory space. Since these processes don't share the GIL, they can run on different CPU cores in true parallel.


Key Concepts in multiprocessing

  1. Process: The core unit of execution. A multiprocessing.Process object represents a separate process.
  2. if __name__ == "__main__":: This is crucial. On Unix-like systems, every time you start a new process, it re-imports the main script. Without this guard, the code inside the if block would be re-executed in the child process, potentially creating an infinite loop of new processes.
  3. IPC (Inter-Process Communication): Since processes have separate memory spaces, they can't directly share variables like threads can. To communicate, they need special mechanisms like Queues, Pipes, and Managers.
  4. Pool: A high-level utility that manages a pool of worker processes. It's the easiest and most common way to parallelize a function across multiple inputs.

Core Components and Examples

Let's go through the main building blocks with practical examples.

The Process Class

This is the most basic way to create a parallel process. You define a target function and pass it to the Process constructor.

Example: Running a function in parallel

import multiprocessing
import time
import os
def worker_function(seconds):
    """A simple function that simulates a task."""
    process_id = os.getpid()
    print(f"Process {process_id}: Starting task, will sleep for {seconds} seconds.")
    time.sleep(seconds)
    print(f"Process {process_id}: Task finished.")
if __name__ == "__main__":
    print(f"Main Process ID: {os.getpid()}")
    # Create a list to hold the process objects
    processes = []
    # Create and start 5 processes
    for i in range(5):
        p = multiprocessing.Process(target=worker_function, args=(i+1,))
        processes.append(p)
        p.start()
    # Wait for all processes to complete
    # This is important! Without it, the main script might exit before the
    # child processes have finished.
    for p in processes:
        p.join()
    print("Main process has finished.")

Output:

Main Process ID: 12345
Process 12346: Starting task, will sleep for 1 seconds.
Process 12347: Starting task, will sleep for 2 seconds.
Process 12348: Starting task, will sleep for 3 seconds.
Process 12349: Starting task, will sleep for 4 seconds.
Process 12350: Starting task, will sleep for 5 seconds.
Process 12346: Task finished.
Process 12347: Task finished.
Process 12348: Task finished.
Process 12349: Task finished.
Process 12350: Task finished.
Main process has finished.

Notice how the processes start and finish at different times, but the main script waits for all of them to complete before printing its final message.


multiprocessing.Pool

This is the recommended approach for "embarrassingly parallel" tasks, where you want to apply the same function to a large number of inputs.

Example: Calculating squares of numbers in parallel

import multiprocessing
import time
def square(n):
    """Function to calculate the square of a number."""
    # Simulate some work
    time.sleep(0.1)
    result = n * n
    print(f"Calculated square of {n} = {result}")
    return result
if __name__ == "__main__":
    numbers = range(1, 11) # Numbers from 1 to 10
    print("Using a Pool of 4 processes...")
    # Create a pool of 4 worker processes
    with multiprocessing.Pool(processes=4) as pool:
        # Use pool.map to apply the 'square' function to every item in 'numbers'
        # map returns the results in the same order as the inputs
        results = pool.map(square, numbers)
    print("\nFinal results from pool.map:", results)

Output:

Using a Pool of 4 processes...
Calculated square of 1 = 1
Calculated square of 2 = 4
Calculated square of 3 = 9
Calculated square of 4 = 16
Calculated square of 5 = 25
Calculated square of 6 = 36
Calculated square of 7 = 49
Calculated square of 8 = 64
Calculated square of 9 = 81
Calculated square of 10 = 100
Final results from pool.map: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

The Pool efficiently distributes the work among the 4 processes. You don't have to manage the Process objects or join them manually.

Pool.map vs. Pool.imap

  • pool.map(function, iterable): Blocks until all results are ready. Returns a list of results. Good when you need all results at once.
  • pool.imap(function, iterable): Returns an iterator that yields results as they become available. Non-blocking. Good for processing results as soon as they are ready, which can save memory.

Inter-Process Communication (IPC)

Since processes don't share memory, you need explicit ways to pass data.

a) Queue A Queue is a data structure that lets you safely pass messages from one process to another. It's a First-In, First-Out (FIFO) structure.

Example: Producer-Consumer Model

import multiprocessing
import time
import random
def producer(queue, count):
    """Generates numbers and puts them into the queue."""
    for i in range(count):
        item = random.randint(1, 100)
        queue.put(item)
        print(f"Producer: Produced {item}")
        time.sleep(0.5) # Simulate production time
    # Signal the consumer that we're done
    queue.put(None) # Using None as a sentinel value
def consumer(queue):
    """Consumes numbers from the queue."""
    while True:
        item = queue.get() # This blocks until an item is available
        if item is None: # Check for the sentinel value
            print("Consumer: Received sentinel. Exiting.")
            break
        print(f"Consumer: Consumed {item}")
        time.sleep(1) # Simulate consumption time
if __name__ == "__main__":
    # Create a shared queue
    task_queue = multiprocessing.Queue()
    # Create producer and consumer processes
    p_producer = multiprocessing.Process(target=producer, args=(task_queue, 5))
    p_consumer = multiprocessing.Process(target=consumer, args=(task_queue,))
    p_producer.start()
    p_consumer.start()
    # Wait for both to finish
    p_producer.join()
    p_consumer.join()
    print("Main process finished.")

b) Pipe A Pipe creates a two-way communication channel. It returns a pair of Connection objects, one for each end of the pipe.

Example: Using a Pipe

import multiprocessing
def sender(conn):
    """Sends a message through the pipe."""
    conn.send("Hello from the sender process!")
    conn.close()
def receiver(conn):
    """Receives a message from the pipe."""
    msg = conn.recv()
    print(f"Receiver got message: {msg}")
    conn.close()
if __name__ == "__main__":
    # Create a pipe, which gives us two connection objects
    parent_conn, child_conn = multiprocessing.Pipe()
    p_sender = multiprocessing.Process(target=sender, args=(child_conn,))
    p_receiver = multiprocessing.Process(target=receiver, args=(parent_conn,))
    p_sender.start()
    p_receiver.start()
    p_sender.join()
    p_receiver.join()
    print("Main process finished.")

Manager for Shared State

If you absolutely need to share more complex data structures (like lists or dictionaries) between processes, you can use a Manager. A Manager object controls a server process that manages shared objects. Access to these objects is coordinated by the manager.

Example: Sharing a list

import multiprocessing
import time
def worker(shared_list, process_id):
    """Appends the process ID to the shared list."""
    shared_list.append(f"Data from process {process_id}")
    time.sleep(1)
if __name__ == "__main__":
    # Create a manager and a shared list
    with multiprocessing.Manager() as manager:
        shared_list = manager.list()
        processes = []
        for i in range(5):
            p = multiprocessing.Process(target=worker, args=(shared_list, i))
            processes.append(p)
            p.start()
        for p in processes:
            p.join()
        print("Shared list contents:", list(shared_list))

Warning: Using a Manager can be slow because every access to the shared object requires IPC (pickling data, sending it over a network socket, and unpickling it). For performance-critical code, prefer Queue or other IPC methods.


When to Use multiprocessing vs. threading

Feature multiprocessing threading
Parallelism True Parallelism. Uses multiple CPU cores. Concurrency. Only one thread runs at a time due to the GIL.
Use Case CPU-bound tasks: Heavy calculations, data processing, image manipulation. I/O-bound tasks: Network requests, reading/writing files, waiting for a database.
Memory Separate memory space. More memory usage. Shared memory space. Lower memory usage.
Communication Slower. Requires explicit IPC (Queues, Pipes). Faster. Shared variables (but needs locks for safety).
Overhead High. Creating a process is more expensive than a thread. Low. Creating a thread is relatively cheap.

Summary

  1. Use multiprocessing for CPU-bound tasks to speed them up using all available CPU cores.
  2. Always wrap your main execution block in if __name__ == "__main__": to prevent infinite process creation.
  3. For simple parallel mapping of a function over data, multiprocessing.Pool is your best friend.
  4. Use Queue or Pipe for safe and efficient communication between processes.
  5. Be aware of the overhead. Creating processes is more resource-intensive than creating threads.
分享:
扫描分享到社交APP
上一篇
下一篇