from fastapi import Request from sqlalchemy.ext.asyncio import AsyncSession from exceptions.exception import ServiceException from module_admin.dao.datastd_dao import DataStdDao from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.datastd_vo import DataStdCodeModel,DeleteDataStdModel,DataStdDictModel,DataStdMainModel,DataStdMainApprModel,DataStdDictApprModel,DataStdCodeApprModel,DataStdCodePageQueryModel from utils.common_util import CamelCaseUtil import uuid from module_admin.entity.vo.approval_vo import ApplyModel from module_admin.service.approval_service import ApprovalService from collections import defaultdict from datetime import datetime from config.constant import CommonConstant from module_admin.entity.vo.data_ast_content_vo import DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogResponseWithChildren,DataCatalogMovedRequest,DataCatalogMergeRequest,DataCatalogChild,DataCatalogMoverelRequest from module_admin.entity.vo.user_vo import CurrentUserModel class DataStdService: """ 数据源标准服务层 """ @classmethod async def get_std_code_list_services( cls, query_db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False ): """ 获取列配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列配置列表信息对象 """ col_list_result = await DataStdDao.get_std_code_list(query_db, query_object, is_page) return col_list_result # @classmethod # async def get_appr_std_code_list_services( # cls, query_db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False # ): # col_list_result = await DataStdDao.get_appr_std_code_list(query_db, query_object, is_page) # # for 循环col_list_result.rows 如果changeType是update的 去查他的compareId 用oldItem= get_std_code_appr_by_id(compareId ) 获取原始数据 比较 cd_no cd_val_cn_mean code_map_id (code_map_id都为空的情况应该判断一致) 如果三个属性都一致的 判定一致 移除这个col_list_result,不一致的把改属性改为 item.cd_no=oldItem.cd_no-->item.cd_no # return col_list_result @classmethod async def get_appr_std_code_list_services( cls, query_db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False ): col_list_result = await DataStdDao.get_appr_std_code_list(query_db, query_object, is_page) filtered_rows = [] for item in col_list_result.rows: if item.get("changeType") == "update" and item.get("compareId"): old_item = await DataStdDao.get_std_code_appr_by_id(query_db, item.get("compareId")) # 判断三个字段是否一致(空值也算一致) is_code_num_same = item.get("cdNo") == old_item.cd_no is_code_name_same = item.get("cdValCnMean") == old_item.cd_val_cn_mean is_code_map_id_same = ( (not item.get("codeMapId") and not old_item.code_map_id) or (item.get("codeMapId") == old_item.code_map_id) ) if is_code_num_same and is_code_name_same and is_code_map_id_same: continue # 跳过完全一致的记录 else: # 拼接变更格式:旧值 ==> 新值 if not is_code_num_same: item["cdNo"] = f"{old_item.cd_no or ''} ==> {item.get('cdNo', '')}" if not is_code_name_same: item["cdValCnMean"] = f"{old_item.cd_val_cn_mean or ''} ==> {item.get('cdValCnMean', '')}" if not is_code_map_id_same: old_val = old_item.code_map_id or "" new_val = item.get("codeMapId") or "" item["codeMapId"] = f"{old_val} ==> {new_val}" filtered_rows.append(item) else: filtered_rows.append(item) col_list_result.rows = filtered_rows return col_list_result @classmethod async def get_std_code_appr_list(cls, query_db: AsyncSession, query_object: DataStdCodeApprModel, is_page: bool = False): return await DataStdDao.get_std_code_appr_list(query_object.flowId, query_db) @classmethod async def get_std_code_map_list_services( cls, query_db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False ): """ 获取列配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列配置列表信息对象 """ col_list_result = await DataStdDao.get_std_code_map_list(query_db, query_object, is_page) return col_list_result @classmethod async def get_std_code_by_id_services(cls, query_db: AsyncSession, id: str): """ 获取列配置详细信息service :param query_db: orm对象 :param id: 列配置ID :return: 列配置详细信息对象 """ col = await DataStdDao.get_std_code_by_id(query_db, id) if col: result = DataStdCodeModel(**CamelCaseUtil.transform_result(col)) else: result = DataStdCodeModel(**dict()) return result @classmethod async def add_std_code_services(cls, request: Request, query_db: AsyncSession, page_object: DataStdCodeModel): """ 新增列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 新增的列配置对象 :return: 新增列配置校验结果 """ try: # 创建新的 page_object 实例,避免修改原始对象 new_page_object = DataStdCodeModel(**page_object.model_dump(by_alias=True)) new_page_object.cd_val_stat="9" # 调用 DAO 方法插入数据 await DataStdDao.add_std_code(query_db, new_page_object) return CrudResponseModel(is_success=True, message='新增标准代码成功') except Exception as e: await query_db.rollback() raise e @classmethod async def code_detail_services(cls, query_db: AsyncSession, col: str): """ 获取参数配置详细信息service :param query_db: orm对象 :param config_id: 参数配置id :return: 参数配置id对应的信息 """ config = await DataStdDao.get_std_code_by_id(query_db, col) if config: result = DataStdCodeModel(**CamelCaseUtil.transform_result(config)) else: result = DataStdCodeModel(**dict()) return result @classmethod async def code_appr_detail_services(cls, query_db: AsyncSession, col: str): """ 获取参数配置详细信息service :param query_db: orm对象 :param config_id: 参数配置id :return: 参数配置id对应的信息 """ config = await DataStdDao.get_std_code_appr_by_id(query_db, col) if config: result = DataStdCodeApprModel(**CamelCaseUtil.transform_result(config)) else: result = DataStdCodeApprModel(**dict()) return result @classmethod async def edit_std_code_services(cls, request: Request, query_db: AsyncSession, page_object: DataStdCodeModel): """ 编辑列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 编辑的列配置对象 :return: 编辑列配置校验结果 """ edit_col = page_object.model_dump(exclude_unset=True) col_info = await cls.get_std_code_by_id_services(query_db, page_object.onum) if col_info: try: await DataStdDao.update_std_code(query_db, edit_col) return CrudResponseModel(is_success=True, message='编辑标准代码成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message=f'标准代码{page_object.onum}不存在') @classmethod async def delete_std_code_with_items_appr( cls, ids: str, query_db: AsyncSession, current_user: CurrentUserModel ): """ 批量删除标准代码及其代码项的审批流程。 :param ids: 多个标准代码ID,用逗号分隔 """ try: id_list = [i.strip() for i in ids.split(',') if i.strip()] if not id_list: raise ServiceException(message="请提供有效的标准代码ID列表") for std_id in id_list: data_std_no = await DataStdDao.get_std_code_by_id(query_db, std_id) if not data_std_no: raise ServiceException(message=f"ID 为 {std_id} 的标准代码不存在,无法提交删除审批") flow_id = str(uuid.uuid4()) last_appr= await DataStdDao.get_last_std_code_appr_by_id(query_db,std_id) # 标准代码审批记录 std_code_appr = DataStdCodeApprModel(**CamelCaseUtil.transform_result(data_std_no)) std_code_appr.changeType = "delete" std_code_appr.compareId = last_appr.onum std_code_appr.oldInstId = data_std_no.onum std_code_appr.approStatus = "waiting" std_code_appr.cd_val_stat = "9" std_code_appr.flowId = flow_id std_code_appr.onum=str(uuid.uuid4()) std_code_appr.create_by = current_user.user.user_name std_code_appr.create_time = datetime.now() await DataStdDao.add_std_code_appr(query_db, std_code_appr) queryCodeItem = DataStdCodePageQueryModel() queryCodeItem.parent_id=data_std_no.onum queryCodeItem.class_id="codeItem" queryCodeItem.page_size=100 queryCodeItem.page_num=1 code_item_list_old = await DataStdDao.get_std_code_list(query_db, queryCodeItem, False) for code_item in code_item_list_old: appr_item=DataStdCodeApprModel(**code_item) last_appr_item= await DataStdDao.get_last_std_code_appr_by_id(query_db,std_id) appr_item.changeType = "delete" appr_item.parent_id = std_code_appr.onum appr_item.compareId = last_appr_item.onum appr_item.oldInstId = appr_item.onum appr_item.approStatus = "waiting" appr_item.cd_val_stat = "9" appr_item.flowId = flow_id appr_item.onum=str(uuid.uuid4()) appr_item.create_by = current_user.user.user_name appr_item.create_time = datetime.now() await DataStdDao.add_std_code_appr(query_db, appr_item) # 发起审批流程(每个标准代码一条审批) apply_model = ApplyModel() apply_model.businessType = "dataStdCode" apply_model.businessId = flow_id apply_model.applicant = current_user.user.user_name await ApprovalService.apply_services(query_db, apply_model, 'dataStdCode') return CrudResponseModel(is_success=True, message="批量提交删除标准代码审批成功!") except Exception as e: await query_db.rollback() raise ServiceException(message=f"提交删除标准代码审批失败: {str(e)}") @classmethod async def delete_std_code_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteDataStdModel): """ 删除列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 删除列配置对象 :return: 删除列配置校验结果 """ if page_object.ids: col_id_list = page_object.ids.split(',') try: for col_id in col_id_list: col_info = await cls.get_std_code_by_id_services(query_db, col_id) if col_info: # 校验不能删除的系统内置列 await DataStdDao.delete_std_code(query_db, col_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除标准代码成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入标准代码ID为空') @classmethod async def add_std_code_with_items_services(cls, request, query_db: AsyncSession, code: DataStdCodeModel, code_item_list: list[DataStdCodeModel]): """ 新增标准代码和对应的代码项 """ try: # 先校验标准代码是否唯一 if not await cls.check_code_unique_services(query_db, code): raise ServiceException(message=f'新增代码{code.cd_val_cn_mean}失败,{code.cd_no}代码已存在') else: # 保存标准代码 code.onum=str(uuid.uuid4()) await cls.add_std_code_services(request, query_db, code) # 为每个代码项设置parent_id,并进行校验 for code_item in code_item_list: # 校验同一个父级下,code是否已存在 code_item.parent_id = code.onum if not await cls.check_code_unique_services(query_db, code_item): raise ServiceException(message=f"父级代码{code.cd_no}下的代码项{code_item.cd_no}已存在") # 将parent_id设为code的ID code_item.onum=str(uuid.uuid4()) code_item.create_by=code.create_by code_item.create_time=code.create_time code_item.upd_prsn=code.upd_prsn code_item.upd_time=code.upd_time await cls.add_std_code_services(request, query_db, code_item) # 提交事务 await query_db.commit() return CrudResponseModel(is_success=True, message='标准代码和代码项新增成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"新增标准代码失败: {str(e)}") @classmethod async def add_std_code_with_items_appr( cls, code: DataStdCodeModel, code_item_list: list[DataStdCodeModel], query_db: AsyncSession, current_user: CurrentUserModel ): # 设置标准代码的创建和更新时间 code.onum = str(uuid.uuid4()) # 新生成 ID code.create_by = current_user.user.user_name code.create_time = datetime.now() code.upd_prsn = current_user.user.user_name code.upd_time = datetime.now() # 校验标准代码是否已存在于正式表中 if not await cls.check_code_unique_services(query_db, code): raise ServiceException(message=f'标准代码{code.cd_no}已经存在于正式表中,无法新增') # 先创建标准代码的审批记录 appr_model = DataStdCodeApprModel(**code.model_dump(exclude_unset=True, by_alias=True)) appr_model.changeType = "add" appr_model.compareId = code.onum appr_model.oldInstId = code.onum appr_model.cd_val_stat="9" appr_model.approStatus = "waiting" appr_model.flowId = str(uuid.uuid4()) # 新生成 Flow ID # 将标准代码添加到审批表 await DataStdDao.add_std_code_appr(query_db, appr_model) # 同时处理代码项的添加 for item in code_item_list: item.onum = str(uuid.uuid4()) # 新生成 ID item.create_by = current_user.user.user_name item.create_time = datetime.now() item.upd_prsn = current_user.user.user_name item.upd_time = datetime.now() item.src_sys = '公司级' if 'company' == item.cd_type else item.src_sys item.parent_id = appr_model.onum # 设置父级 ID # 校验代码项是否已存在于正式表中 if not await cls.check_code_unique_services(query_db, item): raise ServiceException(message=f'代码项{item.cd_no}已经存在于正式表中,无法新增') # 同样需要创建审批记录 item_appr_model = DataStdCodeApprModel(**item.model_dump(exclude_unset=True, by_alias=True)) item_appr_model.changeType = "add" item_appr_model.compareId = item.onum item_appr_model.oldInstId = item.onum item_appr_model.approStatus = "waiting" item_appr_model.flowId = appr_model.flowId # 使用相同的 Flow ID await DataStdDao.add_std_code_appr(query_db, item_appr_model) # 创建审批申请 apply_model = ApplyModel() apply_model.businessType = "dataStdCode" apply_model.businessId = appr_model.flowId apply_model.applicant = appr_model.create_by await ApprovalService.apply_services(query_db, apply_model, 'dataStdCode') return CrudResponseModel(is_success=True, message='提交新增标准代码审批成功!') @classmethod async def edit_std_code_with_items_services(cls, request, query_db: AsyncSession, code: DataStdCodeModel, code_item_list: list[DataStdCodeModel]): """ 修改标准代码和对应的代码项。如果code_item.id为0,则执行新增操作,否则执行更新操作。 """ try: # 校验标准代码是否存在并且唯一 if not await cls.check_code_unique_services(query_db, code): raise ServiceException(message=f'修改代码{code.cd_val_cn_mean}失败,{code.cd_no}代码不存在或已存在') # 更新标准代码 await cls.edit_std_code_services(request, query_db, code) # 为每个代码项设置parent_id,并进行校验 for code_item in code_item_list: code_item.parent_id = code.onum if code_item.onum : # 校验同一个父级下,code_num是否已存在 if not await cls.check_code_unique_services(query_db, code_item): raise ServiceException(message=f"父级代码{code.cd_no}下的代码项{code_item.cd_no}已存在") # 更新代码项的相关信息 code_item.upd_time = code.upd_time code_item.upd_prsn = code.upd_prsn await cls.edit_std_code_services(request, query_db, code_item) # 执行更新操作 else: # 如果code_item的id不为0,表示是修改操作 # 校验同一个父级下,code_num是否已存在 if not await cls.check_code_unique_services(query_db, code_item): raise ServiceException(message=f"父级代码{code.cd_no}下的代码项{code_item.cd_no}已存在") # 生成新的UUID,并新增代码项 code_item.onum = str(uuid.uuid4()) # 新生成的UUID code_item.create_time = code.upd_time code_item.create_by = code.create_by code_item.upd_time = code.upd_time code_item.upd_prsn = code.upd_prsn await cls.add_std_code_services(request, query_db, code_item) # 执行新增操作 # 提交事务 await query_db.commit() return CrudResponseModel(is_success=True, message='标准代码和代码项修改成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"修改标准代码失败: {str(e)}") @classmethod async def edit_std_code_with_items_appr( cls, code: DataStdCodeModel, code_item_list: list[DataStdCodeModel], query_db: AsyncSession, current_user: CurrentUserModel ): """ 修改标准代码及其代码项的审批流程(修改审批)。 code_item.onum 存在为修改,若为空则为新增。 """ # 更新时间及人 code.create_by = current_user.user.user_name code.create_time = datetime.now() # 校验标准代码是否存在并且唯一 if not await cls.check_code_unique_services(query_db, code): raise ServiceException(message=f'标准代码{code.cd_no}不存在或已存在,无法提交修改审批') # 创建标准代码的审批记录(changeType 为 update) appr_model = DataStdCodeApprModel(**code.model_dump(exclude_unset=True, by_alias=True)) last_appr= await DataStdDao.get_last_std_code_appr_by_id(query_db,code.onum) appr_model.changeType = "update" appr_model.compareId = last_appr.onum # 对比对象为正式表的 ID appr_model.oldInstId = code.onum appr_model.cd_val_stat = "9" appr_model.approStatus = "waiting" appr_model.flowId = str(uuid.uuid4()) # flowId 共用 appr_model.onum = str(uuid.uuid4()) # flowId 共用 await DataStdDao.add_std_code_appr(query_db, appr_model) # query_object. query_object = DataStdCodePageQueryModel( ) query_object.parent_id=code.onum query_object.class_id="codeItem" query_object.page_size=100 query_object.page_num=1 code_item_list_old = await DataStdDao.get_std_code_list(query_db, query_object, False) new_ids = {item.onum for item in code_item_list if item.onum} # 同步处理代码项 for item in code_item_list: item.parent_id = code.onum item.upd_prsn = current_user.user.user_name item.upd_time = datetime.now() item.src_sys = '公司级' if 'company' == item.cd_type else item.src_sys if item.onum: # 修改项 if not await cls.check_code_unique_services(query_db, item): raise ServiceException(message=f'父级代码{code.cd_no}下代码项{item.cd_no}重复,无法提交修改审批') item_appr_model = DataStdCodeApprModel(**item.model_dump(exclude_unset=True, by_alias=True)) last_item_appr= await DataStdDao.get_last_std_code_appr_by_id(query_db,item_appr_model.onum) item_appr_model.changeType = "update" item_appr_model.compareId = last_item_appr.onum item_appr_model.parent_id = code.onum item_appr_model.oldInstId = item.onum item_appr_model.approStatus = "waiting" item_appr_model.flowId = appr_model.flowId item_appr_model.onum = str(uuid.uuid4()) # flowId 共用 await DataStdDao.add_std_code_appr(query_db, item_appr_model) else: # 新增项 item.onum = str(uuid.uuid4()) item.create_by = current_user.user.user_name item.create_time = datetime.now() if not await cls.check_code_unique_services(query_db, item): raise ServiceException(message=f'父级代码{code.cd_no}下代码项{item.cd_no}重复,无法提交新增审批') item_appr_model = DataStdCodeApprModel(**item.model_dump(exclude_unset=True, by_alias=True)) item_appr_model.changeType = "add" item_appr_model.compareId = item.onum item_appr_model.oldInstId = item.onum item_appr_model.approStatus = "waiting" item_appr_model.flowId = appr_model.flowId item_appr_model.parent_id = code.onum await DataStdDao.add_std_code_appr(query_db, item_appr_model) for old_item in code_item_list_old: if old_item["onum"] not in new_ids: # 创建“删除”审批记录 delete_appr_model = DataStdCodeApprModel(**old_item) delete_appr_model.changeType = "delete" last_appr= await DataStdDao.get_last_std_code_appr_by_id(query_db,old_item["onum"]) delete_appr_model.compareId = last_appr.onum delete_appr_model.oldInstId = old_item["onum"] delete_appr_model.approStatus = "waiting" delete_appr_model.flowId = appr_model.flowId delete_appr_model.onum = str(uuid.uuid4()) delete_appr_model.parent_id = code.onum delete_appr_model.create_by = current_user.user.user_name delete_appr_model.create_time = datetime.now() await DataStdDao.add_std_code_appr(query_db, delete_appr_model) # 创建审批流程 apply_model = ApplyModel() apply_model.businessType = "dataStdCode" apply_model.businessId = appr_model.flowId apply_model.applicant = appr_model.upd_prsn await ApprovalService.apply_services(query_db, apply_model, 'dataStdCode') return CrudResponseModel(is_success=True, message='提交修改标准代码审批成功!') @classmethod async def check_code_unique_services(cls, query_db: AsyncSession, page_object: DataStdCodeModel): """ 校验字典类型称是否唯一service :param query_db: orm对象 :param page_object: 字典类型对象 :return: 校验结果 """ id = -1 if page_object.onum is None else page_object.onum codemodel=DataStdCodeModel() codemodel.cd_no=page_object.cd_no codemodel.class_id=page_object.class_id codemodel.parent_id=page_object.parent_id codemodel.cd_type=page_object.cd_type data_dict_type = await DataStdDao.get_data_code_by_info(query_db, codemodel ) if data_dict_type and data_dict_type.onum != id: return CommonConstant.NOT_UNIQUE return CommonConstant.UNIQUE # ----------------------------------------------------------------数据字典---------------------------------------------------------------------------------------------------- @classmethod async def get_std_dict_list_services( cls, query_db: AsyncSession, query_object: DataStdDictModel, is_page: bool = False ): """ 获取列配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列配置列表信息对象 """ col_list_result = await DataStdDao.get_std_dict_list(query_db, query_object, is_page) return col_list_result @classmethod async def get_std_dict_by_id_services(cls, query_db: AsyncSession, id: str): """ 获取列配置详细信息service :param query_db: orm对象 :param id: 列配置ID :return: 列配置详细信息对象 """ col = await DataStdDao.get_std_dict_by_id(query_db, id) if col: result = DataStdDictModel(**CamelCaseUtil.transform_result(col)) else: result = DataStdDictModel(**dict()) return result @classmethod async def add_std_dict_services(cls, request: Request, query_db: AsyncSession, page_object: DataStdDictModel): """ 新增列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 新增的列配置对象 :return: 新增列配置校验结果 """ try: if not await cls.check_dict_unique_services(query_db, page_object): raise ServiceException(message=f"数据字典{page_object.data_dict_no}已存在") # 创建新的 page_object 实例,避免修改原始对象 new_page_object = DataStdDictModel(**page_object.model_dump(by_alias=True)) new_page_object.data_dict_stat="9" new_page_object.onum=str(uuid.uuid4()) # 调用 DAO 方法插入数据 await DataStdDao.add_std_dict(query_db, new_page_object) return CrudResponseModel(is_success=True, message='新增数据字典成功') except Exception as e: await query_db.rollback() raise e @classmethod async def dict_detail_services(cls, query_db: AsyncSession, col: str): """ 获取参数配置详细信息service :param query_db: orm对象 :param config_id: 参数配置id :return: 参数配置id对应的信息 """ config = await DataStdDao.get_std_dict_by_id(query_db, col) if config: result = DataStdDictModel(**CamelCaseUtil.transform_result(config)) else: result = DataStdDictModel(**dict()) return result @classmethod async def edit_std_dict_services(cls, request: Request, query_db: AsyncSession, page_object: DataStdDictModel): """ 编辑列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 编辑的列配置对象 :return: 编辑列配置校验结果 """ if not await cls.check_dict_unique_services(query_db, page_object): raise ServiceException(message=f"数据字典编号:{page_object.data_dict_no}已存在") edit_col = page_object.model_dump(exclude_unset=True) col_info = await cls.get_std_dict_by_id_services(query_db, page_object.onum) if col_info: try: await DataStdDao.update_std_dict(query_db, edit_col) return CrudResponseModel(is_success=True, message='编辑数据字典成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message=f'数据字典{page_object.onum}不存在') @classmethod async def delete_std_dict_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteDataStdModel): """ 删除列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 删除列配置对象 :return: 删除列配置校验结果 """ if page_object.ids: col_id_list = page_object.ids.split(',') try: for col_id in col_id_list: col_info = await cls.get_std_dict_by_id_services(query_db, col_id) if col_info: # 校验不能删除的系统内置列 await DataStdDao.delete_std_dict(query_db, col_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除数据字典成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入数据字典ID为空') @classmethod async def check_dict_unique_services(cls, query_db: AsyncSession, page_object: DataStdDictModel): """ 校验字典类型称是否唯一service :param query_db: orm对象 :param page_object: 字典类型对象 :return: 校验结果 """ id ='-1' if page_object.onum is None else page_object.onum codemodel=DataStdDictModel() codemodel.data_dict_no=page_object.data_dict_no data_dict_type = await DataStdDao.get_data_dict_by_info(query_db, codemodel ) if data_dict_type and data_dict_type.onum != id: return CommonConstant.NOT_UNIQUE return CommonConstant.UNIQUE # ----------------------------------------------------------------数据标准分类---------------------------------------------------------------------------------------------------- @classmethod async def get_catalog_list_services( cls, query_db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, is_page: bool = False ): """ 获取数据目录列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 数据目录列表信息对象 """ catalog_list_result = await DataStdDao.get_catalog_list(query_db, query_object, user_id, is_page) # 按contentOnum分组 grouped = defaultdict(list) for item in catalog_list_result.rows: grouped[item['contentOnum']].append(item) nodes = {} # 存储所有处理后的节点 # 处理每个组,生成节点 for belt_data_std_content, items in grouped.items(): first_item = items[0] is_leaf = first_item['leafNodeFlag'] == 1 rela_onum = first_item['relaOnum'] is not None # 公共字段提取 common_fields = { 'contentOnum': first_item['contentOnum'], 'contentName': first_item['contentName'], 'contentStat': first_item['contentStat'], 'contentIntr': first_item['contentIntr'], 'contentPic': first_item['contentPic'], 'suprContentOnum': first_item['suprContentOnum'], 'leafNodeFlag': first_item['leafNodeFlag'], 'updPrsn': first_item['updPrsn'], 'updTime': first_item['updTime'], 'children': [] } nodes[belt_data_std_content] = common_fields # 构建父子关系 root = None for belt_data_std_content, node in nodes.items(): supr = node['suprContentOnum'] if supr is None: root = node else: parent = nodes.get(supr) if parent: parent['children'].append(node) # 对每个父节点的children进行排序,将'临时的节点'放到最后 for belt_data_std_content, node in nodes.items(): if 'children' in node: # 排序时,'临时的节点'会被放到最后 node['children'] = sorted( node['children'], key=lambda x: x['contentName'] == '临时', reverse=False # True会将'临时的节点'排在最前面,False是排在最后 ) print('获取数据清单内容:',root) catalog_list_result.rows = [root] return catalog_list_result @classmethod async def get_catalog_detail_services(cls, query_db: AsyncSession, belt_data_std_content: int): """ 获取数据目录详细信息service :param query_db: orm对象 :param belt_data_std_content: 数据目录ID :return: 数据目录详细信息对象 """ catalog_detail_result = await DataStdDao.get_catalog_by_id(query_db, belt_data_std_content) return catalog_detail_result @classmethod async def add_catalog_services(cls, query_db: AsyncSession, request: DataCatalogResponseWithChildren): """ 新增数据目录信息service :param query_db: orm对象 :param request: 新增数据目录请求对象 :return: 新增目录操作结果 """ catalog_data1 = { 'content_name': request.content_name, 'content_stat': request.content_stat, 'content_intr': request.content_intr, 'content_pic': request.content_pic, 'supr_content_onum': request.supr_content_onum, 'leaf_node_flag': request.leaf_node_flag, 'upd_prsn': request.upd_prsn } catalog_data2 = { 'content_name': request.content_name, 'content_stat': request.content_stat, 'content_intr': request.content_intr, 'content_pic': request.content_pic, 'supr_content_onum': request.supr_content_onum, 'leaf_node_flag': request.leaf_node_flag, 'upd_prsn': request.upd_prsn, 'children': [child.model_dump() for child in request.children] # 将 children 转换为字典列表 } try: for child in catalog_data2["children"]: child["rela_eff_begn_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 设置默认值,当前时间 child["rela_eff_end_date"] = datetime(year=2999, month=12, day=31, hour=0, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S"), # 设置默认值,2999-12-31 child["upd_prsn"] = request.upd_prsn, child["rela_status"] = "1" new_catalog = await DataStdDao.add_catalog_dao(query_db, catalog_data1, catalog_data2) await query_db.commit() return CrudResponseModel(is_success=True, message='新增成功', data=new_catalog) except Exception as e: await query_db.rollback() raise ServiceException(message=f"创建目录时发生错误: {str(e)}") @classmethod async def edit_catalog_leaf_services(cls, query_db: AsyncSession,belt_data_std_content : int, leaf_node_flag : int): """ 编辑数据目录信息service :param query_db: orm对象 :param request: 编辑数据目录请求对象 :return: 编辑目录操作结果 """ catalog_data1 = { 'belt_data_std_content': belt_data_std_content, 'leaf_node_flag': leaf_node_flag } try: await DataStdDao.edit_catalog_leaf_dao(query_db, catalog_data1) await query_db.commit() return CrudResponseModel(is_success=True, message='更新成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"更新目录时发生错误: {str(e)}") @classmethod async def edit_catalog_child_services(cls, query_db: AsyncSession, request: DataCatalogResponseWithChildren): """ 编辑数据目录信息service :param query_db: orm对象 :param request: 编辑数据目录请求对象 :return: 编辑目录操作结果 """ catalog_data = { 'belt_data_std_content': request.belt_data_std_content, 'content_name': request.content_name, 'content_stat': request.content_stat, 'content_intr': request.content_intr, 'content_pic': request.content_pic, 'supr_content_onum': request.supr_content_onum, 'leaf_node_flag': request.leaf_node_flag, 'upd_prsn': request.upd_prsn, 'children': [child.model_dump() for child in request.children] # 将 children 转换为字典列表 } try: for child in catalog_data["children"]: # 设置 rela_eff_begn_date if child.get("rela_eff_begn_date"): child["rela_eff_begn_date"] = child["rela_eff_begn_date"].strftime("%Y-%m-%d %H:%M:%S") else: child["rela_eff_begn_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 设置 rela_eff_end_date if child.get("rela_eff_end_date"): child["rela_eff_end_date"] = child["rela_eff_end_date"].strftime("%Y-%m-%d %H:%M:%S") else: child["rela_eff_end_date"] = datetime(year=2999, month=12, day=31, hour=0, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S") child["upd_prsn"] = request.upd_prsn child["rela_status"] = "1" await DataStdDao.edit_catalog_child_dao(query_db, catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='更新成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"更新目录时发生错误: {str(e)}") @classmethod async def delete_catalog_services(cls, query_db: AsyncSession, request: DeleteDataCatalogModel,user_id): """ 删除数据目录信息service :param query_db: orm对象 :param request: 删除数据目录请求对象 :return: 删除目录操作结果 """ if request.content_onums: content_onum_list = request.content_onums.split(',') try: for belt_data_std_content in content_onum_list: catalog = await cls.get_catalog_detail_services(query_db, int(belt_data_std_content)) if not catalog: raise ServiceException(message=f'目录ID {belt_data_std_content} 不存在') await DataStdDao.delete_catalog_dao(query_db, DeleteDataCatalogModel(content_onums=belt_data_std_content)) # await DataStdDao.delete_ast_book_mark_rela_by_content_onum(query_db, int(belt_data_std_content), user_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入目录id为空') @classmethod async def get_data_asset_catalog_tree_services(cls, query_db: AsyncSession): """ 获取数据资产树信息service :param query_db: orm对象 :return: 数据资产树信息对象 """ # 从 DAO 层获取数据 rows = await DataStdDao.get_data_asset_catalog_tree(query_db) # 使用字典暂存分组数据 sys_groups = {} for item in rows: src_sys, eng_name, cn_name, ast_no = item # 创建或获取系统分组 if src_sys not in sys_groups: sys_groups[src_sys] = { "dataAssetSysName": src_sys, "children": [] } # 添加子节点 sys_groups[src_sys]["children"].append({ "dataAssetCatalogNo": eng_name, "dataAssetCatalogName": cn_name, "dataAssetCatalogAstno": ast_no }) results = list(sys_groups.values()) # 转换为最终列表格式 return results @classmethod async def moved_catalog_instr_services(cls, query_db: AsyncSession, request: DataCatalogMovedRequest): """ 移动数据目录service """ moved_catalog_data = { 'belt_data_std_content': request.belt_data_std_content, 'supr_content_onum': request.supr_content_onum, 'supr_content_onum_after': request.supr_content_onum_after } try: await DataStdDao.moved_catalog_instr_dao(query_db, moved_catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='目录移动成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"移动目录时发生错误: {str(e)}") @classmethod async def merge_catalog_instr_services(cls, query_db: AsyncSession, request: DataCatalogMergeRequest): """ 移动数据目录service """ merge_catalog_data = { 'belt_data_std_content': request.belt_data_std_content, 'supr_content_onum': request.supr_content_onum, 'content_onum_after': request.content_onum_after, 'supr_content_onum_after': request.supr_content_onum_after } try: await DataStdDao.merge_catalog_instr_dao(query_db, merge_catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='目录合并成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"目录合并时发生错误: {str(e)}") @classmethod async def removerel_data_ast_catalog_services(cls, query_db: AsyncSession, request: DataCatalogChild): """ 移除数据资产目录service """ removerel_catalog_data = { 'rela_onum': request.rela_onum, 'belt_data_std_content': request.belt_data_std_content, 'ast_onum': request.ast_onum, 'rela_type': request.rela_type, 'rela_eff_begn_date': request.rela_eff_begn_date, 'rela_eff_end_date': request.rela_eff_end_date, 'upd_prsn': request.upd_prsn, 'rela_status': request.rela_status } try: await DataStdDao.removerel_data_ast_catalog_dao(query_db, removerel_catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='资产移除成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"移除资产时发生错误: {str(e)}") @classmethod async def moverel_data_ast_catalog_services(cls, query_db: AsyncSession, request: DataCatalogMoverelRequest): """ 移动数据目录service """ moverel_catalog_data = { 'rela_onum': request.rela_onum, 'belt_data_std_content': request.belt_data_std_content, 'content_onum_after': request.content_onum_after } try: await DataStdDao.moverel_data_ast_catalog_dao(query_db, moverel_catalog_data) await query_db.commit() return CrudResponseModel(is_success=True, message='资产移动成功') except Exception as e: await query_db.rollback() raise ServiceException(message=f"资产移动时发生错误: {str(e)}") #------------------------------------------------------------数据标准-------------------------------------------------------------------------------------------------------------- @classmethod async def check_std_num_unique(cls, query_db: AsyncSession, model: DataStdMainModel): """ 校验标准编号是否唯一 """ id = '-1' if model.onum is None else model.onum check_model = DataStdMainModel() check_model.data_std_no=model.data_std_no existing = await DataStdDao.get_data_main_by_info(query_db, check_model) return CommonConstant.NOT_UNIQUE if existing and existing.onum != id else CommonConstant.UNIQUE @classmethod async def get_std_main_list(cls, query_db: AsyncSession, query_object: DataStdMainModel, is_page: bool = False): return await DataStdDao.get_std_main_list(query_db, query_object, is_page) @classmethod async def get_std_main_appr_list(cls, query_db: AsyncSession, query_object: DataStdMainApprModel, is_page: bool = False): return await DataStdDao.get_std_main_appr_list(query_object.flowId, query_db) @classmethod async def get_std_dict_appr_list(cls, query_db: AsyncSession, query_object: DataStdDictApprModel, is_page: bool = False): return await DataStdDao.get_std_dict_appr_list(query_object.flowId, query_db) @classmethod async def get_std_main_list_all(cls, query_db: AsyncSession, query_object: DataStdMainModel): main_query_result= await DataStdDao.get_std_main_list_all(query_db, query_object) return main_query_result @classmethod async def get_std_main_by_id(cls, query_db: AsyncSession, id: str): mainstd= await DataStdDao.get_std_main_by_id(query_db, id) if mainstd: return DataStdMainModel(**CamelCaseUtil.transform_result(mainstd)) else: return DataStdMainModel(**dict()) @classmethod async def get_std_main_appr_by_id(cls, query_db: AsyncSession, id: str): mainstd= await DataStdDao.get_std_main_appr_by_id(query_db, id) if mainstd: return DataStdMainModel(**CamelCaseUtil.transform_result(mainstd)) else: return DataStdMainModel(**dict()) @classmethod async def get_std_main_change_by_id(cls, query_db: AsyncSession, id: str): mainstd= await DataStdDao.get_std_main_appr_by_id(query_db, id) if mainstd: if mainstd.changeType == "edit": compareMainstd = await DataStdDao.get_std_main_appr_by_id(query_db, mainstd.compareId) # 将对象转成 dict,驼峰转小写风格字段 new_data = CamelCaseUtil.transform_result(mainstd) old_data = CamelCaseUtil.transform_result(compareMainstd) if compareMainstd else {} result_dict = {} for key, new_value in new_data.items(): old_value = old_data.get(key) if new_value != old_value and old_value is not None: # 有变化,拼接 "旧值 ==> 新值" result_dict[key] = f"{old_value} ==> {new_value}" else: result_dict[key] = new_value return result_dict # 返回 dict,不是 DataStdMainModel 实例 return DataStdMainModel(**CamelCaseUtil.transform_result(mainstd)) else: return DataStdMainModel(**dict()) @classmethod async def get_std_dict_change_by_id(cls, query_db: AsyncSession, id: str): mainstd= await DataStdDao.get_std_dict_appr_by_id(query_db, id) if mainstd: if mainstd.changeType == "edit": compareMainstd = await DataStdDao.get_std_dict_appr_by_id(query_db, mainstd.compareId) # 将对象转成 dict,驼峰转小写风格字段 new_data = CamelCaseUtil.transform_result(mainstd) old_data = CamelCaseUtil.transform_result(compareMainstd) if compareMainstd else {} result_dict = {} for key, new_value in new_data.items(): old_value = old_data.get(key) if new_value != old_value and old_value is not None: # 有变化,拼接 "旧值 ==> 新值" result_dict[key] = f"{old_value} ==> {new_value}" else: result_dict[key] = new_value return result_dict # 返回 dict,不是 DataStdMainModel 实例 return DataStdDictModel(**CamelCaseUtil.transform_result(mainstd)) else: return DataStdDictModel(**dict()) @classmethod async def add_std_main(cls, query_db: AsyncSession, model: DataStdMainModel): if not await cls.check_std_num_unique(query_db, model): raise ServiceException(message=f"标准编号 {model.data_std_no} 已存在") model.onum=str(uuid.uuid4()) model.std_status="9" model.belt_data_std_content=2 await DataStdDao.add_std_main(query_db, model) return CrudResponseModel(is_success=True, message='新增标准成功') @classmethod async def edit_std_main(cls, query_db: AsyncSession, model: DataStdMainModel): if not await cls.check_std_num_unique(query_db, model): raise ServiceException(message=f"标准编号 {model.data_std_no} 已存在") existing = await cls.get_std_main_by_id(query_db, model.onum) if existing: await DataStdDao.update_std_main(query_db, model.model_dump(exclude_unset=True)) return CrudResponseModel(is_success=True, message='编辑标准成功') else: raise ServiceException(message=f'标准 {model.onum} 不存在') @classmethod async def add_std_main_appr(cls, query_db: AsyncSession, model: DataStdMainModel): if not await cls.check_std_num_unique(query_db, model): raise ServiceException(message=f"标准编号 {model.data_std_no} 已存在") model.onum=str(uuid.uuid4()) model.std_status="9" model.belt_data_std_content=2 # 将 DataStdMainModel 转换为 DataStdMainApprModel,保留字段原始名 apprModel = DataStdMainApprModel(**model.model_dump(exclude_unset=True, by_alias=True)) apprModel.changeType="add" apprModel.compareId=model.onum apprModel.oldInstId=model.onum apprModel.approStatus="waiting" apprModel.flowId=str(uuid.uuid4()) await DataStdDao.add_std_main_appr(query_db, apprModel) applyModel = ApplyModel() applyModel.businessType = "dataStdMain" applyModel.businessId = apprModel.flowId applyModel.applicant = apprModel.create_by await ApprovalService.apply_services(query_db, applyModel, 'dataStdMain') return CrudResponseModel(is_success=True, message='新增标准成功') @classmethod async def add_std_dict_appr(cls, query_db: AsyncSession, model: DataStdDictModel): if not await cls.check_dict_unique_services(query_db, model): raise ServiceException(message=f"字典编号 {model.data_dict_no} 已存在") model.onum=str(uuid.uuid4()) model.data_dict_stat="9" # 将 DataStdMainModel 转换为 DataStdMainApprModel,保留字段原始名 apprModel = DataStdDictApprModel(**model.model_dump(exclude_unset=True, by_alias=True)) apprModel.changeType="add" apprModel.compareId=model.onum apprModel.oldInstId=model.onum apprModel.approStatus="waiting" apprModel.flowId=str(uuid.uuid4()) await DataStdDao.add_std_dict_appr(query_db, apprModel) applyModel = ApplyModel() applyModel.businessType = "dataStdDict" applyModel.businessId = apprModel.flowId applyModel.applicant = apprModel.create_by await ApprovalService.apply_services(query_db, applyModel, 'dataStdDict') return CrudResponseModel(is_success=True, message='新增标准成功') @classmethod async def edit_std_main_appr(cls, query_db: AsyncSession, model: DataStdMainModel): if not await cls.check_std_num_unique(query_db, model): raise ServiceException(message=f"标准编号 {model.data_std_no} 已存在") model.std_status="9" # 将 DataStdMainModel 转换为 DataStdMainApprModel,保留字段原始名 watingList=await DataStdDao.check_std_main_waiting(model.onum, query_db) if len(watingList)>0 : # 如果 watingList 存在,意味着标准正在审批中 raise ServiceException(message="标准正在审批中,请等待审批完成") lastAppr =await DataStdDao.get_last_std_main_appr_by_id(query_db,model.onum) apprModel = DataStdMainApprModel(**model.model_dump(exclude_unset=True, by_alias=True)) apprModel.changeType="edit" apprModel.onum=str(uuid.uuid4()) apprModel.oldInstId=model.onum apprModel.compareId=lastAppr.onum apprModel.approStatus="waiting" apprModel.flowId=str(uuid.uuid4()) await DataStdDao.add_std_main_appr(query_db, apprModel) applyModel = ApplyModel() applyModel.businessType = "dataStdMain" applyModel.businessId = apprModel.flowId applyModel.applicant = apprModel.create_by await ApprovalService.apply_services(query_db, applyModel, 'dataStdMain') return CrudResponseModel(is_success=True, message='修改标准成功') @classmethod async def edit_std_dict_appr(cls, query_db: AsyncSession, model: DataStdDictModel): if not await cls.check_dict_unique_services(query_db, model): raise ServiceException(message=f"字典编号 {model.c} 已存在") model.data_dict_stat="9" watingList=await DataStdDao.check_std_dict_waiting(model.onum, query_db) if len(watingList)>0 : # 如果 watingList 存在,意味着标准正在审批中 raise ServiceException(message="标准正在审批中,请等待审批完成") lastAppr =await DataStdDao.get_last_std_dict_appr_by_id(query_db,model.onum) apprModel = DataStdDictApprModel(**model.model_dump(exclude_unset=True, by_alias=True)) apprModel.changeType="edit" apprModel.onum=str(uuid.uuid4()) apprModel.oldInstId=model.onum apprModel.compareId=lastAppr.onum apprModel.approStatus="waiting" apprModel.flowId=str(uuid.uuid4()) await DataStdDao.add_std_dict_appr(query_db, apprModel) applyModel = ApplyModel() applyModel.businessType = "dataStdDict" applyModel.businessId = apprModel.flowId applyModel.applicant = apprModel.create_by await ApprovalService.apply_services(query_db, applyModel, 'dataStdDict') return CrudResponseModel(is_success=True, message='修改标准成功') @classmethod async def delete_std_main_Appr(cls, query_db: AsyncSession, ids: str): if ids: col_id_list = ids.split(',') try: for col_id in col_id_list: col_info = await cls.get_std_main_by_id(query_db, col_id) if col_info: watingList=await DataStdDao.check_std_main_waiting(col_info.onum,query_db ) if len(watingList)>0 : # 如果 watingList 存在,意味着标准正在审批中 raise ServiceException(message="标准正在审批中,请等待审批完成") apprModel = DataStdMainApprModel(**col_info.model_dump(exclude_unset=True, by_alias=True)) apprModel.changeType="delete" apprModel.onum=str(uuid.uuid4()) apprModel.oldInstId=col_info.onum apprModel.approStatus="waiting" apprModel.flowId=str(uuid.uuid4()) await DataStdDao.add_std_main_appr(query_db, apprModel) applyModel = ApplyModel() applyModel.businessType = "dataStdMain" applyModel.businessId = apprModel.flowId applyModel.applicant = apprModel.create_by await ApprovalService.apply_services(query_db, applyModel, 'dataStdMain') await query_db.commit() return CrudResponseModel(is_success=True, message='提交删除标准审批成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入标准ID为空') @classmethod async def delete_std_dict_Appr(cls, query_db: AsyncSession, ids: str): if ids: col_id_list = ids.split(',') try: for col_id in col_id_list: col_info = await cls.get_std_dict_by_id_services(query_db, col_id) if col_info: watingList=await DataStdDao.check_std_dict_waiting(col_info.onum,query_db ) if len(watingList)>0 : # 如果 watingList 存在,意味着标准正在审批中 raise ServiceException(message="标准正在审批中,请等待审批完成") apprModel = DataStdDictApprModel(**col_info.model_dump(exclude_unset=True, by_alias=True)) apprModel.changeType="delete" apprModel.onum=str(uuid.uuid4()) apprModel.oldInstId=col_info.onum apprModel.approStatus="waiting" apprModel.flowId=str(uuid.uuid4()) await DataStdDao.add_std_dict_appr(query_db, apprModel) applyModel = ApplyModel() applyModel.businessType = "dataStdDict" applyModel.businessId = apprModel.flowId applyModel.applicant = apprModel.create_by await ApprovalService.apply_services(query_db, applyModel, 'dataStdDict') await query_db.commit() return CrudResponseModel(is_success=True, message='提交删除字典审批成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入标准ID为空') @classmethod async def delete_std_main(cls, query_db: AsyncSession, ids: str): if ids: col_id_list = ids.split(',') try: for col_id in col_id_list: col_info = await cls.get_std_main_by_id(query_db, col_id) if col_info: # 校验不能删除的系统内置列 await DataStdDao.delete_std_main(query_db, col_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除标准成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入标准ID为空') @classmethod async def change_std_main_onum(cls, query_db: AsyncSession, ids: str,onum:int): if ids: col_id_list = ids.split(',') try: for col_id in col_id_list: col_info = await cls.get_std_main_by_id(query_db, col_id) if col_info: col_info.belt_data_std_content=onum await DataStdDao.update_std_main(query_db,col_info.model_dump(exclude_unset=True) ) await query_db.commit() return CrudResponseModel(is_success=True, message='修改标准分类成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入标准ID为空') @classmethod async def get_code_std_map_list_services(cls, query_db: AsyncSession, id: str): # 查询 A 表(t_datastd_main) check_model = DataStdMainModel() check_model.cd_id = id main_list = await DataStdDao.get_std_main_list_all(query_db, check_model) dataStdNo=await DataStdDao.get_std_code_by_id(query_db, id) if not main_list: return {"tableData": [], "children": []} # 如果 A 表没有数据,返回空结构 table_data = [] # 存储表格数据 children = [] # 存储图谱数据 for main in main_list: # 查询 B 表(t_datastd_dict),通过 A 表的 id 匹配 data_std_no dict_model = DataStdDictModel() dict_model.data_std_no = main.get('id') # 使用 get() 方法访问字段 dict_list = await DataStdDao.get_std_dict_list_all(query_db, dict_model) # 组织表格数据 if dict_list: for dict_item in dict_list: table_data.append({ "cdNo": dataStdNo.cd_no, # 使用 get() 方法访问字段 "cdValCnMean": dataStdNo.cd_val_cn_mean, # 使用 get() 方法访问字段 "dataStdNo": main.get('dataStdNo'), # 使用 get() 方法访问字段 "dataStdCnName": main.get('dataStdCnName'), # 使用 get() 方法访问字段 "dataStdNo": main.get('dataStdNo'), # 使用 get() 方法访问字段 "dataDictNo": dict_item.get('dataDictNo'), # 使用 get() 方法访问字段 "dataDictCnName": dict_item.get('dataDictCnName'), # 使用 get() 方法访问字段 "dataDictEngName": dict_item.get('dataDictEngName'), # 使用 get() 方法访问字段 }) else : table_data.append({ "cdNo": dataStdNo.cd_no, # 使用 get() 方法访问字段 "cdValCnMean": dataStdNo.cd_val_cn_mean, # 使用 get() 方法访问字段 "dataStdNo": main.get('dataStdNo'), # 使用 get() 方法访问字段 "dataStdCnName": main.get('dataStdCnName'), # 使用 get() 方法访问字段 "dataStdNo": main.get('dataStdNo'), # 使用 get() 方法访问字段 "dataDictNo": "", # 使用 get() 方法访问字段 "dataDictCnName":"", # 使用 get() 方法访问字段 "dataDictEngName": "", # 使用 get() 方法访问字段 }) # 组织图谱数据(A 表作为父节点) node = { "id": f"node_{main.get('id')}", # 使用 get() 方法访问字段 "name": main.get('dataStdCnName'), # 使用 get() 方法访问字段 "label": main.get('dataStdNo'), # 使用 get() 方法访问字段 "rate": 1.0, "status": "B", "currency": main.get('dataStdNo'), # 使用 get() 方法访问字段 "variableValue": "标准", "variableUp": True, "children": [], } # B 表数据作为子节点 for dict_item in dict_list: node["children"].append({ "id": f"dict_{dict_item.get('id')}", # 使用 get() 方法访问字段 "name": dict_item.get('dataDictCnName'), # 使用 get() 方法访问字段 "label": dict_item.get('dataDictEngName'), # 使用 get() 方法访问字段 "rate": 1.0, "status": "R", "currency": dict_item.get('dataDictNo'), # 使用 get() 方法访问字段 "variableValue": "词典", "variableUp": True, }) children.append(node) return { "tableData": table_data, "children": children } @classmethod async def get_code_map_list(cls, query_db: AsyncSession, id: str): check_model = DataStdCodeModel() dataStdNo=await DataStdDao.get_std_code_by_id(query_db, id) cd_type="公司级" if dataStdNo.cd_type=='sys': if dataStdNo.code_map_id: check_model.onum=dataStdNo.code_map_id else: check_model.onum="no" else: check_model.code_map_id = id cd_type="系统级" main_list = await DataStdDao.get_data_code_list_by_info(query_db, check_model) if not main_list: return { "children": []} # 如果 A 表没有数据,返回空结构 table_data = [] # 存储表格数据 children = [] # 存储图谱数据 for main in main_list: # 组织图谱数据(A 表作为父节点) node = { "id": f"node_{main.onum}", # 使用 get() 方法访问字段 "name": main.cd_val_cn_mean, # 使用 get() 方法访问字段 "label": main.cd_no, # 使用 get() 方法访问字段 "rate": 1.0, "status": "B", # "currency": main.get('dataStdNo'), # 使用 get() 方法访问字段 "variableValue": cd_type, "variableUp": True, "children": [], } children.append(node) return { "tableData": table_data, "children": children }