You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					176 lines
				
				6.6 KiB
			
		
		
			
		
	
	
					176 lines
				
				6.6 KiB
			| 
											7 months ago
										 | from sqlalchemy.ext.asyncio import AsyncSession | ||
|  | from module_admin.dao.data_asset_dao import DataAssetDao | ||
|  | from module_admin.entity.vo.data_asset_vo import DataAssetItemModel, DataAssetBatchModel, DataAssetResultModel, DataAssetPageQueryModel | ||
|  | from exceptions.exception import ServiceException | ||
|  | from typing import Dict, Any, List | ||
|  | from utils.common_util import export_list2excel | ||
|  | 
 | ||
|  | 
 | ||
|  | class DataAssetService: | ||
|  |     """
 | ||
|  |     数据资产信息模块服务层 | ||
|  |     """
 | ||
|  | 
 | ||
|  |     @classmethod | ||
|  |     async def get_data_asset_list_services( | ||
|  |         cls, query_db: AsyncSession, query_object: DataAssetPageQueryModel, is_page: bool = False | ||
|  |     ): | ||
|  |         """
 | ||
|  |         获取数据资产信息列表service | ||
|  | 
 | ||
|  |         :param query_db: orm对象 | ||
|  |         :param query_object: 查询参数对象 | ||
|  |         :param is_page: 是否开启分页 | ||
|  |         :return: 数据资产信息列表对象 | ||
|  |         """
 | ||
|  |         data_asset_list_result = await DataAssetDao.get_data_asset_list(query_db, query_object, is_page) | ||
|  |         return data_asset_list_result | ||
|  | 
 | ||
|  |     @classmethod | ||
|  |     async def batch_process_data_asset_services( | ||
|  |         cls, query_db: AsyncSession, batch_object: DataAssetBatchModel | ||
|  |     ): | ||
|  |         """
 | ||
|  |         批量处理数据资产信息service | ||
|  | 
 | ||
|  |         :param query_db: orm对象 | ||
|  |         :param batch_object: 批量处理参数对象 | ||
|  |         :return: 处理结果对象 | ||
|  |         """
 | ||
|  |         result = DataAssetResultModel() | ||
|  |          | ||
|  |         for item in batch_object.data_assets: | ||
|  |             try: | ||
|  |                 # 转换对象为字典,并移除ctrl_flag,version_no字段 | ||
|  |                 data_dict = item.model_dump() | ||
|  |                 ctrl_flag = data_dict.pop("ctrl_flag") | ||
|  |                 version_no = data_dict.pop("version_no") | ||
|  |                  | ||
|  |                 # 检查数据资产是否存在 | ||
|  |                 existing_asset = await DataAssetDao.get_data_asset_by_ast_no(query_db, item.ast_no) | ||
|  |                  | ||
|  |                 # 根据ctrl_flag进行不同的操作 | ||
|  |                 if ctrl_flag == "1":  # 插入 | ||
|  |                     if existing_asset: | ||
|  |                         raise ServiceException(f"数据资产编号 {item.ast_no} 已存在,无法插入") | ||
|  |                     await DataAssetDao.insert_data_asset(query_db, data_dict) | ||
|  |                     result.success_count += 1 | ||
|  |                     result.success_items.append(item.ast_no) | ||
|  |                 elif ctrl_flag == "2":  # 删除 | ||
|  |                     if not existing_asset: | ||
|  |                         raise ServiceException(f"数据资产编号 {item.ast_no} 不存在,无法删除") | ||
|  |                     await DataAssetDao.delete_data_asset(query_db, item.ast_no) | ||
|  |                     result.success_count += 1 | ||
|  |                     result.success_items.append(item.ast_no) | ||
|  |                 elif ctrl_flag == "3":  # 更新 | ||
|  |                     if not existing_asset: | ||
|  |                         raise ServiceException(f"数据资产编号 {item.ast_no} 不存在,无法更新") | ||
|  |                     await DataAssetDao.update_data_asset(query_db, item.ast_no, data_dict) | ||
|  |                     result.success_count += 1 | ||
|  |                     result.success_items.append(item.ast_no) | ||
|  |                 else: | ||
|  |                     raise ServiceException(f"未知的操作类型: {ctrl_flag}") | ||
|  |                  | ||
|  |             except Exception as e: | ||
|  |                 result.failed_count += 1 | ||
|  |                 result.failed_items.append({"ast_no": item.ast_no, "error": str(e)}) | ||
|  |          | ||
|  |         # 提交事务 | ||
|  |         await query_db.commit() | ||
|  |          | ||
|  |         return result  | ||
|  |          | ||
|  |     @classmethod | ||
|  |     async def get_data_asset_sources_services(cls, query_db: AsyncSession): | ||
|  |         """
 | ||
|  |         获取所有数据资产来源service | ||
|  |          | ||
|  |         :param query_db: orm对象 | ||
|  |         :return: 数据资产来源列表 | ||
|  |         """
 | ||
