Browse Source

代码优化

master
si@aidatagov.com 2 months ago
parent
commit
d21c37d0e7
  1. 76
      vue-fastapi-backend/module_admin/service/metasecurity_service.py
  2. 29
      vue-fastapi-backend/module_admin/service/metatask_service.py

76
vue-fastapi-backend/module_admin/service/metasecurity_service.py

@ -14,7 +14,7 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import text
from config.env import AppConfig
import requests
from sqlalchemy.exc import OperationalError
import json
import re
from decimal import Decimal
@ -299,6 +299,7 @@ class MetaSecurityService:
dsDataResource=await get_data_source_tree(request,page_object)
# dbConnent= cls.get_db_engine("postgresql",dataParams])
dbConnent= cls.get_db_engine(dsDataResource["type"],dsDataResource["connectionParams"])
# await test_connection(dbConnent)
#3.执行原始sql
result = await cls.execute_sql(dbConnent, page_object.sqlStr,"原始")
#4.获取sql中涉及的表名
@ -313,14 +314,14 @@ class MetaSecurityService:
for table_name in sqlTableNames:
table_configs = await get_table_configs(query_db, page_object, user, role_id_list, table_name)
tablesRowCol[table_name] = table_configs
# 返回最终的结果字典
ctrSqlDict = await generate_sql(tablesRowCol,table_columns)
oldStrSql= page_object.sqlStr
#7.根据行列配置控制原始sql
newStrSql =await replace_table_with_subquery(ctrSqlDict,oldStrSql)
#8.执行结果
result = await cls.execute_sql(dbConnent, page_object.sqlStr,"控制后")
result = await cls.execute_sql(dbConnent, newStrSql,"控制后")
resultDict={
"ctrlSql": newStrSql,
"data": result,
@ -347,12 +348,13 @@ class MetaSecurityService:
if db_type.lower() == "mysql":
conn_str=f"mysql+aiomysql://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}"
print(f"数据库连接字符串: {conn_str}") # 输出调试信息
return create_async_engine(conn_str)
dbContent= create_async_engine(conn_str)
return dbContent
elif db_type.lower() == "postgresql":
return create_async_engine(f"postgresql+asyncpg://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}")
# 你可以根据需求添加更多数据库类型
dbContent= create_async_engine(f"postgresql+asyncpg://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}")
return dbContent
else:
raise ValueError("不支持的数据库类型")
raise ValueError("不支持的数据库类型")
except SQLAlchemyError as e:
# 捕获SQLAlchemy相关的数据库连接错误
raise ConnectionError(f"数据库连接失败: {e}")
@ -456,21 +458,37 @@ async def get_table_configs(query_db, page_object, user, role_id_list, table_nam
query_db, page_object.dbRId, table_name, '1', role_id
)
role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中
isHave = any([
user_col_list,
role_col_list,
user_row_list,
role_row_list
])
return {
"user_col_list": user_col_list,
"role_col_list": role_col_list,
"role_row_list": role_row_list,
"user_row_list": user_row_list
"user_row_list": user_row_list,
"isHave":isHave
}
async def generate_sql(tablesRowCol:dict, table_columns:dict):
sql_queries = {}
# 1. 列控制
# 遍历每个表
isHave=False
no_configTable_name=""
for table_name, table_configs in tablesRowCol.items():
if table_configs["isHave"]:
isHave=True
else:
no_configTable_name += table_name + ","
if not isHave:
no_configTable_name = no_configTable_name.rstrip(',')
raise ValueError(f"表:{no_configTable_name}均未配置行列数据安全")
for table_name, config in tablesRowCol.items():
# 获取该表的字段名
columns = {col.lower(): col for col in table_columns[table_name]} # 将字段名转为小写
# 初始化 SELECT 部分:用字典存储字段名,值是 null 字段名
select_columns = {col: f"null {col}" for col in columns}
@ -556,20 +574,42 @@ async def generate_sql(tablesRowCol:dict, table_columns:dict):
if len(tab_col_value) != 2:
raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值")
select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})"
if select_rows:
if select_rows.values():
where_conditions = " AND ".join(select_rows.values())
sql_queries[table_name] += " WHERE " + where_conditions
if where_conditions:
sql_queries[table_name] += " WHERE " + where_conditions
return sql_queries
# async def replace_table_with_subquery(ctrSqlDict, oldStrSql):
# # 遍历 ctrSqlDict 并替换 SQL 查询中的表名
# for table_name, subquery in ctrSqlDict.items():
# # 创建一个正则表达式,匹配原始 SQL 中的表名(注意大小写问题,正则会忽略大小写)
# # 匹配类似 "模式名.tab1" 或 "tab1" 的表名
# table_name_pattern = r'\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' + re.escape(table_name) + r'\b'
# # 替换原始 SQL 中的表名为对应的子查询
# oldStrSql = re.sub(table_name_pattern, f"({subquery})", oldStrSql, flags=re.IGNORECASE)
# return oldStrSql
async def replace_table_with_subquery(ctrSqlDict, oldStrSql):
# 遍历 ctrSqlDict 并替换 SQL 查询中的表名
for table_name, subquery in ctrSqlDict.items():
# 创建一个正则表达式,匹配原始 SQL 中的表名(注意大小写问题,正则会忽略大小写)
# 匹配类似 "模式名.tab1" 或 "tab1" 的表名
table_name_pattern = r'\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' + re.escape(table_name) + r'\b'
# 替换原始 SQL 中的表名为对应的子查询
oldStrSql = re.sub(table_name_pattern, f"({subquery})", oldStrSql, flags=re.IGNORECASE)
def add_alias(match):
# 子查询替换后,检查是否有 AS 关键字
replaced = f"({subquery})"
# 检查是否已经有 AS
if ' AS ' not in match.group(0).upper():
# 如果没有 AS 关键字,则添加一个默认别名
alias = table_name # 你可以根据需求设置别名格式,这里使用表名作为别名
replaced += f" AS {alias}"
return replaced
oldStrSql = re.sub(table_name_pattern, add_alias, oldStrSql, flags=re.IGNORECASE)
return oldStrSql
@ -591,3 +631,11 @@ async def get_data_source_tree(request: Request, current_user: MetaSecurityApiMo
else:
raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}')
async def test_connection(db_content):
try:
# 尝试执行一个简单的查询来测试连接
async with db_content.connect() as connection:
# 这里执行一个简单的查询,例如“SELECT 1”
await connection.scalar("SELECT 1")
except Exception as e:
raise Exception("数据源连接失败") from e

