杰瑞科技汇

python sqlite3 多线程

核心结论先行

sqlite3 模块在 Python 中不是完全线程安全的,这意味着,如果你同时从多个线程对同一个数据库连接对象进行写操作,可能会导致数据库损坏(DatabaseError: database disk image is malformed)。

sqlite3 内部使用文件级锁,这意味着:

  1. 一个数据库文件在同一时间只能被一个连接修改,如果两个线程尝试同时写入同一个数据库文件,第二个线程的写入操作会等待第一个线程完成。
  2. 读操作和写操作不能同时进行,一个正在写入的连接会阻塞所有其他试图读取或写入的连接。
  3. 多个读操作可以同时进行,读取操作是并发安全的。

多线程使用 sqlite3 的关键在于如何管理数据库连接


推荐的最佳实践:为每个线程创建独立的连接

这是最简单、最安全、也是官方推荐的做法,每个线程都创建并使用自己的数据库连接对象。

为什么这是最佳实践?

  • 避免冲突:每个线程操作自己的连接,从根本上避免了多个线程同时操作同一个连接对象的问题。
  • 性能:SQLite 内部连接非常轻量,创建连接的开销极小。
  • 符合 SQLite 设计:SQLite 的并发模型就是围绕“每个连接一个写事务”来设计的。

示例代码:

import sqlite3
import threading
import time
# 数据库文件名
DB_FILE = 'mydatabase.db'
# 初始化数据库
def init_db():
    # 在主线程中创建一次表
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT NOT NULL,
            points INTEGER DEFAULT 0
        )
    ''')
    # 插入一些初始数据
    cursor.execute("INSERT OR IGNORE INTO users (username) VALUES ('Alice')")
    cursor.execute("INSERT OR IGNORE INTO users (username) VALUES ('Bob')")
    conn.commit()
    conn.close()
# 线程任务:更新用户的积分
def update_user_points(user_id, points_to_add):
    # 1. 每个线程创建自己的连接
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    try:
        # 使用 BEGIN IMMEDIATE 开始一个立即获取锁的写事务
        # 这可以防止多个线程同时开始写操作,但在这里不是必须的,
        # 因为每个线程有自己的连接,但这是一个好习惯。
        conn.execute("BEGIN IMMEDIATE")
        # 读取当前积分
        cursor.execute("SELECT points FROM users WHERE id = ?", (user_id,))
        current_points = cursor.fetchone()[0]
        # 模拟一些处理时间
        time.sleep(0.1)
        # 更新积分
        new_points = current_points + points_to_add
        cursor.execute("UPDATE users SET points = ? WHERE id = ?", (new_points, user_id))
        # 提交事务
        conn.commit()
        print(f"Thread {threading.current_thread().name}: Updated user {user_id} to {new_points} points.")
    except sqlite3.Error as e:
        print(f"Thread {threading.current_thread().name}: Error {e}")
        conn.rollback() # 出错时回滚
    finally:
        # 3. 确保连接被关闭
        conn.close()
if __name__ == '__main__':
    init_db()
    threads = []
    # 启动多个线程,同时更新 Alice 和 Bob 的积分
    for i in range(5):
        t1 = threading.Thread(target=update_user_points, args=(1, 10)) # 给 Alice 加 10 分
        t2 = threading.Thread(target=update_user_points, args=(2, 5))  # 给 Bob 加 5 分
        threads.append(t1)
        threads.append(t2)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    # 最终检查结果
    final_conn = sqlite3.connect(DB_FILE)
    final_cursor = final_conn.cursor()
    final_cursor.execute("SELECT username, points FROM users")
    print("\nFinal points:")
    for row in final_cursor.fetchall():
        print(f"  {row[0]}: {row[1]}")
    final_conn.close()

代码解析:

  1. init_db(): 在程序开始时,主线程创建数据库和表。
  2. update_user_points(): 这是线程要执行的任务,函数内部,conn = sqlite3.connect(DB_FILE) 是关键,它为当前线程建立了一个全新的、独立的连接。
  3. try...finally...: 确保无论操作成功还是失败,conn.close() 都会被执行,释放连接资源。
  4. main 部分:创建并启动多个线程,它们各自调用 update_user_points,各自管理自己的连接。join() 等待所有线程完成。

需要避免的做法:共享一个连接对象

绝对不要在多个线程之间共享同一个 sqlite3.Connection 对象,尤其是在有写操作的情况下。

错误示例:

import sqlite3
import threading
# 全局共享的连接对象
SHARED_CONN = None
def worker():
    global SHARED_CONN
    try:
        # 多个线程同时操作这个全局连接
        cursor = SHARED_CONN.cursor()
        cursor.execute("INSERT INTO logs (message) VALUES (?)", (f"Message from {threading.current_thread().name}",))
        SHARED_CONN.commit()
    except sqlite3.Error as e:
        print(f"Error in {threading.current_thread().name}: {e}")
if __name__ == '__main__':
    SHARED_CONN = sqlite3.connect('test.db')
    # 创建表...
    threads = []
    for i in range(10):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    SHARED_CONN.close()

这个例子很可能会导致 DatabaseError: database disk image is malformed,因为 SQLite 无法协调多个线程对同一个连接对象的并发写入。


高级用法:使用连接池(sqlite3 模块内置)

虽然 sqlite3 没有像其他数据库(如 psycopg2)那样显式的连接池类,但我们可以利用其线程安全sqlite3.connect() 函数本身来模拟一个简单的连接池,每个线程在需要时从“池”中获取一个连接(即调用 connect()),用完后放回“池”(即调用 close()),由于连接的创建和销毁非常快,这实际上就是一种高效的“按需分配”模式。

这本质上就是最佳实践的另一种说法,但更强调“获取-使用-释放”的模式。


check_same_thread 参数

sqlite3.connect() 函数有一个 check_same_thread 参数,默认值为 True

  • check_same_thread=True (默认): 这是安全模式。Connection 对象会检查所有它的操作(cursor(), commit(), execute() 等)是否在创建它的同一个线程中执行,如果不是,它会抛出 sqlite3.ProgrammingError,这可以有效防止意外地在多线程中共享同一个连接对象。
  • check_same_thread=False: 如果你非常确定你会在多线程环境中安全地使用一个连接(只进行读操作,或者你手动管理了事务锁),可以将其设为 False 来禁用这个检查。

警告:除非你完全理解 SQLite 的并发模型并进行了精心的设计,否则强烈建议保持默认值 True,它可以帮你提前发现代码中的错误。

场景 推荐做法 原因
多线程读写 为每个线程创建独立的连接 最简单、最安全,符合 SQLite 设计,避免数据损坏。
共享连接对象 绝对避免 会导致数据库损坏或不可预期的错误。
check_same_thread 保持默认 True 提供一个额外的安全层,防止意外地在多线程间共享连接。
性能考虑 连接创建/销毁很快,无需担心。 专注于正确的并发模型,而不是连接本身的开销。

记住这个黄金法则:在多线程环境中使用 SQLite,让每个线程都拥有自己的 sqlite3.Connection 对象。 这样你就可以安心地利用其并发读取能力,并安全地执行写入操作。

分享:
扫描分享到社交APP
上一篇
下一篇