Browse Source

数据安全接口优化

master
si@aidatagov.com 3 weeks ago
parent
commit
fc06b0c3db
  1. 141
      vue-fastapi-backend/module_admin/service/metasecurity_service.py

141
vue-fastapi-backend/module_admin/service/metasecurity_service.py

@ -1,5 +1,4 @@
from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession
from exceptions.exception import ServiceException
from module_admin.dao.metaSecurity_dao import MetaSecurityDao
from module_admin.entity.vo.common_vo import CrudResponseModel
@ -567,7 +566,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict):
# 获取该表的字段名
columns = {col.lower(): col for col in table_columns[table_name]} # 将字段名转为小写
# 初始化 SELECT 部分:用字典存储字段名,值是 null 字段名
select_columns = {col: f"null {col}" for col in columns}
select_columns = {col: f"null as {col}" for col in columns}
# 处理角色列配置
for col in config["role_col_list"]:
@ -575,7 +574,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict):
if col.dbCName == "ALL":
if col.ctrl_type == '0': # If ctrl_type is '0', prefix all columns with null
for db_column in columns: # Assuming 'user' is the table name
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
select_columns[db_column] = f"null as {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column names
for db_column in columns:
select_columns[db_column] = db_column # 使用实际字段名
@ -586,7 +585,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict):
db_column = db_column.strip()
if db_column in columns: # Check if the column exists in the table
if col.ctrl_type == '0': # If ctrl_type is '0', prefix with null
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
select_columns[db_column] = f"null as {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column name
select_columns[db_column] = db_column # 使用实际字段名
# 处理用户列配置
@ -594,7 +593,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict):
if col.dbCName == "ALL": # 如果 dbCName 为 "ALL"
if col.ctrl_type == "0": # ctrlType 为 0,字符串字段
for db_column in columns: # 对所有字段加上 null
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
select_columns[db_column] = f"null as {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == "1": # ctrlType 为 1,实际数据库字段
for db_column in columns: # 使用实际字段名,不加 null
select_columns[db_column] = db_column # 使用实际字段名
@ -604,7 +603,7 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict):
db_column = db_column.strip()
if db_column in columns:
if col.ctrl_type == "0":
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
select_columns[db_column] = f"null as {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == "1":
select_columns[db_column] = db_column # 使用实际字段名
# 生成 SQL 查询
@ -656,41 +655,121 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict):
if where_conditions:
sql_queries[table_name] += " WHERE " + where_conditions
return sql_queries
# async def replace_table_with_subquery(ctrSqlDict, oldStrSql):
# table_alias_map = {} # 存储表名和别名的映射
# for table_name, subquery in ctrSqlDict.items():
# # 构建正则表达式,匹配表名及可能的别名
# pattern = (
# r'(\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' # 匹配模式名(可选)
# + re.escape(table_name) # 转义表名
# + r'\b)' # 结束表名
# r'(\s+(?:AS\s+)?(\w+))?' # 捕获别名部分(含 AS 或直接别名)
# r'(?=\s*[\w\(\)]*)' # 确保后面是合法 SQL 语法,不是 SQL 关键字
# )
# def replace(match):
# original_table = match.group(1) # 原始表名(可能含模式名)
# alias_part = match.group(2) # 别名部分(含空格、AS 或直接别名)
# alias_name = match.group(3) # 别名名称(无 AS 前缀)
# if original_table not in table_alias_map:
# # 处理表名后直接跟着 SQL 关键字的情况
# sql_keywords = {"LIMIT", "WHERE", "ORDER", "GROUP", "HAVING", "JOIN", "ON", "USING", "UNION",
# "EXCEPT", "INTERSECT", "FETCH", "OFFSET"}
# if alias_name and alias_name.upper().split()[0] not in sql_keywords:
# # 已存在别名,且别名后没有紧跟 SQL 关键字,保留原别名
# replaced = f"({subquery}) {alias_part}"
# table_alias_map[original_table] = alias_part
# else:
# # 无别名时,或者别名无效(如 LIMIT),添加默认别名
# alias = original_table.split('.')[-1]
# replaced = f"({subquery}) AS {alias}{alias_part}"
# table_alias_map[original_table] = alias
# else:
# alias = table_alias_map[original_table]
# replaced = f"{alias}" # 使用别名
# return replaced
# # 执行替换(忽略大小写)
# oldStrSql = re.sub(pattern, replace, oldStrSql, flags=re.IGNORECASE)
# return oldStrSql
async def replace_table_with_subquery(ctrSqlDict, oldStrSql):
table_alias_map = {} # 存储表名和别名的映射
"""
SQL 中的表替换成子查询并自动生成别名同时把字段引用替换为别名.字段
"""
table_alias_map = {} # 存储表名和别名的映射
for table_name, subquery in ctrSqlDict.items():
# 构建正则表达式,匹配表名及可能的别名
pattern = (
r'(\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' # 匹配模式名(可选)
+ re.escape(table_name) # 转义表名
+ r'\b)' # 结束表名
r'(\s+(?:AS\s+)?(\w+))?' # 捕获别名部分(含 AS 或直接别名)
r'(?=\s*[\w\(\)]*)' # 确保后面是合法 SQL 语法,不是 SQL 关键字
# 1️⃣ 匹配 FROM / JOIN 中的表名及别名(不使用 lookbehind)
from_join_pattern = (
r'\b(FROM|JOIN)\s+' # 捕获关键字
r'((?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' # 模式名(可选)
+ re.escape(table_name) + r')' # 表名
r'(\s+(?:AS\s+)?(\w+))?' # 可选别名
)
def replace(match):
original_table = match.group(1) # 原始表名(可能含模式名)
alias_part = match.group(2) # 别名部分(含空格、AS 或直接别名)
alias_name = match.group(3) # 别名名称(无 AS 前缀)
# 替换 FROM / JOIN 部分
def from_join_replace(match):
keyword = match.group(1) # FROM / JOIN
original_table = match.group(2)
alias_part = match.group(3) # " AS xxx" 或 " xxx"
alias_name = match.group(4) # xxx
if original_table not in table_alias_map:
# 处理表名后直接跟着 SQL 关键字的情况
sql_keywords = {"LIMIT", "WHERE", "ORDER", "GROUP", "HAVING", "JOIN", "ON", "USING", "UNION",
"EXCEPT", "INTERSECT", "FETCH", "OFFSET"}
# 判断 alias 是否为关键字
# 判断 alias 是否为关键字
sql_keywords = {
# 数据操作关键字
"SELECT", "INSERT", "UPDATE", "DELETE", "MERGE", "TRUNCATE",
"VALUES", "RETURNING",
# 查询关键字
"FROM", "WHERE", "GROUP", "HAVING", "ORDER", "LIMIT", "OFFSET",
"DISTINCT", "ALL", "UNION", "INTERSECT", "EXCEPT",
# 连接关键字
"JOIN", "INNER", "LEFT", "RIGHT", "FULL", "CROSS", "NATURAL", "USING", "ON",
# 数据类型和约束关键字
"TABLE", "VIEW", "INDEX", "PRIMARY", "KEY", "FOREIGN", "REFERENCES",
"NOT", "NULL", "UNIQUE", "CHECK", "DEFAULT",
# 控制关键字
"IF", "ELSE", "CASE", "WHEN", "THEN", "END", "LOOP", "FOR", "WHILE",
# 其他
"CREATE", "ALTER", "DROP", "TRUNCATE", "COMMENT",
"EXISTS", "IN", "IS", "LIKE", "ILIKE", "SIMILAR", "BETWEEN",
"AND", "OR", "ANY", "ALL", "SOME",
"FETCH", "NEXT", "ONLY", "ASC", "DESC",
"GRANT", "REVOKE", "ROLE", "USER",
"CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP",
}
if alias_name and alias_name.upper().split()[0] not in sql_keywords:
# 已存在别名,且别名后没有紧跟 SQL 关键字,保留原别名
replaced = f"({subquery}) {alias_part}"
table_alias_map[original_table] = alias_part
replaced = f"{keyword} ({subquery}) {alias_part}"
table_alias_map[original_table] = alias_name
else:
# 无别名时,或者别名无效(如 LIMIT),添加默认别名
alias = original_table.split('.')[-1]
replaced = f"({subquery}) AS {alias}{alias_part}"
replaced = f"{keyword} ({subquery}) AS {alias}{alias_part or ''}"
table_alias_map[original_table] = alias
else:
alias = table_alias_map[original_table]
replaced = f"{alias}" # 使用别名
alias = table_alias_map[original_table]
replaced = f"{keyword} {alias}"
return replaced
# 执行替换(忽略大小写)
oldStrSql = re.sub(pattern, replace, oldStrSql, flags=re.IGNORECASE)
oldStrSql = re.sub(from_join_pattern, from_join_replace, oldStrSql, flags=re.IGNORECASE)
# 2️⃣ 替换字段引用 table_name.column → alias.column
column_ref_pattern = re.escape(table_name) + r'\.(\w+)'
def column_replace(match):
col = match.group(1)
alias = table_alias_map.get(table_name, table_name.split('.')[-1])
return f"{alias}.{col}"
oldStrSql = re.sub(column_ref_pattern, column_replace, oldStrSql, flags=re.IGNORECASE)
return oldStrSql

Loading…
Cancel
Save