杰瑞科技汇

Python批量插入如何实现高效操作?

Of course! The term "batch insert" in Python refers to the process of inserting multiple records into a database in a single operation, rather than one by one. This is a critical performance optimization technique.

Python批量插入如何实现高效操作?-图1
(图片来源网络,侵删)

Here's a comprehensive guide covering:

  1. Why Batch Insert? (The Performance Benefit)
  2. The Standard Way: Using executemany() (The most common approach)
  3. The Modern Way: Using executemany() with Server-Side Cursors (A significant improvement for large datasets)
  4. The High-Performance Way: Using COPY (For PostgreSQL, the fastest method)
  5. A Complete Example (Tying it all together)
  6. Best Practices

Why Batch Insert? The Performance Benefit

Imagine you need to insert 10,000 records into a database.

  • Naive Approach (Loop + execute()):

    for record in records:
        cursor.execute("INSERT INTO my_table (col1, col2) VALUES (%s, %s)", (record['val1'], record['val2']))

    This results in 10,000 separate database round-trips. Each round-trip has significant overhead (network latency, parsing the query, planning the execution, etc.). This is extremely slow.

    Python批量插入如何实现高效操作?-图2
    (图片来源网络,侵删)
  • Batch Insert (executemany()):

    cursor.executemany("INSERT INTO my_table (col1, col2) VALUES (%s, %s)", records)

    This typically results in one database round-trip (or a very small, fixed number). The database driver sends all the data at once, drastically reducing overhead and improving performance by several orders of magnitude.


The Standard Way: executemany()

This is the most common and portable method across different database libraries (like psycopg2 for PostgreSQL and mysql-connector-python for MySQL).

The executemany() method takes an SQL statement and a list of parameters. It executes the command for each set of parameters in the list.

Python批量插入如何实现高效操作?-图3
(图片来源网络,侵删)

How it Works:

The database driver creates a parameterized query and sends all the data in a single batch. The database server then prepares the statement once and executes it for all the parameter sets.

Example with psycopg2 (PostgreSQL):

import psycopg2
from psycopg2.extras import RealDictCursor # To get results as dictionaries
# --- Setup: Create a table and some data ---
# (In a real app, you'd use a connection pool)
try:
    conn = psycopg2.connect(
        dbname="testdb",
        user="user",
        password="password",
        host="localhost"
    )
    cursor = conn.cursor()
    # Create a table for our example
    cursor.execute("DROP TABLE IF EXISTS products;")
    cursor.execute("""
        CREATE TABLE products (
            id SERIAL PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            price NUMERIC(10, 2),
            stock INT
        );
    """)
    # Sample data to insert
    products_to_insert = [
        ('Laptop', 1200.50, 50),
        ('Mouse', 25.00, 200),
        ('Keyboard', 75.75, 150),
        ('Monitor', 300.00, 75),
        ('Webcam', 50.25, 120)
    ]
    # --- The Batch Insert ---
    insert_query = "INSERT INTO products (name, price, stock) VALUES (%s, %s, %s);"
    # executemany() takes the query and the list of tuples
    cursor.executemany(insert_query, products_to_insert)
    # Commit the transaction
    conn.commit()
    print(f"{cursor.rowcount} records inserted successfully.")
except (Exception, psycopg2.Error) as error:
    print("Error while connecting to PostgreSQL or inserting data:", error)
    if conn:
        conn.rollback() # Roll back in case of error
finally:
    # Closing database connection.
    if conn:
        cursor.close()
        conn.close()
        print("PostgreSQL connection is closed.")

The Modern Way: executemany() with Server-Side Cursors

For very large batches (e.g., 100,000+ records), even executemany() can consume a lot of memory on the client side because it needs to hold all the data in a list. It can also overwhelm the database server.

The solution is to use a server-side cursor with executemany(). This allows the driver to stream the data to the server in smaller chunks, reducing memory usage on the client and preventing server overload.

How it Works:

You create a server-side cursor and then pass a generator (or an iterator) to executemany() instead of a list. The generator yields batches of data one at a time.

Example with psycopg2:

import psycopg2
import time
# Assume 'conn' and 'cursor' are already established as a server-side cursor
# conn = psycopg2.connect(...)
# cursor = conn.cursor(name='my_batch_cursor') # <-- This makes it server-side
def batch_generator(data, batch_size=1000):
    """Yields batches of data from a large list."""
    for i in range(0, len(data), batch_size):
        yield data[i:i + batch_size]
