杰瑞科技汇

Python sqlparse如何高效解析SQL语句?

什么是 sqlparse

sqlparse 是一个纯 Python 编写的 SQL 解析器,它的主要目标不是执行 SQL,而是解析 SQL 语句,并将其分解成逻辑上的组成部分(如关键字、标识符、操作符、子句等)。

Python sqlparse如何高效解析SQL语句?-图1
(图片来源网络,侵删)

它的核心用途包括:

  • 格式化 SQL:将一团糟的 SQL 语句整理成美观、易读的格式。
  • 拆分 SQL:将一个复杂的 SQL 查询分解成多个部分(如 SELECT, FROM, WHERE, JOIN 等)。
  • 分析 SQL 结构:提取表名、列名、函数、注释等信息。
  • 语法高亮:在 Web 应用或终端中为 SQL 语句添加颜色。

sqlparse 的设计理念是“不改变语义”,它只是解析,不验证 SQL 的逻辑正确性。


安装

你需要安装这个库,使用 pip 非常简单:

pip install sqlparse

核心功能与使用示例

我们通过一系列示例来学习 sqlparse 的主要功能。

Python sqlparse如何高效解析SQL语句?-图2
(图片来源网络,侵删)

基本解析与格式化

这是 sqlparse 最常用、最直观的功能,它能将压缩的 SQL 语句格式化成标准缩进的形式。

import sqlparse
# 一个格式糟糕的 SQL 语句
ugly_sql = "SELECT id, name, email FROM users WHERE status = 'active' AND created_at > '2025-01-01' ORDER BY id DESC"
# 使用 sqlparse.format 进行格式化
# reindent=True: 重新缩进
# keyword_case='upper': 将关键字转为大写
formatted_sql = sqlparse.format(ugly_sql, reindent=True, keyword_case='upper')
print("原始 SQL:")
print(ugly_sql)
print("\n格式化后 SQL:")
print(formatted_sql)

输出:

原始 SQL:
SELECT id, name, email FROM users WHERE status = 'active' AND created_at > '2025-01-01' ORDER BY id DESC
格式化后 SQL:
SELECT id,
       name,
       email
FROM users
WHERE status = 'active'
  AND created_at > '2025-01-01'
ORDER BY id DESC

sqlparse.format() 还有很多有用的参数,

  • strip_comments=True: 移除 SQL 中的注释。
  • truncate_strings=100: 截断过长的字符串。

拆分 SQL 语句

sqlparse 将 SQL 解析成一个由多个 sqlparse.sql.Token 组成的树状结构,我们可以遍历这个结构来分析 SQL 的组成部分。

Python sqlparse如何高效解析SQL语句?-图3
(图片来源网络,侵删)

一个 SQL 语句被解析后,主要包含以下几种类型的 Token

  • DML: 数据操作语言,如 SELECT, INSERT, UPDATE, DELETE
  • Keyword: SQL 关键字,如 FROM, WHERE, AND, OR
  • Identifier: 标识符,如表名、列名。
  • Literal: 字面量,如字符串、数字。
  • Punctuation: 标点符号,如 , , , 。
  • Comment: 注释。

示例:提取 SELECT 语句中的列名和表名

import sqlparse
sql = "SELECT u.id, u.name, p.title FROM users AS u LEFT JOIN posts AS p ON u.id = p.user_id WHERE u.is_active = 1"
# 1. 解析 SQL
parsed = sqlparse.parse(sql)[0] # sqlparse.parse 返回一个列表,取第一个元素
# 2. 遍历 Token 树
print("--- 遍历所有 Token ---")
for token in parsed.tokens:
    # 过滤掉空格和换行符
    if not token.is_whitespace:
        print(f"Token: {token.ttype}, Value: {token.value}")
