Python MySQL 自动重连实现
在Python中使用MySQL时,网络不稳定或服务器短暂重启可能导致连接中断,实现自动重连功能可以增强程序的健壮性,以下是几种实现自动重连的方法:

使用连接池(推荐)
使用连接池是最可靠的方式,推荐使用PyMySQL或mysql-connector-python配合连接池。
使用PyMySQL + DBUtils
import pymysql
from dbutils.pooled_db import PooledDB
# 创建连接池
pool = PooledDB(
creator=pymysql,
maxconnections=6, # 连接池最大连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接
maxcached=4, # 链接池中最多闲置的链接
maxshared=3, # 链接池中最多共享的链接数量,0表示不共享
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表
ping=1, # ping MySQL服务器,检查是否服务可用
host='localhost',
port=3306,
user='root',
password='password',
database='test',
charset='utf8mb4'
)
def get_connection():
"""从连接池获取连接"""
return pool.connection()
# 使用示例
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
print(result)
cursor.close()
conn.close() # 实际是归还连接到连接池
except Exception as e:
print(f"Error: {e}")
使用mysql-connector-python + pooling
import mysql.connector
from mysql.connector import pooling
# 创建连接池
dbconfig = {
"host": "localhost",
"user": "root",
"password": "password",
"database": "test"
# 其他配置...
}
connection_pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=5,
**dbconfig
)
def get_connection():
"""从连接池获取连接"""
return connection_pool.get_connection()
# 使用示例
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
print(result)
cursor.close()
conn.close() # 归还连接
except mysql.connector.Error as err:
print(f"Error: {err}")
手动实现自动重连
如果不使用连接池,可以手动实现重连逻辑:
import pymysql
import time
def get_db_connection():
return pymysql.connect(
host='localhost',
user='root',
password='password',
database='test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def execute_query_with_retry(query, max_retries=3, retry_interval=5):
last_error = None
for attempt in range(max_retries):
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchall()
conn.close()
return result
except (pymysql.Error, pymysql.OperationalError) as e:
last_error = e
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_interval} seconds...")
time.sleep(retry_interval)
print(f"All {max_retries} attempts failed. Last error: {last_error}")
return None
# 使用示例
result = execute_query_with_retry("SELECT * FROM users")
print(result)
使用装饰器实现自动重连
import pymysql
import time
from functools import wraps
def auto_reconnect(max_retries=3, retry_interval=5):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (pymysql.Error, pymysql.OperationalError) as e:
last_error = e
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_interval} seconds...")
time.sleep(retry_interval)
print(f"All {max_retries} attempts failed. Last error: {last_error}")
return None
return wrapper
return decorator
# 使用示例
@auto_reconnect(max_retries=3, retry_interval=2)
def get_user(user_id):
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='test',
charset='utf8mb4'
)
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
result = cursor.fetchone()
conn.close()
return result
# 测试
user = get_user(1)
print(user)
最佳实践建议
- 优先使用连接池:连接池能更好地管理连接资源,自动处理重连和连接验证
- 设置合理的超时:在连接配置中设置连接超时和读取超时
- 实现心跳检测:定期执行简单查询保持连接活跃
- 记录重连日志:记录重连尝试和失败情况,便于排查问题
- 考虑使用ORM:如SQLAlchemy等ORM框架通常内置了连接池和重连机制
方法可以根据你的具体需求和环境选择合适的实现方式。

