from sqlalchemy.ext.asyncio import AsyncSession from module_admin.dao.data_asset_dao import DataAssetDao from module_admin.entity.vo.data_asset_vo import DataAssetItemModel, DataAssetBatchModel, DataAssetResultModel, DataAssetPageQueryModel from exceptions.exception import ServiceException from typing import Dict, Any, List from utils.common_util import export_list2excel class DataAssetService: """ 数据资产信息模块服务层 """ @classmethod async def get_data_asset_list_services( cls, query_db: AsyncSession, query_object: DataAssetPageQueryModel, is_page: bool = False ): """ 获取数据资产信息列表service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 数据资产信息列表对象 """ data_asset_list_result = await DataAssetDao.get_data_asset_list(query_db, query_object, is_page) return data_asset_list_result @classmethod async def batch_process_data_asset_services( cls, query_db: AsyncSession, batch_object: DataAssetBatchModel ): """ 批量处理数据资产信息service :param query_db: orm对象 :param batch_object: 批量处理参数对象 :return: 处理结果对象 """ result = DataAssetResultModel() for item in batch_object.data_assets: try: # 转换对象为字典,并移除ctrl_flag,version_no字段 data_dict = item.model_dump() ctrl_flag = data_dict.pop("ctrl_flag") version_no = data_dict.pop("version_no") # 检查数据资产是否存在 existing_asset = await DataAssetDao.get_data_asset_by_ast_no(query_db, item.ast_no) # 根据ctrl_flag进行不同的操作 if ctrl_flag == "1": # 插入 if existing_asset: raise ServiceException(f"数据资产编号 {item.ast_no} 已存在,无法插入") await DataAssetDao.insert_data_asset(query_db, data_dict) result.success_count += 1 result.success_items.append(item.ast_no) elif ctrl_flag == "2": # 删除 if not existing_asset: raise ServiceException(f"数据资产编号 {item.ast_no} 不存在,无法删除") await DataAssetDao.delete_data_asset(query_db, item.ast_no) result.success_count += 1 result.success_items.append(item.ast_no) elif ctrl_flag == "3": # 更新 if not existing_asset: raise ServiceException(f"数据资产编号 {item.ast_no} 不存在,无法更新") await DataAssetDao.update_data_asset(query_db, item.ast_no, data_dict) result.success_count += 1 result.success_items.append(item.ast_no) else: raise ServiceException(f"未知的操作类型: {ctrl_flag}") except Exception as e: result.failed_count += 1 result.failed_items.append({"ast_no": item.ast_no, "error": str(e)}) # 提交事务 await query_db.commit() return result @classmethod async def get_data_asset_sources_services(cls, query_db: AsyncSession): """ 获取所有数据资产来源service :param query_db: orm对象 :return: 数据资产来源列表 """ sources = await DataAssetDao.get_data_asset_sources(query_db) # 构造父节点 formatted_data = [ { "name": "表数据资产", "children": [ { "id": str(index), # 转换为字符串类型id "name": source # 原始来源名称 } for index, source in enumerate(sources, 1) # 从1开始编号 ] }, { "name": "接口数据资产", "children": [] }, { "name": "数据治理资产", "children": [] } ] return formatted_data @classmethod async def search_data_assets_services( cls, query_db: AsyncSession, search_params: dict, page_num: int = 1, page_size: int = 10, is_page: bool = True ): """ 综合查询数据资产信息service :param query_db: orm对象 :param search_params: 查询参数字典 :param page_num: 页码 :param page_size: 每页大小 :param is_page: 是否分页 :return: 查询结果 """ search_result = await DataAssetDao.search_data_assets( query_db, search_params, is_page, page_num, page_size ) return search_result @staticmethod async def export_data_asset_list_services(data_assets_list: list): mapping_dict = { "astNo": "资产编号", "dataAstEngName": "表英文名称", "dataAstCnName": "表中文名称", "dataAstType": "表类型", "dataAstStat": "表状态", "dataAstDesc": "表描述", "dataAstScreen": "资产应用场景", "dataAstScrenClas": "应用场景分类", "dataAstClas": "资产标签", "dataAstCont": "资产内容", "dataAstFaq": "资产常见问题", "dataAstSrc": "资产来源", "dataAstEstbTime": "资产建立时间", "dataAstUpdTime": "资产更新时间" } # 1. 获取映射关系的键顺序(确保列顺序一致) keys_order = list(mapping_dict.keys()) new_data = [] for item in data_assets_list: item_dict = item.__dict__ if not isinstance(item, dict) else item cleaned_item = {k: (v if v is not None else "") for k, v in item_dict.items()} # 处理状态字段 cleaned_item["dataAstStat"] = "有效" if cleaned_item.get("dataAstStat") == "有效" else "废弃" # 按mapping_dict的键顺序构建数据 mapped_item = { mapping_dict[key]: cleaned_item.get(key, "") for key in keys_order } new_data.append(mapped_item) # 直接调用export_list2excel,只传入数据参数 return export_list2excel(new_data)