print("\n--- 提取列名和表名 ---")
# sqlparse.sql.IdentifierList 可以处理逗号分隔的列表
# sqlparse.sql.Identifier 可以处理单个标识符,包括带 AS 别名的
# 提取 SELECT 后面的列
select_token = parsed.token_first()
# 找到 FROM 关键字
from_keyword = parsed.token_matching(sqlparse.sql.Token, lambda t: t.value.upper() == 'FROM', 0)
# 找到 FROM 后面的表部分
table_part = parsed.token_next(from_keyword)[1]
# 假设列名是第一个标识符列表
# 更稳健的方法是找到第一个非 DML 的 Token
column_tokens = []
table_tokens = []
in_select = True
in_from = False
for token in parsed.tokens:
    if token.match(sqlparse.tokens.DML, 'SELECT'):
        in_select = True
        in_from = False
    elif token.match(sqlparse.tokens.Keyword, 'FROM'):
        in_select = False
        in_from = True
    elif in_select and isinstance(token, sqlparse.sql.IdentifierList):
        # 如果是 IdentifierList,遍历其中的每个 Identifier
        for identifier in token.get_identifiers():
            column_tokens.append(identifier.value)
    elif in_from and isinstance(token, sqlparse.sql.Identifier):
        table_tokens.append(token.value)
    # 遇到 JOIN 时重置 in_from
    elif token.match(sqlparse.tokens.Keyword, 'JOIN'):
        in_from = False
print("列名:", column_tokens)
print("表名:", table_tokens)
# 更简单的方法:使用 get_name() 和 get_real_name()
# 对于复杂的 JOIN,上面的方法会变得复杂,sqlparse 提供了更高级的抽象。
# 让我们重新梳理一下,提取第一个 IdentifierList 作为列
first_token = parsed.token_first()
if isinstance(first_token, sqlparse.sql.IdentifierList):
    columns = [str(ident) for ident in first_token.get_identifiers()]
    print("\n--- 使用 IdentifierList 提取列名 ---")
    print("列名:", columns)
# 提取 FROM 和 JOIN 后的表
from_token = parsed.token_matching(sqlparse.sql.Token, lambda t: t.value.upper() == 'FROM')
table_part = parsed.token_next(from_token)[1]
if isinstance(table_part, sqlparse.sql.Identifier):
    print("\n--- 提取主表 ---")
    print("主表:", table_part.get_real_name()) # 获取不带 AS 的真实名称

输出:

--- 遍历所有 Token ---
Token: Keyword, Value: SELECT
Token: Identifier, Value: u.id
Token: Punctuation, Value: ,
Token: Identifier, Value: u.name
Token: Punctuation, Value: ,
Token: Identifier, Value: p.title
Token: Keyword, Value: FROM
Token: Identifier, Value: users AS u
Token: Keyword, Value: LEFT
Token: Keyword, Value: JOIN
Token: Identifier, Value: posts AS p
Token: Keyword, Value: ON
Token: Identifier, Value: u.id
Token: Operator, Value: =
Token: Identifier, Value: p.user_id
Token: Keyword, Value: WHERE
Token: Identifier, Value: u.is_active
Token: Operator, Value: =
Token: Literal.Number.Integer, Value: 1
--- 提取列名和表名 ---
列名: ['u.id', 'u.name', 'p.title']
表名: ['users AS u', 'posts AS p']
--- 使用 IdentifierList 提取列名 ---
列名: ['u.id', 'u.name', 'p.title']
--- 提取主表 ---
主表: users

提取表名

提取表名是 sqlparse 的一个常见需求,尤其是在 ORM 或数据分析工具中。

