杰瑞科技汇

PostgreSQL与Python如何高效交互?

Of course! Using Python with PostgreSQL is a very common and powerful combination. This guide will walk you through the entire process, from installation to writing robust, secure database applications.

PostgreSQL与Python如何高效交互?-图1
(图片来源网络,侵删)

We'll cover the most popular and recommended library, psycopg2, and the modern, async-friendly alternative, asyncpg.


The Core Library: psycopg2

psycopg2 is the most widely used PostgreSQL adapter for Python. It's a mature, robust, and feature-rich library that conforms to the Python DB-API 2.0 specification.

Step 1: Installation

First, you need to install the library using pip. It's highly recommended to also install the binary package, which includes all necessary dependencies.

# Recommended installation (includes pre-compiled binaries)
pip install psycopg2-binary
# If you need to build from source (e.g., for custom extensions)
# pip install psycopg2

Step 2: Connecting to a Database

To connect, you need your database connection details:

PostgreSQL与Python如何高效交互?-图2
(图片来源网络,侵删)
  • dbname: The name of the database.
  • user: The username.
  • password: The password.
  • host: The server address (e.g., localhost or an IP).
  • port: The port number (default is 5432).

You can manage these details in a configuration file or environment variables for security.

import psycopg2
import os
# Best Practice: Use environment variables for credentials
# Example: export DB_NAME="mydb" DB_USER="myuser" DB_PASS="mypassword" DB_HOST="localhost"
dbname = os.getenv('DB_NAME', 'mydb')
user = os.getenv('DB_USER', 'myuser')
password = os.getenv('DB_PASS', 'mypassword')
host = os.getenv('DB_HOST', 'localhost')
port = os.getenv('DB_PORT', '5432')
try:
    # Establish a connection
    conn = psycopg2.connect(
        dbname=dbname,
        user=user,
        password=password,
        host=host,
        port=port
    )
    # Create a cursor object to execute SQL commands
    cur = conn.cursor()
    print("Connection successful!")
    # You can now execute SQL commands...
    # Close the cursor and connection
    cur.close()
    conn.close()
except psycopg2.OperationalError as e:
    print(f"Could not connect to the database: {e}")

Step 3: Basic CRUD Operations

Let's assume you have a table named employees:

CREATE TABLE employees (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    position VARCHAR(100),
    salary NUMERIC(10, 2)
);

A) Creating (INSERT) Data

Important: Always use parameterized queries to prevent SQL injection.

# (Assuming 'conn' and 'cur' are open from the previous step)
# --- Using a tuple for parameters ---
insert_query = "INSERT INTO employees (name, position, salary) VALUES (%s, %s, %s)"
employee_data = ("Alice Smith", "Software Engineer", 90000.00)
cur.execute(insert_query, employee_data)
# --- Using a dictionary for named parameters (requires a small change) ---
# psycopg2 supports this by using '%(name)s' placeholders
insert_query_dict = "INSERT INTO employees (name, position, salary) VALUES (%(name)s, %(position)s, %(salary)s)"
employee_data_dict = {
    "name": "Bob Johnson",
    "position": "Project Manager",
    "salary": 105000.50
}
cur.execute(insert_query_dict, employee_data_dict)
# Commit the transaction to save the changes
conn.commit()
print("Employee added successfully.")

B) Reading (SELECT) Data

# (Assuming 'conn' and 'cur' are open)
# Fetch a single record
cur.execute("SELECT id, name, position FROM employees WHERE name = %s", ("Alice Smith",))
alice_record = cur.fetchone() # Returns a tuple or None
if alice_record:
    print(f"Found Alice: ID: {alice_record[0]}, Name: {alice_record[1]}, Position: {alice_record[2]}")
# Fetch all records
cur.execute("SELECT * FROM employees;")
all_employees = cur.fetchall() # Returns a list of tuples
print("\nAll Employees:")
for emp in all_employees:
    print(f"ID: {emp[0]}, Name: {emp[1]}, Position: {emp[2]}, Salary: {emp[3]}")

C) Updating (UPDATE) Data

# (Assuming 'conn' and 'cur' are open)
update_query = "UPDATE employees SET salary = %s WHERE name = %s"
new_salary = 95000.00
name_to_update = "Alice Smith"
cur.execute(update_query, (new_salary, name_to_update))
conn.commit()
print(f"Alice's salary updated to {new_salary}.")

D) Deleting (DELETE) Data

# (Assuming 'conn' and 'cur' are open)
delete_query = "DELETE FROM employees WHERE name = %s"
name_to_delete = "Bob Johnson"
cur.execute(delete_query, (name_to_delete,))
conn.commit()
print(f"Bob Johnson has been deleted.")

The Modern Asynchronous Alternative: asyncpg

For high-performance applications, especially those involving many concurrent database connections (like web servers), an asynchronous driver is a better choice. asyncpg is a very fast and complete PostgreSQL driver for asyncio.

PostgreSQL与Python如何高效交互?-图3
(图片来源网络,侵删)

Step 1: Installation

pip install asyncpg

Step 2: Connecting and Using with asyncio

The code structure is different because it uses async and await keywords.

