from fastapi import Request from sqlalchemy.ext.asyncio import AsyncSession from exceptions.exception import ServiceException from module_admin.dao.metaSecurity_dao import MetaSecurityDao from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.metasecurity_vo import MetaSecurityColModel, MetaSecurityRowModel,DeleteMetaSecurityModel,MetaSecurityApiModel from utils.common_util import CamelCaseUtil import uuid from module_admin.dao.login_dao import login_by_account from module_admin.dao.user_dao import UserDao from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import text import json class MetaSecurityService: """ 数据源安全管理模块服务层 """ @classmethod async def get_meta_security_col_list_services( cls, query_db: AsyncSession, query_object: MetaSecurityColModel, is_page: bool = False ): """ 获取列配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列配置列表信息对象 """ col_list_result = await MetaSecurityDao.get_meta_security_col_list(query_db, query_object, is_page) return col_list_result @classmethod async def get_meta_security_row_list_services( cls, query_db: AsyncSession, query_object: MetaSecurityRowModel, is_page: bool = False ): """ 获取行配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 行配置列表信息对象 """ row_list_result = await MetaSecurityDao.get_meta_security_row_list(query_db, query_object, is_page) return row_list_result @classmethod async def get_meta_security_col_by_id_services(cls, query_db: AsyncSession, colId: str): """ 获取列配置详细信息service :param query_db: orm对象 :param colId: 列配置ID :return: 列配置详细信息对象 """ col = await MetaSecurityDao.get_meta_security_col_by_id(query_db, colId) if col: result = MetaSecurityColModel(**CamelCaseUtil.transform_result(col)) else: result = MetaSecurityColModel(**dict()) return result @classmethod async def get_meta_security_row_by_id_services(cls, query_db: AsyncSession, rowId: str): """ 获取行配置详细信息service :param query_db: orm对象 :param rowId: 行配置ID :return: 行配置详细信息对象 """ row = await MetaSecurityDao.get_meta_security_row_by_id(query_db, rowId) if row: result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(row)) else: result = MetaSecurityRowModel(**dict()) return result @classmethod async def add_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel): """ 新增列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 新增的列配置对象 :return: 新增列配置校验结果 """ try: if isinstance(page_object.obj_value, str) and page_object.obj_value: obj_values = page_object.obj_value.split(",") obj_names = page_object.obj_name.split(",") for value, name in zip(obj_values, obj_names): # 创建新的 page_object 实例,避免修改原始对象 new_page_object = MetaSecurityColModel(**page_object.model_dump(by_alias=True)) new_page_object.obj_value = value.strip() # 去除空格并赋值 new_page_object.obj_name = name.strip() # 去除空格并赋值 new_page_object.colId = str(uuid.uuid4()) # 调用 DAO 方法插入数据 await MetaSecurityDao.add_meta_security_col(query_db, new_page_object) await query_db.commit() return CrudResponseModel(is_success=True, message='新增列配置成功') except Exception as e: await query_db.rollback() raise e @classmethod async def add_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel): """ 新增行配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 新增的行配置对象 :return: 新增行配置校验结果 """ try: if isinstance(page_object.obj_value, str) and page_object.obj_value: obj_values = page_object.obj_value.split(",") obj_names = page_object.obj_name.split(",") for value, name in zip(obj_values, obj_names): # 创建新的 page_object 实例,避免修改原始对象 new_page_object = MetaSecurityRowModel(**page_object.model_dump(by_alias=True)) new_page_object.obj_value = value.strip() # 去除空格并赋值 new_page_object.obj_name = name.strip() # 去除空格并赋值 new_page_object.rowId = str(uuid.uuid4()) # 调用 DAO 方法插入数据 await MetaSecurityDao.add_meta_security_row(query_db, new_page_object) await query_db.commit() # 缓存相关操作,如果需要 # await request.app.state.redis.set(...) return CrudResponseModel(is_success=True, message='新增行配置成功') except Exception as e: await query_db.rollback() raise e @classmethod async def col_detail_services(cls, query_db: AsyncSession, col: str): """ 获取参数配置详细信息service :param query_db: orm对象 :param config_id: 参数配置id :return: 参数配置id对应的信息 """ config = await MetaSecurityDao.get_meta_security_col_by_id(query_db, col) if config: result = MetaSecurityColModel(**CamelCaseUtil.transform_result(config)) else: result = MetaSecurityColModel(**dict()) return result @classmethod async def row_detail_services(cls, query_db: AsyncSession, row_id: str): """ 获取参数配置详细信息service :param query_db: orm对象 :param config_id: 参数配置id :return: 参数配置id对应的信息 """ config = await MetaSecurityDao.get_meta_security_row_by_id(query_db, row_id) if config: result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(config)) else: result = MetaSecurityRowModel(**dict()) return result @classmethod async def edit_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel): """ 编辑列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 编辑的列配置对象 :return: 编辑列配置校验结果 """ edit_col = page_object.model_dump(exclude_unset=True) col_info = await cls.get_meta_security_col_by_id_services(query_db, page_object.colId) if col_info: try: await MetaSecurityDao.update_meta_security_col(query_db, edit_col) await query_db.commit() # 缓存更新,如果需要 # await request.app.state.redis.set(...) return CrudResponseModel(is_success=True, message='编辑列配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message=f'列配置{page_object.colId}不存在') @classmethod async def edit_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel): """ 编辑行配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 编辑的行配置对象 :return: 编辑行配置校验结果 """ edit_row = page_object.model_dump(exclude_unset=True) row_info = await cls.get_meta_security_row_by_id_services(query_db, page_object.rowId) if row_info: try: await MetaSecurityDao.update_meta_security_row(query_db, edit_row) await query_db.commit() return CrudResponseModel(is_success=True, message='编辑行配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message=f'行配置{page_object.rowId}不存在') @classmethod async def delete_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel): """ 删除列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 删除列配置对象 :return: 删除列配置校验结果 """ if page_object.metaSecurity_ids: col_id_list = page_object.metaSecurity_ids.split(',') try: for col_id in col_id_list: col_info = await cls.get_meta_security_col_by_id_services(query_db, col_id) if col_info: # 校验不能删除的系统内置列 await MetaSecurityDao.delete_meta_security_col(query_db, col_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除列配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入列配置ID为空') @classmethod async def delete_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel): """ 删除行配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 删除行配置对象 :return: 删除行配置校验结果 """ if page_object.metaSecurity_ids: row_id_list = page_object.metaSecurity_ids.split(',') try: for row_id in row_id_list: row_info = await cls.get_meta_security_row_by_id_services(query_db, row_id) if row_info: await MetaSecurityDao.delete_meta_security_row(query_db, row_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除行配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入行配置ID为空') @classmethod async def getMetaSercuitybysql(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityApiModel): #1.校验用户 if not page_object.username: raise ServiceException(data='', message='用户名不能为空!') user = await login_by_account(query_db, page_object.username) if not user: raise ServiceException(data='', message='用户不存在') if not page_object.password == user[0].password: raise ServiceException(data='', message='用户密码错误!') query_user = await UserDao.get_user_by_id(query_db, user_id=user[0].user_id) role_id_list = [item.role_id for item in query_user.get('user_role_info')] #2.查询用户及该用户角色下的所有行列配置 # 列配置 user_col_list=await MetaSecurityDao.get_api_col_list(query_db, page_object.dbRId,page_object.tableName, '0',user[0].user_id) role_col_list = [] for role_id in role_id_list: role_cols = await MetaSecurityDao.get_api_col_list( query_db, page_object.dbRId, page_object.tableName, '1', role_id ) role_col_list.extend(role_cols) # 将每个角色的列配置合并到列表中 # 行配置 user_row_list=await MetaSecurityDao.get_api_row_list(query_db, page_object.dbRId,page_object.tableName, '0',user[0].user_id) role_row_list = [] for role_id in role_id_list: role_rows = await MetaSecurityDao.get_api_row_list( query_db, page_object.dbRId, page_object.tableName, '1', role_id ) role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中 result = { "user_col_list": user_col_list, "role_col_list": role_col_list, "role_row_list": role_row_list, "user_row_list": user_row_list } # return result #3.根据行列配置控制原始sql #4.测试数据源连接是否正常 dataParams ={"user":"dbf","password":"1q2w3e4r","address":"jdbc:mysql://47.113.147.166:3306","database":"dash_test_w","jdbcUrl":"jdbc:mysql://47.113.147.166:3306/dash_test_w","driverClassName":"com.mysql.cj.jdbc.Driver","validationQuery":"select 1"} dbConnent = cls.get_db_engine('mysql',dataParams) query = "SELECT * FROM msq_table_constraints" result = await cls.execute_sql(dbConnent, query) return result #5.执行原始sql #6.执行控制后的sql #7.执行结果 def get_db_engine(db_type: str, db_params: dict): try: if db_type == "mysql": address = db_params['address'] if address.startswith("jdbc:mysql://"): # 去掉 jdbc:mysql:// 部分 address = address[len("jdbc:mysql://"):] db_params['address'] = address conn_str=f"mysql+aiomysql://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}" print(f"数据库连接字符串: {conn_str}") # 输出调试信息 return create_async_engine(conn_str) elif db_type == "postgresql": return create_async_engine(f"postgresql+asyncpg://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}") # 你可以根据需求添加更多数据库类型 else: raise ValueError("不支持的数据库类型") except SQLAlchemyError as e: # 捕获SQLAlchemy相关的数据库连接错误 raise ConnectionError(f"数据库连接失败: {e}") except Exception as e: # 捕获其他非预期的错误 raise RuntimeError(f"连接过程中发生了未知错误: {e}") @classmethod async def execute_sql(cls, dbConnent, sql_query: str,sql_type: str): # 创建异步会话 async with dbConnent.begin(): # 获取会话对象 async_session = sessionmaker( dbConnent, class_=AsyncSession, expire_on_commit=False ) async with async_session() as session: try: # 执行原始SQL查询 query = text(sql_query) result = await session.execute(query) # 获取所有结果 rows = result.fetchall() # 获取列名 columns = result.keys() # 将每一行转化为字典,键为列名 result_dict = [dict(zip(columns, row)) for row in rows] # 转换为 JSON 字符串 return json.dumps(result_dict, ensure_ascii=False, indent=4) except SQLAlchemyError as e: raise RuntimeError(f"{sql_type}执行 SQL 查询时发生错误: {e}")