核心结论先行
在 Python 的 sqlite3 模块中进行多线程编程,最简单、最推荐、也是最安全的做法是:每个线程使用自己独立的数据库连接。
# 推荐:每个线程一个连接
import sqlite3
import threading
def worker_function(thread_id):
# 每个线程创建自己的连接
conn = sqlite3.connect('my_database.db')
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO logs (message) VALUES (?)", (f"Message from thread {thread_id}",))
conn.commit()
print(f"Thread {thread_id}: Inserted successfully.")
except Exception as e:
conn.rollback()
print(f"Thread {thread_id}: Error - {e}")
finally:
conn.close()
# 创建并启动多个线程
threads = []
for i in range(5):
t = threading.Thread(target=worker_function, args=(i,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("All threads finished.")
为什么必须每个线程一个连接?
这背后是 SQLite 的核心设计哲学决定的,SQLite 采用了文件级锁的并发控制模型。
SQLite 的并发模型
- 串行访问:默认情况下,SQLite 对数据库的访问是串行化的,这意味着在任何给定时间,只有一个数据库连接可以写入数据库,如果另一个连接尝试写入,它必须等待第一个连接完成其事务并释放锁。
- 两种模式:SQLite 提供了两种不同的线程模式,通过
sqlite3.threadsafety可以检查:1(串行访问 - 默认): 这是 Pythonsqlite3模块的默认设置,它意味着同一个数据库连接不能在多个线程之间共享,如果你尝试在一个线程中使用连接,同时在另一个线程中操作同一个连接,会立即抛出sqlite3.ProgrammingError。2(多线程访问): 在这种模式下,同一个连接可以安全地被多个线程使用,但前提是所有线程都只进行读操作,一旦有线程开始写操作,其他线程(无论是读还是写)都必须等待。3(串行访问): 这种模式允许连接在多个线程间共享,但要求应用程序自己处理同步问题(例如使用threading.Lock),Python 的sqlite3模块不支持这种模式。
SQLite 的锁规则:
| 操作类型 | 访问模式 | 是否需要锁 | 备注 |
|---|---|---|---|
| 读 | 多线程访问 | 不需要 | 多个线程可以同时读取。 |
| 写 | 任何模式 | 需要 | 任何时候,只有一个线程可以写。 |
| 读 + 写 | 任何模式 | 需要 | 写操作会阻塞所有读操作。 |
| 写 + 写 | 任何模式 | 需要 | 写操作会阻塞其他所有写操作。 |
Python sqlite3 的行为
Python 的 sqlite3 模块为了安全和简单,默认采用了最严格的 1 (串行访问) 模式,这意味着:
- 禁止连接共享:你绝对不能将一个
sqlite3.Connection对象传递给另一个线程使用,如果尝试这样做,sqlite3模块会检测到并抛出sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.这样的错误。 - 隐式事务:
sqlite3在执行INSERT,UPDATE,DELETE语句时会自动开始一个事务,并在你调用commit()或rollback()时结束它,这个事务会持有数据库的锁,直到事务结束,如果你忘记commit(),连接可能会一直持有锁,导致其他线程无限期等待。
多线程场景下的最佳实践
每个线程一个连接(强烈推荐)
这是最符合 SQLite 设计、最简单、最不容易出错的方式。
- 优点:
- 安全:完全避免了线程间共享连接带来的问题。
- 简单:你不需要手动管理锁,代码逻辑清晰。
- 高效:每个线程的读操作可以并行进行,不会相互阻塞。
- 缺点:
会创建多个数据库连接对象,但对于大多数应用来说,这个开销微不足道。
代码示例:
import sqlite3
import threading
def worker(thread_id):
# 1. 每个线程创建自己的连接
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
try:
# 2. 执行操作
cursor.execute("INSERT INTO products (name, price) VALUES (?, ?)",
(f"Product {thread_id}", thread_id * 10))
conn.commit()
print(f"Thread {thread_id}: Product inserted.")
except Exception as e:
conn.rollback()
print(f"Thread {thread_id}: Failed. Error: {e}")
finally:
# 3. 关闭连接
conn.close()
# --- 主程序 ---
if __name__ == "__main__":
# 初始化数据库
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS products
(id INTEGER PRIMARY KEY, name TEXT NOT NULL, price REAL)''')
conn.commit()
conn.close()
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("All products inserted.")
使用连接池(高级用法)
如果你的应用有大量短生命周期的线程,频繁地创建和销毁连接可能会带来性能开销,这时可以考虑使用一个简单的连接池。
- 注意:SQLite 的连接池不能像其他数据库(如 PostgreSQL)那样让连接在多个线程间自由切换,这里的“池”更像是每个线程从池中“借用”一个连接,用完后再“归还”到池中,供自己或其他线程后续使用。
代码示例:
import sqlite3
import threading
from queue import Queue
class SQLiteConnectionPool:
def __init__(self, db_path, max_connections=5):
self.db_path = db_path
self.pool = Queue(maxsize=max_connections)
self.max_connections = max_connections
# 预先创建一些连接
for _ in range(max_connections):
self.pool.put(self._create_connection())
def _create_connection(self):
conn = sqlite3.connect(self.db_path)
# 设置一个较短的超时,防止无限期等待锁
conn.execute("PRAGMA busy_timeout = 5000") # 5秒超时
return conn
def get_connection(self):
# 如果池为空,会阻塞直到有连接可用
return self.pool.get()
def return_connection(self, conn):
# 将连接放回池中
if conn:
self.pool.put(conn)
# --- 使用连接池的线程函数 ---
def worker_with_pool(pool, thread_id):
conn = pool.get_connection()
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO logs (message) VALUES (?)", (f"Pool message {thread_id}",))
conn.commit()
except Exception as e:
conn.rollback()
print(f"Thread {thread_id} (Pool): Error - {e}")
finally:
pool.return_connection(conn)
# --- 主程序 ---
if __name__ == "__main__":
# 初始化数据库
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS logs (id INTEGER PRIMARY KEY, message TEXT)''')
conn.commit()
conn.close()
# 创建连接池
pool = SQLiteConnectionPool('test.db', max_connections=3)
threads = []
for i in range(5): # 5个线程,但池只有3个连接
t = threading.Thread(target=worker_with_pool, args=(pool, i))
threads.append(t)
t.start()
for t in threads:
t.join()
# 关闭池中所有连接
while not pool.pool.empty():
conn = pool.pool.get()
conn.close()
print("All threads with pool finished.")
需要避免的常见错误
-
共享连接对象:
# 错误示范! conn = sqlite3.connect('test.db') def worker(): conn.execute("INSERT ...") # 这会或立即报错,或导致数据竞争 -
忘记提交或回滚:
# 错误示范! def worker(): conn = sqlite3.connect('test.db') conn.execute("INSERT ...") # 自动开始一个事务 # 如果忘记 conn.commit(),连接会一直持有锁 # 其他线程可能会在这里无限期等待 -
忽略
busy_timeout: 当一个线程正在写入,其他线程尝试写入时会遇到sqlite3.OperationalError: database is locked,可以通过设置超时来优雅地处理这种情况。conn = sqlite3.connect('test.db') # 设置5秒超时,如果5秒内锁被释放,就继续执行,否则抛出异常 conn.execute("PRAGMA busy_timeout = 5000")
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 每个线程一个连接 | 绝大多数情况下的首选方案 | 简单、安全、代码清晰 | 创建多个连接对象(开销可忽略) |
| 连接池 | 大量短生命周期线程,需要复用连接 | 减少连接创建/销毁的开销 | 实现稍复杂,管理逻辑更复杂 |
对于绝大多数 Python 请始终遵循“每个线程一个连接”的原则,这能让你避免 99% 的 SQLite 多线程相关问题,让你的代码既健壮又易于维护。