import asyncio
import asyncpg
import os
async def main():
    # Best Practice: Use environment variables
    dbname = os.getenv('DB_NAME', 'mydb')
    user = os.getenv('DB_USER', 'myuser')
    password = os.getenv('DB_PASS', 'mypassword')
    host = os.getenv('DB_HOST', 'localhost')
    try:
        # Establish an asynchronous connection
        conn = await asyncpg.connect(
            user=user,
            password=password,
            database=dbname,
            host=host
        )
        print("Async connection successful!")
        # --- INSERT ---
        await conn.execute(
            "INSERT INTO employees (name, position, salary) VALUES ($1, $2, $3)",
            "Charlie Brown", "Data Scientist", 120000.00
        )
        print("Charlie added.")
        # --- SELECT (fetching one row) ---
        charlie = await conn.fetchrow(
            "SELECT id, name FROM employees WHERE name = $1", "Charlie Brown"
        )
        if charlie:
            print(f"Found Charlie: ID: {charlie['id']}, Name: {charlie['name']}")
        # --- SELECT (fetching all rows) ---
        all_employees = await conn.fetch("SELECT * FROM employees;")
        print("\nAll Employees (async):")
        for emp in all_employees:
            print(f"ID: {emp['id']}, Name: {emp['name']}, Position: {emp['position']}, Salary: {emp['salary']}")
        # --- UPDATE ---
        await conn.execute(
            "UPDATE employees SET salary = $1 WHERE name = $2",
            125000.00, "Charlie Brown"
        )
        print("Charlie's salary updated.")
        # --- DELETE ---
        await conn.execute("DELETE FROM employees WHERE name = $1", "Charlie Brown")
        print("Charlie deleted.")
        # Close the connection
        await conn.close()
    except Exception as e:
        print(f"An error occurred: {e}")
# Run the async main function
asyncio.run(main())

Note: asyncpg uses $1, $2, etc., for placeholders, which is a common convention in PostgreSQL drivers.


Best Practices & Advanced Topics

A) Using Connection Pooling

Creating a new database connection for every request is expensive. A connection pool reuses connections, dramatically improving performance.

  • With psycopg2: The psycopg2.pool module provides a simple pool.
  • With asyncpg: asyncpg.create_pool() is the standard and highly recommended way.

Example with asyncpg pool:

import asyncio
import asyncpg
async def use_pool(pool):
    # Acquire a connection from the pool
    async with pool.acquire() as connection:
        # Use the connection
        data = await connection.fetch("SELECT * FROM employees;")
        print(f"Fetched {len(data)} records using a pooled connection.")
        # The connection is automatically released back to the pool
async def main():
    pool = await asyncpg.create_pool(
        user='myuser',
        password='mypassword',
        database='mydb',
        host='localhost',
        min_size=2,  # Minimum number of connections in the pool
        max_size=10  # Maximum number of connections in the pool
    )
    # Run multiple tasks concurrently
    await asyncio.gather(
        use_pool(pool),
        use_pool(pool),
        use_pool(pool)
    )
    await pool.close() # Close the pool when done
asyncio.run(main())

B) Using an ORM (Object-Relational Mapper)

An ORM like SQLAlchemy allows you to interact with your database using Python objects and methods, rather than writing raw SQL. This can make your code more Pythonic and less error-prone.

SQLAlchemy with psycopg2 (using the modern 2.0 style):

from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 1. Setup
Base = declarative_base()
class Employee(Base):
    __tablename__ = 'employees'
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    position = Column(String(100))
    salary = Column(Float)
# The engine manages the connection pool
engine = create_engine('postgresql+psycopg2://myuser:mypassword@localhost/mydb')
# Create the table if it doesn't exist
Base.metadata.create_all(engine)
# 2. Create a Session
Session = sessionmaker(bind=engine)
session = Session()
# 3. Use the ORM
# --- Create ---
new_employee = Employee(name="Diana Prince", position="Hero", salary=200000.00)
session.add(new_employee)
session.commit()
print("Diana added via ORM.")
# --- Read ---
diana = session.query(Employee).filter_by(name="Diana Prince").first()
if diana:
    print(f"Found Diana: {diana.name}, Position: {diana.position}")
# --- Update ---
diana.salary = 210000.00
session.commit()
# --- Delete ---
session.delete(diana)
session.commit()
print("Diana deleted via ORM.")
session.close()

Summary: Which One to Choose?

Library Type Best For Pros Cons
psycopg2 Synchronous General-purpose scripts, simple applications, data analysis (e.g., with Pandas). - Most popular and mature
- Simple, direct SQL control
- Excellent performance for single-threaded tasks
- Blocking I/O can be slow in high-concurrency web apps
asyncpg Asynchronous High-performance web servers (FastAPI, aiohttp), microservices, real-time apps. - Extremely fast and non-blocking
- Ideal for many concurrent DB connections
- Modern asyncio integration
- Requires understanding of async/await
SQLAlchemy ORM (Synchronous/Async) Large applications where developer productivity and maintainability are key. - Abstracts SQL away
- Prevents SQL injection
- Handles relationships between tables
- Adds a layer of abstraction (can be slower)
Tortoise ORM ORM (Asynchronous) Modern async-first Python applications (e.g., FastAPI). - Built from the ground up for asyncio
- Simple and elegant API
- Smaller community than SQLAlchemy
分享:
扫描分享到社交APP
上一篇
下一篇