# --- Simulating a very large dataset ---
all_products = [(f"Product {i}", i * 10.50, i % 100) for i in range(1, 100001)]
# Using a server-side cursor
try:
    conn = psycopg2.connect(
        dbname="testdb",
        user="user",
        password="password",
        host="localhost"
    )
    # Use a named cursor for server-side processing
    cursor = conn.cursor(name='batch_insert_cursor') 
    cursor.execute("DROP TABLE IF EXISTS large_products;")
    cursor.execute("""
        CREATE TABLE large_products (
            id SERIAL PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            price NUMERIC(10, 2),
            stock INT
        );
    """)
    conn.commit()
    insert_query = "INSERT INTO large_products (name, price, stock) VALUES (%s, %s, %s);"
    # Pass the generator to executemany
    cursor.executemany(insert_query, batch_generator(all_products, batch_size=2000))
    conn.commit()
    print(f"{cursor.rowcount} records inserted successfully using a server-side cursor.")
except (Exception, psycopg2.Error) as error:
    print("Error:", error)
    if conn:
        conn.rollback()
finally:
    if conn:
        cursor.close()
        conn.close()

The High-Performance Way: Using COPY (PostgreSQL)

For PostgreSQL, the absolute fastest way to load large amounts of data is by using the COPY command. It bypasses most of the SQL query parsing and execution overhead and is highly optimized for bulk data loading.

The psycopg2 library provides a copy_from() method to do this.

How it Works:

  1. Prepare your data as a file-like object (e.g., a StringIO object in memory or a real file).
  2. The copy_from() method streams this data to the PostgreSQL server using the COPY command.

Example with psycopg2:

import psycopg2
import io
import csv
try:
    conn = psycopg2.connect(
        dbname="testdb",
        user="user",
        password="password",
        host="localhost"
    )
    cursor = conn.cursor()
    # Create a table for the COPY example
    cursor.execute("DROP TABLE IF EXISTS products_copy;")
    cursor.execute("""
        CREATE TABLE products_copy (
            name VARCHAR(255),
            price NUMERIC(10, 2),
            stock INT
        );
    """)
    conn.commit()
    # Sample data
    products_data = [
        ('Laptop', 1200.50, 50),
        ('Mouse', 25.00, 200),
        ('Keyboard', 75.75, 150),
        ('Monitor', 300.00, 75),
        ('Webcam', 50.25, 120)
    ]
    # Create a buffer in memory to act as a file
    output = io.StringIO()
    # Use csv.writer to format the data correctly (handles escaping, etc.)
    writer = csv.writer(output, delimiter=',', quotechar='"')
    writer.writerows(products_data)
    # Reset the buffer's position to the beginning
    output.seek(0)
    # The COPY command requires the columns to be specified
    # The file-like object is passed as the source
    cursor.copy_from(output, 'products_copy', sep=',', null='', columns=('name', 'price', 'stock'))
    conn.commit()
    print("Data inserted successfully using COPY.")
except (Exception, psycopg2.Error) as error:
    print("Error:", error)
    if conn:
        conn.rollback()
finally:
    if conn:
        cursor.close()
        conn.close()

Complete Example: Comparing All Methods

Here is a single script that sets up a table and then times the different methods to show the performance difference.

import psycopg2
import time
import io
import csv
# --- Configuration ---
DB_NAME = "testdb"
DB_USER = "user"
DB_PASS = "password"
DB_HOST = "localhost"
TABLE_NAME = "performance_test"
NUM_RECORDS = 50000
# --- Generate sample data ---
def generate_data(n):
    return [(f"Item-{i}", i * 1.1, i % 1000) for i in range(n)]
# --- Helper function to run a test ---
def run_test(test_name, func, *args):
    print(f"\n--- Running Test: {test_name} ---")
    try:
        conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST)
        cursor = conn.cursor()
        # Clean up table before each test
        cursor.execute(f"DROP TABLE IF EXISTS {TABLE_NAME};")
        cursor.execute(f"CREATE TABLE {TABLE_NAME} (name VARCHAR(255), price NUMERIC(10, 2), stock INT);")
        conn.commit()
        start_time = time.time()
        func(cursor, *args) # Pass cursor and other args
        conn.commit()
        end_time = time.time()
        duration = end_time - start_time
        print(f"Success! Inserted {cursor.rowcount} rows in {duration:.4f} seconds.")
    except (Exception, psycopg2.Error) as error:
        print(f"Error in {test_name}:", error)
        if conn:
            conn.rollback()
    finally:
        if conn:
            cursor.close()
            conn.close()