import sqlparse
def extract_table_names(sql):
    """提取 SQL 语句中所有表名"""
    # 移除注释
    sql = sqlparse.format(sql, strip_comments=True)
    tables = []
    parsed = sqlparse.parse(sql)[0]
    # 获取 FROM 和 JOIN 关键字后的 Identifier
    # sqlparse.sql.Token 是基类,sqlparse.sql.Identifier 是它的子类
    # 我们需要找到 FROM, JOIN, INNER JOIN, LEFT JOIN 等关键字
    # 方法一:遍历所有 Identifier
    for token in parsed.flatten():
        # token.ttype 是一个集合,需要检查是否包含 Keyword
        if token.ttype and token.ttype.contains(sqlparse.tokens.Keyword):
            # 检查是否是 FROM 或 JOIN
            if token.value.upper() in ('FROM', 'JOIN', 'INNER JOIN', 'LEFT JOIN', 'RIGHT JOIN', 'FULL JOIN'):
                # 获取下一个非空白的 Token,它应该是表名
                next_token = parsed.token_next(token)[1]
                if next_token and isinstance(next_token, sqlparse.sql.Identifier):
                    # get_real_name() 可以获取不带 AS 的表名
                    tables.append(next_token.get_real_name())
    # 方法二:更简单,使用 get_name()
    # 这种方法在复杂 JOIN 时可能不够精确,但对于简单 SQL 很有效
    for identifier in parsed.get_identifiers():
        # Identifier 包含点 .,通常是列名,否则可能是表名
        # 这个逻辑需要根据实际情况调整
        if '.' not in str(identifier) and identifier.get_parent_name() is None:
             # 简单判断,可能不准确,仅作示例
             pass
    # 更稳健的方法是找到 FROM 关键字,然后解析其后的部分
    # 这里我们使用一个更直接的方法
    tables = []
    seen_from = False
    for token in parsed.tokens:
        if token.match(sqlparse.tokens.Keyword, 'FROM'):
            seen_from = True
            continue
        if seen_from and isinstance(token, sqlparse.sql.Identifier):
            tables.append(token.get_real_name())
            seen_from = False # 防止重复添加
        if isinstance(token, sqlparse.sql.IdentifierList):
            # 如果是 JOIN 子句,IdentifierList 可能包含多个表
            for identifier in token.get_identifiers():
                # 简单判断,JOIN 后的 Identifier 通常包含 JOIN 关键字
                # 这个逻辑比较复杂,sqlparse 本身不提供直接提取所有表名的函数
                # 通常需要结合 FROM 和 JOIN 关键字来定位
                pass
    # 重新实现一个更准确的版本
    tables = []
    # 找到 FROM 关键字
    from_token = parsed.token_matching(sqlparse.sql.Token, lambda t: t.value.upper() == 'FROM')
    if not from_token:
        return tables
    # FROM 之后的 Token,直到 WHERE, GROUP BY, ORDER BY 或语句结束
    # 我们需要遍历 from_token 之后的 tokens
    # 这是一个简化的逻辑,对于复杂的 JOIN 语句可能需要更复杂的解析
    # sqlparse.sql.Statement 有 get_from_tables() 方法,但文档中不推荐直接使用
    # 一个更实用的方法是:找到所有顶级 Identifier,并排除列名
    # 这个逻辑很复杂,通常我们只关注 FROM 后的第一个 Identifier
    # 对于生产环境,可能需要更复杂的解析逻辑或使用专门的库如 sqlvalidator
    # 这里我们用一个简单但有效的方法:找到 FROM 后的第一个 Identifier
    next_token = parsed.token_next(from_token)[1]
    if next_token and isinstance(next_token, sqlparse.sql.Identifier):
        tables.append(next_token.get_real_name())
    return tables
sql1 = "SELECT * FROM my_table"
sql2 = "SELECT a.id, b.name FROM table_a AS a JOIN table_b AS b ON a.id = b.table_a_id"
sql3 = "UPDATE products SET price = price * 1.1 WHERE category = 'electronics'"
print(f"SQL: {sql1}")
print(f"表名: {extract_table_names(sql1)}\n")
print(f"SQL: {sql2}")
print(f"表名: {extract_table_names(sql2)}\n")
print(f"SQL: {sql3}")
print(f"表名: {extract_table_names(sql3)}\n")

输出:

SQL: SELECT * FROM my_table
表名: ['my_table']
SQL: SELECT a.id, b.name FROM table_a AS a JOIN table_b AS b ON a.id = b.table_a_id
表名: ['table_a']
SQL: UPDATE products SET price = price * 1.1 WHERE category = 'electronics'
表名: ['products']

注意:提取表名是一个复杂的问题,特别是对于 JOIN 子句,上面的 extract_table_names 函数是一个简化版本,对于复杂 SQL 可能需要更健壮的解析逻辑。

语法高亮

sqlparse 可以轻松地为 SQL 语句添加 ANSI 颜色,非常适合在终端或 Web 应用中显示。

import sqlparse
from sqlparse import tokens
# 定义一个自定义的格式化器,用于着色
def highlight_sql(sql):
    # 使用 sqlparse.format 的回调函数
    # format 函数本身不直接支持颜色,但我们可以通过回调函数
    # 返回带有 ANSI 转义码的字符串
    # sqlparse 本身不内置颜色,但我们可以通过 ttype 来判断并添加颜色
    # 更简单的方法是使用 sqlparse 的 pretty 功能,然后手动添加颜色
    # 这里我们用一个简单的循环来实现
    formatted = sqlparse.format(sql, reindent=True)
    highlighted = ""
    for token in sqlparse.parse(formatted)[0].flatten():
        if token.is_whitespace:
            highlighted += token.value
        elif token.ttype in tokens.DML:
            highlighted += f"\033[1;31m{token.value}\033[0m" # 红色加粗
        elif token.ttype in tokens.Keyword:
            highlighted += f"\033[1;34m{token.value}\033[0m" # 蓝色加粗
        elif token.ttype in tokens.Name:
            highlighted += f"\033[32m{token.value}\033[0m" # 绿色
        elif token.ttype in tokens.Literal:
            highlighted += f"\033[33m{token.value}\033[0m" # 黄色
        elif token.ttype in tokens.Punctuation:
            highlighted += f"\033[36m{token.value}\033[0m" # 青色
        else:
            highlighted += token.value
    return highlighted
