from fastapi import Request from sqlalchemy.ext.asyncio import AsyncSession from exceptions.exception import ServiceException from module_admin.dao.metaSecurity_dao import MetaSecurityDao from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.metasecurity_vo import MetaSecurityColModel, MetaSecurityRowModel,DeleteMetaSecurityModel,MetaSecurityApiModel from utils.common_util import CamelCaseUtil import uuid from module_admin.dao.login_dao import login_by_account from module_admin.dao.user_dao import UserDao from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import text from config.env import AppConfig import requests import json import re from decimal import Decimal class MetaSecurityService: """ 数据源安全管理模块服务层 """ @classmethod async def get_meta_security_col_list_services( cls, query_db: AsyncSession, query_object: MetaSecurityColModel, is_page: bool = False ): """ 获取列配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列配置列表信息对象 """ col_list_result = await MetaSecurityDao.get_meta_security_col_list(query_db, query_object, is_page) return col_list_result @classmethod async def get_meta_security_row_list_services( cls, query_db: AsyncSession, query_object: MetaSecurityRowModel, is_page: bool = False ): """ 获取行配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 行配置列表信息对象 """ row_list_result = await MetaSecurityDao.get_meta_security_row_list(query_db, query_object, is_page) return row_list_result @classmethod async def get_meta_security_col_by_id_services(cls, query_db: AsyncSession, colId: str): """ 获取列配置详细信息service :param query_db: orm对象 :param colId: 列配置ID :return: 列配置详细信息对象 """ col = await MetaSecurityDao.get_meta_security_col_by_id(query_db, colId) if col: result = MetaSecurityColModel(**CamelCaseUtil.transform_result(col)) else: result = MetaSecurityColModel(**dict()) return result @classmethod async def get_meta_security_row_by_id_services(cls, query_db: AsyncSession, rowId: str): """ 获取行配置详细信息service :param query_db: orm对象 :param rowId: 行配置ID :return: 行配置详细信息对象 """ row = await MetaSecurityDao.get_meta_security_row_by_id(query_db, rowId) if row: result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(row)) else: result = MetaSecurityRowModel(**dict()) return result @classmethod async def add_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel): """ 新增列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 新增的列配置对象 :return: 新增列配置校验结果 """ try: if isinstance(page_object.obj_value, str) and page_object.obj_value: obj_values = page_object.obj_value.split(",") obj_names = page_object.obj_name.split(",") for value, name in zip(obj_values, obj_names): # 创建新的 page_object 实例,避免修改原始对象 new_page_object = MetaSecurityColModel(**page_object.model_dump(by_alias=True)) new_page_object.obj_value = value.strip() # 去除空格并赋值 new_page_object.obj_name = name.strip() # 去除空格并赋值 new_page_object.colId = str(uuid.uuid4()) # 调用 DAO 方法插入数据 await MetaSecurityDao.add_meta_security_col(query_db, new_page_object) await query_db.commit() return CrudResponseModel(is_success=True, message='新增列配置成功') except Exception as e: await query_db.rollback() raise e @classmethod async def add_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel): """ 新增行配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 新增的行配置对象 :return: 新增行配置校验结果 """ try: if isinstance(page_object.obj_value, str) and page_object.obj_value: obj_values = page_object.obj_value.split(",") obj_names = page_object.obj_name.split(",") for value, name in zip(obj_values, obj_names): # 创建新的 page_object 实例,避免修改原始对象 new_page_object = MetaSecurityRowModel(**page_object.model_dump(by_alias=True)) new_page_object.obj_value = value.strip() # 去除空格并赋值 new_page_object.obj_name = name.strip() # 去除空格并赋值 new_page_object.rowId = str(uuid.uuid4()) # 调用 DAO 方法插入数据 await MetaSecurityDao.add_meta_security_row(query_db, new_page_object) await query_db.commit() # 缓存相关操作,如果需要 # await request.app.state.redis.set(...) return CrudResponseModel(is_success=True, message='新增行配置成功') except Exception as e: await query_db.rollback() raise e @classmethod async def col_detail_services(cls, query_db: AsyncSession, col: str): """ 获取参数配置详细信息service :param query_db: orm对象 :param config_id: 参数配置id :return: 参数配置id对应的信息 """ config = await MetaSecurityDao.get_meta_security_col_by_id(query_db, col) if config: result = MetaSecurityColModel(**CamelCaseUtil.transform_result(config)) else: result = MetaSecurityColModel(**dict()) return result @classmethod async def row_detail_services(cls, query_db: AsyncSession, row_id: str): """ 获取参数配置详细信息service :param query_db: orm对象 :param config_id: 参数配置id :return: 参数配置id对应的信息 """ config = await MetaSecurityDao.get_meta_security_row_by_id(query_db, row_id) if config: result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(config)) else: result = MetaSecurityRowModel(**dict()) return result @classmethod async def edit_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel): """ 编辑列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 编辑的列配置对象 :return: 编辑列配置校验结果 """ edit_col = page_object.model_dump(exclude_unset=True) col_info = await cls.get_meta_security_col_by_id_services(query_db, page_object.colId) if col_info: try: await MetaSecurityDao.update_meta_security_col(query_db, edit_col) await query_db.commit() # 缓存更新,如果需要 # await request.app.state.redis.set(...) return CrudResponseModel(is_success=True, message='编辑列配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message=f'列配置{page_object.colId}不存在') @classmethod async def edit_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel): """ 编辑行配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 编辑的行配置对象 :return: 编辑行配置校验结果 """ edit_row = page_object.model_dump(exclude_unset=True) row_info = await cls.get_meta_security_row_by_id_services(query_db, page_object.rowId) if row_info: try: await MetaSecurityDao.update_meta_security_row(query_db, edit_row) await query_db.commit() return CrudResponseModel(is_success=True, message='编辑行配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message=f'行配置{page_object.rowId}不存在') @classmethod async def delete_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel): """ 删除列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 删除列配置对象 :return: 删除列配置校验结果 """ if page_object.metaSecurity_ids: col_id_list = page_object.metaSecurity_ids.split(',') try: for col_id in col_id_list: col_info = await cls.get_meta_security_col_by_id_services(query_db, col_id) if col_info: # 校验不能删除的系统内置列 await MetaSecurityDao.delete_meta_security_col(query_db, col_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除列配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入列配置ID为空') @classmethod async def delete_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel): """ 删除行配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 删除行配置对象 :return: 删除行配置校验结果 """ if page_object.metaSecurity_ids: row_id_list = page_object.metaSecurity_ids.split(',') try: for row_id in row_id_list: row_info = await cls.get_meta_security_row_by_id_services(query_db, row_id) if row_info: await MetaSecurityDao.delete_meta_security_row(query_db, row_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除行配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入行配置ID为空') @classmethod async def getMetaSercuitybysql(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityApiModel): #1.校验用户 if not page_object.username: raise ServiceException(data='', message='用户名不能为空!') user = await login_by_account(query_db, page_object.username) if not user: raise ServiceException(data='', message='用户不存在') if not page_object.password == user[0].password: raise ServiceException(data='', message='用户密码错误!') query_user = await UserDao.get_user_by_id(query_db, user_id=user[0].user_id) role_id_list = [item.role_id for item in query_user.get('user_role_info')] #2.测试数据源连接是否正常 # mysql # dataParams ={"user":"dbf","password":"1q2w3e4r","address":"jdbc:mysql://47.113.147.166:3306","database":"dash_test_w","jdbcUrl":"jdbc:mysql://47.113.147.166:3306/dash_test_w","driverClassName":"com.mysql.cj.jdbc.Driver","validationQuery":"select 1"} # postgresql # dataParams ={"user":"testuser","password":"testpd","address":"jdbc:postgresql://47.121.207.11:5432","database":"zx2","jdbcUrl":"jdbc:postgresql://47.121.207.11:5432/zx2","driverClassName":"org.postgresql.Driver","validationQuery":"select version()"} dsDataResource=await get_data_source_tree(request,page_object) # dbConnent= cls.get_db_engine("postgresql",dataParams]) dbConnent= cls.get_db_engine(dsDataResource["type"],dsDataResource["connectionParams"]) #3.执行原始sql result = await cls.execute_sql(dbConnent, page_object.sqlStr,"原始") #4.获取sql中涉及的表名 sqlTableNames =await cls.get_tables_from_sql(page_object.sqlStr) #5.根据表名获取数据库中的字段名 table_columns = await cls.get_columns_from_tables(dbConnent, sqlTableNames,dsDataResource["type"],) #6.查询用户及该用户角色下的所有行列配置 tablesRowCol = {} # 遍历每个表名,获取对应的配置 for table_name in sqlTableNames: table_configs = await get_table_configs(query_db, page_object, user, role_id_list, table_name) tablesRowCol[table_name] = table_configs # 返回最终的结果字典 ctrSqlDict = await generate_sql(tablesRowCol,table_columns) oldStrSql= page_object.sqlStr #7.根据行列配置控制原始sql newStrSql =await replace_table_with_subquery(ctrSqlDict,oldStrSql) #8.执行结果 result = await cls.execute_sql(dbConnent, page_object.sqlStr,"控制后") resultDict={ "ctrlSql": newStrSql, "data": result, "tablesRowCol":tablesRowCol } return resultDict def get_db_engine(db_type: str, db_params: dict): try: address = db_params['address'] jdbc_prefixes = { "jdbc:mysql://": len("jdbc:mysql://"), "jdbc:postgresql://": len("jdbc:postgresql://") } # Check and remove the matching prefix for prefix, length in jdbc_prefixes.items(): if address.startswith(prefix): address = address[length:] db_params['address']=address break # Once the correct prefix is found, exit the loop if db_type.lower() == "mysql": conn_str=f"mysql+aiomysql://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}" print(f"数据库连接字符串: {conn_str}") # 输出调试信息 return create_async_engine(conn_str) elif db_type.lower() == "postgresql": return create_async_engine(f"postgresql+asyncpg://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}") # 你可以根据需求添加更多数据库类型 else: raise ValueError("不支持的数据库类型") except SQLAlchemyError as e: # 捕获SQLAlchemy相关的数据库连接错误 raise ConnectionError(f"数据库连接失败: {e}") except Exception as e: # 捕获其他非预期的错误 raise RuntimeError(f"连接过程中发生了未知错误: {e}") @classmethod async def execute_sql(cls, dbConnent, sql_query: str,sql_type: str): # 创建异步会话 async with dbConnent.begin(): # 获取会话对象 async_session = sessionmaker( dbConnent, class_=AsyncSession, expire_on_commit=False ) async with async_session() as session: try: # 执行原始SQL查询 query = text(sql_query) result = await session.execute(query) # 获取所有结果 rows = result.fetchall() # 获取列名 columns = result.keys() # 将每一行转化为字典,键为列名 result_dict = [dict(zip(columns, row)) for row in rows] # # 使用 convert_decimal 处理数据 # result_dict = [convert_decimal(row) for row in result_dict] # 转换为 JSON 字符串 return result_dict except SQLAlchemyError as e: raise RuntimeError(f"{sql_type}执行 SQL 查询时发生错误: {e}") async def get_tables_from_sql(sql_query:str): table_pattern = r"(FROM|JOIN|INTO|UPDATE)\s+([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+))" table_matches = re.findall(table_pattern, sql_query, re.IGNORECASE) table_names = [match[1] for match in table_matches] table_names = [match[1].split('.')[-1] for match in table_matches] # `split('.')[-1]` 取最后一部分,即表名 return table_names @classmethod async def get_columns_from_tables(cls, dbConnent, table_names, db_type: str): """查询每个表的字段信息,根据数据库类型调整查询""" columns = {} query="" for table_name in table_names: if db_type.lower() == "postgresql": # PostgreSQL: 使用 information_schema.columns 查询字段 query= f"SELECT column_name FROM information_schema.columns WHERE table_name ='{table_name}'" elif db_type.lower() == "mysql": # MySQL: 使用 INFORMATION_SCHEMA.COLUMNS 查询字段 query= f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME ='{table_name}'" else: raise ValueError(f"暂不支持数据库类型: {db_type}") # Execute the query for the specific table result = await cls.execute_sql(dbConnent, query, "字段查询") # 将结果转换为字典格式 {table_name: ['column1', 'column2', ...]} columns[table_name] = [row["column_name"] for row in result] return columns def convert_decimal(obj): if isinstance(obj, Decimal): return float(obj) # 或者 str(obj) 来保留精度 elif isinstance(obj, dict): # 递归处理字典中的每个值 return {key: convert_decimal(value) for key, value in obj.items()} elif isinstance(obj, list): # 递归处理列表中的每个元素 return [convert_decimal(item) for item in obj] return obj # 返回非 Decimal、dict 或 list 的值 async def get_table_configs(query_db, page_object, user, role_id_list, table_name): # 获取用户的列配置 user_col_list = await MetaSecurityDao.get_api_col_list( query_db, page_object.dbRId, table_name, '0', user[0].user_id ) # 获取角色的列配置 role_col_list = [] for role_id in role_id_list: role_cols = await MetaSecurityDao.get_api_col_list( query_db, page_object.dbRId, table_name, '1', role_id ) role_col_list.extend(role_cols) # 将每个角色的列配置合并到列表中 # 获取用户的行配置 user_row_list = await MetaSecurityDao.get_api_row_list( query_db, page_object.dbRId, table_name, '0', user[0].user_id ) # 获取角色的行配置 role_row_list = [] for role_id in role_id_list: role_rows = await MetaSecurityDao.get_api_row_list( query_db, page_object.dbRId, table_name, '1', role_id ) role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中 return { "user_col_list": user_col_list, "role_col_list": role_col_list, "role_row_list": role_row_list, "user_row_list": user_row_list } async def generate_sql(tablesRowCol:dict, table_columns:dict): sql_queries = {} # 1. 列控制 # 遍历每个表 for table_name, config in tablesRowCol.items(): # 获取该表的字段名 columns = {col.lower(): col for col in table_columns[table_name]} # 将字段名转为小写 # 初始化 SELECT 部分:用字典存储字段名,值是 null 字段名 select_columns = {col: f"null {col}" for col in columns} # 处理角色列配置 for col in config["role_col_list"]: # If dbCName is "ALL", handle it as a special case if col.dbCName == "ALL": if col.ctrl_type == '0': # If ctrl_type is '0', prefix all columns with null for db_column in columns: # Assuming 'user' is the table name select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column names for db_column in columns: select_columns[db_column] = db_column # 使用实际字段名 else: # Handle specific columns listed in dbCName db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")] for db_column in db_columns: db_column = db_column.strip() if db_column in columns: # Check if the column exists in the table if col.ctrl_type == '0': # If ctrl_type is '0', prefix with null select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column name select_columns[db_column] = db_column # 使用实际字段名 # 处理用户列配置 for col in config["user_col_list"]: if col.dbCName == "ALL": # 如果 dbCName 为 "ALL" if col.ctrl_type == "0": # ctrlType 为 0,字符串字段 for db_column in columns: # 对所有字段加上 null select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == "1": # ctrlType 为 1,实际数据库字段 for db_column in columns: # 使用实际字段名,不加 null select_columns[db_column] = db_column # 使用实际字段名 else: # 处理 dbCName 不为 "ALL" 的情况 db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")] for db_column in db_columns: db_column = db_column.strip() if db_column in columns: if col.ctrl_type == "0": select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == "1": select_columns[db_column] = db_column # 使用实际字段名 # 生成 SQL 查询 sql_queries[table_name] = f"SELECT {', '.join(select_columns.values())} FROM {table_name}" # 2.行控制 select_rows={} # 处理角色行配置 for row in config["role_row_list"]: # 仅仅对固定值有效,不加行限制 if row.ctrl_value == "ALL" and row.ctrl_type == '0': # 控制方式 --固定值 select_rows[row.dbCName] = "" else: if row.ctrl_type == '0': # row.ctrl_value 是逗号分隔的字符串时,改为 IN 语句 if "," in row.ctrl_value: # 将 ctrl_value 按逗号分割,并用单引号包裹每个值 values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")] select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})" else: select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'" if row.ctrl_type == '1': tab_col_value=row.ctrl_value.split(".") if len(tab_col_value) != 2: raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值") select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})" # 处理用户行配置 for row in config["user_row_list"]: # 仅仅对固定值有效,不加行限制 if row.ctrl_value == "ALL" and row.ctrl_type == '0': # 控制方式 --固定值 select_rows[row.dbCName] = "" else: if row.ctrl_type == '0': # row.obj_value 是逗号分隔的字符串时,改为 IN 语句 if "," in row.ctrl_value: # 将 obj_value 按逗号分割,并用单引号包裹每个值 values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")] select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})" else: select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'" if row.ctrl_type == '1': tab_col_value=row.ctrl_value.split(".") if len(tab_col_value) != 2: raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值") select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})" if select_rows: where_conditions = " AND ".join(select_rows.values()) sql_queries[table_name] += " WHERE " + where_conditions return sql_queries async def replace_table_with_subquery(ctrSqlDict, oldStrSql): # 遍历 ctrSqlDict 并替换 SQL 查询中的表名 for table_name, subquery in ctrSqlDict.items(): # 创建一个正则表达式,匹配原始 SQL 中的表名(注意大小写问题,正则会忽略大小写) # 匹配类似 "模式名.tab1" 或 "tab1" 的表名 table_name_pattern = r'\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' + re.escape(table_name) + r'\b' # 替换原始 SQL 中的表名为对应的子查询 oldStrSql = re.sub(table_name_pattern, f"({subquery})", oldStrSql, flags=re.IGNORECASE) return oldStrSql async def get_data_source_tree(request: Request, current_user: MetaSecurityApiModel): url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources/withpwdlist?pageNo=1&pageSize=100' headers = {'dashUserName': current_user.username ,'dashPassword': current_user.password,} response = requests.get(url, headers=headers, verify=False) if response.reason == 'OK': response_text = response.text data = json.loads(response_text) total_list = data["data"]["totalList"] # 解析 connectionParams 字符串为字典 for item in total_list: if item["id"]==current_user.dbRId: item["connectionParams"] = json.loads(item["connectionParams"]) return item raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}') else: raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}')