杰瑞科技汇

Python如何高效驱动PostgreSQL?

下面我将为你详细介绍如何使用这两个库来操作 PostgreSQL 数据库。

Python如何高效驱动PostgreSQL?-图1
(图片来源网络,侵删)

核心推荐:psycopg (psycopg3)

这是目前 强烈推荐 的版本,它是 psycopg2 的现代化重写,修复了许多历史问题,并提供了更现代的 Python API。

安装

你需要安装 psycopg 库,推荐使用 pip

# 安装 psycopg3
pip install psycopg[binary]
  • [binary] 会安装预编译的二进制包,安装速度更快,兼容性更好,这是绝大多数情况下的首选。
  • 如果你想从源码编译(需要特殊功能或自定义依赖),可以安装 psycopg[pool] 或不带任何标签的 psycopg

基本使用

psycopg 使用 with 语句来管理连接和游标,这能确保资源(如连接和游标)在使用后被正确关闭。

示例 1:连接、查询、关闭

import psycopg
from psycopg import sql, OperationalError
# --- 1. 定义数据库连接信息 ---
# 建议使用环境变量来存储敏感信息,而不是硬编码在代码中
DB_USER = "your_username"
DB_PASSWORD = "your_password"
DB_HOST = "localhost"  # 或你的数据库服务器IP
DB_PORT = "5432"
DB_NAME = "your_database"
# --- 2. 建立连接并执行查询 ---
try:
    # 使用 with 语句管理连接
    # conn 对象会在 with 代码块结束时自动关闭
    with psycopg.connect(
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT,
        dbname=DB_NAME
    ) as conn:
        print("成功连接到 PostgreSQL 数据库!")
        # 使用 with 语句管理游标
        # cur 对象会在 with 代码块结束时自动关闭
        with conn.cursor() as cur:
            # --- 3. 执行 SQL 查询 ---
            # 使用 sql 模块来构建 SQL 语句,可以防止 SQL 注入
            query = sql.SQL("SELECT version()")
            cur.execute(query)
            # --- 4. 获取查询结果 ---
            #.fetchone() 获取一行结果
            db_version = cur.fetchone()
            print(f"PostgreSQL 版本: {db_version[0]}")
            # 查询示例表
            # 假设我们有一个名为 'users' 的表
            create_table_query = sql.SQL("""
                CREATE TABLE IF NOT EXISTS users (
                    id SERIAL PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    age INT
                );
            """)
            cur.execute(create_table_query)
            print("表 'users' 已准备就绪。")
            # 插入数据
            insert_query = sql.SQL("INSERT INTO users (name, email, age) VALUES (%s, %s, %s) RETURNING id")
            # 注意:参数必须是一个元组或列表,即使只有一个参数也要加逗号
            user_data = ("Alice", "alice@example.com", 30)
            cur.execute(insert_query, user_data)
            # 获取刚刚插入的行的 ID
            inserted_user_id = cur.fetchone()[0]
            print(f"成功插入用户,ID: {inserted_user_id}")
            # 查询所有用户
            cur.execute("SELECT id, name, email, age FROM users;")
            # fetchall() 获取所有结果
            users = cur.fetchall()
            print("\n所有用户列表:")
            for user in users:
                print(f"ID: {user[0]}, Name: {user[1]}, Email: {user[2]}, Age: {user[3]}")
            # --- 5. 提交事务 ---
            # 在 psycopg 中,DML 操作(如 INSERT, UPDATE, DELETE)需要手动提交
            conn.commit()
            print("\n事务已提交。")
except OperationalError as e:
    print(f"连接数据库失败: {e}")
except Exception as e:
    print(f"发生错误: {e}")
    # 如果发生错误,回滚事务
    if 'conn' in locals() and conn is not None:
        conn.rollback()

示例 2:使用 DictCursor 以字典形式获取结果

默认情况下,fetchall() 返回的是元组,如果你想通过列名来访问数据,可以使用 DictCursor

Python如何高效驱动PostgreSQL?-图2
(图片来源网络,侵删)
import psycopg
from psycopg import sql
# ... (连接代码与上面相同) ...
with psycopg.connect(...) as conn:
    with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
        cur.execute("SELECT id, name, email FROM users;")
        users = cur.fetchall()
        print("\n使用 DictCursor 获取用户:")
        for user in users:
            # 现在可以通过列名访问数据,更直观
            print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")

示例 3:参数化查询 (防止 SQL 注入)

永远不要使用字符串拼接来构建 SQL 查询! 这会带来巨大的 SQL 注入风险。psycopg 提供了安全的方式来传递参数。

import psycopg
# ... (连接代码与上面相同) ...
with psycopg.connect(...) as conn:
    with conn.cursor() as cur:
        user_name = "Bob"
        # 使用 %s 作为占位符
        # 参数作为第二个参数传递,必须是一个元组或列表
        query = "SELECT * FROM users WHERE name = %s;"
        cur.execute(query, (user_name,)) # 注意这里的逗号!
        bob = cur.fetchone()
        if bob:
            print(f"找到用户: {bob}")
        else:
            print(f"未找到名为 '{user_name}' 的用户。")

传统选择:psycopg2

psycopg2 是一个非常成熟和稳定的库,在过去很长一段时间内是事实上的标准,如果你正在维护一个旧项目,或者某些特定工具只支持 psycopg2,你可能会用到它。

