From fc06b0c3dbc9a18ca2fcb93548bac2baca3e3637 Mon Sep 17 00:00:00 2001 From: "si@aidatagov.com" Date: Sat, 27 Sep 2025 19:33:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=AE=89=E5=85=A8=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/metasecurity_service.py | 141 ++++++++++++++---- 1 file changed, 110 insertions(+), 31 deletions(-) diff --git a/vue-fastapi-backend/module_admin/service/metasecurity_service.py b/vue-fastapi-backend/module_admin/service/metasecurity_service.py index 97b20fb..2c2fea4 100644 --- a/vue-fastapi-backend/module_admin/service/metasecurity_service.py +++ b/vue-fastapi-backend/module_admin/service/metasecurity_service.py @@ -1,5 +1,4 @@ from fastapi import Request -from sqlalchemy.ext.asyncio import AsyncSession from exceptions.exception import ServiceException from module_admin.dao.metaSecurity_dao import MetaSecurityDao from module_admin.entity.vo.common_vo import CrudResponseModel @@ -567,7 +566,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict): # 获取该表的字段名 columns = {col.lower(): col for col in table_columns[table_name]} # 将字段名转为小写 # 初始化 SELECT 部分:用字典存储字段名,值是 null 字段名 - select_columns = {col: f"null {col}" for col in columns} + select_columns = {col: f"null as {col}" for col in columns} # 处理角色列配置 for col in config["role_col_list"]: @@ -575,7 +574,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict): if col.dbCName == "ALL": if col.ctrl_type == '0': # If ctrl_type is '0', prefix all columns with null for db_column in columns: # Assuming 'user' is the table name - select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 + select_columns[db_column] = f"null as {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column names for db_column in columns: select_columns[db_column] = db_column # 使用实际字段名 @@ -586,7 +585,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict): db_column = db_column.strip() if db_column in columns: # Check if the column exists in the table if col.ctrl_type == '0': # If ctrl_type is '0', prefix with null - select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 + select_columns[db_column] = f"null as {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column name select_columns[db_column] = db_column # 使用实际字段名 # 处理用户列配置 @@ -594,7 +593,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict): if col.dbCName == "ALL": # 如果 dbCName 为 "ALL" if col.ctrl_type == "0": # ctrlType 为 0,字符串字段 for db_column in columns: # 对所有字段加上 null - select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 + select_columns[db_column] = f"null as {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == "1": # ctrlType 为 1,实际数据库字段 for db_column in columns: # 使用实际字段名,不加 null select_columns[db_column] = db_column # 使用实际字段名 @@ -604,7 +603,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict): db_column = db_column.strip() if db_column in columns: if col.ctrl_type == "0": - select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 + select_columns[db_column] = f"null as {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == "1": select_columns[db_column] = db_column # 使用实际字段名 # 生成 SQL 查询 @@ -656,41 +655,121 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict): if where_conditions: sql_queries[table_name] += " WHERE " + where_conditions return sql_queries +# async def replace_table_with_subquery(ctrSqlDict, oldStrSql): +# table_alias_map = {} # 存储表名和别名的映射 +# for table_name, subquery in ctrSqlDict.items(): +# # 构建正则表达式,匹配表名及可能的别名 +# pattern = ( +# r'(\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' # 匹配模式名(可选) +# + re.escape(table_name) # 转义表名 +# + r'\b)' # 结束表名 +# r'(\s+(?:AS\s+)?(\w+))?' # 捕获别名部分(含 AS 或直接别名) +# r'(?=\s*[\w\(\)]*)' # 确保后面是合法 SQL 语法,不是 SQL 关键字 +# ) +# def replace(match): +# original_table = match.group(1) # 原始表名(可能含模式名) +# alias_part = match.group(2) # 别名部分(含空格、AS 或直接别名) +# alias_name = match.group(3) # 别名名称(无 AS 前缀) +# if original_table not in table_alias_map: +# # 处理表名后直接跟着 SQL 关键字的情况 +# sql_keywords = {"LIMIT", "WHERE", "ORDER", "GROUP", "HAVING", "JOIN", "ON", "USING", "UNION", +# "EXCEPT", "INTERSECT", "FETCH", "OFFSET"} +# if alias_name and alias_name.upper().split()[0] not in sql_keywords: +# # 已存在别名,且别名后没有紧跟 SQL 关键字,保留原别名 +# replaced = f"({subquery}) {alias_part}" +# table_alias_map[original_table] = alias_part +# else: +# # 无别名时,或者别名无效(如 LIMIT),添加默认别名 +# alias = original_table.split('.')[-1] +# replaced = f"({subquery}) AS {alias}{alias_part}" +# table_alias_map[original_table] = alias +# else: +# alias = table_alias_map[original_table] +# replaced = f"{alias}" # 使用别名 +# return replaced +# # 执行替换(忽略大小写) +# oldStrSql = re.sub(pattern, replace, oldStrSql, flags=re.IGNORECASE) + +# return oldStrSql + async def replace_table_with_subquery(ctrSqlDict, oldStrSql): - table_alias_map = {} # 存储表名和别名的映射 + """ + 将 SQL 中的表替换成子查询,并自动生成别名,同时把字段引用替换为别名.字段 + """ + table_alias_map = {} # 存储表名和别名的映射 + for table_name, subquery in ctrSqlDict.items(): - # 构建正则表达式,匹配表名及可能的别名 - pattern = ( - r'(\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' # 匹配模式名(可选) - + re.escape(table_name) # 转义表名 - + r'\b)' # 结束表名 - r'(\s+(?:AS\s+)?(\w+))?' # 捕获别名部分(含 AS 或直接别名) - r'(?=\s*[\w\(\)]*)' # 确保后面是合法 SQL 语法,不是 SQL 关键字 + # 1️⃣ 匹配 FROM / JOIN 中的表名及别名(不使用 lookbehind) + from_join_pattern = ( + r'\b(FROM|JOIN)\s+' # 捕获关键字 + r'((?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' # 模式名(可选) + + re.escape(table_name) + r')' # 表名 + r'(\s+(?:AS\s+)?(\w+))?' # 可选别名 ) - def replace(match): - original_table = match.group(1) # 原始表名(可能含模式名) - alias_part = match.group(2) # 别名部分(含空格、AS 或直接别名) - alias_name = match.group(3) # 别名名称(无 AS 前缀) + + # 替换 FROM / JOIN 部分 + def from_join_replace(match): + keyword = match.group(1) # FROM / JOIN + original_table = match.group(2) + alias_part = match.group(3) # " AS xxx" 或 " xxx" + alias_name = match.group(4) # xxx + if original_table not in table_alias_map: - # 处理表名后直接跟着 SQL 关键字的情况 - sql_keywords = {"LIMIT", "WHERE", "ORDER", "GROUP", "HAVING", "JOIN", "ON", "USING", "UNION", - "EXCEPT", "INTERSECT", "FETCH", "OFFSET"} + # 判断 alias 是否为关键字 + # 判断 alias 是否为关键字 + sql_keywords = { + # 数据操作关键字 + "SELECT", "INSERT", "UPDATE", "DELETE", "MERGE", "TRUNCATE", + "VALUES", "RETURNING", + + # 查询关键字 + "FROM", "WHERE", "GROUP", "HAVING", "ORDER", "LIMIT", "OFFSET", + "DISTINCT", "ALL", "UNION", "INTERSECT", "EXCEPT", + + # 连接关键字 + "JOIN", "INNER", "LEFT", "RIGHT", "FULL", "CROSS", "NATURAL", "USING", "ON", + + # 数据类型和约束关键字 + "TABLE", "VIEW", "INDEX", "PRIMARY", "KEY", "FOREIGN", "REFERENCES", + "NOT", "NULL", "UNIQUE", "CHECK", "DEFAULT", + + # 控制关键字 + "IF", "ELSE", "CASE", "WHEN", "THEN", "END", "LOOP", "FOR", "WHILE", + + # 其他 + "CREATE", "ALTER", "DROP", "TRUNCATE", "COMMENT", + "EXISTS", "IN", "IS", "LIKE", "ILIKE", "SIMILAR", "BETWEEN", + "AND", "OR", "ANY", "ALL", "SOME", + "FETCH", "NEXT", "ONLY", "ASC", "DESC", + "GRANT", "REVOKE", "ROLE", "USER", + "CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP", + } + if alias_name and alias_name.upper().split()[0] not in sql_keywords: - # 已存在别名,且别名后没有紧跟 SQL 关键字,保留原别名 - replaced = f"({subquery}) {alias_part}" - table_alias_map[original_table] = alias_part + replaced = f"{keyword} ({subquery}) {alias_part}" + table_alias_map[original_table] = alias_name else: - # 无别名时,或者别名无效(如 LIMIT),添加默认别名 alias = original_table.split('.')[-1] - replaced = f"({subquery}) AS {alias}{alias_part}" + replaced = f"{keyword} ({subquery}) AS {alias}{alias_part or ''}" table_alias_map[original_table] = alias else: - alias = table_alias_map[original_table] - replaced = f"{alias}" # 使用别名 + alias = table_alias_map[original_table] + replaced = f"{keyword} {alias}" + return replaced - # 执行替换(忽略大小写) - oldStrSql = re.sub(pattern, replace, oldStrSql, flags=re.IGNORECASE) - + + oldStrSql = re.sub(from_join_pattern, from_join_replace, oldStrSql, flags=re.IGNORECASE) + + # 2️⃣ 替换字段引用 table_name.column → alias.column + column_ref_pattern = re.escape(table_name) + r'\.(\w+)' + + def column_replace(match): + col = match.group(1) + alias = table_alias_map.get(table_name, table_name.split('.')[-1]) + return f"{alias}.{col}" + + oldStrSql = re.sub(column_ref_pattern, column_replace, oldStrSql, flags=re.IGNORECASE) + return oldStrSql