import io import json import uuid import pandas as pd from fastapi import UploadFile from module_admin.entity.vo.meta_vo import MetaPageObject, MetaColObject, SuppleModel, MetaBusinessRelShipObject, \ MetaProcQueryObject from module_admin.entity.do.meta_do import MetadataSuppInfo, MetadataFldSuppInfo, MetadataSuppInfoVett, \ MetadataFldSuppInfoVett, MetadataExtractInfo, MetadataFldTabExtractInfo from module_admin.dao.meta_dao import MetaDao from module_admin.dao.datastd_dao import DataStdDao from datetime import datetime from module_admin.entity.vo.user_vo import CurrentUserModel from module_admin.entity.vo.approval_vo import ApplyModel from sqlalchemy.ext.asyncio import AsyncSession from exceptions.exception import ServiceException, ServiceWarning from utils.pwd_util import * from utils.common_util import * from utils.log_util import logger from module_admin.service.login_service import LoginService from module_admin.service.approval_service import ApprovalService from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.user_vo import CurrentUserModel from utils.common_util import CamelCaseUtil from config.env import AppConfig import requests class MetaService: @classmethod async def get_meta_list_services(cls, result_db: AsyncSession, query_object: MetaPageObject, current_user: CurrentUserModel): """ 获取元信息service :param result_db: orm对象 :param query_object: 查询参数对象 :param data_scope_sql: 数据权限对应的查询sql语句 :return: 用户列表信息对象 """ # print("********") # 0,正常,1已发布,2审核中,3已审核,4已作废 if 'admin' in current_user.roles: meta_rel_list = await MetaDao.get_meta_rel_list(result_db, query_object) if meta_rel_list and len(meta_rel_list.rows) > 0: for table in meta_rel_list.rows: tab_list = await MetaDao.get_meta_tab_clas(result_db, table['ssysId'], table['mdlName'], table['tabEngName']) table['batchTabClas'] = CamelCaseUtil.transform_result(tab_list) return meta_rel_list else: meta_rel_list = await MetaDao.get_meta_rel_list(result_db, query_object) if meta_rel_list and len(meta_rel_list.rows) > 0: for table in meta_rel_list.rows: tab_list = await MetaDao.get_meta_tab_clas(result_db, table['ssysId'], table['mdlName'], table['tabEngName']) table['batchTabClas'] = CamelCaseUtil.transform_result(tab_list) return meta_rel_list @classmethod async def get_meta_col_list_services(cls, result_db: AsyncSession, query_object: MetaColObject): meta_result = await MetaDao.get_meta_col_list(result_db, query_object) result = CamelCaseUtil.transform_result(meta_result) for column in result: fld_list = await MetaDao.get_meta_fld_clas(result_db, column['ssysId'], column['mdlName'], column['tabEngName'], column['fldEngName']) column['batchFldClas'] = CamelCaseUtil.transform_result(fld_list) if column['dataSecLvl'] is None and column['dataDictId'] is not None: # 获取数据标准的安全等级 data_sec_lvl = await DataStdDao.get_data_sec_lvl_by_dict_id(result_db, column['dataDictId']) column['dataSecLvl'] = data_sec_lvl return result @classmethod async def get_meta_clas_list_services(cls, result_db: AsyncSession): result = await MetaDao.get_meta_clas_list(result_db) return CamelCaseUtil.transform_result(result) @classmethod async def meta_supp(cls, result_db: AsyncSession, supple: SuppleModel, current_user: CurrentUserModel): # 0,暂存 waiting,申请中,pending审核中,succeed,rejected已审核,canceled已取消 hasTable = await MetaDao.get_lastest_meta_data_supp_vett(result_db, supple.ssys_id, supple.mdl_name, supple.tab_eng_name) oldTable = await MetaDao.get_supp_table_by_vett(supple.ssys_id, supple.mdl_name, supple.tab_eng_name, result_db) tableInfo = await MetaDao.get_meta_table(supple.ssys_id, supple.mdl_name, supple.tab_eng_name, result_db) tableOnum = uuid.uuid4() businessId = uuid.uuid4() if hasTable is not None: if hasTable.apply_status == 'waiting': raise ServiceException(message=f'所补录对象已存在补录待审核记录,请等待审批完成或撤回申请后,再行补录') if hasTable.apply_status == 'pending': raise ServiceException(message=f'所补录对象已存在待审核记录,请等待审批完成后,再行补录') applyTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") suppTableInfo = MetadataSuppInfoVett() suppTableInfo.onum = tableOnum suppTableInfo.ssys_id = supple.ssys_id suppTableInfo.mdl_name = supple.mdl_name suppTableInfo.tab_eng_name = supple.tab_eng_name suppTableInfo.tab_crrct_name = supple.tab_crrct_name suppTableInfo.tab_desc = supple.tab_desc suppTableInfo.pic = supple.pic suppTableInfo.gov_flag = supple.gov_flag suppTableInfo.tab_clas = supple.tab_clas suppTableInfo.rec_subm_prsn = current_user.user.user_name suppTableInfo.apply_time = applyTime suppTableInfo.business_id = businessId suppTableInfo.apply_status = 'waiting' suppTableInfo.oldTableData = cls.castToTableStr(oldTable, tableInfo) await MetaDao.insertMetadataSuppInfoVett(suppTableInfo, result_db) if supple.columnInfo is not None and len(supple.columnInfo) > 0: for column in supple.columnInfo: oldColumn = await MetaDao.get_supp_column_by_columnInfo(supple.ssys_id, supple.mdl_name, supple.tab_eng_name, column.fld_eng_name, result_db) data_dict_name = '' if oldColumn and oldColumn.data_dict_id and oldColumn.data_dict_id != '': oldDict = await DataStdDao.get_data_dict_by_id(result_db, oldColumn.data_dict_id) data_dict_name = oldDict.data_dict_cn_name if oldDict else '' columnInfo = await MetaDao.get_meta_column(supple.ssys_id, supple.mdl_name, supple.tab_eng_name, column.fld_eng_name, result_db) suppColumnInfo = MetadataFldSuppInfoVett() suppColumnInfo.onum = uuid.uuid4() suppColumnInfo.ssys_id = supple.ssys_id suppColumnInfo.mdl_name = supple.mdl_name suppColumnInfo.tab_eng_name = supple.tab_eng_name suppColumnInfo.fld_eng_name = column.fld_eng_name suppColumnInfo.fld_crrct_name = column.fld_crrct_name suppColumnInfo.crrct_pk_flag = column.crrct_pk_flag suppColumnInfo.fld_desc = column.fld_desc suppColumnInfo.pic = column.pic suppColumnInfo.fld_clas = column.fld_clas suppColumnInfo.fld_null_rate = column.fld_null_rate suppColumnInfo.data_dict_id = column.data_dict_id suppColumnInfo.data_sec_lvl = column.data_sec_lvl suppColumnInfo.rec_stat = column.rec_stat suppColumnInfo.rec_subm_prsn = current_user.user.user_name suppColumnInfo.apply_time = applyTime suppColumnInfo.apply_status = 'waiting' suppColumnInfo.oldColumnData = cls.castToColumnStr(oldColumn, columnInfo, data_dict_name) await MetaDao.insertMetadataFldSuppInfoVett(suppColumnInfo, result_db) applyModel = ApplyModel() applyModel.businessType = "metaDataInfo" applyModel.businessId = businessId applyModel.applicant = current_user.user.user_name await ApprovalService.apply_services(result_db, applyModel, 'metaDataInfo') await result_db.commit() return CrudResponseModel(is_success=True, message='操作成功') @classmethod def castToTableStr(cls, table: MetadataSuppInfo, tableInfo: MetadataExtractInfo): tableDict = dict( onum=tableInfo.onum, crrctVerNum=table.crrct_ver_num if table else None, ssysId=tableInfo.ssys_id, mdlName=tableInfo.mdl_name, tabEngName=tableInfo.tab_eng_name, tabCnName=tableInfo.tab_cn_name, tabType=tableInfo.tab_type, tabRecNum=tableInfo.tab_rec_num, tabCrrctName=table.tab_crrct_name if table else None, tabDesc=table.tab_desc if table else None, pic=table.pic if table else None, govFlag=table.gov_flag if table else None, recStat=table.rec_stat if table else None, tabClas=table.tab_clas if table else None, recSubmPrsn=table.rec_subm_prsn if table else None, updTime=table.upd_time.isoformat() if table and table.upd_time else None ) return json.dumps(tableDict) @classmethod def castToColumnStr(cls, column: MetadataFldSuppInfo, columnInfo: MetadataFldTabExtractInfo, dictName: str): columnDict = dict( onum=columnInfo.onum, crrctVerNum=column.crrct_ver_num if column and column.crrct_ver_num else None, ssysId=columnInfo.ssys_id, mdlName=columnInfo.mdl_name, tabEngName=columnInfo.tab_eng_name, fldEngName=columnInfo.fld_eng_name, fldCnName=columnInfo.fld_cn_name, fldType=columnInfo.fld_type, pkFlag=columnInfo.pk_flag, requireFlag=columnInfo.require_flag, idxFlag=columnInfo.idx_flag, fldCrrctName=column.fld_crrct_name if column and column.fld_crrct_name else None, crrctPkFlag=column.crrct_pk_flag if column and column.crrct_pk_flag else None, fldDesc=column.fld_desc if column and column.fld_desc else None, pic=column.pic if column and column.pic else None, fldClas=column.fld_clas if column and column.fld_clas else None, fldNullRate=column.fld_null_rate if column and column.fld_null_rate else None, dataDictId=column.data_dict_id if column and column.data_dict_id else None, dataSecLvl=column.data_sec_lvl if column and column.data_sec_lvl else None, dataDictName=dictName, recStat=column.rec_stat if column and column.rec_stat else None, updTime=column.upd_time.isoformat() if column and column.upd_time else None ) return json.dumps(columnDict) @classmethod async def get_meta_apply_detail_services(cls, result_db: AsyncSession, businessId: str): tableDataList = await MetaDao.get_supp_table_vett_by_businessId(businessId, result_db) tableList = CamelCaseUtil.transform_result(tableDataList) for tableData in tableList: clas_list = await MetaDao.get_meta_tab_clas(result_db, tableData['ssysId'], tableData['mdlName'], tableData['tabEngName']) tableData['batchTabClas'] = CamelCaseUtil.transform_result(clas_list) columnData = await MetaDao.get_supp_column_vett_by_tableInfo(result_db, tableData) column_list = CamelCaseUtil.transform_result(columnData) for column in column_list: col_list = await MetaDao.get_meta_fld_clas(result_db, column['ssysId'], column['mdlName'], column['tabEngName'], column['fldEngName']) column['batchColClas'] = CamelCaseUtil.transform_result(col_list) tableData['columnList'] = column_list return tableList @classmethod async def get_table_by_id(cls, result_db: AsyncSession, tableId: int): table = await MetaDao.get_meta_table_by_id(tableId, result_db) tab_list = await MetaDao.get_meta_tab_clas(result_db, table['ssysId'], table['mdlName'], table['tabEngName']) table['batchTabClas'] = CamelCaseUtil.transform_result(tab_list) colQuery = MetaColObject() colQuery.ssys_id = table['ssysId'] colQuery.mdl_name = table['mdlName'] colQuery.tab_name = table['tabEngName'] columnList = await MetaDao.get_meta_col_list(result_db, colQuery) for column in columnList: col_list = await MetaDao.get_meta_fld_clas(result_db, column['ssys_id'], column['mdl_name'], column['tab_eng_name'], column['fld_eng_name']) column['batchColClas'] = CamelCaseUtil.transform_result(col_list) table['columnList'] = CamelCaseUtil.transform_result(columnList) return table @classmethod async def getBusinessRelationShip(cls, result_db: AsyncSession, meta_query: MetaBusinessRelShipObject): currentNodeList = await cls.getRelationByTable(result_db, meta_query.ssys_id, meta_query.mdl_name, meta_query.tab_eng_name, meta_query.type) relationList = [] if currentNodeList is not None and len(currentNodeList) > 0: for currentNode in currentNodeList: if currentNode['father'] is None: relation = {"source": {"ssys_id": currentNode['a_ssys_id'], "mdl_name": currentNode['a_mdl_name'], "tab_eng_name": currentNode['a_tab_eng_name'], "fld_eng_name": currentNode['a_fld_eng_name']}, "target": {'ssys_id': currentNode['b_ssys_id'], 'mdl_name': currentNode['b_mdl_name'], 'tab_eng_name': currentNode['b_tab_eng_name'], 'fld_eng_name': currentNode['b_fld_eng_name']}, "endArrow": False} else: relation = {"source": {"ssys_id": currentNode['a_ssys_id'], "mdl_name": currentNode['a_mdl_name'], "tab_eng_name": currentNode['a_tab_eng_name'], "fld_eng_name": currentNode['a_fld_eng_name']}, "target": {'ssys_id': currentNode['b_ssys_id'], 'mdl_name': currentNode['b_mdl_name'], 'tab_eng_name': currentNode['b_tab_eng_name'], 'fld_eng_name': currentNode['b_fld_eng_name']}, "endArrow": True} if \ currentNode['father'] == 'A' else { "source": {'ssys_id': currentNode['b_ssys_id'], 'mdl_name': currentNode['b_mdl_name'], 'tab_eng_name': currentNode['b_tab_eng_name'], 'fld_eng_name': currentNode['b_fld_eng_name']}, "target": {"ssys_id": currentNode['a_ssys_id'], "mdl_name": currentNode['a_mdl_name'], "tab_eng_name": currentNode['a_tab_eng_name'], "fld_eng_name": currentNode['a_fld_eng_name']}, "endArrow": True} relationList.append(relation) if meta_query.type == 'er': if currentNode['a_tab_eng_name'] == meta_query.tab_eng_name: if currentNode['father'] == 'A': # b为下游,取b字段的下下游 nextNodeList = (await cls.getRelationByColumn(result_db, currentNode['b_ssys_id'], currentNode['b_mdl_name'], currentNode['b_tab_eng_name'], currentNode['b_fld_eng_name'], meta_query.type, 'next')) if nextNodeList and len(nextNodeList) > 0: for nextNode in nextNodeList: relation = {"source": {"ssys_id": nextNode['a_ssys_id'], "mdl_name": nextNode['a_mdl_name'], "tab_eng_name": nextNode['a_tab_eng_name'], "fld_eng_name": nextNode['a_fld_eng_name']}, "target": {'ssys_id': nextNode['b_ssys_id'], 'mdl_name': nextNode['b_mdl_name'], 'tab_eng_name': nextNode['b_tab_eng_name'], 'fld_eng_name': nextNode['b_fld_eng_name']}, "endArrow": True} if nextNode['father'] == 'A' else \ {"source": {'ssys_id': nextNode['b_ssys_id'], 'mdl_name': nextNode['b_mdl_name'], 'tab_eng_name': nextNode['b_tab_eng_name'], 'fld_eng_name': nextNode['b_fld_eng_name']}, "target": {"ssys_id": nextNode['a_ssys_id'], "mdl_name": nextNode['a_mdl_name'], "tab_eng_name": nextNode['a_tab_eng_name'], "fld_eng_name": nextNode['a_fld_eng_name']}, "endArrow": True } relationList.append(relation) if currentNode['father'] == 'B': # b为上游, 取b字段的上上游 preNodeList = await cls.getRelationByColumn(result_db, currentNode['b_ssys_id'], currentNode['b_mdl_name'], currentNode['b_tab_eng_name'], currentNode['b_fld_eng_name'], meta_query.type, 'pre') if preNodeList and len(preNodeList) > 0: for preNode in preNodeList: relation = {"source": {"ssys_id": preNode['a_ssys_id'], "mdl_name": preNode['a_mdl_name'], "tab_eng_name": preNode['a_tab_eng_name'], "fld_eng_name": preNode['a_fld_eng_name']}, "target": {'ssys_id': preNode['b_ssys_id'], 'mdl_name': preNode['b_mdl_name'], 'tab_eng_name': preNode['b_tab_eng_name'], 'fld_eng_name': preNode['b_fld_eng_name']}, "endArrow": True} if preNode['father'] == 'A' else \ {"source": {'ssys_id': preNode['b_ssys_id'], 'mdl_name': preNode['b_mdl_name'], 'tab_eng_name': preNode['b_tab_eng_name'], 'fld_eng_name': preNode['b_fld_eng_name']}, "target": {"ssys_id": preNode['a_ssys_id'], "mdl_name": preNode['a_mdl_name'], "tab_eng_name": preNode['a_tab_eng_name'], "fld_eng_name": preNode['a_fld_eng_name']}, "endArrow": True} relationList.append(relation) if currentNode['b_tab_eng_name'] == meta_query.tab_eng_name: if currentNode['father'] == 'A': # a 为上游,取a字段的上上游 preNodeList = await cls.getRelationByColumn(result_db, currentNode['a_ssys_id'], currentNode['a_mdl_name'], currentNode['a_tab_eng_name'], currentNode['a_fld_eng_name'], meta_query.type, 'pre') if preNodeList and len(preNodeList) > 0: for preNode in preNodeList: relation = {"source": {"ssys_id": preNode['a_ssys_id'], "mdl_name": preNode['a_mdl_name'], "tab_eng_name": preNode['a_tab_eng_name'], "fld_eng_name": preNode['a_fld_eng_name']}, "target": {'ssys_id': preNode['b_ssys_id'], 'mdl_name': preNode['b_mdl_name'], 'tab_eng_name': preNode['b_tab_eng_name'], 'fld_eng_name': preNode['b_fld_eng_name']}, "endArrow": True} if \ preNode['father'] == 'A' else { "source": {'ssys_id': preNode['b_ssys_id'], 'mdl_name': preNode['b_mdl_name'], 'tab_eng_name': preNode['b_tab_eng_name'], 'fld_eng_name': preNode['b_fld_eng_name']}, "target": {"ssys_id": preNode['a_ssys_id'], "mdl_name": preNode['a_mdl_name'], "tab_eng_name": preNode['a_tab_eng_name'], "fld_eng_name": preNode['a_fld_eng_name']}, "endArrow": True} relationList.append(relation) if currentNode['father'] == 'B': # a 为下游,取a字段的下下游 nextNodeList = await cls.getRelationByColumn(result_db, currentNode['a_ssys_id'], currentNode['a_mdl_name'], currentNode['a_tab_eng_name'], currentNode['a_fld_eng_name'], meta_query.type, 'next') for nextNode in nextNodeList: relation = {"source": {"ssys_id": nextNode['a_ssys_id'], "mdl_name": nextNode['a_mdl_name'], "tab_eng_name": nextNode['a_tab_eng_name'], "fld_eng_name": nextNode['a_fld_eng_name']}, "target": {'ssys_id': nextNode['b_ssys_id'], 'mdl_name': nextNode['b_mdl_name'], 'tab_eng_name': nextNode['b_tab_eng_name'], 'fld_eng_name': nextNode['b_fld_eng_name']}, "endArrow": True} if \ nextNode['father'] == 'A' else { "source": {'ssys_id': nextNode['b_ssys_id'], 'mdl_name': nextNode['b_mdl_name'], 'tab_eng_name': nextNode['b_tab_eng_name'], 'fld_eng_name': nextNode['b_fld_eng_name']}, "target": {"ssys_id": nextNode['a_ssys_id'], "mdl_name": nextNode['a_mdl_name'], "tab_eng_name": nextNode['a_tab_eng_name'], "fld_eng_name": nextNode['a_fld_eng_name']}, "endArrow": True} relationList.append(relation) tableList = [] if len(relationList) > 0: for relation in relationList: if len(tableList) > 0: hasSourceTable = False hasTargetTable = False for tab in tableList: if tab['ssys_id'] == relation['source']['ssys_id'] and tab['mdl_name'] == \ relation['source']['mdl_name'] and tab['tab_eng_name'] == \ relation['source']['tab_eng_name']: hasSourceTable = True if tab['ssys_id'] == relation['target']['ssys_id'] and tab['mdl_name'] == \ relation['target']['mdl_name'] and tab['tab_eng_name'] == \ relation['target']['tab_eng_name']: hasTargetTable = True if not hasSourceTable: tableList.append({"ssys_id": relation['source']['ssys_id'], "mdl_name": relation['source']['mdl_name'], "tab_eng_name": relation['source']['tab_eng_name']}) if not hasTargetTable: tableList.append({"ssys_id": relation['target']['ssys_id'], "mdl_name": relation['target']['mdl_name'], "tab_eng_name": relation['target']['tab_eng_name']}) else: tableList.append({"ssys_id": relation['source']['ssys_id'], "mdl_name": relation['source']['mdl_name'], "tab_eng_name": relation['source']['tab_eng_name']}) tableList.append({"ssys_id": relation['target']['ssys_id'], "mdl_name": relation['target']['mdl_name'], "tab_eng_name": relation['target']['tab_eng_name']}) if len(tableList) > 0: for table in tableList: query_object = MetaColObject() query_object.ssys_id = table['ssys_id'] query_object.mdl_name = table['mdl_name'] query_object.tab_name = table['tab_eng_name'] meta_result = await MetaDao.get_meta_col_name_list(result_db, query_object) result = CamelCaseUtil.transform_result(meta_result) tableCnName = await MetaDao.get_meta_table_cn_name(result_db, table['ssys_id'], table['mdl_name'], table['tab_eng_name']) table['tab_cn_name'] = tableCnName table['column'] = result result = { "tableList": tableList, "relation": relationList } return result @classmethod async def getRelationByTable(cls, result_db: AsyncSession, ssys_id: int, mdl_name: str, tab_eng_name: str, rel_type: str): if rel_type == 'op': currentNodeList = await MetaDao.get_op_relation_by_table(result_db, ssys_id, mdl_name, tab_eng_name) return currentNodeList if rel_type == 'er': currentNodeList = await MetaDao.get_er_relation_by_table(result_db, ssys_id, mdl_name, tab_eng_name) return currentNodeList return None @classmethod async def getRelationByColumn(cls, result_db: AsyncSession, ssys_id: int, mdl_name: str, tab_eng_name: str, fld_eng_name: str, rel_type: str, module: str): col = { 'ssys_id': ssys_id, 'mdl_name': mdl_name, 'tab_eng_name': tab_eng_name, 'fld_eng_name': fld_eng_name } if rel_type == 'op': currentNodeList = await MetaDao.get_op_relation_by_column(result_db, col, module) return currentNodeList if rel_type == 'er': currentNodeList = await MetaDao.get_er_relation_by_column(result_db, col, module) return currentNodeList return None @classmethod async def getMetaProc(cls, result_db: AsyncSession, meta_query: MetaProcQueryObject): result = await MetaDao.get_proc_by_table(result_db, meta_query.ssys_id, meta_query.mdl_name, meta_query.tab_eng_name) return result @classmethod async def getBloodRelationShip(cls, result_db: AsyncSession, procId: int): bloodRelations = await MetaDao.get_blood_by_procId(result_db, procId) tableList = [] if bloodRelations is not None and len(bloodRelations) > 0: for blood in bloodRelations: if len(tableList) > 0: exists1 = any(item["ssys_id"] == blood.targetSysId and item["mdl_name"].lower() == blood.targetMdlName.lower() and item["tab_eng_name"].lower() == blood.targetTableName.lower() for item in tableList) if not exists1: tableList.append({"ssys_id": blood.targetSysId, "mdl_name": blood.targetMdlName.lower(), "tab_eng_name": blood.targetTableName.lower()}) exists2 = any(item["ssys_id"] == blood.sourceSysId and item["mdl_name"].lower() == blood.sourceMdlName.lower() and item["tab_eng_name"].lower() == blood.sourceTableName.lower() for item in tableList) if not exists2: tableList.append({"ssys_id": blood.sourceSysId, "mdl_name": blood.sourceMdlName.lower(), "tab_eng_name": blood.sourceTableName.lower()}) else: tableList.append({"ssys_id": blood.targetSysId, "mdl_name": blood.targetMdlName.lower(), "tab_eng_name": blood.targetTableName.lower()}) tableList.append({"ssys_id": blood.sourceSysId, "mdl_name": blood.sourceMdlName.lower(), "tab_eng_name": blood.sourceTableName.lower()}) if len(tableList) > 0: for table in tableList: query_object = MetaColObject() query_object.ssys_id = table['ssys_id'] query_object.mdl_name = table['mdl_name'] query_object.tab_name = table['tab_eng_name'] meta_result = await MetaDao.get_meta_col_name_list(result_db, query_object) result1 = CamelCaseUtil.transform_result(meta_result) tableCnName = await MetaDao.get_meta_table_cn_name(result_db, table['ssys_id'], table['mdl_name'], table['tab_eng_name']) table['tab_cn_name'] = tableCnName table['column'] = result1 result = { "relation": bloodRelations, "tableList": tableList } return result @staticmethod async def get_meta_import_template_services(): """ 获取元数据导入模板service :return: 元数据导入模板excel的二进制数据 """ table_header_list = ['系统代码', '模式名称', '对象英文名', '补录对象名称', '补录对象描述', '负责人'] column_header_list = ['系统代码', '模式名称', '对象英文名', '字段英文名', '字段补录名', '补录主键', '补录字段描述', '引用字典/标准', '安全等级', '负责人'] selector_header_list = ['补录主键', '安全等级'] option_list = [{}, {'补录主键': ['是', '否'], '安全等级': ['S1', 'S2', 'S3', 'S4']}] sheet_config1 = dict( sheet_name="表信息", header_list=table_header_list, selector_header_list=[] ) sheet_config2 = dict( sheet_name="字段信息", header_list=column_header_list, selector_header_list=selector_header_list ) sheet_configs = [sheet_config1, sheet_config2] binary_data = get_excel_template_with_sheets( sheet_configs, # 每个Sheet的配置(包含表头、选择器等) option_list ) return binary_data @classmethod async def batch_import_meta_services(cls, result_db: AsyncSession, file: UploadFile, current_user: CurrentUserModel): import_err_msg = [] # 调用DS接口通过系统代码查询系统ID url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response = requests.get(url, headers=headers, verify=False) dataSourceList = [] if response.reason == 'OK': response_text = response.text data = json.loads(response_text) dataSourceList = data["data"]["totalList"] else: raise ServiceException(message=f'系统异常,获取数据源失败') skip_table = [] businessId = uuid.uuid4() applyTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") table_header_dict = { '系统代码': 'ssys_cd', '模式名称': 'mdl_name', '对象英文名': 'tab_eng_name', '补录对象名称': 'tab_crrct_name', '补录对象描述': 'tab_desc', '负责人': 'rec_subm_prsn' } column_header_dict = { '系统代码': 'ssys_cd', '模式名称': 'mdl_name', '对象英文名': 'tab_eng_name', '字段英文名': 'fld_eng_name', '字段补录名': 'fld_crrct_name', '补录主键': 'crrct_pk_flag', '补录字段描述': 'fld_desc', '引用字典/标准': 'data_dict_id', '安全等级': 'data_sec_lvl', '负责人': 'rec_subm_prsn' } contents = await file.read() excel_file = pd.ExcelFile(io.BytesIO(contents)) await file.close() # 获取所有sheet名称 sheet_names = excel_file.sheet_names # 逐个读取 tableSheet = sheet_names[0] columnSheet = sheet_names[1] successCount = 0 # 校验, 1、各必填内容不能为空, 2.系统代码映射的系统是否存在 3.表是否存在 4.字段是否存在5.引用的字典标准是否存在6.标准的系统与表系统是否对应 if tableSheet == '表信息': # 表信息补录 df = excel_file.parse(sheet_name=tableSheet) df.rename(columns=table_header_dict, inplace=True) for index, row in df.iterrows(): noneValid = '' if row['ssys_cd'] is None or len(row['ssys_cd']) == 0: noneValid += "sheet[表信息]中行:" + str(index+1) + "中的系统代码不能为空" if row['mdl_name'] is None or len(row['mdl_name']) == 0: if len(noneValid) > 0: noneValid += ",模式名称不能为空" else: noneValid += "sheet[表信息]中行:" + str(index+1) + "中的模式名称不能为空" if row['tab_eng_name'] is None or len(row['tab_eng_name']) == 0: if len(noneValid) > 0: noneValid += ",表英文名称不能为空" else: noneValid += "sheet[表信息]中行:" + str(index+1) + "中的表英文名不能为空" if len(noneValid) > 0: import_err_msg.append(noneValid) continue ssysId = next((item["id"] for item in dataSourceList if item["name"] == row['ssys_cd']), None) if ssysId is None: import_err_msg.append("行:" + str(index+1) + "中的系统不存在,需重新修正") continue hasTable = await MetaDao.get_lastest_meta_data_supp_vett(result_db, ssysId, row['mdl_name'], row['tab_eng_name']) if hasTable: if hasTable.apply_status == 'waiting': import_err_msg.append( 'sheet[表信息]中行:' + str(index+1) + ' 导入的系统代码:' + row['ssys_cd'] + ',模式名称:' + row['mdl_name'] + ',对象英文名:' + row['tab_eng_name'] + '已存在补录待审核记录,请等待审批完成或撤回申请后,再行导入') if hasTable.apply_status == 'pending': import_err_msg.append( 'sheet[字段信息]中行:' + str(index+1) + ' 导入的系统代码:' + row['ssys_cd'] + ',模式名称:' + row['mdl_name'] + ',对象英文名:' + row['tab_eng_name'] + '已存在待审核记录,请等待审批完成或撤回申请后,再行导入') skip_table.append({'ssys_id': ssysId, 'mdl_name': row['mdl_name'], 'tab_eng_name': row['tab_eng_name']}) continue oldTable = await MetaDao.get_supp_table_by_vett(ssysId, row['mdl_name'], row['tab_eng_name'], result_db) tableInfo = await MetaDao.get_meta_table(ssysId, row['mdl_name'], row['tab_eng_name'], result_db) if tableInfo is None: import_err_msg.append("sheet[表信息]中行:"+str(index+1) + "中所对应的表不存在,无法上传补录") continue tableOnum = uuid.uuid4() suppTableInfo = MetadataSuppInfoVett() suppTableInfo.onum = tableOnum suppTableInfo.ssys_id = ssysId suppTableInfo.mdl_name = row['mdl_name'] suppTableInfo.tab_eng_name = row['tab_eng_name'] suppTableInfo.tab_crrct_name = row['tab_crrct_name'] suppTableInfo.tab_desc = row['tab_desc'] suppTableInfo.pic = oldTable.pic if oldTable else None suppTableInfo.gov_flag = oldTable.gov_flag if oldTable else None suppTableInfo.tab_clas = oldTable.tab_clas if oldTable else None suppTableInfo.rec_subm_prsn = row['rec_subm_prsn'] suppTableInfo.apply_time = applyTime suppTableInfo.apply_status = 'waiting' suppTableInfo.business_id = businessId suppTableInfo.oldTableData = cls.castToTableStr(oldTable, tableInfo) await MetaDao.insertMetadataSuppInfoVett(suppTableInfo, result_db) successCount += 1 if columnSheet == '字段信息': # 字段信息补录 df = excel_file.parse(sheet_name=columnSheet) df.rename(columns=column_header_dict, inplace=True) for index, row in df.iterrows(): noneValid = '' if row['ssys_cd'] is None or len(row['ssys_cd']) == 0: noneValid += "sheet[字段信息]中行:" + str(index+1) + "中的系统代码不能为空" if row['mdl_name'] is None or len(row['mdl_name']) == 0: if len(noneValid) > 0: noneValid += ",模式名称不能为空" else: noneValid += "sheet[字段信息]中行:" + str(index+1) + "中的模式名称不能为空" if row['tab_eng_name'] is None or len(row['tab_eng_name']) == 0: if len(noneValid) > 0: noneValid += ",表英文名称不能为空" else: noneValid += "sheet[字段信息]中行:" + str(index+1) + "中的表英文名不能为空" if len(noneValid) > 0: import_err_msg.append(noneValid) continue ssysId = next((item["id"] for item in dataSourceList if item["name"] == row['ssys_cd']), None) if any( table['ssys_id'] == ssysId and table['mdl_name'] == row['mdl_name'] and table['tab_eng_name'] == row['tab_eng_name'] for table in skip_table ): continue oldColumn = await MetaDao.get_supp_column_by_columnInfo(ssysId, row['mdl_name'], row['tab_eng_name'], row['fld_eng_name'], result_db) data_dict_name = '' if oldColumn and oldColumn.data_dict_id and oldColumn.data_dict_id != '': oldDict = await DataStdDao.get_data_dict_by_id(result_db, oldColumn.data_dict_id) data_dict_name = oldDict.data_dict_cn_name if oldDict else '' columnInfo = await MetaDao.get_meta_column(ssysId, row['mdl_name'], row['tab_eng_name'], row['fld_eng_name'], result_db) if columnInfo is None: import_err_msg.append("sheet[字段信息]中行:"+str(index+1) + "中所对应的字段不存在,无法上传补录") continue dataDictId = '' if row['data_dict_id'] and len(row['data_dict_id']) > 0: dataDict = await DataStdDao.get_data_dict_by_code(result_db, row['data_dict_id']) if dataDict is None: import_err_msg.append("sheet[字段信息]中行:"+str(index+1) + "中所对应的数据字典不存在,无法上传补录") continue else: if int(dataDict.src_sys) != ssysId: import_err_msg.append("sheet[字段信息]中行:"+str(index+1) + "中所对应的数据字典所属系统与表所属系统不一致,无法上传补录") continue else: dataDictId = dataDict.onum suppColumnInfo = MetadataFldSuppInfoVett() suppColumnInfo.onum = uuid.uuid4() suppColumnInfo.ssys_id = ssysId suppColumnInfo.mdl_name = row['mdl_name'] suppColumnInfo.tab_eng_name = row['tab_eng_name'] suppColumnInfo.fld_eng_name = row['fld_eng_name'] suppColumnInfo.fld_crrct_name = row['fld_crrct_name'] suppColumnInfo.crrct_pk_flag = True if row['crrct_pk_flag'] and row['crrct_pk_flag'] == '是' else False suppColumnInfo.fld_desc = row['fld_desc'] suppColumnInfo.pic = oldColumn.pic if oldColumn else None suppColumnInfo.fld_clas = oldColumn.fld_clas if oldColumn else None suppColumnInfo.fld_null_rate = oldColumn.fld_null_rate if oldColumn else None suppColumnInfo.data_dict_id = dataDictId if dataDictId != '' else None suppColumnInfo.data_sec_lvl = row['data_sec_lvl'] suppColumnInfo.rec_stat = oldColumn.rec_stat if oldColumn else None suppColumnInfo.rec_subm_prsn = row['rec_subm_prsn'] suppColumnInfo.business_id = businessId suppColumnInfo.apply_time = applyTime suppColumnInfo.apply_status = 'waiting' suppColumnInfo.oldColumnData = cls.castToColumnStr(oldColumn, columnInfo, data_dict_name) await MetaDao.insertMetadataFldSuppInfoVett(suppColumnInfo, result_db) successCount += 1 if successCount > 0: applyModel = ApplyModel() applyModel.businessType = "metaDataInfo" applyModel.businessId = businessId applyModel.applicant = current_user.user.user_name await ApprovalService.apply_services(result_db, applyModel, 'metaDataInfo') await result_db.commit() else: import_err_msg.append("上传的数据均有问题,本次导入0条") if len(import_err_msg) > 0: return ";".join(import_err_msg) else: return "操作成功"