安装

pip install psycopg2-binary

同样,推荐安装 psycopg2-binary 版本。

基本使用

psycopg2 的 API 与 psycopg3 非常相似,但有一些细微差别。

Python如何高效驱动PostgreSQL?-图3
(图片来源网络,侵删)
import psycopg2
from psycopg2 import sql, Error
# --- 1. 定义数据库连接信息 ---
# (与上面相同)
DB_USER = "your_username"
DB_PASSWORD = "your_password"
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "your_database"
conn = None # 初始化连接对象为 None
try:
    # --- 2. 建立连接 ---
    conn = psycopg2.connect(
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT,
        dbname=DB_NAME
    )
    print("成功连接到 PostgreSQL 数据库!")
    # 创建一个游标对象
    cur = conn.cursor()
    # --- 3. 执行查询 ---
    # 使用 sql 模块构建查询
    query = sql.SQL("SELECT version()")
    cur.execute(query)
    db_version = cur.fetchone()
    print(f"PostgreSQL 版本: {db_version[0]}")
    # 插入数据
    insert_query = sql.SQL("INSERT INTO users (name, email, age) VALUES (%s, %s, %s) RETURNING id")
    user_data = ("Charlie", "charlie@example.com", 25)
    cur.execute(insert_query, user_data)
    inserted_user_id = cur.fetchone()[0]
    print(f"成功插入用户,ID: {inserted_user_id}")
    # 查询所有用户
    cur.execute("SELECT id, name, email, age FROM users;")
    users = cur.fetchall()
    print("\n所有用户列表:")
    for user in users:
        print(f"ID: {user[0]}, Name: {user[1]}, Email: {user[2]}, Age: {user[3]}")
    # --- 4. 提交事务 ---
    conn.commit()
    print("\n事务已提交。")
except Error as e:
    print(f"数据库错误: {e}")
    if conn is not None:
        conn.rollback() # 发生错误时回滚
finally:
    # --- 5. 关闭游标和连接 ---
    if conn is not None:
        cur.close()
        conn.close()
        print("数据库连接已关闭。")

进阶主题

使用连接池

频繁地创建和销毁数据库连接会消耗大量资源,连接池可以复用已建立的连接,显著提高应用程序的性能。

psycopg3 内置了对连接池的支持。

import psycopg
from psycopg import pool
# 创建一个连接池
# min_size: 最小连接数
# max_size: 最大连接数
connection_pool = pool.ConnectionPool(
    conninfo="dbname=your_database user=your_password host=localhost port=5432",
    min_size=1,
    max_size=10,
)
try:
    # 从池中获取一个连接
    with connection_pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT count(*) FROM users;")
            user_count = cur.fetchone()[0]
            print(f"当前用户总数: {user_count}")
finally:
    # 关闭连接池
    connection_pool.closeall()
    print("连接池已关闭。")

使用 ORM (Object-Relational Mapping)

如果你不想直接写 SQL,可以使用 ORM 框架,它们将数据库表映射为 Python 对象,让你能用面向对象的方式操作数据库。

SQLAlchemy 是最流行的 Python ORM 之一,它底层可以使用 psycopg 作为驱动。

# 安装 SQLAlchemy 和 psycopg
pip install sqlalchemy psycopg[binary]

SQLAlchemy 示例:

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 1. 定义模型 (映射到数据库的表)
Base = declarative_base()
class User(Base):
    __tablename__ = 'users'  # 表名
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)
    age = Column(Integer)
    def __repr__(self):
        return f"<User(name='{self.name}', email='{self.email}')>"
# 2. 创建数据库引擎 (psycopg 作为驱动)
# 'postgresql+psycopg://' 是协议
engine = create_engine('postgresql+psycopg://your_username:your_password@localhost:5432/your_database')
# 3. 创建表 (如果不存在)
Base.metadata.create_all(engine)
# 4. 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 5. 使用 ORM 进行增删改查
# 创建新用户
new_user = User(name="David", email="david@example.com", age=42)
session.add(new_user)
# 查询用户
users = session.query(User).filter(User.age > 30).all()
print("\n使用 SQLAlchemy ORM 查询年龄大于30的用户:")
for user in users:
    print(user)
# 提交事务
session.commit()
# 关闭会话
session.close()

总结与对比

特性 psycopg (psycopg3) psycopg2
推荐度 ⭐⭐⭐⭐⭐ (首选) ⭐⭐ (仅用于旧项目)
API 更现代化,完全符合 Python 3 风格 经典 API,与 Python 2 兼容
类型转换 更强大,支持 asyncpg 风格的 typed queries 基本类型支持
连接池 内置支持,非常方便 需要第三方库(如 psycopg2.pool
异步支持 原生支持 (asyncpg 是其灵感来源) 需要配合 asyncpg 或第三方库
状态 活跃开发,是未来的方向 维护模式,修复关键 bug
安装 pip install psycopg[binary] pip install psycopg2-binary

对于任何新的 Python 项目,都应该毫不犹豫地选择 psycopg (psycopg3),它提供了更现代、更安全、更强大的功能,并且是 PostgreSQL 官方推荐的 Python 驱动。

如果你正在维护一个旧的 psycopg2 项目,可以继续使用它,但可以考虑在未来迁移到 psycopg3 以获得更好的性能和功能。

分享:
扫描分享到社交APP
上一篇
下一篇