29
vue-fastapi-backend/module_admin/service/metatask_service.py

@ -496,9 +496,9 @@ class MetataskService:
for config in processConfigList:
# mysql表字段
if config.ac_target=='0':
modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
modified_json_str = config.taskDefinitionJson.replace("16699723296864", str_list[0]).replace("16699723296865", str_list[1]).replace("16699723296866", str_list[2]).replace("'ORCL_USER'", f"'{page_object.dbSName}'").replace("'orcl_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16699723296864", str_list[0]).replace("16699723296865", str_list[1]).replace("16699723296866", str_list[2])
modified_json_str3=config.locations.replace("16699723296864", str_list[0]).replace("16699723296865", str_list[1]).replace("16699723296866", str_list[2])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
@ -526,9 +526,9 @@ class MetataskService:
message += ", "
message += page_object.metatask_name + "-表字段采集新增失败"
if config.ac_target=='1':
modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}")
modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
modified_json_str = config.taskDefinitionJson.replace("16699841738592", str_list[0]).replace("16699841738593", str_list[1]).replace("16699841738594", str_list[2]).replace("16699841738595", str_list[3]).replace("'ORCL_USER'", f"'{page_object.dbSName}'").replace("'orcl_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16699841738592", str_list[0]).replace("16699841738593", str_list[1]).replace("16699841738594", str_list[2]).replace("16699841738595", str_list[3])
modified_json_str3=config.locations.replace("16699841738592", str_list[0]).replace("16699841738593", str_list[1]).replace("16699841738594", str_list[2]).replace("16699841738595", str_list[3])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
@ -585,9 +585,9 @@ class MetataskService:
for config in processConfigList:
# mysql表字段
if config.ac_target=='0':
modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
modified_json_str = config.taskDefinitionJson.replace("16699625480160", str_list[0]).replace("16699625480161", str_list[1]).replace("16699625480162", str_list[2]).replace("'MYSCHEMA'", f"'{page_object.dbSName}'").replace("'db2_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16699625480160", str_list[0]).replace("16699625480161", str_list[1]).replace("16699625480162", str_list[2])
modified_json_str3=config.locations.replace("16699625480160", str_list[0]).replace("16699625480161", str_list[1]).replace("16699625480162", str_list[2])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
@ -601,7 +601,6 @@ class MetataskService:
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
# text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
@ -615,9 +614,9 @@ class MetataskService:
message += ", "
message += page_object.metatask_name + "-表字段采集新增失败"
if config.ac_target=='1':
modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}")
modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
modified_json_str = config.taskDefinitionJson.replace("16699623866592", str_list[0]).replace("16699623866593", str_list[1]).replace("16699623866594", str_list[2]).replace("16699623866595", str_list[3]).replace("'MYSCHEMA'", f"'{page_object.dbSName}'").replace("'db2_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16699623866592", str_list[0]).replace("16699623866593", str_list[1]).replace("16699623866594", str_list[2]).replace("16699623866595", str_list[3])
modified_json_str3=config.locations.replace("16699623866592", str_list[0]).replace("16699623866593", str_list[1]).replace("16699623866594", str_list[2]).replace("16699623866595", str_list[3])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
@ -771,7 +770,7 @@ class MetataskService:
for config in processConfigList:
# POSTGRESQL表字段
if config.ac_target=='0':
modified_json_str = config.taskDefinitionJson.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2])
modified_json_str = config.taskDefinitionJson.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2]).replace("'public'", f"'{page_object.dbSName}'").replace("'pg_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2])
modified_json_str3=config.locations.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
@ -800,7 +799,7 @@ class MetataskService:
message += ", "
message += page_object.metatask_name + "-表字段采集新增失败"
elif config.ac_target=='1':
modified_json_str = config.taskDefinitionJson.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3])
modified_json_str = config.taskDefinitionJson.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3]).replace("'public'", f"'{page_object.dbSName}'").replace("'pg_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3])
modified_json_str3=config.locations.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson

Loading…
Cancel
Save