杰瑞科技汇

Python MySQL 如何实现自动重连?

Python MySQL 自动重连实现

在Python中使用MySQL时,网络不稳定或服务器短暂重启可能导致连接中断,实现自动重连功能可以增强程序的健壮性,以下是几种实现自动重连的方法:

Python MySQL 如何实现自动重连?-图1
(图片来源网络,侵删)

使用连接池(推荐)

使用连接池是最可靠的方式,推荐使用PyMySQLmysql-connector-python配合连接池。

使用PyMySQL + DBUtils

import pymysql
from dbutils.pooled_db import PooledDB
# 创建连接池
pool = PooledDB(
    creator=pymysql,
    maxconnections=6,  # 连接池最大连接数
    mincached=2,      # 初始化时,链接池中至少创建的空闲的链接
    maxcached=4,      # 链接池中最多闲置的链接
    maxshared=3,      # 链接池中最多共享的链接数量,0表示不共享
    blocking=True,    # 连接池中如果没有可用连接后,是否阻塞等待
    maxusage=None,    # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],    # 开始会话前执行的命令列表
    ping=1,           # ping MySQL服务器,检查是否服务可用
    host='localhost',
    port=3306,
    user='root',
    password='password',
    database='test',
    charset='utf8mb4'
)
def get_connection():
    """从连接池获取连接"""
    return pool.connection()
# 使用示例
try:
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute("SELECT 1")
    result = cursor.fetchone()
    print(result)
    cursor.close()
    conn.close()  # 实际是归还连接到连接池
except Exception as e:
    print(f"Error: {e}")

使用mysql-connector-python + pooling

import mysql.connector
from mysql.connector import pooling
# 创建连接池
dbconfig = {
    "host": "localhost",
    "user": "root",
    "password": "password",
    "database": "test"
    # 其他配置...
}
connection_pool = pooling.MySQLConnectionPool(
    pool_name="mypool",
    pool_size=5,
    **dbconfig
)
def get_connection():
    """从连接池获取连接"""
    return connection_pool.get_connection()
# 使用示例
try:
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute("SELECT 1")
    result = cursor.fetchone()
    print(result)
    cursor.close()
    conn.close()  # 归还连接
except mysql.connector.Error as err:
    print(f"Error: {err}")

手动实现自动重连

如果不使用连接池,可以手动实现重连逻辑:

import pymysql
import time
def get_db_connection():
    return pymysql.connect(
        host='localhost',
        user='root',
        password='password',
        database='test',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
def execute_query_with_retry(query, max_retries=3, retry_interval=5):
    last_error = None
    for attempt in range(max_retries):
        try:
            conn = get_db_connection()
            with conn.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchall()
                conn.close()
                return result
        except (pymysql.Error, pymysql.OperationalError) as e:
            last_error = e
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                print(f"Retrying in {retry_interval} seconds...")
                time.sleep(retry_interval)
    print(f"All {max_retries} attempts failed. Last error: {last_error}")
    return None
# 使用示例
result = execute_query_with_retry("SELECT * FROM users")
print(result)

使用装饰器实现自动重连

import pymysql
import time
from functools import wraps
def auto_reconnect(max_retries=3, retry_interval=5):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except (pymysql.Error, pymysql.OperationalError) as e:
                    last_error = e
                    print(f"Attempt {attempt + 1} failed: {e}")
                    if attempt < max_retries - 1:
                        print(f"Retrying in {retry_interval} seconds...")
                        time.sleep(retry_interval)
            print(f"All {max_retries} attempts failed. Last error: {last_error}")
            return None
        return wrapper
    return decorator
# 使用示例
@auto_reconnect(max_retries=3, retry_interval=2)
def get_user(user_id):
    conn = pymysql.connect(
        host='localhost',
        user='root',
        password='password',
        database='test',
        charset='utf8mb4'
    )
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        result = cursor.fetchone()
    conn.close()
    return result
# 测试
user = get_user(1)
print(user)

最佳实践建议

  1. 优先使用连接池:连接池能更好地管理连接资源,自动处理重连和连接验证
  2. 设置合理的超时:在连接配置中设置连接超时和读取超时
  3. 实现心跳检测:定期执行简单查询保持连接活跃
  4. 记录重连日志:记录重连尝试和失败情况,便于排查问题
  5. 考虑使用ORM:如SQLAlchemy等ORM框架通常内置了连接池和重连机制

方法可以根据你的具体需求和环境选择合适的实现方式。

Python MySQL 如何实现自动重连?-图2
(图片来源网络,侵删)
分享:
扫描分享到社交APP
上一篇
下一篇