杰瑞科技汇

concurrent python

Of course! Let's break down concurrency in Python, from the core concepts to the tools you'll use.

What is Concurrency?

At its heart, concurrency is about dealing with (and potentially completing) many tasks at the same time. It's not the same as parallelism, although they are related.

  • Concurrency is about structure. It's a way of writing code that can handle multiple operations seemingly simultaneously. It's like a single-core CPU rapidly switching between tasks, giving the illusion of parallelism. The goal is often to improve responsiveness (e.g., a GUI app that doesn't freeze) or throughput (e.g., handling many network requests).
  • Parallelism is about execution. It's when tasks are literally running at the same time, typically on multiple CPU cores. The goal is to improve performance and speed up computation.

In Python, concurrency is crucial because of the Global Interpreter Lock (GIL). The GIL is a mutex that protects access to Python objects, preventing multiple native threads from executing Python bytecode at the same time. This means that for CPU-bound tasks (heavy calculations), Python's threading module won't give you true parallelism. However, for I/O-bound tasks (waiting for network, disk, or database operations), concurrency is extremely effective because a thread can be paused (I/O blocked) while another runs, making excellent use of the CPU's idle time.


The Three Pillars of Concurrency in Python

Python primarily offers three ways to achieve concurrency:

  1. threading: For I/O-bound tasks. Uses OS-level threads.
  2. multiprocessing: For CPU-bound tasks. Uses separate processes, bypassing the GIL.
  3. asyncio: For high-concurrency I/O-bound tasks. Uses a single thread and an "event loop."

threading - The Classic Approach

The threading module is the most straightforward way to introduce concurrency. It's perfect for tasks that spend a lot of time waiting.

Key Concept: You create multiple threads of execution within a single process. While the GIL prevents them from running Python code in parallel, they can release the GIL when they perform I/O operations, allowing other threads to run.

When to use it:

  • Network requests (downloading files, API calls).
  • Reading/writing files.
  • Database queries.
  • Any task that is "waiting" more than "calculating."

Example: Downloading Multiple Files

Let's compare a synchronous approach to a concurrent one using threads.

import requests
import threading
import time
# A function that performs a blocking I/O operation
def download_file(url, filename):
    print(f"Starting download of {url}...")
    try:
        response = requests.get(url, timeout=10)
        with open(filename, 'wb') as f:
            f.write(response.content)
        print(f"Finished downloading {url}")
    except requests.RequestException as e:
        print(f"Error downloading {url}: {e}")
# --- Synchronous Version ---
def run_sequential():
    urls = [
        'https://example.com/file1.zip',
        'https://example.com/file2.zip',
        'https://example.com/file3.zip'
    ]
    start_time = time.time()
    for i, url in enumerate(urls):
        download_file(url, f"file_{i}.zip")
    end_time = time.time()
    print(f"Sequential download took: {end_time - start_time:.2f} seconds")
# --- Concurrent Version using Threads ---
def run_concurrent():
    urls = [
        'https://example.com/file1.zip',
        'https://example.com/file2.zip',
        'https://example.com/file3.zip'
    ]
    threads = []
    start_time = time.time()
    for i, url in enumerate(urls):
        thread = threading.Thread(target=download_file, args=(url, f"file_{i}.zip"))
        threads.append(thread)
        thread.start() # Start the thread
    # Wait for all threads to complete
    for thread in threads:
        thread.join()
    end_time = time.time()
    print(f"Concurrent download took: {end_time - start_time:.2f} seconds")
# To run this, you would need actual URLs.
# run_sequential()
# run_concurrent()

Output (Conceptual):

# Sequential:
Starting download of https://...
Finished downloading https://...
Starting download of https://...
Finished downloading https://...
...
Sequential download took: 15.02 seconds
# Concurrent:
Starting download of https://...
Starting download of https://...
Starting download of https://...
Finished downloading https://...
Finished downloading https://...
...
Concurrent download took: 5.10 seconds  # Much faster!

multiprocessing - Bypassing the GIL

When your task is CPU-bound (e.g., complex math, data processing, video encoding), the GIL becomes a bottleneck. The multiprocessing module solves this by creating new processes, each with its own Python interpreter and memory space (and thus, its own GIL).

Key Concept: True parallelism. Each process gets its own slice of the CPU and can run simultaneously.

When to use it:

  • Heavy numerical computations (e.g., with NumPy).
  • Data processing (e.g., with Pandas or custom algorithms).
  • Image or video processing.
  • Any task that is "calculating" more than "waiting."

Example: CPU-Bound Task

import multiprocessing
import time
# A CPU-bound task: checking if a number is prime
def is_prime(n):
    if n <= 1:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(n**0.5) + 1, 2):
        if n % i == 0:
            return False
    return True
