杰瑞科技汇

python sqlite 多线程

核心结论先行

在 Python 的 sqlite3 模块中进行多线程编程,最简单、最推荐、也是最安全的做法是:每个线程使用自己独立的数据库连接

# 推荐:每个线程一个连接
import sqlite3
import threading
def worker_function(thread_id):
    # 每个线程创建自己的连接
    conn = sqlite3.connect('my_database.db')
    cursor = conn.cursor()
    try:
        cursor.execute("INSERT INTO logs (message) VALUES (?)", (f"Message from thread {thread_id}",))
        conn.commit()
        print(f"Thread {thread_id}: Inserted successfully.")
    except Exception as e:
        conn.rollback()
        print(f"Thread {thread_id}: Error - {e}")
    finally:
        conn.close()
# 创建并启动多个线程
threads = []
for i in range(5):
    t = threading.Thread(target=worker_function, args=(i,))
    threads.append(t)
    t.start()
# 等待所有线程完成
for t in threads:
    t.join()
print("All threads finished.")

为什么必须每个线程一个连接?

这背后是 SQLite 的核心设计哲学决定的,SQLite 采用了文件级锁的并发控制模型。

SQLite 的并发模型

  • 串行访问:默认情况下,SQLite 对数据库的访问是串行化的,这意味着在任何给定时间,只有一个数据库连接可以写入数据库,如果另一个连接尝试写入,它必须等待第一个连接完成其事务并释放锁。
  • 两种模式:SQLite 提供了两种不同的线程模式,通过 sqlite3.threadsafety 可以检查:
    • 1 (串行访问 - 默认): 这是 Python sqlite3 模块的默认设置,它意味着同一个数据库连接不能在多个线程之间共享,如果你尝试在一个线程中使用连接,同时在另一个线程中操作同一个连接,会立即抛出 sqlite3.ProgrammingError
    • 2 (多线程访问): 在这种模式下,同一个连接可以安全地被多个线程使用,但前提是所有线程都只进行读操作,一旦有线程开始写操作,其他线程(无论是读还是写)都必须等待。
    • 3 (串行访问): 这种模式允许连接在多个线程间共享,但要求应用程序自己处理同步问题(例如使用 threading.Lock),Python 的 sqlite3 模块不支持这种模式。

SQLite 的锁规则:

操作类型 访问模式 是否需要锁 备注
多线程访问 不需要 多个线程可以同时读取。
任何模式 需要 任何时候,只有一个线程可以写。
读 + 写 任何模式 需要 写操作会阻塞所有读操作。
写 + 写 任何模式 需要 写操作会阻塞其他所有写操作。

Python sqlite3 的行为

Python 的 sqlite3 模块为了安全和简单,默认采用了最严格的 1 (串行访问) 模式,这意味着:

  1. 禁止连接共享:你绝对不能将一个 sqlite3.Connection 对象传递给另一个线程使用,如果尝试这样做,sqlite3 模块会检测到并抛出 sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. 这样的错误。
  2. 隐式事务sqlite3 在执行 INSERT, UPDATE, DELETE 语句时会自动开始一个事务,并在你调用 commit()rollback() 时结束它,这个事务会持有数据库的锁,直到事务结束,如果你忘记 commit(),连接可能会一直持有锁,导致其他线程无限期等待。

多线程场景下的最佳实践

每个线程一个连接(强烈推荐)

这是最符合 SQLite 设计、最简单、最不容易出错的方式。

  • 优点
    • 安全:完全避免了线程间共享连接带来的问题。
    • 简单:你不需要手动管理锁,代码逻辑清晰。
    • 高效:每个线程的读操作可以并行进行,不会相互阻塞。
  • 缺点

    会创建多个数据库连接对象,但对于大多数应用来说,这个开销微不足道。

代码示例:

import sqlite3
import threading
def worker(thread_id):
    # 1. 每个线程创建自己的连接
    conn = sqlite3.connect('test.db')
    cursor = conn.cursor()
    try:
        # 2. 执行操作
        cursor.execute("INSERT INTO products (name, price) VALUES (?, ?)", 
                       (f"Product {thread_id}", thread_id * 10))
        conn.commit()
        print(f"Thread {thread_id}: Product inserted.")
    except Exception as e:
        conn.rollback()
        print(f"Thread {thread_id}: Failed. Error: {e}")
    finally:
        # 3. 关闭连接
        conn.close()