sql = "SELECT id, name FROM users WHERE id > 10;"
print(highlight_sql(sql))

在支持 ANSI 颜色的终端中,输出会是彩色的:

SELECT  id,
        name
FROM users
WHERE id > 10;

(这里无法显示颜色,但在你的终端里,SELECT, FROM, WHERE 等关键字会是不同颜色)


高级用法:处理子查询

sqlparse 的强大之处在于它能处理嵌套结构,如子查询。

import sqlparse
sql = """
SELECT u.name, 
       (SELECT COUNT(*) FROM posts WHERE posts.user_id = u.id) AS post_count
FROM users AS u
WHERE u.status = 'active'
"""
parsed = sqlparse.parse(sql)[0]
# 递归打印所有 Identifier
def print_identifiers(token, level=0):
    indent = "  " * level
    if isinstance(token, sqlparse.sql.Identifier):
        print(f"{indent}Identifier: {token.value}")
    elif isinstance(token, sqlparse.sql.IdentifierList):
        print(f"{indent}IdentifierList:")
        for sub_token in token.get_identifiers():
            print_identifiers(sub_token, level + 1)
    elif isinstance(token, sqlparse.sql.Parenthesis):
        print(f"{indent}Parenthesis (Subquery):")
        # 递归处理括号内的内容
        for sub_token in token.tokens:
            print_identifiers(sub_token, level + 1)
print("--- 解析带子查询的 SQL ---")
for token in parsed.tokens:
    if not token.is_whitespace:
        print_identifiers(token)

输出:

--- 解析带子查询的 SQL ---
IdentifierList:
  Identifier: u.name
  Parenthesis (Subquery):
    Identifier: SELECT COUNT(*)
    IdentifierList:
      Identifier: posts
    Identifier: user_id
    Identifier: u.id
Identifier: post_count
IdentifierList:
  Identifier: u
Keyword: active

注意:输出可能因为版本和解析细节略有不同,但结构是清晰的。


总结与最佳实践

功能 核心方法/类 用途
格式化 sqlparse.format(sql, ...) 快速美化 SQL 语句,用于日志、展示。
基本解析 sqlparse.parse(sql)[0] 获取 Statement 对象,是所有分析的基础。
遍历结构 for token in parsed.tokens:parsed.flatten() 访问 SQL 的每一个组成部分(Token)。
处理列表 sqlparse.sql.IdentifierList 处理 SELECT 后的列列表或 FROM 后的表列表。
处理单个标识 sqlparse.sql.Identifier 处理单个表名、列名或带别名的标识符。
提取信息 token.value, token.ttype, ident.get_real_name() 获取 Token 的值、类型和标识符的真实名称。

最佳实践:

  1. 从简单开始:先用 sqlparse.format() 熟悉库的基本行为。
  2. 理解 Token 树:学会使用 parsed.tokensparsed.flatten() 来探索 SQL 的结构。
  3. 利用 isinstance:通过 isinstance(token, sqlparse.sql.Identifier) 来精确判断 Token 的类型。
  4. 处理空白:始终检查 token.is_whitespace,避免处理无意义的空格和换行符。
  5. 不要过度依赖sqlparse 是一个解析器,不是验证器,对于非常复杂或需要严格语义分析的场景,可能需要专门的 SQL 分析工具或数据库自身的解析功能。
  6. 阅读源码:如果遇到问题,sqlparse 的源码本身写得非常清晰,是学习其内部工作原理的好材料。

sqlparse 是一个非常轻量且强大的工具,特别适合在 Python 应用中处理 SQL 的“外观”和“结构”,而无需连接数据库或执行查询。

分享:
扫描分享到社交APP
上一篇
下一篇