def check_numbers_chunk(numbers_chunk):
    results = []
    for num in numbers_chunk:
        results.append((num, is_prime(num)))
    return results
if __name__ == "__main__":
    # A large list of numbers to check
    numbers = list(range(1, 100000))
    # --- Sequential Version ---
    start_time = time.time()
    sequential_results = [is_prime(n) for n in numbers]
    end_time = time.time()
    print(f"Sequential check took: {end_time - start_time:.2f} seconds")
    # --- Concurrent Version using Processes ---
    num_processes = multiprocessing.cpu_count()
    chunk_size = len(numbers) // num_processes
    number_chunks = [numbers[i:i + chunk_size] for i in range(0, len(numbers), chunk_size)]
    start_time = time.time()
    with multiprocessing.Pool(processes=num_processes) as pool:
        process_results = pool.map(check_numbers_chunk, number_chunks)
    # Flatten the results from all processes
    final_results = [item for sublist in process_results for item in sublist]
    end_time = time.time()
    print(f"Concurrent check took: {end_time - start_time:.2f} seconds")

Output (Conceptual):

# On a 4-core machine:
Sequential check took: 4.50 seconds
Concurrent check took: 1.30 seconds # Significantly faster!

Note: The if __name__ == "__main__": guard is essential on some platforms (like Windows) to prevent infinite loops when spawning processes.


asyncio - The Modern, High-Performance I/O Approach

asyncio is a different paradigm. It's a single-threaded, single-process model that uses an event loop to manage thousands of concurrent "tasks." Tasks yield control back to the event loop when they encounter an await on an I/O operation. This allows other tasks to run while the first one waits.

Key Concept: Cooperative multitasking. Code must be written explicitly to yield control using async and await.

When to use it:

  • High-performance web servers (e.g., FastAPI, Starlette).
  • Managing thousands of network connections (e.g., chat servers).
  • Web scraping many sites.
  • Any situation where you need to handle a massive number of I/O-bound tasks with minimal overhead.

Example: Fetching Web Pages with asyncio

import asyncio
import aiohttp # A popular async HTTP client library
import time
async def fetch_url(session, url):
    print(f"Starting fetch for {url}")
    try:
        # session.get() is a non-blocking operation. 'await' pauses this function
        # and lets the event loop run other tasks.
        async with session.get(url, timeout=10) as response:
            # We can also 'await' reading the content
            data = await response.text()
            print(f"Finished fetching {url}, status: {response.status}")
            return data
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None
async def run_async():
    urls = [
        'https://example.com',
        'https://python.org',
        'https://github.com'
    ]
    # aiohttp.ClientSession is the equivalent of requests.Session for async
    async with aiohttp.ClientSession() as session:
        # Create a list of tasks to be run concurrently
        tasks = [fetch_url(session, url) for url in urls]
        # asyncio.gather runs all tasks concurrently and waits for them all to finish
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()
        print(f"\nAsync fetch took: {end_time - start_time:.2f} seconds")
        # print(f"Results: {len([r for r in results if r])} successful fetches.")
# To run an asyncio program, you need an event loop
# In modern Python (3.7+), you can just run the main function
asyncio.run(run_async())

Output (Conceptual):

Starting fetch for https://example.com
Starting fetch for https://python.org
Starting fetch for https://github.com
Finished fetching https://example.com, status: 200
Finished fetching https://python.org, status: 200
Finished fetching https://github.com, status: 200
Async fetch took: 0.85 seconds

Summary: Which One to Choose?

Feature threading multiprocessing asyncio
Primary Use Case I/O-bound tasks CPU-bound tasks High-concurrency I/O-bound tasks
Execution Model OS-level threads Separate processes Single-threaded event loop
Parallelism No (due to GIL) Yes No (but appears concurrent)
Overhead Low High (memory, startup) Very Low
Best For... Simple scripts, GUI apps, moderate I/O. Heavy calculations, data crunching. Web servers, 10k+ connections, microservices.
Key Tools threading.Thread multiprocessing.Pool, Process async def, await, asyncio.gather

Final Rule of Thumb:

  1. Is your task waiting for I/O (network, disk)?
    • If you have a few tasks, use threading.
    • If you have thousands of tasks, use asyncio for better performance and scalability.
  2. Is your task doing heavy calculations (CPU-bound)?
    • Use multiprocessing to bypass the GIL and use all your CPU cores.
分享:
扫描分享到社交APP
上一篇
下一篇