|  |         sources = await DataAssetDao.get_data_asset_sources(query_db) | ||
|  | 
 | ||
|  |         # 构造父节点 | ||
|  |         formatted_data = [ | ||
|  |             { | ||
|  |                 "name": "表数据资产", | ||
|  |                 "children": [ | ||
|  |                     { | ||
|  |                         "id": str(index),  # 转换为字符串类型id | ||
|  |                         "name": source  # 原始来源名称 | ||
|  |                     } | ||
|  |                     for index, source in enumerate(sources, 1)  # 从1开始编号 | ||
|  |                 ] | ||
|  |             }, | ||
|  |             { | ||
|  |                 "name": "接口数据资产", | ||
|  |                 "children": [] | ||
|  |             }, | ||
|  |             { | ||
|  |                 "name": "数据治理资产", | ||
|  |                 "children": [] | ||
|  |             } | ||
|  |         ] | ||
|  | 
 | ||
|  |         return formatted_data | ||
|  | 
 | ||
|  |     @classmethod | ||
|  |     async def search_data_assets_services( | ||
|  |         cls, query_db: AsyncSession, search_params: dict, page_num: int = 1, page_size: int = 10, is_page: bool = True | ||
|  |     ): | ||
|  |         """
 | ||
|  |         综合查询数据资产信息service | ||
|  |          | ||
|  |         :param query_db: orm对象 | ||
|  |         :param search_params: 查询参数字典 | ||
|  |         :param page_num: 页码 | ||
|  |         :param page_size: 每页大小 | ||
|  |         :param is_page: 是否分页 | ||
|  |         :return: 查询结果 | ||
|  |         """
 | ||
|  |         search_result = await DataAssetDao.search_data_assets( | ||
|  |             query_db, search_params, is_page, page_num, page_size | ||
|  |         ) | ||
|  |         return search_result | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  |     @staticmethod | ||
|  |     async def export_data_asset_list_services(data_assets_list: list): | ||
|  |         mapping_dict = { | ||
|  |             "astNo": "资产编号", | ||
|  |             "dataAstEngName": "表英文名称", | ||
|  |             "dataAstCnName": "表中文名称", | ||
|  |             "dataAstType": "表类型", | ||
|  |             "dataAstStat": "表状态", | ||
|  |             "dataAstDesc": "表描述", | ||
|  |             "dataAstScreen": "资产应用场景", | ||
|  |             "dataAstScrenClas": "应用场景分类", | ||
|  |             "dataAstClas": "资产标签", | ||
|  |             "dataAstCont": "资产内容", | ||
|  |             "dataAstFaq": "资产常见问题", | ||
|  |             "dataAstSrc": "资产来源", | ||
|  |             "dataAstEstbTime": "资产建立时间", | ||
|  |             "dataAstUpdTime": "资产更新时间" | ||
|  |         } | ||
|  | 
 | ||
|  |         # 1. 获取映射关系的键顺序(确保列顺序一致) | ||
|  |         keys_order = list(mapping_dict.keys()) | ||
|  | 
 | ||
|  |         new_data = [] | ||
|  |         for item in data_assets_list: | ||
|  |             item_dict = item.__dict__ if not isinstance(item, dict) else item | ||
|  |             cleaned_item = {k: (v if v is not None else "") for k, v in item_dict.items()} | ||
|  |              | ||
|  |             # 处理状态字段 | ||
|  |             cleaned_item["dataAstStat"] = "有效" if cleaned_item.get("dataAstStat") == "有效" else "废弃" | ||
|  | 
 | ||
|  |             # 按mapping_dict的键顺序构建数据 | ||
|  |             mapped_item = { | ||
|  |                 mapping_dict[key]: cleaned_item.get(key, "") | ||
|  |                 for key in keys_order | ||
|  |             } | ||
|  |             new_data.append(mapped_item) | ||
|  | 
 | ||
|  |         # 直接调用export_list2excel,只传入数据参数 | ||
|  |         return export_list2excel(new_data) |