# --- 主程序 ---
if __name__ == "__main__":
    # 初始化数据库
    conn = sqlite3.connect('test.db')
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE IF NOT EXISTS products
                      (id INTEGER PRIMARY KEY, name TEXT NOT NULL, price REAL)''')
    conn.commit()
    conn.close()
    threads = []
    for i in range(5):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print("All products inserted.")

使用连接池(高级用法)

如果你的应用有大量短生命周期的线程,频繁地创建和销毁连接可能会带来性能开销,这时可以考虑使用一个简单的连接池。

  • 注意:SQLite 的连接池不能像其他数据库(如 PostgreSQL)那样让连接在多个线程间自由切换,这里的“池”更像是每个线程从池中“借用”一个连接,用完后再“归还”到池中,供自己或其他线程后续使用。

代码示例:

import sqlite3
import threading
from queue import Queue
class SQLiteConnectionPool:
    def __init__(self, db_path, max_connections=5):
        self.db_path = db_path
        self.pool = Queue(maxsize=max_connections)
        self.max_connections = max_connections
        # 预先创建一些连接
        for _ in range(max_connections):
            self.pool.put(self._create_connection())
    def _create_connection(self):
        conn = sqlite3.connect(self.db_path)
        # 设置一个较短的超时,防止无限期等待锁
        conn.execute("PRAGMA busy_timeout = 5000") # 5秒超时
        return conn
    def get_connection(self):
        # 如果池为空,会阻塞直到有连接可用
        return self.pool.get()
    def return_connection(self, conn):
        # 将连接放回池中
        if conn:
            self.pool.put(conn)
# --- 使用连接池的线程函数 ---
def worker_with_pool(pool, thread_id):
    conn = pool.get_connection()
    cursor = conn.cursor()
    try:
        cursor.execute("INSERT INTO logs (message) VALUES (?)", (f"Pool message {thread_id}",))
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"Thread {thread_id} (Pool): Error - {e}")
    finally:
        pool.return_connection(conn)
# --- 主程序 ---
if __name__ == "__main__":
    # 初始化数据库
    conn = sqlite3.connect('test.db')
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE IF NOT EXISTS logs (id INTEGER PRIMARY KEY, message TEXT)''')
    conn.commit()
    conn.close()
    # 创建连接池
    pool = SQLiteConnectionPool('test.db', max_connections=3)
    threads = []
    for i in range(5): # 5个线程,但池只有3个连接
        t = threading.Thread(target=worker_with_pool, args=(pool, i))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    # 关闭池中所有连接
    while not pool.pool.empty():
        conn = pool.pool.get()
        conn.close()
    print("All threads with pool finished.")

需要避免的常见错误

  1. 共享连接对象

    # 错误示范!
    conn = sqlite3.connect('test.db')
    def worker():
        conn.execute("INSERT ...") # 这会或立即报错,或导致数据竞争
  2. 忘记提交或回滚

    # 错误示范!
    def worker():
        conn = sqlite3.connect('test.db')
        conn.execute("INSERT ...") # 自动开始一个事务
        # 如果忘记 conn.commit(),连接会一直持有锁
        # 其他线程可能会在这里无限期等待
  3. 忽略 busy_timeout: 当一个线程正在写入,其他线程尝试写入时会遇到 sqlite3.OperationalError: database is locked,可以通过设置超时来优雅地处理这种情况。

    conn = sqlite3.connect('test.db')
    # 设置5秒超时,如果5秒内锁被释放,就继续执行,否则抛出异常
    conn.execute("PRAGMA busy_timeout = 5000") 
方案 适用场景 优点 缺点
每个线程一个连接 绝大多数情况下的首选方案 简单、安全、代码清晰 创建多个连接对象(开销可忽略)
连接池 大量短生命周期线程,需要复用连接 减少连接创建/销毁的开销 实现稍复杂,管理逻辑更复杂

对于绝大多数 Python 请始终遵循“每个线程一个连接”的原则,这能让你避免 99% 的 SQLite 多线程相关问题,让你的代码既健壮又易于维护。

分享:
扫描分享到社交APP
上一篇
下一篇