You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
176 lines
6.6 KiB
176 lines
6.6 KiB
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from module_admin.dao.data_asset_dao import DataAssetDao
|
|
from module_admin.entity.vo.data_asset_vo import DataAssetItemModel, DataAssetBatchModel, DataAssetResultModel, DataAssetPageQueryModel
|
|
from exceptions.exception import ServiceException
|
|
from typing import Dict, Any, List
|
|
from utils.common_util import export_list2excel
|
|
|
|
|
|
class DataAssetService:
|
|
"""
|
|
数据资产信息模块服务层
|
|
"""
|
|
|
|
@classmethod
|
|
async def get_data_asset_list_services(
|
|
cls, query_db: AsyncSession, query_object: DataAssetPageQueryModel, is_page: bool = False
|
|
):
|
|
"""
|
|
获取数据资产信息列表service
|
|
|
|
:param query_db: orm对象
|
|
:param query_object: 查询参数对象
|
|
:param is_page: 是否开启分页
|
|
:return: 数据资产信息列表对象
|
|
"""
|
|
data_asset_list_result = await DataAssetDao.get_data_asset_list(query_db, query_object, is_page)
|
|
return data_asset_list_result
|
|
|
|
@classmethod
|
|
async def batch_process_data_asset_services(
|
|
cls, query_db: AsyncSession, batch_object: DataAssetBatchModel
|
|
):
|
|
"""
|
|
批量处理数据资产信息service
|
|
|
|
:param query_db: orm对象
|
|
:param batch_object: 批量处理参数对象
|
|
:return: 处理结果对象
|
|
"""
|
|
result = DataAssetResultModel()
|
|
|
|
for item in batch_object.data_assets:
|
|
try:
|
|
# 转换对象为字典,并移除ctrl_flag,version_no字段
|
|
data_dict = item.model_dump()
|
|
ctrl_flag = data_dict.pop("ctrl_flag")
|
|
version_no = data_dict.pop("version_no")
|
|
|
|
# 检查数据资产是否存在
|
|
existing_asset = await DataAssetDao.get_data_asset_by_ast_no(query_db, item.ast_no)
|
|
|
|
# 根据ctrl_flag进行不同的操作
|
|
if ctrl_flag == "1": # 插入
|
|
if existing_asset:
|
|
raise ServiceException(f"数据资产编号 {item.ast_no} 已存在,无法插入")
|
|
await DataAssetDao.insert_data_asset(query_db, data_dict)
|
|
result.success_count += 1
|
|
result.success_items.append(item.ast_no)
|
|
elif ctrl_flag == "2": # 删除
|
|
if not existing_asset:
|
|
raise ServiceException(f"数据资产编号 {item.ast_no} 不存在,无法删除")
|
|
await DataAssetDao.delete_data_asset(query_db, item.ast_no)
|
|
result.success_count += 1
|
|
result.success_items.append(item.ast_no)
|
|
elif ctrl_flag == "3": # 更新
|
|
if not existing_asset:
|
|
raise ServiceException(f"数据资产编号 {item.ast_no} 不存在,无法更新")
|
|
await DataAssetDao.update_data_asset(query_db, item.ast_no, data_dict)
|
|
result.success_count += 1
|
|
result.success_items.append(item.ast_no)
|
|
else:
|
|
raise ServiceException(f"未知的操作类型: {ctrl_flag}")
|
|
|
|
except Exception as e:
|
|
result.failed_count += 1
|
|
result.failed_items.append({"ast_no": item.ast_no, "error": str(e)})
|
|
|
|
# 提交事务
|
|
await query_db.commit()
|
|
|
|
return result
|
|
|
|
@classmethod
|
|
async def get_data_asset_sources_services(cls, query_db: AsyncSession):
|
|
"""
|
|
获取所有数据资产来源service
|
|
|
|
:param query_db: orm对象
|
|
:return: 数据资产来源列表
|
|
"""
|
|
sources = await DataAssetDao.get_data_asset_sources(query_db)
|
|
|
|
# 构造父节点
|
|
formatted_data = [
|
|
{
|
|
"name": "表数据资产",
|
|
"children": [
|
|
{
|
|
"id": str(index), # 转换为字符串类型id
|
|
"name": source # 原始来源名称
|
|
}
|
|
for index, source in enumerate(sources, 1) # 从1开始编号
|
|
]
|
|
},
|
|
{
|
|
"name": "接口数据资产",
|
|
"children": []
|
|
},
|
|
{
|
|
"name": "数据治理资产",
|
|
"children": []
|
|
}
|
|
]
|
|
|
|
return formatted_data
|
|
|
|
@classmethod
|
|
async def search_data_assets_services(
|
|
cls, query_db: AsyncSession, search_params: dict, page_num: int = 1, page_size: int = 10, is_page: bool = True
|
|
):
|
|
"""
|
|
综合查询数据资产信息service
|
|
|
|
:param query_db: orm对象
|
|
:param search_params: 查询参数字典
|
|
:param page_num: 页码
|
|
:param page_size: 每页大小
|
|
:param is_page: 是否分页
|
|
:return: 查询结果
|
|
"""
|
|
search_result = await DataAssetDao.search_data_assets(
|
|
query_db, search_params, is_page, page_num, page_size
|
|
)
|
|
return search_result
|
|
|
|
|
|
|
|
@staticmethod
|
|
async def export_data_asset_list_services(data_assets_list: list):
|
|
mapping_dict = {
|
|
"astNo": "资产编号",
|
|
"dataAstEngName": "表英文名称",
|
|
"dataAstCnName": "表中文名称",
|
|
"dataAstType": "表类型",
|
|
"dataAstStat": "表状态",
|
|
"dataAstDesc": "表描述",
|
|
"dataAstScreen": "资产应用场景",
|
|
"dataAstScrenClas": "应用场景分类",
|
|
"dataAstClas": "资产标签",
|
|
"dataAstCont": "资产内容",
|
|
"dataAstFaq": "资产常见问题",
|
|
"dataAstSrc": "资产来源",
|
|
"dataAstEstbTime": "资产建立时间",
|
|
"dataAstUpdTime": "资产更新时间"
|
|
}
|
|
|
|
# 1. 获取映射关系的键顺序(确保列顺序一致)
|
|
keys_order = list(mapping_dict.keys())
|
|
|
|
new_data = []
|
|
for item in data_assets_list:
|
|
item_dict = item.__dict__ if not isinstance(item, dict) else item
|
|
cleaned_item = {k: (v if v is not None else "") for k, v in item_dict.items()}
|
|
|
|
# 处理状态字段
|
|
cleaned_item["dataAstStat"] = "有效" if cleaned_item.get("dataAstStat") == "有效" else "废弃"
|
|
|
|
# 按mapping_dict的键顺序构建数据
|
|
mapped_item = {
|
|
mapping_dict[key]: cleaned_item.get(key, "")
|
|
for key in keys_order
|
|
}
|
|
new_data.append(mapped_item)
|
|
|
|
# 直接调用export_list2excel,只传入数据参数
|
|
return export_list2excel(new_data)
|