Python sqlglot库来解析SQL语句,提取目标表字段来源,查询表的表名、别名、查询字段、筛选字段以及多表关联字段和聚合条件。查询样例sql如下 insert into VT_ATST_1 ( Cust_UID ,Atst_Elg_Inv_Exp_Ind ) select S1.Cust_UID as Cust_UID ,max(S2.Atst_Elg_Inv_Exp_Ind) as Atst_Elg_Inv_Exp_Ind from ARTS_VIEW.ARTS_XXXXX as S1 inner join ARTS_VIEW.ARTS_HHHHH as S2 on S1.Cust_UID = S2.Cust_UID where S1.DW_Snsh_DT = date'2025-01-12' group by S1.Cust_UID ;
时间: 2025-03-10 13:01:28 浏览: 412
<think>好的,我需要帮助用户使用Python的sqlglot库来解析SQL语句,提取目标表的字段来源,包括表名、别名、查询字段、筛选条件、连接条件和聚合条件。用户还提供了一个具体的SQL示例,我需要根据这个示例来演示解析过程。
首先,我需要确认用户的需求:他们想解析SQL语句,提取特定的元素。可能的应用场景包括分析查询结构、优化SQL性能,或者进行数据血缘分析。用户提到的sqlglot库我之前有所了解,它是一个SQL解析和转换工具,支持多种SQL方言,能够将SQL语句解析成抽象语法树(AST),然后遍历这个树来提取所需的信息。
接下来,我需要回顾sqlglot的基本用法。安装库之后,通常的步骤是解析SQL语句,生成语法树,然后通过遍历节点来收集信息。例如,提取SELECT语句中的字段、FROM子句中的表名和别名、JOIN条件、WHERE条件以及GROUP BY子句等。
用户提供的示例SQL是一个INSERT INTO ... SELECT语句,其中SELECT部分包含了多个表连接、别名、聚合函数和筛选条件。我需要解析这个SELECT部分的结构。首先,可能需要将整个SQL语句解析,找到其中的SELECT子查询,然后处理该子查询的结构。
接下来,我需要考虑如何提取各个部分:
1. **表名和别名**:在FROM和JOIN子句中,表可能被赋予别名。例如,示例中的ARTS_XXXXX被别名为S1,ARTS_HHHHH被别名为S2。需要提取这些表名和对应的别名。
2. **查询字段**:SELECT子句中的字段,如S1.Cust_UID和max(S2.Atst_Elg_Inv_Exp_Ind)。这些字段可能来自不同的表,并且可能有别名(如Cust_UID和Atst_Elg_Inv_Exp_Ind)。需要确定每个字段的来源表,尤其是当使用别名时如何关联到原始表。
3. **筛选字段**:WHERE子句中的条件,比如S1.DW_Snsh_DT = date'2025-01-12'。这里需要提取涉及的字段及其来源表。
4. **多表关联字段**:JOIN条件中的字段,例如ON S1.Cust_UID = S2.Cust_UID。需要提取连接条件的左右两边的字段及其对应的表。
5. **聚合条件**:GROUP BY子句中的字段,以及SELECT中的聚合函数(如max)。需要识别哪些字段被聚合,以及聚合函数使用的字段来源。
接下来,我需要思考如何使用sqlglot的API来访问这些部分。sqlglot提供了访问AST的各种方法,比如查找特定类型的节点(如Select、From、Join、Where、Group等),然后提取相关信息。
例如,解析后的AST中,SELECT语句的from属性会包含FROM和JOIN的信息,而joins属性可能包含具体的连接条件。在SELECT的expressions中可以遍历每个表达式,判断是否是别名(Alias),如果是,则获取其来源表和字段。对于聚合函数,可能需要检查表达式是否是AggFunc(如Max),并提取其参数。
此外,需要注意处理别名的情况,比如在FROM子句中表可能有别名,后续的字段引用可能使用这个别名。因此,需要建立一个映射,将别名对应到原始表名,这样在解析字段时能够正确找到来源表。
在编写代码时,可能需要递归或遍历AST的各个节点。例如,处理FROM子句时,获取主表和JOIN的表,记录它们的别名和真实表名。然后处理SELECT的表达式,解析每个字段的来源,可能需要根据别名来查找对应的表。同样,WHERE条件和JOIN条件中的字段也需要类似的处理。
对于聚合函数,需要检查SELECT中的表达式是否有聚合函数(如MAX),并记录这些函数及其参数字段。同时,GROUP BY子句中的字段也需要提取,因为它们决定了聚合的分组依据。
现在,针对用户提供的示例SQL,我需要确保代码能够正确解析并提取所需的信息。例如,在示例中,SELECT子句中的S1.Cust_UID和MAX(S2.Atst_Elg_Inv_Exp_Ind),FROM子句中的ARTS_XXXXX AS S1和ARTS_HHHHH AS S2,JOIN条件中的S1.Cust_UID = S2.Cust_UID,WHERE条件中的S1.DW_Snsh_DT = date'2025-01-12',以及GROUP BY S1.Cust_UID。
可能的步骤分解:
1. 解析SQL语句,获取AST。
2. 提取INSERT语句中的SELECT子查询。
3. 处理SELECT子查询的FROM部分,提取主表和JOIN表,记录别名和表名。
4. 处理SELECT的表达式,提取每个字段的来源表和别名,同时检查是否包含聚合函数。
5. 处理WHERE子句,提取涉及的字段和条件。
6. 处理JOIN条件,提取连接字段及其来源表。
7. 处理GROUP BY子句,提取分组字段。
在代码实现中,可能需要使用sqlglot的解析器,例如使用parse_one函数来解析SQL语句,然后访问其各个部分。例如:
```python
import sqlglot
sql = """
insert into VT_ATST_1 (Cust_UID, Atst_Elg_Inv_Exp_Ind)
select
S1.Cust_UID as Cust_UID,
max(S2.Atst_Elg_Inv_Exp_Ind) as Atst_Elg_Inv_Exp_Ind
from ARTS_VIEW.ARTS_XXXXX as S1
inner join ARTS_VIEW.ARTS_HHHHH as S2
on S1.Cust_UID = S2.Cust_UID
where S1.DW_Snsh_DT = date'2025-01-12'
group by S1.Cust_UID;
"""
# 解析SQL
parsed = sqlglot.parse_one(sql, read="snowflake") # 可能需要根据SQL方言调整
```
然后,找到SELECT语句:
```python
select_stmt = parsed.find(sqlglot.exp.Select)
```
接下来,提取FROM和JOIN的表:
```python
tables = []
for from_ in select_stmt.find_all(sqlglot.exp.From):
for source in from_.args.get("expressions", []):
if isinstance(source, sqlglot.exp.Table):
tables.append({
"name": source.name,
"alias": source.alias
})
elif isinstance(source, sqlglot.exp.Join):
# 处理JOIN的表
for join_source in source.args.get("this", []):
if isinstance(join_source, sqlglot.exp.Table):
tables.append({
"name": join_source.name,
"alias": join_source.alias
})
```
不过,可能需要更精确地处理FROM和JOIN的结构。例如,主表和JOIN的表可能都在From表达式中。或者使用select_stmt.args.get("from")来获取FROM部分。
另一个方法是遍历FROM中的表达式,包括JOIN:
```python
from_clause = select_stmt.args.get("from")[0]
tables = []
current_from = from_clause
while current_from:
if isinstance(current_from, sqlglot.exp.Table):
tables.append({
"name": current_from.name,
"alias": current_from.alias
})
elif isinstance(current_from, sqlglot.exp.Join):
# 处理JOIN的表
join_table = current_from.this
if isinstance(join_table, sqlglot.exp.Table):
tables.append({
"name": join_table.name,
"alias": join_table.alias
})
# 可能有多个JOIN,需要遍历
current_from = current_from.args.get("joins")[0] if current_from.args.get("joins") else None
```
不过这可能不够准确,需要更仔细的处理。可能需要递归或遍历所有Join节点。
接下来,处理SELECT的表达式:
```python
select_expressions = select_stmt.args.get("expressions", [])
selected_fields = []
aggregates = []
for expr in select_expressions:
if isinstance(expr, sqlglot.exp.Alias):
alias = expr.alias
expr = expr.this
else:
alias = None
# 检查是否是聚合函数
if isinstance(expr, sqlglot.exp.AggFunc):
aggregates.append({
"function": expr.key,
"args": [arg.sql() for arg in expr.args.get("expressions", [])],
"alias": alias
})
else:
# 普通字段,解析来源表和字段名
# 这里需要判断字段是否有表别名
if isinstance(expr, sqlglot.exp.Column):
table_alias = expr.table
column_name = expr.name
selected_fields.append({
"table_alias": table_alias,
"column": column_name,
"alias": alias
})
```
处理WHERE子句:
```python
where_clause = select_stmt.args.get("where")
where_conditions = []
if where_clause:
# 可能需要递归处理条件表达式中的各个字段
# 例如,提取等号两边的字段或值
for condition in where_clause.find_all(sqlglot.exp.EQ):
left = condition.left
right = condition.right
# 假设左边是字段,右边是值
if isinstance(left, sqlglot.exp.Column):
where_conditions.append({
"table_alias": left.table,
"column": left.name,
"operator": "=",
"value": right.sql()
})
```
处理JOIN条件:
```python
join_conditions = []
for join in select_stmt.find_all(sqlglot.exp.Join):
on_clause = join.args.get("on")
if on_clause:
# 处理ON条件,比如等式
for condition in on_clause.find_all(sqlglot.exp.EQ):
left = condition.left
right = condition.right
if isinstance(left, sqlglot.exp.Column) and isinstance(right, sqlglot.exp.Column):
join_conditions.append({
"left_table": left.table,
"left_column": left.name,
"right_table": right.table,
"right_column": right.name
})
```
处理GROUP BY:
```python
group_by_fields = []
group_by_clause = select_stmt.args.get("group")
if group_by_clause:
for expr in group_by_clause.expressions:
if isinstance(expr, sqlglot.exp.Column):
group_by_fields.append({
"table_alias": expr.table,
"column": expr.name
})
```
不过,上述代码可能需要根据具体的AST结构进行调整,因为不同版本的sqlglot可能有不同的API。此外,处理复杂的表达式可能需要更细致的方法,比如递归遍历表达式树。
最后,将这些提取的信息整理成用户需要的结构,比如字典或JSON格式,包含表名、别名、查询字段、筛选条件、连接条件和聚合函数。
需要注意的是,用户提供的示例中有一个INSERT INTO语句,SELECT部分是它的子查询。因此,在解析时需要先找到INSERT语句中的SELECT部分。这可以通过sqlglot的AST结构来定位。例如,INSERT语句可能有一个expression属性指向SELECT子句。
总结步骤:
1. 解析SQL,获取AST。
2. 定位到SELECT子句。
3. 提取FROM和JOIN中的表及别名。
4. 提取SELECT中的字段及其来源,区分普通字段和聚合函数。
5. 提取WHERE条件中的字段。
6. 提取JOIN条件中的字段。
7. 提取GROUP BY字段。
8. 将所有提取的信息结构化返回。
在实现过程中,可能需要处理各种边界情况,例如子查询、多个JOIN、复杂的表达式等。但对于用户提供的示例,上述方法应该足够应对。现在将这些思路转化为具体的代码示例,并验证是否能正确解析用户给出的SQL示例。</think>以下是使用 Python sqlglot 库解析 SQL 查询的逐步实现方案:
```python
import sqlglot
from sqlglot import expressions as exp
def parse_sql(sql):
parsed = sqlglot.parse_one(sql, read=None)
select_stmt = parsed.find(exp.Select)
# 提取表名和别名
tables = []
for from_ in select_stmt.find_all(exp.From):
for source in from_.args.get("expressions", []):
if isinstance(source, exp.Table):
tables.append({
"source_table": source.name,
"alias": source.alias or source.name
})
elif isinstance(source, exp.Join):
join_table = source.this
if isinstance(join_table, exp.Table):
tables.append({
"source_table": join_table.name,
"alias": join_table.alias or join_table.name
})
# 提取查询字段
selected_fields = []
for expr in select_stmt.expressions:
if isinstance(expr, exp.Alias):
alias = expr.alias
expr = expr.this
else:
alias = None
if isinstance(expr, exp.Column):
selected_fields.append({
"field": expr.name,
"source_table": expr.table,
"alias": alias
})
elif isinstance(expr, exp.AggFunc):
arg = expr.this.this if expr.this else None
selected_fields.append({
"function": expr.key,
"field": arg.name if arg else None,
"source_table": arg.table if arg else None,
"alias": alias
})
# 提取筛选条件
where_conditions = []
where_clause = select_stmt.args.get("where")
if where_clause:
for condition in where_clause.find_all(exp.EQ):
left = condition.left
if isinstance(left, exp.Column):
where_conditions.append({
"field": left.name,
"source_table": left.table,
"operator": "=",
"value": condition.right.sql()
})
# 提取连接条件
join_conditions = []
for join in select_stmt.find_all(exp.Join):
on_clause = join.args.get("on")
if on_clause:
for condition in on_clause.find_all(exp.EQ):
left = condition.left
right = condition.right
if isinstance(left, exp.Column) and isinstance(right, exp.Column):
join_conditions.append({
"left_table": left.table,
"left_field": left.name,
"right_table": right.table,
"right_field": right.name
})
# 提取聚合条件
group_by = []
if select_stmt.group:
for expr in select_stmt.group.expressions:
if isinstance(expr, exp.Column):
group_by.append({
"field": expr.name,
"source_table": expr.table
})
return {
"tables": tables,
"selected_fields": selected_fields,
"where_conditions": where_conditions,
"join_conditions": join_conditions,
"group_by": group_by
}
# 示例SQL
sql = """
insert into VT_ATST_1 (Cust_UID, Atst_Elg_Inv_Exp_Ind)
select
S1.Cust_UID as Cust_UID,
max(S2.Atst_Elg_Inv_Exp_Ind) as Atst_Elg_Inv_Exp_Ind
from ARTS_VIEW.ARTS_XXXXX as S1
inner join ARTS_VIEW.ARTS_HHHHH as S2
on S1.Cust_UID = S2.Cust_UID
where S1.DW_Snsh_DT = date'2025-01-12'
group by S1.Cust_UID;
"""
result = parse_sql(sql)
print(result)
```
阅读全文
相关推荐



