# --- Test Functions ---
# 1. Standard executemany
def test_executemany(cursor, data):
    insert_query = f"INSERT INTO {TABLE_NAME} (name, price, stock) VALUES (%s, %s, %s);"
    cursor.executemany(insert_query, data)
# 2. Server-side cursor with executemany
def test_server_side_executemany(cursor, data):
    # We'll simulate a generator from our list
    def batch_generator(d, batch_size=5000):
        for i in range(0, len(d), batch_size):
            yield d[i:i + batch_size]
    insert_query = f"INSERT INTO {TABLE_NAME} (name, price, stock) VALUES (%s, %s, %s);"
    # Use a server-side cursor
    with conn.cursor(name='server_side_cursor') as s_cursor:
        s_cursor.executemany(insert_query, batch_generator(data))
        # The rowcount is on the s_cursor
        global rowcount
        rowcount = s_cursor.rowcount
# 3. COPY command
def test_copy(cursor, data):
    output = io.StringIO()
    writer = csv.writer(output, delimiter=',', quotechar='"')
    writer.writerows(data)
    output.seek(0)
    cursor.copy_from(output, TABLE_NAME, sep=',', null='', columns=('name', 'price', 'stock'))
# --- Main execution ---
if __name__ == "__main__":
    all_data = generate_data(NUM_RECORDS)
    # Test 1
    run_test("Standard executemany", test_executemany, all_data)
    # Test 2
    # Note: For this test, the connection handling is slightly different
    # Let's simplify and just show the concept
    print("\n--- Running Test: Server-Side executemany ---")
    try:
        conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST)
        cursor = conn.cursor(name='server_side_test_cursor')
        cursor.execute(f"DROP TABLE IF EXISTS {TABLE_NAME};")
        cursor.execute(f"CREATE TABLE {TABLE_NAME} (name VARCHAR(255), price NUMERIC(10, 2), stock INT);")
        conn.commit()
        def batch_generator(d, batch_size=5000):
            for i in range(0, len(d), batch_size):
                yield d[i:i + batch_size]
        insert_query = f"INSERT INTO {TABLE_NAME} (name, price, stock) VALUES (%s, %s, %s);"
        start_time = time.time()
        cursor.executemany(insert_query, batch_generator(all_data))
        conn.commit()
        end_time = time.time()
        duration = end_time - start_time
        print(f"Success! Inserted {cursor.rowcount} rows in {duration:.4f} seconds.")
    except (Exception, psycopg2.Error) as error:
        print("Error:", error)
        if conn:
            conn.rollback()
    finally:
        if conn:
            cursor.close()
            conn.close()
    # Test 3
    run_test("COPY Command", test_copy, all_data)

When you run this, you will see that COPY is significantly faster than the other two methods, especially for large datasets.


Best Practices

  1. Always Use Parameterized Queries: Never format your SQL with f-strings or formatting to insert values. This is a major security risk (SQL Injection) and less efficient.

    • Bad: cursor.execute(f"INSERT INTO my_table (val) VALUES ({my_value})")
    • Good: cursor.execute("INSERT INTO my_table (val) VALUES (%s)", (my_value,))
  2. Commit in Batches: For extremely large operations, even COPY or server-side cursors can benefit from periodic commits. This can prevent long-running transactions from filling up the database's transaction log (WAL).

    # Inside a loop with a server-side cursor
    for batch in batch_generator(huge_data, 10000):
        cursor.executemany(query, batch)
        conn.commit() # Commit every 10,000 rows
  3. Disable Indexes and Constraints (for huge loads): If you are loading millions of rows into a table with indexes or foreign key constraints, consider:

    • Dropping the indexes and constraints before the load.
    • Performing the batch insert.
    • Re-creating the indexes and constraints afterward. This is because building an index on an empty table is much faster than updating it row-by-row.
  4. Choose the Right Tool:

    • executemany(): The standard, go-to method for most use cases. It's portable and easy to use.
    • Server-Side Cursor + executemany(): Best for very large datasets where client memory is a concern.
    • COPY: The undisputed champion for performance in PostgreSQL. Use it for ETL, data migrations, and other large-scale data loading tasks.
分享:
扫描分享到社交APP
上一篇
下一篇