You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
654 lines
30 KiB
654 lines
30 KiB
from fastapi import Request
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from exceptions.exception import ServiceException
|
|
from module_admin.dao.metaSecurity_dao import MetaSecurityDao
|
|
from module_admin.entity.vo.common_vo import CrudResponseModel
|
|
from module_admin.entity.vo.metasecurity_vo import MetaSecurityColModel, MetaSecurityRowModel,DeleteMetaSecurityModel,MetaSecurityApiModel
|
|
from utils.common_util import CamelCaseUtil
|
|
import uuid
|
|
from module_admin.dao.login_dao import login_by_account
|
|
from module_admin.dao.user_dao import UserDao
|
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy import text
|
|
from config.env import AppConfig
|
|
import requests
|
|
from sqlalchemy.exc import OperationalError
|
|
import json
|
|
import re
|
|
from decimal import Decimal
|
|
|
|
class MetaSecurityService:
|
|
"""
|
|
数据源安全管理模块服务层
|
|
"""
|
|
|
|
@classmethod
|
|
async def get_meta_security_col_list_services(
|
|
cls, query_db: AsyncSession, query_object: MetaSecurityColModel, is_page: bool = False
|
|
):
|
|
"""
|
|
获取列配置列表信息service
|
|
|
|
:param query_db: orm对象
|
|
:param query_object: 查询参数对象
|
|
:param is_page: 是否开启分页
|
|
:return: 列配置列表信息对象
|
|
"""
|
|
col_list_result = await MetaSecurityDao.get_meta_security_col_list(query_db, query_object, is_page)
|
|
return col_list_result
|
|
|
|
@classmethod
|
|
async def get_meta_security_row_list_services(
|
|
cls, query_db: AsyncSession, query_object: MetaSecurityRowModel, is_page: bool = False
|
|
):
|
|
"""
|
|
获取行配置列表信息service
|
|
|
|
:param query_db: orm对象
|
|
:param query_object: 查询参数对象
|
|
:param is_page: 是否开启分页
|
|
:return: 行配置列表信息对象
|
|
"""
|
|
row_list_result = await MetaSecurityDao.get_meta_security_row_list(query_db, query_object, is_page)
|
|
return row_list_result
|
|
|
|
@classmethod
|
|
async def get_meta_security_col_by_id_services(cls, query_db: AsyncSession, colId: str):
|
|
"""
|
|
获取列配置详细信息service
|
|
|
|
:param query_db: orm对象
|
|
:param colId: 列配置ID
|
|
:return: 列配置详细信息对象
|
|
"""
|
|
col = await MetaSecurityDao.get_meta_security_col_by_id(query_db, colId)
|
|
if col:
|
|
result = MetaSecurityColModel(**CamelCaseUtil.transform_result(col))
|
|
else:
|
|
result = MetaSecurityColModel(**dict())
|
|
return result
|
|
|
|
@classmethod
|
|
async def get_meta_security_row_by_id_services(cls, query_db: AsyncSession, rowId: str):
|
|
"""
|
|
获取行配置详细信息service
|
|
|
|
:param query_db: orm对象
|
|
:param rowId: 行配置ID
|
|
:return: 行配置详细信息对象
|
|
"""
|
|
row = await MetaSecurityDao.get_meta_security_row_by_id(query_db, rowId)
|
|
if row:
|
|
result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(row))
|
|
else:
|
|
result = MetaSecurityRowModel(**dict())
|
|
return result
|
|
|
|
@classmethod
|
|
async def add_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel):
|
|
"""
|
|
新增列配置服务
|
|
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 新增的列配置对象
|
|
:return: 新增列配置校验结果
|
|
"""
|
|
try:
|
|
if isinstance(page_object.obj_value, str) and page_object.obj_value:
|
|
obj_values = page_object.obj_value.split(",")
|
|
obj_names = page_object.obj_name.split(",")
|
|
for value, name in zip(obj_values, obj_names):
|
|
# 创建新的 page_object 实例,避免修改原始对象
|
|
new_page_object = MetaSecurityColModel(**page_object.model_dump(by_alias=True))
|
|
new_page_object.obj_value = value.strip() # 去除空格并赋值
|
|
new_page_object.obj_name = name.strip() # 去除空格并赋值
|
|
new_page_object.colId = str(uuid.uuid4())
|
|
# 调用 DAO 方法插入数据
|
|
await MetaSecurityDao.add_meta_security_col(query_db, new_page_object)
|
|
await query_db.commit()
|
|
return CrudResponseModel(is_success=True, message='新增列配置成功')
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
|
|
@classmethod
|
|
async def add_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel):
|
|
"""
|
|
新增行配置服务
|
|
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 新增的行配置对象
|
|
:return: 新增行配置校验结果
|
|
"""
|
|
try:
|
|
if isinstance(page_object.obj_value, str) and page_object.obj_value:
|
|
obj_values = page_object.obj_value.split(",")
|
|
obj_names = page_object.obj_name.split(",")
|
|
for value, name in zip(obj_values, obj_names):
|
|
# 创建新的 page_object 实例,避免修改原始对象
|
|
new_page_object = MetaSecurityRowModel(**page_object.model_dump(by_alias=True))
|
|
new_page_object.obj_value = value.strip() # 去除空格并赋值
|
|
new_page_object.obj_name = name.strip() # 去除空格并赋值
|
|
new_page_object.rowId = str(uuid.uuid4())
|
|
# 调用 DAO 方法插入数据
|
|
await MetaSecurityDao.add_meta_security_row(query_db, new_page_object)
|
|
await query_db.commit()
|
|
|
|
# 缓存相关操作,如果需要
|
|
# await request.app.state.redis.set(...)
|
|
|
|
return CrudResponseModel(is_success=True, message='新增行配置成功')
|
|
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
@classmethod
|
|
async def col_detail_services(cls, query_db: AsyncSession, col: str):
|
|
"""
|
|
获取参数配置详细信息service
|
|
|
|
:param query_db: orm对象
|
|
:param config_id: 参数配置id
|
|
:return: 参数配置id对应的信息
|
|
"""
|
|
config = await MetaSecurityDao.get_meta_security_col_by_id(query_db, col)
|
|
if config:
|
|
result = MetaSecurityColModel(**CamelCaseUtil.transform_result(config))
|
|
else:
|
|
result = MetaSecurityColModel(**dict())
|
|
|
|
return result
|
|
@classmethod
|
|
async def row_detail_services(cls, query_db: AsyncSession, row_id: str):
|
|
"""
|
|
获取参数配置详细信息service
|
|
|
|
:param query_db: orm对象
|
|
:param config_id: 参数配置id
|
|
:return: 参数配置id对应的信息
|
|
"""
|
|
config = await MetaSecurityDao.get_meta_security_row_by_id(query_db, row_id)
|
|
if config:
|
|
result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(config))
|
|
else:
|
|
result = MetaSecurityRowModel(**dict())
|
|
|
|
return result
|
|
@classmethod
|
|
async def edit_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel):
|
|
"""
|
|
编辑列配置服务
|
|
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 编辑的列配置对象
|
|
:return: 编辑列配置校验结果
|
|
"""
|
|
edit_col = page_object.model_dump(exclude_unset=True)
|
|
col_info = await cls.get_meta_security_col_by_id_services(query_db, page_object.colId)
|
|
|
|
if col_info:
|
|
try:
|
|
await MetaSecurityDao.update_meta_security_col(query_db, edit_col)
|
|
await query_db.commit()
|
|
|
|
# 缓存更新,如果需要
|
|
# await request.app.state.redis.set(...)
|
|
|
|
return CrudResponseModel(is_success=True, message='编辑列配置成功')
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message=f'列配置{page_object.colId}不存在')
|
|
|
|
@classmethod
|
|
async def edit_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel):
|
|
"""
|
|
编辑行配置服务
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 编辑的行配置对象
|
|
:return: 编辑行配置校验结果
|
|
"""
|
|
edit_row = page_object.model_dump(exclude_unset=True)
|
|
row_info = await cls.get_meta_security_row_by_id_services(query_db, page_object.rowId)
|
|
if row_info:
|
|
try:
|
|
await MetaSecurityDao.update_meta_security_row(query_db, edit_row)
|
|
await query_db.commit()
|
|
return CrudResponseModel(is_success=True, message='编辑行配置成功')
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message=f'行配置{page_object.rowId}不存在')
|
|
|
|
@classmethod
|
|
async def delete_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel):
|
|
"""
|
|
删除列配置服务
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 删除列配置对象
|
|
:return: 删除列配置校验结果
|
|
"""
|
|
if page_object.metaSecurity_ids:
|
|
col_id_list = page_object.metaSecurity_ids.split(',')
|
|
try:
|
|
for col_id in col_id_list:
|
|
col_info = await cls.get_meta_security_col_by_id_services(query_db, col_id)
|
|
if col_info:
|
|
# 校验不能删除的系统内置列
|
|
await MetaSecurityDao.delete_meta_security_col(query_db, col_id)
|
|
await query_db.commit()
|
|
|
|
|
|
return CrudResponseModel(is_success=True, message='删除列配置成功')
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message='传入列配置ID为空')
|
|
|
|
@classmethod
|
|
async def delete_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel):
|
|
"""
|
|
删除行配置服务
|
|
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 删除行配置对象
|
|
:return: 删除行配置校验结果
|
|
"""
|
|
if page_object.metaSecurity_ids:
|
|
row_id_list = page_object.metaSecurity_ids.split(',')
|
|
try:
|
|
for row_id in row_id_list:
|
|
row_info = await cls.get_meta_security_row_by_id_services(query_db, row_id)
|
|
if row_info:
|
|
await MetaSecurityDao.delete_meta_security_row(query_db, row_id)
|
|
await query_db.commit()
|
|
return CrudResponseModel(is_success=True, message='删除行配置成功')
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message='传入行配置ID为空')
|
|
@classmethod
|
|
async def getMetaSercuitybysql(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityApiModel):
|
|
#1.校验用户
|
|
if not page_object.username:
|
|
raise ServiceException(data='', message='用户名不能为空!')
|
|
user = await login_by_account(query_db, page_object.username)
|
|
if not user:
|
|
raise ServiceException(data='', message='用户不存在')
|
|
if not page_object.password == user[0].password:
|
|
raise ServiceException(data='', message='用户密码错误!')
|
|
query_user = await UserDao.get_user_by_id(query_db, user_id=user[0].user_id)
|
|
role_id_list = [item.role_id for item in query_user.get('user_role_info')]
|
|
#2.测试数据源连接是否正常
|
|
# mysql
|
|
# dataParams ={"user":"dbf","password":"1q2w3e4r","address":"jdbc:mysql://47.113.147.166:3306","database":"dash_test_w","jdbcUrl":"jdbc:mysql://47.113.147.166:3306/dash_test_w","driverClassName":"com.mysql.cj.jdbc.Driver","validationQuery":"select 1"}
|
|
# postgresql
|
|
# dataParams ={"user":"testuser","password":"testpd","address":"jdbc:postgresql://47.121.207.11:5432","database":"zx2","jdbcUrl":"jdbc:postgresql://47.121.207.11:5432/zx2","driverClassName":"org.postgresql.Driver","validationQuery":"select version()"}
|
|
dsDataResource=await get_data_source_tree(request,page_object)
|
|
# dbConnent= cls.get_db_engine("postgresql",dataParams])
|
|
dbConnent= cls.get_db_engine(dsDataResource["type"],dsDataResource["connectionParams"])
|
|
# await test_connection(dbConnent)
|
|
#3.执行原始sql
|
|
result = await cls.execute_sql(dbConnent, page_object.sqlStr,"原始")
|
|
if 3 in role_id_list:
|
|
resultDict={
|
|
"ctrlSql": page_object.sqlStr,
|
|
"data": result,
|
|
"message":"数据安全管理员权限"
|
|
}
|
|
return resultDict
|
|
|
|
#4.获取sql中涉及的表名
|
|
sqlTableNames =await cls.get_tables_from_sql(page_object.sqlStr)
|
|
#5.根据表名获取数据库中的字段名
|
|
table_columns = await cls.get_columns_from_tables(dbConnent, sqlTableNames,dsDataResource["type"],)
|
|
|
|
#6.查询用户及该用户角色下的所有行列配置
|
|
tablesRowCol = {}
|
|
|
|
# 遍历每个表名,获取对应的配置
|
|
for table_name in sqlTableNames:
|
|
table_configs = await get_table_configs(query_db, page_object, user, role_id_list, table_name)
|
|
tablesRowCol[table_name] = table_configs
|
|
|
|
# 返回最终的结果字典
|
|
ctrSqlDict = await generate_sql(tablesRowCol,table_columns)
|
|
oldStrSql= page_object.sqlStr
|
|
#7.根据行列配置控制原始sql
|
|
newStrSql =await replace_table_with_subquery(ctrSqlDict,oldStrSql)
|
|
#8.执行结果
|
|
result = await cls.execute_sql(dbConnent, newStrSql,"控制后")
|
|
resultDict={
|
|
"ctrlSql": newStrSql,
|
|
"data": result,
|
|
"tablesRowCol":tablesRowCol
|
|
}
|
|
return resultDict
|
|
|
|
|
|
|
|
def get_db_engine(db_type: str, db_params: dict):
|
|
try:
|
|
address = db_params['address']
|
|
jdbc_prefixes = {
|
|
"jdbc:mysql://": len("jdbc:mysql://"),
|
|
"jdbc:postgresql://": len("jdbc:postgresql://")
|
|
}
|
|
|
|
# Check and remove the matching prefix
|
|
for prefix, length in jdbc_prefixes.items():
|
|
if address.startswith(prefix):
|
|
address = address[length:]
|
|
db_params['address']=address
|
|
break # Once the correct prefix is found, exit the loop
|
|
if db_type.lower() == "mysql":
|
|
conn_str=f"mysql+aiomysql://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}"
|
|
print(f"数据库连接字符串: {conn_str}") # 输出调试信息
|
|
dbContent= create_async_engine(conn_str)
|
|
return dbContent
|
|
elif db_type.lower() == "postgresql":
|
|
dbContent= create_async_engine(f"postgresql+asyncpg://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}")
|
|
return dbContent
|
|
else:
|
|
raise ValueError("不支持的数据库类型")
|
|
except SQLAlchemyError as e:
|
|
# 捕获SQLAlchemy相关的数据库连接错误
|
|
raise ConnectionError(f"数据库连接失败: {e}")
|
|
except Exception as e:
|
|
# 捕获其他非预期的错误
|
|
raise RuntimeError(f"连接过程中发生了未知错误: {e}")
|
|
@classmethod
|
|
async def execute_sql(cls, dbConnent, sql_query: str,sql_type: str):
|
|
# 创建异步会话
|
|
async with dbConnent.begin():
|
|
# 获取会话对象
|
|
async_session = sessionmaker(
|
|
dbConnent, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with async_session() as session:
|
|
try:
|
|
# 执行原始SQL查询
|
|
query = text(sql_query)
|
|
result = await session.execute(query)
|
|
|
|
# 获取所有结果
|
|
rows = result.fetchall()
|
|
|
|
# 获取列名
|
|
columns = result.keys()
|
|
|
|
# 将每一行转化为字典,键为列名
|
|
result_dict = [dict(zip(columns, row)) for row in rows]
|
|
# # 使用 convert_decimal 处理数据
|
|
# result_dict = [convert_decimal(row) for row in result_dict]
|
|
# 转换为 JSON 字符串
|
|
return result_dict
|
|
except SQLAlchemyError as e:
|
|
raise RuntimeError(f"{sql_type}执行 SQL 查询时发生错误: {e}")
|
|
async def get_tables_from_sql(sql_query:str):
|
|
table_pattern = r"(FROM|JOIN|INTO|UPDATE)\s+([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+))"
|
|
|
|
table_matches = re.findall(table_pattern, sql_query, re.IGNORECASE)
|
|
|
|
table_names = [match[1] for match in table_matches]
|
|
table_names = [match[1].split('.')[-1] for match in table_matches] # `split('.')[-1]` 取最后一部分,即表名
|
|
|
|
return table_names
|
|
|
|
@classmethod
|
|
async def get_columns_from_tables(cls, dbConnent, table_names, db_type: str):
|
|
"""查询每个表的字段信息,根据数据库类型调整查询"""
|
|
columns = {}
|
|
query=""
|
|
for table_name in table_names:
|
|
if db_type.lower() == "postgresql":
|
|
# PostgreSQL: 使用 information_schema.columns 查询字段
|
|
query= f"SELECT column_name FROM information_schema.columns WHERE table_name ='{table_name}'"
|
|
elif db_type.lower() == "mysql":
|
|
# MySQL: 使用 INFORMATION_SCHEMA.COLUMNS 查询字段
|
|
query= f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME ='{table_name}'"
|
|
else:
|
|
raise ValueError(f"暂不支持数据库类型: {db_type}")
|
|
|
|
# Execute the query for the specific table
|
|
result = await cls.execute_sql(dbConnent, query, "字段查询")
|
|
|
|
# 将结果转换为字典格式 {table_name: ['column1', 'column2', ...]}
|
|
columns[table_name] = [row["column_name"] for row in result]
|
|
|
|
return columns
|
|
|
|
def convert_decimal(obj):
|
|
if isinstance(obj, Decimal):
|
|
return float(obj) # 或者 str(obj) 来保留精度
|
|
elif isinstance(obj, dict):
|
|
# 递归处理字典中的每个值
|
|
return {key: convert_decimal(value) for key, value in obj.items()}
|
|
elif isinstance(obj, list):
|
|
# 递归处理列表中的每个元素
|
|
return [convert_decimal(item) for item in obj]
|
|
return obj # 返回非 Decimal、dict 或 list 的值
|
|
async def get_table_configs(query_db, page_object, user, role_id_list, table_name):
|
|
# 获取用户的列配置
|
|
user_col_list = await MetaSecurityDao.get_api_col_list(
|
|
query_db, page_object.dbRId, table_name, '0', user[0].user_id
|
|
)
|
|
|
|
# 获取角色的列配置
|
|
role_col_list = []
|
|
for role_id in role_id_list:
|
|
role_cols = await MetaSecurityDao.get_api_col_list(
|
|
query_db, page_object.dbRId, table_name, '1', role_id
|
|
)
|
|
role_col_list.extend(role_cols) # 将每个角色的列配置合并到列表中
|
|
|
|
# 获取用户的行配置
|
|
user_row_list = await MetaSecurityDao.get_api_row_list(
|
|
query_db, page_object.dbRId, table_name, '0', user[0].user_id
|
|
)
|
|
|
|
# 获取角色的行配置
|
|
role_row_list = []
|
|
for role_id in role_id_list:
|
|
role_rows = await MetaSecurityDao.get_api_row_list(
|
|
query_db, page_object.dbRId, table_name, '1', role_id
|
|
)
|
|
role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中
|
|
isHave = any([
|
|
user_col_list,
|
|
role_col_list,
|
|
user_row_list,
|
|
role_row_list
|
|
])
|
|
return {
|
|
"user_col_list": user_col_list,
|
|
"role_col_list": role_col_list,
|
|
"role_row_list": role_row_list,
|
|
"user_row_list": user_row_list,
|
|
"isHave":isHave
|
|
}
|
|
async def generate_sql(tablesRowCol:dict, table_columns:dict):
|
|
sql_queries = {}
|
|
|
|
# 1. 列控制
|
|
# 遍历每个表
|
|
isHave=False
|
|
no_configTable_name=""
|
|
for table_name, table_configs in tablesRowCol.items():
|
|
if table_configs["isHave"]:
|
|
isHave=True
|
|
else:
|
|
no_configTable_name += table_name + ","
|
|
if not isHave:
|
|
no_configTable_name = no_configTable_name.rstrip(',')
|
|
raise ValueError(f"表:{no_configTable_name}均未配置行列数据安全")
|
|
for table_name, config in tablesRowCol.items():
|
|
# 获取该表的字段名
|
|
columns = {col.lower(): col for col in table_columns[table_name]} # 将字段名转为小写
|
|
# 初始化 SELECT 部分:用字典存储字段名,值是 null 字段名
|
|
select_columns = {col: f"null {col}" for col in columns}
|
|
|
|
# 处理角色列配置
|
|
for col in config["role_col_list"]:
|
|
# If dbCName is "ALL", handle it as a special case
|
|
if col.dbCName == "ALL":
|
|
if col.ctrl_type == '0': # If ctrl_type is '0', prefix all columns with null
|
|
for db_column in columns: # Assuming 'user' is the table name
|
|
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
|
|
elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column names
|
|
for db_column in columns:
|
|
select_columns[db_column] = db_column # 使用实际字段名
|
|
else:
|
|
# Handle specific columns listed in dbCName
|
|
db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")]
|
|
for db_column in db_columns:
|
|
db_column = db_column.strip()
|
|
if db_column in columns: # Check if the column exists in the table
|
|
if col.ctrl_type == '0': # If ctrl_type is '0', prefix with null
|
|
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
|
|
elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column name
|
|
select_columns[db_column] = db_column # 使用实际字段名
|
|
# 处理用户列配置
|
|
for col in config["user_col_list"]:
|
|
if col.dbCName == "ALL": # 如果 dbCName 为 "ALL"
|
|
if col.ctrl_type == "0": # ctrlType 为 0,字符串字段
|
|
for db_column in columns: # 对所有字段加上 null
|
|
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
|
|
elif col.ctrl_type == "1": # ctrlType 为 1,实际数据库字段
|
|
for db_column in columns: # 使用实际字段名,不加 null
|
|
select_columns[db_column] = db_column # 使用实际字段名
|
|
else: # 处理 dbCName 不为 "ALL" 的情况
|
|
db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")]
|
|
for db_column in db_columns:
|
|
db_column = db_column.strip()
|
|
if db_column in columns:
|
|
if col.ctrl_type == "0":
|
|
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
|
|
elif col.ctrl_type == "1":
|
|
select_columns[db_column] = db_column # 使用实际字段名
|
|
# 生成 SQL 查询
|
|
sql_queries[table_name] = f"SELECT {', '.join(select_columns.values())} FROM {table_name}"
|
|
# 2.行控制
|
|
select_rows={}
|
|
# 处理角色行配置
|
|
for row in config["role_row_list"]:
|
|
# 仅仅对固定值有效,不加行限制
|
|
if row.ctrl_value == "ALL" and row.ctrl_type == '0':
|
|
# 控制方式 --固定值
|
|
select_rows[row.dbCName] = ""
|
|
else:
|
|
if row.ctrl_type == '0':
|
|
# row.ctrl_value 是逗号分隔的字符串时,改为 IN 语句
|
|
if "," in row.ctrl_value:
|
|
# 将 ctrl_value 按逗号分割,并用单引号包裹每个值
|
|
values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")]
|
|
select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})"
|
|
else:
|
|
select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'"
|
|
if row.ctrl_type == '1':
|
|
tab_col_value=row.ctrl_value.split(".")
|
|
if len(tab_col_value) != 2:
|
|
raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值")
|
|
select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})"
|
|
# 处理用户行配置
|
|
for row in config["user_row_list"]:
|
|
# 仅仅对固定值有效,不加行限制
|
|
if row.ctrl_value == "ALL" and row.ctrl_type == '0':
|
|
# 控制方式 --固定值
|
|
select_rows[row.dbCName] = ""
|
|
else:
|
|
if row.ctrl_type == '0':
|
|
# row.obj_value 是逗号分隔的字符串时,改为 IN 语句
|
|
if "," in row.ctrl_value:
|
|
# 将 obj_value 按逗号分割,并用单引号包裹每个值
|
|
values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")]
|
|
select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})"
|
|
else:
|
|
select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'"
|
|
if row.ctrl_type == '1':
|
|
tab_col_value=row.ctrl_value.split(".")
|
|
if len(tab_col_value) != 2:
|
|
raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值")
|
|
select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})"
|
|
if select_rows.values():
|
|
where_conditions = " AND ".join(select_rows.values())
|
|
if where_conditions:
|
|
sql_queries[table_name] += " WHERE " + where_conditions
|
|
return sql_queries
|
|
async def replace_table_with_subquery(ctrSqlDict, oldStrSql):
|
|
table_alias_map = {} # 存储表名和别名的映射
|
|
for table_name, subquery in ctrSqlDict.items():
|
|
# 构建正则表达式,匹配表名及可能的别名
|
|
pattern = (
|
|
r'(\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' # 匹配模式名(可选)
|
|
+ re.escape(table_name) # 转义表名
|
|
+ r'\b)' # 结束表名
|
|
r'(\s+(?:AS\s+)?(\w+))?' # 捕获别名部分(含 AS 或直接别名)
|
|
r'(?=\s*[\w\(\)]*)' # 确保后面是合法 SQL 语法,不是 SQL 关键字
|
|
)
|
|
def replace(match):
|
|
original_table = match.group(1) # 原始表名(可能含模式名)
|
|
alias_part = match.group(2) # 别名部分(含空格、AS 或直接别名)
|
|
alias_name = match.group(3) # 别名名称(无 AS 前缀)
|
|
if original_table not in table_alias_map:
|
|
# 处理表名后直接跟着 SQL 关键字的情况
|
|
sql_keywords = {"LIMIT", "WHERE", "ORDER", "GROUP", "HAVING", "JOIN", "ON", "USING", "UNION",
|
|
"EXCEPT", "INTERSECT", "FETCH", "OFFSET"}
|
|
if alias_name and alias_name.upper().split()[0] not in sql_keywords:
|
|
# 已存在别名,且别名后没有紧跟 SQL 关键字,保留原别名
|
|
replaced = f"({subquery}) {alias_part}"
|
|
table_alias_map[original_table] = alias_part
|
|
else:
|
|
# 无别名时,或者别名无效(如 LIMIT),添加默认别名
|
|
alias = original_table.split('.')[-1]
|
|
replaced = f"({subquery}) AS {alias}{alias_part}"
|
|
table_alias_map[original_table] = alias
|
|
else:
|
|
alias = table_alias_map[original_table]
|
|
replaced = f"{alias}" # 使用别名
|
|
return replaced
|
|
# 执行替换(忽略大小写)
|
|
oldStrSql = re.sub(pattern, replace, oldStrSql, flags=re.IGNORECASE)
|
|
|
|
return oldStrSql
|
|
|
|
|
|
|
|
async def get_data_source_tree(request: Request, current_user: MetaSecurityApiModel):
|
|
|
|
url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources/withpwdlist?pageNo=1&pageSize=100'
|
|
headers = {'dashUserName': current_user.username ,'dashPassword': current_user.password,}
|
|
response = requests.get(url, headers=headers, verify=False)
|
|
if response.reason == 'OK':
|
|
response_text = response.text
|
|
data = json.loads(response_text)
|
|
total_list = data["data"]["totalList"]
|
|
# 解析 connectionParams 字符串为字典
|
|
for item in total_list:
|
|
if item["id"]==current_user.dbRId:
|
|
item["connectionParams"] = json.loads(item["connectionParams"])
|
|
return item
|
|
raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}')
|
|
|
|
else:
|
|
raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}')
|
|
async def test_connection(db_content):
|
|
try:
|
|
# 尝试执行一个简单的查询来测试连接
|
|
async with db_content.connect() as connection:
|
|
# 这里执行一个简单的查询,例如“SELECT 1”
|
|
await connection.scalar("SELECT 1")
|
|
except Exception as e:
|
|
raise Exception("数据源连接失败") from e
|