from datetime import datetime from utils.log_util import logger from collections import defaultdict from pyecharts.options import LabelOpts, InitOpts from fastapi.responses import JSONResponse from pyecharts.charts import Pie, Bar, Page from pyecharts import options as opts from sqlalchemy.ext.asyncio import AsyncSession from exceptions.exception import ServiceException from module_admin.dao.data_ast_content_dao import DataCatalogDAO from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.data_ast_content_vo import DataCatalogRequest, DataCatalogResponse, DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogResponseWithChildren, DataAssetCatalogTreeResponse, DataAssetCatalogTreeNode,DataCatalogMovedRequest,DataCatalogMergeRequest,DataCatalogChild,DataCatalogMoverelRequest,DataAstBookmarkRelaRequest,DataAstIndxRequest from pyecharts.globals import CurrentConfig class DataCatalogService: """ 数据目录管理模块服务层 """ @classmethod async def get_catalog_list_services( cls, query_db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, user_name: str, is_page: bool = False ): """ 获取数据目录列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 数据目录列表信息对象 """ catalog_list_result = await DataCatalogDAO.get_catalog_list(query_db, query_object, user_id, user_name, is_page) print('获取数据清单内容111:',catalog_list_result) # 按contentOnum分组 grouped = defaultdict(list) for item in catalog_list_result.rows: grouped[item['contentOnum']].append(item) nodes = {} # 存储所有处理后的节点 # 处理每个组,生成节点 for content_onum, items in grouped.items(): first_item = items[0] is_leaf = first_item['leafNodeFlag'] == 1 rela_onum = first_item['relaOnum'] is not None # 公共字段提取 common_fields = { 'contentOnum': first_item['contentOnum'], 'contentName': first_item['contentName'], 'contentStat': first_item['contentStat'], 'contentIntr': first_item['contentIntr'], 'contentPic': first_item['contentPic'], 'suprContentOnum': first_item['suprContentOnum'], 'leafNodeFlag': first_item['leafNodeFlag'], 'updPrsn': first_item['updPrsn'], 'updTime': first_item['updTime'], 'children': [] } # 处理叶子节点的数据资产子节点 if is_leaf and rela_onum : for item in items: asset_child = { 'relaOnum': item['relaOnum'], 'contentOnum': first_item['contentOnum'], 'astOnum': item['astOnum'], 'relaType': item['relaType'], 'relaEffBegnDate': item['relaEffBegnDate'], 'relaEffEndDate': item['relaEffEndDate'], 'updPrsn1': item['updPrsn1'], 'dataAstNo': item['dataAstNo'], 'dataAstEngName': item['dataAstEngName'], 'dataAstCnName': item['dataAstCnName'], 'dataAstType': item['dataAstType'], 'dataAstStat': item['dataAstStat'], 'dataAstDesc': item['dataAstDesc'], 'dataAstClas': item['dataAstClas'], 'dataAstCont': item['dataAstCont'], 'dataAstFaq': item['dataAstFaq'], 'dataAstEstbTime': item['dataAstEstbTime'], 'dataAstUpdTime': item['dataAstUpdTime'], 'dataAstSrc': item['dataAstSrc'], 'astNo': item['astNo'], 'relaOnum': item['relaOnum'], 'bookmarkOrde': item['bookmarkOrde'], 'bookmarkFlag': item['bookmarkFlag'], 'sczcFlag': item['sczcFlag'] } common_fields['children'].append(asset_child) nodes[content_onum] = common_fields # 构建父子关系 root = None for content_onum, node in nodes.items(): supr = node['suprContentOnum'] if supr is None: root = node else: parent = nodes.get(supr) if parent: parent['children'].append(node) catalog_list_result.rows = [root] return catalog_list_result @classmethod async def get_catalog_detail_services(cls, query_db: AsyncSession, content_onum: int): """ 获取数据目录详细信息service :param query_db: orm对象 :param content_onum: 数据目录ID :return: 数据目录详细信息对象 """ catalog_detail_result = await DataCatalogDAO.get_catalog_by_id(query_db, content_onum) return catalog_detail_result @classmethod async def add_catalog_services(cls, query_db: AsyncSession, request: DataCatalogResponseWithChildren): """ 新增数据目录信息service :param query_db: orm对象 :param request: 新增数据目录请求对象 :return: 新增目录操作结果 """ catalog_data1 = { 'content_name': request.content_name, 'content_stat': request.content_stat, 'content_intr': request.content_intr, 'content_pic': request.content_pic, 'supr_content_onum': request.supr_content_onum, 'leaf_node_flag': request.leaf_node_flag, 'upd_prsn': request.upd_prsn } catalog_data2 = { 'content_name': request.content_name, 'content_stat': request.content_stat, 'content_intr': request.content_intr, 'content_pic': request.content_pic, 'supr_content_onum': request.supr_content_onum, 'leaf_node_flag': request.leaf_node_flag, 'upd_prsn': request.upd_prsn, 'children': [child.model_dump() for child in request.children] # 将 children 转换为字典列表 } try: for child in catalog_data2["children"]: child["rela_eff_begn_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 设置默认值,当前时间 child["rela_eff_end_date"] = datetime(year=2999, month=12, day=31, hour=0, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S"), # 设置默认值,2999-12-31 child["upd_prsn"] = request.upd_prsn, child["rela_status"] = "1" new_catalog = await DataCatalogDAO.add_catalog_dao(query_db, catalog_data1, catalog_data2) await query_db.commit() return CrudResponseModel(is_success=True, message='新增成功', data=new_catalog) except Exception as e: await query_db.rollback() raise ServiceException(message=f"创建目录时发生错误: {str(e)}") @classmethod async def edit_catalog_leaf_services(cls, query_db: AsyncSession,content_onum : int, leaf_node_flag : int): """ 编辑数据目录信息service :param query_db: orm对象 :param request: 编辑数据目录请求对象 :return: 编辑目录操作结果 """ catalog_data1 = { 'content_onum': content_onum, 'leaf_node_flag': leaf_node_flag } try: await DataCatalogDAO.edit_catalog_leaf_dao(query_db, catalog_data1) await query_db.commit() return CrudResponseModel(is_success=True, message='更新成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"更新目录时发生错误: {str(e)}") @classmethod async def edit_catalog_child_services(cls, query_db: AsyncSession, request: DataCatalogResponseWithChildren): """ 编辑数据目录信息service :param query_db: orm对象 :param request: 编辑数据目录请求对象 :return: 编辑目录操作结果 """ catalog_data = { 'content_onum': request.content_onum, 'content_name': request.content_name, 'content_stat': request.content_stat, 'content_intr': request.content_intr, 'content_pic': request.content_pic, 'supr_content_onum': request.supr_content_onum, 'leaf_node_flag': request.leaf_node_flag, 'upd_prsn': request.upd_prsn, 'children': [child.model_dump() for child in request.children] # 将 children 转换为字典列表 } try: for child in catalog_data["children"]: # 设置 rela_eff_begn_date if child.get("rela_eff_begn_date"): child["rela_eff_begn_date"] = child["rela_eff_begn_date"].strftime("%Y-%m-%d %H:%M:%S") else: child["rela_eff_begn_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 设置 rela_eff_end_date if child.get("rela_eff_end_date"): child["rela_eff_end_date"] = child["rela_eff_end_date"].strftime("%Y-%m-%d %H:%M:%S") else: child["rela_eff_end_date"] = datetime(year=2999, month=12, day=31, hour=0, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S") child["upd_prsn"] = request.upd_prsn child["rela_status"] = "1" await DataCatalogDAO.edit_catalog_child_dao(query_db, catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='更新成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"更新目录时发生错误: {str(e)}") @classmethod async def delete_catalog_services(cls, query_db: AsyncSession, request: DeleteDataCatalogModel,user_id): """ 删除数据目录信息service :param query_db: orm对象 :param request: 删除数据目录请求对象 :return: 删除目录操作结果 """ if request.content_onums: content_onum_list = request.content_onums.split(',') try: for content_onum in content_onum_list: catalog = await cls.get_catalog_detail_services(query_db, int(content_onum)) if not catalog: raise ServiceException(message=f'目录ID {content_onum} 不存在') await DataCatalogDAO.delete_catalog_dao(query_db, DeleteDataCatalogModel(content_onums=content_onum)) await DataCatalogDAO.delete_ast_book_mark_rela_by_content_onum(query_db, int(content_onum), user_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入目录id为空') @classmethod async def get_data_asset_catalog_tree_services(cls, query_db: AsyncSession): """ 获取数据资产树信息service :param query_db: orm对象 :return: 数据资产树信息对象 """ # 从 DAO 层获取数据 rows = await DataCatalogDAO.get_data_asset_catalog_tree(query_db) # 使用字典暂存分组数据 sys_groups = {} for item in rows: sys_name, eng_name, cn_name, ast_no, rel_status = item # 创建或获取系统分组 if sys_name not in sys_groups: sys_groups[sys_name] = { "dataAssetSysName": sys_name, "children": [] } # 添加子节点 sys_groups[sys_name]["children"].append({ "dataAssetCatalogNo": eng_name, "dataAssetCatalogName": cn_name, "dataAssetCatalogAstno": ast_no, "rel_status": rel_status }) results = list(sys_groups.values()) # 转换为最终列表格式 return results @classmethod async def moved_catalog_instr_services(cls, query_db: AsyncSession, request: DataCatalogMovedRequest): """ 移动数据目录service """ moved_catalog_data = { 'content_onum': request.content_onum, 'supr_content_onum': request.supr_content_onum, 'supr_content_onum_after': request.supr_content_onum_after } try: await DataCatalogDAO.moved_catalog_instr_dao(query_db, moved_catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='目录移动成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"移动目录时发生错误: {str(e)}") @classmethod async def merge_catalog_instr_services(cls, query_db: AsyncSession, request: DataCatalogMergeRequest): """ 移动数据目录service """ merge_catalog_data = { 'content_onum': request.content_onum, 'supr_content_onum': request.supr_content_onum, 'content_onum_after': request.content_onum_after, 'supr_content_onum_after': request.supr_content_onum_after } try: await DataCatalogDAO.merge_catalog_instr_dao(query_db, merge_catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='目录合并成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"目录合并时发生错误: {str(e)}") @classmethod async def removerel_data_ast_catalog_services(cls, query_db: AsyncSession, request: DataCatalogChild): """ 移除数据资产目录service """ removerel_catalog_data = { 'rela_onum': request.rela_onum, 'content_onum': request.content_onum, 'ast_onum': request.ast_onum, 'rela_type': request.rela_type, 'rela_eff_begn_date': request.rela_eff_begn_date, 'rela_eff_end_date': request.rela_eff_end_date, 'upd_prsn': request.upd_prsn, 'rela_status': request.rela_status } try: await DataCatalogDAO.removerel_data_ast_catalog_dao(query_db, removerel_catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='资产移除成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"移除资产时发生错误: {str(e)}") @classmethod async def moverel_data_ast_catalog_services(cls, query_db: AsyncSession, request: DataCatalogMoverelRequest): """ 移动数据目录service """ moverel_catalog_data = { 'rela_onum': request.rela_onum, 'content_onum': request.content_onum, 'content_onum_after': request.content_onum_after } try: await DataCatalogDAO.moverel_data_ast_catalog_dao(query_db, moverel_catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='资产移动成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"资产移动时发生错误: {str(e)}") @classmethod async def delete_ast_book_mark_rela_services(cls, db: AsyncSession, request: DataAstBookmarkRelaRequest, user_name: str, user_id: str): """ 取消收藏数据库操作 """ delete_rela_onum = { 'rela_onum': request.rela_onum } try: await DataCatalogDAO.delete_ast_book_mark_rela_dao(db, delete_rela_onum,user_name,user_id) await db.commit() return CrudResponseModel(is_success=True, message='取消收藏,操作成功') except Exception as e: await db.rollback() logger.error(" 取消收藏,操作失败") @classmethod async def add_ast_book_mark_rela_services(cls, db: AsyncSession, request: DataAstBookmarkRelaRequest, user_name: str): """ 添加收藏数据库操作 :param db: 异步会话对象 :param request: 收藏请求对象,包含用户ID、资产编号等信息 :param user_name: 当前操作用户的名称 :return: 标准化响应模型,包含操作是否成功、消息和返回数据 """ # 构建DAO参数 add_rela_onum = { 'user_id': request.user_id, 'data_ast_no': request.data_ast_no, # 我的收藏 'content_onum': request.content_onum, 'rela_type': "归属关系", 'rela_eff_begn_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 设置默认值,当前时间 'rela_eff_end_date': datetime(year=2999, month=12, day=31, hour=0, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S") } try: # 调用DAO层操作 dao_result = await DataCatalogDAO.add_ast_book_mark_rela_dao(db, add_rela_onum, user_name) if not dao_result.get("is_success"): # 业务逻辑错误(如重复收藏) logger.warning(f"用户{request.user_id}收藏资产{request.data_ast_no}失败: {dao_result.get('message')}") return CrudResponseModel( is_success=False, message=dao_result.get("message", "操作失败"), data=None ) # 执行成功时提交事务 await db.commit() # 记录成功日志 if dao_result.get("data") is not None: logger.info(f"用户{request.user_id}收藏资产{request.data_ast_no}成功,顺序号: {dao_result.get('data').bookmark_orde}") return CrudResponseModel( is_success=True, message=dao_result.get("message", "操作成功"), data={ "rela_onum": dao_result.get("data").rela_onum, "bookmark_orde": dao_result.get("data").bookmark_orde } ) else: logger.warning(f"用户{request.user_id}收藏资产{request.data_ast_no}失败: 返回数据为空") return CrudResponseModel( is_success=False, message="操作失败:返回数据为空", data=None ) except Exception as e: await db.rollback() logger.error( f"用户 {request.user_id} 收藏操作失败: {str(e)}", # 直接使用 request.user_id exc_info=True ) return CrudResponseModel( is_success=False, message=f"收藏操作失败: {str(e)}" ) # 添加到 DataCatalogService 类中 @classmethod async def add_bookmark_folder_services(cls, db: AsyncSession, folder: DataCatalogRequest): """ 添加收藏子目录服务 :param db: 数据库会话 :param folder: 目录请求对象 :return: 操作结果 """ try: # 检查是否已存在同名目录 existing_folder = await DataCatalogDAO.get_bookmark_folder_by_name( db, folder.content_name, folder.upd_prsn ) if existing_folder: return CrudResponseModel(is_success=False, message="已存在同名收藏目录") # 转换为字典并添加 folder_dict = folder.model_dump(exclude_unset=True) result = await DataCatalogDAO.add_catalog_dao(db, folder_dict, {"children": []}) # 提交事务 await db.commit() return CrudResponseModel(is_success=True, message="收藏目录添加成功") except Exception as e: await db.rollback() logger.error(f"添加收藏目录失败: {str(e)}", exc_info=True) return CrudResponseModel(is_success=False, message=f"添加收藏目录失败: {str(e)}") @classmethod async def edit_bookmark_folder_services(cls, db: AsyncSession, folder: DataCatalogRequest): """ 修改收藏子目录服务 :param db: 数据库会话 :param folder: 目录请求对象 :return: 操作结果 """ try: # 检查目录是否存在且属于当前用户 existing_folder = await DataCatalogDAO.get_bookmark_folder_by_id( db, folder.content_onum, folder.upd_prsn ) if not existing_folder: return CrudResponseModel(is_success=False, message="收藏目录不存在或无权限修改") # 检查是否已存在同名目录(排除自身) same_name_folder = await DataCatalogDAO.get_bookmark_folder_by_name_exclude_id( db, folder.content_name, folder.upd_prsn, folder.content_onum ) if same_name_folder: return CrudResponseModel(is_success=False, message="已存在同名收藏目录") # 转换为字典并更新 folder_dict = folder.model_dump(exclude_unset=True) folder_dict.update({ "supr_content_onum": 2, # 确保父目录不变 "content_stat": "1", # 添加状态字段 "leaf_node_flag": 1, # 添加叶子节点标志 "content_intr": folder_dict.get("content_intr", None), # 可选字段 "content_pic": folder_dict.get("content_pic", None) # 可选字段 }) await DataCatalogDAO.edit_catalog_dao(db, folder_dict) # 提交事务 await db.commit() return CrudResponseModel(is_success=True, message="收藏目录修改成功") except Exception as e: await db.rollback() logger.error(f"修改收藏目录失败: {str(e)}", exc_info=True) return CrudResponseModel(is_success=False, message=f"修改收藏目录失败: {str(e)}") @classmethod async def delete_bookmark_folder_services(cls, db: AsyncSession, content_onum: int, user_name: str,user_id:str): """ 删除收藏子目录服务 :param db: 数据库会话 :param content_onum: 目录ID :param user_name: 用户名 :return: 操作结果 """ try: # 检查目录是否存在且属于当前用户 existing_folder = await DataCatalogDAO.get_bookmark_folder_by_id( db, content_onum, user_name ) if not existing_folder: return CrudResponseModel(is_success=False, message="收藏目录不存在或无权限删除") # 使用专用方法执行删除操作 delete_result = await DataCatalogDAO.delete_bookmark_folder_dao(db, content_onum, user_name,user_id) if not delete_result["success"]: return CrudResponseModel(is_success=False, message=delete_result["message"]) # 提交事务 await db.commit() return CrudResponseModel(is_success=True, message="收藏目录删除成功") except Exception as e: await db.rollback() logger.error(f"删除收藏目录失败: {str(e)}", exc_info=True) return CrudResponseModel(is_success=False, message=f"删除收藏目录失败: {str(e)}") @classmethod async def get_bookmark_folders_services(cls, db: AsyncSession, user_name: str): """ 获取用户收藏目录列表服务 :param db: 数据库会话 :param user_name: 用户名 :return: 目录列表 """ try: # 获取用户的收藏目录列表 folders = await DataCatalogDAO.get_bookmark_folders(db, user_name) return folders except Exception as e: logger.error(f"获取收藏目录列表失败: {str(e)}", exc_info=True) return [] @classmethod async def move_bookmark_asset_services(cls, query_db: AsyncSession, request: DataCatalogMoverelRequest): """ 在收藏目录间移动资产服务 """ try: # 1. 验证源目录和目标目录是否都属于收藏体系 source_is_bookmark = await DataCatalogDAO.is_bookmark_folder(query_db, request.content_onum, request.upd_prsn) target_is_bookmark = await DataCatalogDAO.is_bookmark_folder( query_db, request.content_onum_after, request.upd_prsn ) if not source_is_bookmark: return CrudResponseModel(is_success=False, message="源目录不是收藏目录") if not target_is_bookmark: return CrudResponseModel(is_success=False, message="目标目录不是收藏目录或不属于当前用户") # 2. 构建移动数据 moverel_catalog_data = { 'rela_onum': request.rela_onum, 'content_onum': request.content_onum, 'content_onum_after': request.content_onum_after, 'upd_prsn': request.upd_prsn } # 3. 执行移动操作 await DataCatalogDAO.move_bookmark_asset_dao(query_db, moverel_catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message="收藏资产移动成功") except Exception as e: await query_db.rollback() logger.error(f"移动收藏资产失败: {str(e)}", exc_info=True) return CrudResponseModel(is_success=False, message=f"移动收藏资产失败: {str(e)}") @classmethod async def get_data_ast_indx_list_services(cls, query_db: AsyncSession, query_object: DataAstIndxRequest): """获取数据资产指标列表信息service""" try: indx_list_dict = await DataCatalogDAO.get_data_ast_indx_list(query_db, query_object) indx_names = [item["indx_name"] for item in indx_list_dict] indx_vals = [item["indx_val"] for item in indx_list_dict] CurrentConfig.ONLINE_HOST = "/assets/js/echarts.min.js" # 创建独立图表 pie = ( Pie(init_opts=InitOpts(width="100%", height="180px")) .add("", list(zip(indx_names, indx_vals))) .set_global_opts( title_opts=opts.TitleOpts(title="指标饼图", padding=[5,0,0,0]), legend_opts=opts.LegendOpts( type_="scroll", orient="vertical", pos_right="2%", pos_top="10%" ) ) .set_series_opts(label_opts=LabelOpts(formatter="{b}: {c}")) ) bar = ( Bar(init_opts=InitOpts(width="100%", height="250px")) .add_xaxis(indx_names) .add_yaxis("指标值", indx_vals) .set_global_opts( title_opts=opts.TitleOpts(title="指标柱状图", padding=[20,0,0,0]), datazoom_opts=[opts.DataZoomOpts(type_="inside")] ) ) # 生成独立图表HTML pie_html = pie.render_embed() bar_html = bar.render_embed() # 构建最终HTML结构 responsive_html = f"""
{pie_html}
{bar_html}
""" except Exception as e: logger.error(f"获取数据资产指标列表信息失败: {str(e)}") raise ServiceException(message=f"获取数据资产指标列表信息失败: {str(e)}") else: return responsive_html