You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

752 lines
47 KiB

import io
import json
7 months ago
import uuid
import pandas as pd
from fastapi import UploadFile
from module_admin.entity.vo.meta_vo import MetaPageObject, MetaColObject, SuppleModel, MetaBusinessRelShipObject, \
MetaProcQueryObject
7 months ago
from module_admin.entity.do.meta_do import MetadataSuppInfo, MetadataFldSuppInfo, MetadataSuppInfoVett, \
MetadataFldSuppInfoVett, MetadataExtractInfo, MetadataFldTabExtractInfo
7 months ago
from module_admin.dao.meta_dao import MetaDao
from module_admin.dao.datastd_dao import DataStdDao
7 months ago
from datetime import datetime
from module_admin.entity.vo.user_vo import CurrentUserModel
from module_admin.entity.vo.approval_vo import ApplyModel
from sqlalchemy.ext.asyncio import AsyncSession
from exceptions.exception import ServiceException, ServiceWarning
from utils.pwd_util import *
from utils.common_util import *
from utils.log_util import logger
from module_admin.service.login_service import LoginService
from module_admin.service.approval_service import ApprovalService
from module_admin.entity.vo.common_vo import CrudResponseModel
from module_admin.entity.vo.user_vo import CurrentUserModel
from utils.common_util import CamelCaseUtil
from config.env import AppConfig
import requests
7 months ago
class MetaService:
@classmethod
async def get_meta_list_services(cls, result_db: AsyncSession, query_object: MetaPageObject,
current_user: CurrentUserModel):
"""
获取元信息service
:param result_db: orm对象
:param query_object: 查询参数对象
:param data_scope_sql: 数据权限对应的查询sql语句
:return: 用户列表信息对象
"""
# print("********")
# 0,正常,1已发布,2审核中,3已审核,4已作废
if 'admin' in current_user.roles:
meta_rel_list = await MetaDao.get_meta_rel_list(result_db, query_object)
if meta_rel_list and len(meta_rel_list.rows) > 0:
for table in meta_rel_list.rows:
tab_list = await MetaDao.get_meta_tab_clas(result_db, table['ssysId'], table['mdlName'],
table['tabEngName'])
table['batchTabClas'] = CamelCaseUtil.transform_result(tab_list)
7 months ago
return meta_rel_list
else:
meta_rel_list = await MetaDao.get_meta_rel_list(result_db, query_object)
if meta_rel_list and len(meta_rel_list.rows) > 0:
for table in meta_rel_list.rows:
tab_list = await MetaDao.get_meta_tab_clas(result_db, table['ssysId'], table['mdlName'],
table['tabEngName'])
table['batchTabClas'] = CamelCaseUtil.transform_result(tab_list)
return meta_rel_list
7 months ago
@classmethod
async def get_meta_col_list_services(cls, result_db: AsyncSession, query_object: MetaColObject):
meta_result = await MetaDao.get_meta_col_list(result_db, query_object)
result = CamelCaseUtil.transform_result(meta_result)
for column in result:
fld_list = await MetaDao.get_meta_fld_clas(result_db, column['ssysId'], column['mdlName'],
column['tabEngName'], column['fldEngName'])
column['batchFldClas'] = CamelCaseUtil.transform_result(fld_list)
if column['dataSecLvl'] is None and column['dataDictId'] is not None:
# 获取数据标准的安全等级
data_sec_lvl = await DataStdDao.get_data_sec_lvl_by_dict_id(result_db, column['dataDictId'])
column['dataSecLvl'] = data_sec_lvl
return result
7 months ago
@classmethod
async def get_meta_clas_list_services(cls, result_db: AsyncSession):
result = await MetaDao.get_meta_clas_list(result_db)
return CamelCaseUtil.transform_result(result)
@classmethod
async def meta_supp(cls, result_db: AsyncSession, supple: SuppleModel, current_user: CurrentUserModel):
# 0,暂存 waiting,申请中,pending审核中,succeed,rejected已审核,canceled已取消
hasTable = await MetaDao.get_lastest_meta_data_supp_vett(result_db, supple.ssys_id, supple.mdl_name,
7 months ago
supple.tab_eng_name)
oldTable = await MetaDao.get_supp_table_by_vett(supple.ssys_id, supple.mdl_name, supple.tab_eng_name, result_db)
tableInfo = await MetaDao.get_meta_table(supple.ssys_id, supple.mdl_name, supple.tab_eng_name, result_db)
7 months ago
tableOnum = uuid.uuid4()
businessId = uuid.uuid4()
7 months ago
if hasTable is not None:
if hasTable.apply_status == 'waiting':
raise ServiceException(message=f'所补录对象已存在补录待审核记录,请等待审批完成或撤回申请后,再行补录')
if hasTable.apply_status == 'pending':
raise ServiceException(message=f'所补录对象已存在待审核记录,请等待审批完成后,再行补录')
applyTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
7 months ago
suppTableInfo = MetadataSuppInfoVett()
suppTableInfo.onum = tableOnum
suppTableInfo.ssys_id = supple.ssys_id
7 months ago
suppTableInfo.mdl_name = supple.mdl_name
suppTableInfo.tab_eng_name = supple.tab_eng_name
suppTableInfo.tab_crrct_name = supple.tab_crrct_name
suppTableInfo.tab_desc = supple.tab_desc
suppTableInfo.pic = supple.pic
suppTableInfo.gov_flag = supple.gov_flag
suppTableInfo.tab_clas = supple.tab_clas
suppTableInfo.rec_subm_prsn = current_user.user.user_name
suppTableInfo.apply_time = applyTime
suppTableInfo.business_id = businessId
7 months ago
suppTableInfo.apply_status = 'waiting'
suppTableInfo.oldTableData = cls.castToTableStr(oldTable, tableInfo)
7 months ago
await MetaDao.insertMetadataSuppInfoVett(suppTableInfo, result_db)
if supple.columnInfo is not None and len(supple.columnInfo) > 0:
for column in supple.columnInfo:
oldColumn = await MetaDao.get_supp_column_by_columnInfo(supple.ssys_id, supple.mdl_name,
supple.tab_eng_name,
column.fld_eng_name, result_db)
data_dict_name = ''
if oldColumn and oldColumn.data_dict_id and oldColumn.data_dict_id != '':
oldDict = await DataStdDao.get_data_dict_by_id(result_db, oldColumn.data_dict_id)
data_dict_name = oldDict.data_dict_cn_name if oldDict else ''
columnInfo = await MetaDao.get_meta_column(supple.ssys_id, supple.mdl_name, supple.tab_eng_name,
column.fld_eng_name, result_db)
7 months ago
suppColumnInfo = MetadataFldSuppInfoVett()
suppColumnInfo.onum = uuid.uuid4()
suppColumnInfo.ssys_id = supple.ssys_id
7 months ago
suppColumnInfo.mdl_name = supple.mdl_name
suppColumnInfo.tab_eng_name = supple.tab_eng_name
suppColumnInfo.fld_eng_name = column.fld_eng_name
suppColumnInfo.fld_crrct_name = column.fld_crrct_name
suppColumnInfo.crrct_pk_flag = column.crrct_pk_flag
suppColumnInfo.fld_desc = column.fld_desc
suppColumnInfo.pic = column.pic
suppColumnInfo.fld_clas = column.fld_clas
suppColumnInfo.fld_null_rate = column.fld_null_rate
suppColumnInfo.data_dict_id = column.data_dict_id
suppColumnInfo.data_sec_lvl = column.data_sec_lvl
7 months ago
suppColumnInfo.rec_stat = column.rec_stat
suppColumnInfo.rec_subm_prsn = current_user.user.user_name
suppColumnInfo.apply_time = applyTime
7 months ago
suppColumnInfo.apply_status = 'waiting'
suppColumnInfo.oldColumnData = cls.castToColumnStr(oldColumn, columnInfo, data_dict_name)
7 months ago
await MetaDao.insertMetadataFldSuppInfoVett(suppColumnInfo, result_db)
applyModel = ApplyModel()
applyModel.businessType = "metaDataInfo"
applyModel.businessId = businessId
7 months ago
applyModel.applicant = current_user.user.user_name
await ApprovalService.apply_services(result_db, applyModel, 'metaDataInfo')
await result_db.commit()
7 months ago
return CrudResponseModel(is_success=True, message='操作成功')
@classmethod
def castToTableStr(cls, table: MetadataSuppInfo, tableInfo: MetadataExtractInfo):
tableDict = dict(
onum=tableInfo.onum,
crrctVerNum=table.crrct_ver_num if table else None,
ssysId=tableInfo.ssys_id,
mdlName=tableInfo.mdl_name,
tabEngName=tableInfo.tab_eng_name,
tabCnName=tableInfo.tab_cn_name,
tabType=tableInfo.tab_type,
tabRecNum=tableInfo.tab_rec_num,
tabCrrctName=table.tab_crrct_name if table else None,
tabDesc=table.tab_desc if table else None,
pic=table.pic if table else None,
govFlag=table.gov_flag if table else None,
recStat=table.rec_stat if table else None,
tabClas=table.tab_clas if table else None,
recSubmPrsn=table.rec_subm_prsn if table else None,
updTime=table.upd_time.isoformat() if table and table.upd_time else None
)
return json.dumps(tableDict)
@classmethod
def castToColumnStr(cls, column: MetadataFldSuppInfo, columnInfo: MetadataFldTabExtractInfo, dictName: str):
columnDict = dict(
onum=columnInfo.onum,
crrctVerNum=column.crrct_ver_num if column and column.crrct_ver_num else None,
ssysId=columnInfo.ssys_id,
mdlName=columnInfo.mdl_name,
tabEngName=columnInfo.tab_eng_name,
fldEngName=columnInfo.fld_eng_name,
fldCnName=columnInfo.fld_cn_name,
fldType=columnInfo.fld_type,
pkFlag=columnInfo.pk_flag,
requireFlag=columnInfo.require_flag,
idxFlag=columnInfo.idx_flag,
fldCrrctName=column.fld_crrct_name if column and column.fld_crrct_name else None,
crrctPkFlag=column.crrct_pk_flag if column and column.crrct_pk_flag else None,
fldDesc=column.fld_desc if column and column.fld_desc else None,
pic=column.pic if column and column.pic else None,
fldClas=column.fld_clas if column and column.fld_clas else None,
fldNullRate=column.fld_null_rate if column and column.fld_null_rate else None,
dataDictId=column.data_dict_id if column and column.data_dict_id else None,
dataSecLvl=column.data_sec_lvl if column and column.data_sec_lvl else None,
dataDictName=dictName,
recStat=column.rec_stat if column and column.rec_stat else None,
updTime=column.upd_time.isoformat() if column and column.upd_time else None
)
return json.dumps(columnDict)
@classmethod
async def get_meta_apply_detail_services(cls, result_db: AsyncSession, businessId: str):
tableDataList = await MetaDao.get_supp_table_vett_by_businessId(businessId, result_db)
tableList = CamelCaseUtil.transform_result(tableDataList)
for tableData in tableList:
clas_list = await MetaDao.get_meta_tab_clas(result_db, tableData['ssysId'], tableData['mdlName'],
tableData['tabEngName'])
tableData['batchTabClas'] = CamelCaseUtil.transform_result(clas_list)
columnData = await MetaDao.get_supp_column_vett_by_tableInfo(result_db, tableData)
column_list = CamelCaseUtil.transform_result(columnData)
for column in column_list:
col_list = await MetaDao.get_meta_fld_clas(result_db, column['ssysId'], column['mdlName'],
column['tabEngName'], column['fldEngName'])
column['batchColClas'] = CamelCaseUtil.transform_result(col_list)
tableData['columnList'] = column_list
return tableList
@classmethod
async def get_table_by_id(cls, result_db: AsyncSession, tableId: int):
table = await MetaDao.get_meta_table_by_id(tableId, result_db)
tab_list = await MetaDao.get_meta_tab_clas(result_db, table['ssysId'], table['mdlName'],
table['tabEngName'])
table['batchTabClas'] = CamelCaseUtil.transform_result(tab_list)
colQuery = MetaColObject()
colQuery.ssys_id = table['ssysId']
colQuery.mdl_name = table['mdlName']
colQuery.tab_name = table['tabEngName']
columnList = await MetaDao.get_meta_col_list(result_db, colQuery)
for column in columnList:
col_list = await MetaDao.get_meta_fld_clas(result_db, column['ssys_id'], column['mdl_name'],
column['tab_eng_name'], column['fld_eng_name'])
column['batchColClas'] = CamelCaseUtil.transform_result(col_list)
table['columnList'] = CamelCaseUtil.transform_result(columnList)
return table
@classmethod
async def getBusinessRelationShip(cls, result_db: AsyncSession, meta_query: MetaBusinessRelShipObject):
currentNodeList = await cls.getRelationByTable(result_db, meta_query.ssys_id, meta_query.mdl_name,
meta_query.tab_eng_name, meta_query.type)
relationList = []
if currentNodeList is not None and len(currentNodeList) > 0:
for currentNode in currentNodeList:
if currentNode['father'] is None:
relation = {"source": {"ssys_id": currentNode['a_ssys_id'],
"mdl_name": currentNode['a_mdl_name'],
"tab_eng_name": currentNode['a_tab_eng_name'],
"fld_eng_name": currentNode['a_fld_eng_name']},
"target": {'ssys_id': currentNode['b_ssys_id'],
'mdl_name': currentNode['b_mdl_name'],
'tab_eng_name': currentNode['b_tab_eng_name'],
'fld_eng_name': currentNode['b_fld_eng_name']},
"endArrow": False}
else:
relation = {"source": {"ssys_id": currentNode['a_ssys_id'],
"mdl_name": currentNode['a_mdl_name'],
"tab_eng_name": currentNode['a_tab_eng_name'],
"fld_eng_name": currentNode['a_fld_eng_name']},
"target": {'ssys_id': currentNode['b_ssys_id'],
'mdl_name': currentNode['b_mdl_name'],
'tab_eng_name': currentNode['b_tab_eng_name'],
'fld_eng_name': currentNode['b_fld_eng_name']},
"endArrow": True} if \
currentNode['father'] == 'A' else {
"source": {'ssys_id': currentNode['b_ssys_id'],
'mdl_name': currentNode['b_mdl_name'],
'tab_eng_name': currentNode['b_tab_eng_name'],
'fld_eng_name': currentNode['b_fld_eng_name']},
"target": {"ssys_id": currentNode['a_ssys_id'],
"mdl_name": currentNode['a_mdl_name'],
"tab_eng_name": currentNode['a_tab_eng_name'],
"fld_eng_name": currentNode['a_fld_eng_name']},
"endArrow": True}
relationList.append(relation)
if meta_query.type == 'er':
if currentNode['a_tab_eng_name'] == meta_query.tab_eng_name:
if currentNode['father'] == 'A':
# b为下游,取b字段的下下游
nextNodeList = (await cls.getRelationByColumn(result_db, currentNode['b_ssys_id'],
currentNode['b_mdl_name'],
currentNode['b_tab_eng_name'],
currentNode['b_fld_eng_name'],
meta_query.type,
'next'))
if nextNodeList and len(nextNodeList) > 0:
for nextNode in nextNodeList:
relation = {"source": {"ssys_id": nextNode['a_ssys_id'],
"mdl_name": nextNode['a_mdl_name'],
"tab_eng_name": nextNode['a_tab_eng_name'],
"fld_eng_name": nextNode['a_fld_eng_name']},
"target": {'ssys_id': nextNode['b_ssys_id'],
'mdl_name': nextNode['b_mdl_name'],
'tab_eng_name': nextNode['b_tab_eng_name'],
'fld_eng_name': nextNode['b_fld_eng_name']},
"endArrow": True} if nextNode['father'] == 'A' else \
{"source": {'ssys_id': nextNode['b_ssys_id'],
'mdl_name': nextNode['b_mdl_name'],
'tab_eng_name': nextNode['b_tab_eng_name'],
'fld_eng_name': nextNode['b_fld_eng_name']},
"target": {"ssys_id": nextNode['a_ssys_id'],
"mdl_name": nextNode['a_mdl_name'],
"tab_eng_name": nextNode['a_tab_eng_name'],
"fld_eng_name": nextNode['a_fld_eng_name']},
"endArrow": True
}
relationList.append(relation)
if currentNode['father'] == 'B':
# b为上游, 取b字段的上上游
preNodeList = await cls.getRelationByColumn(result_db, currentNode['b_ssys_id'],
currentNode['b_mdl_name'],
currentNode['b_tab_eng_name'],
currentNode['b_fld_eng_name'],
meta_query.type,
'pre')
if preNodeList and len(preNodeList) > 0:
for preNode in preNodeList:
relation = {"source": {"ssys_id": preNode['a_ssys_id'],
"mdl_name": preNode['a_mdl_name'],
"tab_eng_name": preNode['a_tab_eng_name'],
"fld_eng_name": preNode['a_fld_eng_name']},
"target": {'ssys_id': preNode['b_ssys_id'],
'mdl_name': preNode['b_mdl_name'],
'tab_eng_name': preNode['b_tab_eng_name'],
'fld_eng_name': preNode['b_fld_eng_name']},
"endArrow": True} if preNode['father'] == 'A' else \
{"source": {'ssys_id': preNode['b_ssys_id'],
'mdl_name': preNode['b_mdl_name'],
'tab_eng_name': preNode['b_tab_eng_name'],
'fld_eng_name': preNode['b_fld_eng_name']},
"target": {"ssys_id": preNode['a_ssys_id'], "mdl_name": preNode['a_mdl_name'],
"tab_eng_name": preNode['a_tab_eng_name'],
"fld_eng_name": preNode['a_fld_eng_name']},
"endArrow": True}
relationList.append(relation)
if currentNode['b_tab_eng_name'] == meta_query.tab_eng_name:
if currentNode['father'] == 'A':
# a 为上游,取a字段的上上游
preNodeList = await cls.getRelationByColumn(result_db, currentNode['a_ssys_id'],
currentNode['a_mdl_name'],
currentNode['a_tab_eng_name'],
currentNode['a_fld_eng_name'],
meta_query.type,
'pre')
if preNodeList and len(preNodeList) > 0:
for preNode in preNodeList:
relation = {"source": {"ssys_id": preNode['a_ssys_id'],
"mdl_name": preNode['a_mdl_name'],
"tab_eng_name": preNode['a_tab_eng_name'],
"fld_eng_name": preNode['a_fld_eng_name']},
"target": {'ssys_id': preNode['b_ssys_id'],
'mdl_name': preNode['b_mdl_name'],
'tab_eng_name': preNode['b_tab_eng_name'],
'fld_eng_name': preNode['b_fld_eng_name']},
"endArrow": True} if \
preNode['father'] == 'A' else {
"source": {'ssys_id': preNode['b_ssys_id'],
'mdl_name': preNode['b_mdl_name'],
'tab_eng_name': preNode['b_tab_eng_name'],
'fld_eng_name': preNode['b_fld_eng_name']},
"target": {"ssys_id": preNode['a_ssys_id'], "mdl_name": preNode['a_mdl_name'],
"tab_eng_name": preNode['a_tab_eng_name'],
"fld_eng_name": preNode['a_fld_eng_name']},
"endArrow": True}
relationList.append(relation)
if currentNode['father'] == 'B':
# a 为下游,取a字段的下下游
nextNodeList = await cls.getRelationByColumn(result_db, currentNode['a_ssys_id'],
currentNode['a_mdl_name'],
currentNode['a_tab_eng_name'],
currentNode['a_fld_eng_name'],
meta_query.type,
'next')
for nextNode in nextNodeList:
relation = {"source": {"ssys_id": nextNode['a_ssys_id'],
"mdl_name": nextNode['a_mdl_name'],
"tab_eng_name": nextNode['a_tab_eng_name'],
"fld_eng_name": nextNode['a_fld_eng_name']},
"target": {'ssys_id': nextNode['b_ssys_id'],
'mdl_name': nextNode['b_mdl_name'],
'tab_eng_name': nextNode['b_tab_eng_name'],
'fld_eng_name': nextNode['b_fld_eng_name']},
"endArrow": True} if \
nextNode['father'] == 'A' else {
"source": {'ssys_id': nextNode['b_ssys_id'],
'mdl_name': nextNode['b_mdl_name'],
'tab_eng_name': nextNode['b_tab_eng_name'],
'fld_eng_name': nextNode['b_fld_eng_name']},
"target": {"ssys_id": nextNode['a_ssys_id'], "mdl_name": nextNode['a_mdl_name'],
"tab_eng_name": nextNode['a_tab_eng_name'],
"fld_eng_name": nextNode['a_fld_eng_name']},
"endArrow": True}
relationList.append(relation)
tableList = []
if len(relationList) > 0:
for relation in relationList:
if len(tableList) > 0:
hasSourceTable = False
hasTargetTable = False
for tab in tableList:
if tab['ssys_id'] == relation['source']['ssys_id'] and tab['mdl_name'] == \
relation['source']['mdl_name'] and tab['tab_eng_name'] == \
relation['source']['tab_eng_name']:
hasSourceTable = True
if tab['ssys_id'] == relation['target']['ssys_id'] and tab['mdl_name'] == \
relation['target']['mdl_name'] and tab['tab_eng_name'] == \
relation['target']['tab_eng_name']:
hasTargetTable = True
if not hasSourceTable:
tableList.append({"ssys_id": relation['source']['ssys_id'],
"mdl_name": relation['source']['mdl_name'],
"tab_eng_name": relation['source']['tab_eng_name']})
if not hasTargetTable:
tableList.append({"ssys_id": relation['target']['ssys_id'],
"mdl_name": relation['target']['mdl_name'],
"tab_eng_name": relation['target']['tab_eng_name']})
else:
tableList.append({"ssys_id": relation['source']['ssys_id'],
"mdl_name": relation['source']['mdl_name'],
"tab_eng_name": relation['source']['tab_eng_name']})
tableList.append({"ssys_id": relation['target']['ssys_id'],
"mdl_name": relation['target']['mdl_name'],
"tab_eng_name": relation['target']['tab_eng_name']})
if len(tableList) > 0:
for table in tableList:
query_object = MetaColObject()
query_object.ssys_id = table['ssys_id']
query_object.mdl_name = table['mdl_name']
query_object.tab_name = table['tab_eng_name']
meta_result = await MetaDao.get_meta_col_name_list(result_db, query_object)
result = CamelCaseUtil.transform_result(meta_result)
tableCnName = await MetaDao.get_meta_table_cn_name(result_db, table['ssys_id'], table['mdl_name'],
table['tab_eng_name'])
table['tab_cn_name'] = tableCnName
table['column'] = result
result = {
"tableList": tableList,
"relation": relationList
}
return result
@classmethod
async def getRelationByTable(cls, result_db: AsyncSession, ssys_id: int, mdl_name: str,
tab_eng_name: str, rel_type: str):
if rel_type == 'op':
currentNodeList = await MetaDao.get_op_relation_by_table(result_db, ssys_id, mdl_name, tab_eng_name)
return currentNodeList
if rel_type == 'er':
currentNodeList = await MetaDao.get_er_relation_by_table(result_db, ssys_id, mdl_name, tab_eng_name)
return currentNodeList
return None
@classmethod
async def getRelationByColumn(cls, result_db: AsyncSession, ssys_id: int, mdl_name: str,
tab_eng_name: str, fld_eng_name: str, rel_type: str, module: str):
col = {
'ssys_id': ssys_id,
'mdl_name': mdl_name,
'tab_eng_name': tab_eng_name,
'fld_eng_name': fld_eng_name
}
if rel_type == 'op':
currentNodeList = await MetaDao.get_op_relation_by_column(result_db, col, module)
return currentNodeList
if rel_type == 'er':
currentNodeList = await MetaDao.get_er_relation_by_column(result_db, col, module)
return currentNodeList
return None
@classmethod
async def getMetaProc(cls, result_db: AsyncSession, meta_query: MetaProcQueryObject):
result = await MetaDao.get_proc_by_table(result_db, meta_query.ssys_id, meta_query.mdl_name,
meta_query.tab_eng_name)
return result
@classmethod
async def getBloodRelationShip(cls, result_db: AsyncSession, procId: int):
bloodRelations = await MetaDao.get_blood_by_procId(result_db, procId)
tableList = []
if bloodRelations is not None and len(bloodRelations) > 0:
for blood in bloodRelations:
if len(tableList) > 0:
exists1 = any(item["ssys_id"] == blood.targetSysId
and item["mdl_name"].lower() == blood.targetMdlName.lower()
and item["tab_eng_name"].lower() == blood.targetTableName.lower()
for item in tableList)
if not exists1:
tableList.append({"ssys_id": blood.targetSysId,
"mdl_name": blood.targetMdlName.lower(),
"tab_eng_name": blood.targetTableName.lower()})
exists2 = any(item["ssys_id"] == blood.sourceSysId
and item["mdl_name"].lower() == blood.sourceMdlName.lower()
and item["tab_eng_name"].lower() == blood.sourceTableName.lower()
for item in tableList)
if not exists2:
tableList.append({"ssys_id": blood.sourceSysId,
"mdl_name": blood.sourceMdlName.lower(),
"tab_eng_name": blood.sourceTableName.lower()})
else:
tableList.append({"ssys_id": blood.targetSysId,
"mdl_name": blood.targetMdlName.lower(),
"tab_eng_name": blood.targetTableName.lower()})
tableList.append({"ssys_id": blood.sourceSysId,
"mdl_name": blood.sourceMdlName.lower(),
"tab_eng_name": blood.sourceTableName.lower()})
if len(tableList) > 0:
for table in tableList:
query_object = MetaColObject()
query_object.ssys_id = table['ssys_id']
query_object.mdl_name = table['mdl_name']
query_object.tab_name = table['tab_eng_name']
meta_result = await MetaDao.get_meta_col_name_list(result_db, query_object)
result1 = CamelCaseUtil.transform_result(meta_result)
tableCnName = await MetaDao.get_meta_table_cn_name(result_db, table['ssys_id'], table['mdl_name'],
table['tab_eng_name'])
table['tab_cn_name'] = tableCnName
table['column'] = result1
result = {
"relation": bloodRelations,
"tableList": tableList
}
return result
@staticmethod
async def get_meta_import_template_services():
"""
获取元数据导入模板service
:return: 元数据导入模板excel的二进制数据
"""
table_header_list = ['系统代码', '模式名称', '对象英文名', '补录对象名称', '补录对象描述', '负责人']
column_header_list = ['系统代码', '模式名称', '对象英文名', '字段英文名', '字段补录名', '补录主键',
'补录字段描述', '引用字典/标准', '安全等级', '负责人']
selector_header_list = ['补录主键', '安全等级']
option_list = [{}, {'补录主键': ['', ''], '安全等级': ['S1', 'S2', 'S3', 'S4']}]
sheet_config1 = dict(
sheet_name="表信息", header_list=table_header_list, selector_header_list=[]
)
sheet_config2 = dict(
sheet_name="字段信息", header_list=column_header_list, selector_header_list=selector_header_list
)
sheet_configs = [sheet_config1, sheet_config2]
binary_data = get_excel_template_with_sheets(
sheet_configs, # 每个Sheet的配置(包含表头、选择器等)
option_list
)
return binary_data
@classmethod
async def batch_import_meta_services(cls,
result_db: AsyncSession,
file: UploadFile,
current_user: CurrentUserModel):
import_err_msg = []
# 调用DS接口通过系统代码查询系统ID
url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
response = requests.get(url, headers=headers, verify=False)
dataSourceList = []
if response.reason == 'OK':
response_text = response.text
data = json.loads(response_text)
dataSourceList = data["data"]["totalList"]
else:
raise ServiceException(message=f'系统异常,获取数据源失败')
skip_table = []
businessId = uuid.uuid4()
applyTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
table_header_dict = {
'系统代码': 'ssys_cd',
'模式名称': 'mdl_name',
'对象英文名': 'tab_eng_name',
'补录对象名称': 'tab_crrct_name',
'补录对象描述': 'tab_desc',
'负责人': 'rec_subm_prsn'
}
column_header_dict = {
'系统代码': 'ssys_cd',
'模式名称': 'mdl_name',
'对象英文名': 'tab_eng_name',
'字段英文名': 'fld_eng_name',
'字段补录名': 'fld_crrct_name',
'补录主键': 'crrct_pk_flag',
'补录字段描述': 'fld_desc',
'引用字典/标准': 'data_dict_id',
'安全等级': 'data_sec_lvl',
'负责人': 'rec_subm_prsn'
}
contents = await file.read()
excel_file = pd.ExcelFile(io.BytesIO(contents))
await file.close()
# 获取所有sheet名称
sheet_names = excel_file.sheet_names
# 逐个读取
tableSheet = sheet_names[0]
columnSheet = sheet_names[1]
successCount = 0
# 校验, 1、各必填内容不能为空, 2.系统代码映射的系统是否存在 3.表是否存在 4.字段是否存在5.引用的字典标准是否存在6.标准的系统与表系统是否对应
if tableSheet == '表信息':
# 表信息补录
df = excel_file.parse(sheet_name=tableSheet)
df.rename(columns=table_header_dict, inplace=True)
for index, row in df.iterrows():
noneValid = ''
if row['ssys_cd'] is None or len(row['ssys_cd']) == 0:
noneValid += "sheet[表信息]中行:" + str(index+1) + "中的系统代码不能为空"
if row['mdl_name'] is None or len(row['mdl_name']) == 0:
if len(noneValid) > 0:
noneValid += ",模式名称不能为空"
else:
noneValid += "sheet[表信息]中行:" + str(index+1) + "中的模式名称不能为空"
if row['tab_eng_name'] is None or len(row['tab_eng_name']) == 0:
if len(noneValid) > 0:
noneValid += ",表英文名称不能为空"
else:
noneValid += "sheet[表信息]中行:" + str(index+1) + "中的表英文名不能为空"
if len(noneValid) > 0:
import_err_msg.append(noneValid)
continue
ssysId = next((item["id"] for item in dataSourceList if item["name"] == row['ssys_cd']), None)
if ssysId is None:
import_err_msg.append("行:" + str(index+1) + "中的系统不存在,需重新修正")
continue
hasTable = await MetaDao.get_lastest_meta_data_supp_vett(result_db, ssysId, row['mdl_name'],
row['tab_eng_name'])
if hasTable:
if hasTable.apply_status == 'waiting':
import_err_msg.append(
'sheet[表信息]中行:' + str(index+1) + ' 导入的系统代码:' + row['ssys_cd'] + ',模式名称:' + row['mdl_name'] +
',对象英文名:' + row['tab_eng_name'] +
'已存在补录待审核记录,请等待审批完成或撤回申请后,再行导入')
if hasTable.apply_status == 'pending':
import_err_msg.append(
'sheet[字段信息]中行:' + str(index+1) + ' 导入的系统代码:' + row['ssys_cd'] + ',模式名称:' + row['mdl_name'] +
',对象英文名:' + row['tab_eng_name'] +
'已存在待审核记录,请等待审批完成或撤回申请后,再行导入')
skip_table.append({'ssys_id': ssysId, 'mdl_name': row['mdl_name'],
'tab_eng_name': row['tab_eng_name']})
continue
oldTable = await MetaDao.get_supp_table_by_vett(ssysId, row['mdl_name'],
row['tab_eng_name'], result_db)
tableInfo = await MetaDao.get_meta_table(ssysId, row['mdl_name'],
row['tab_eng_name'], result_db)
if tableInfo is None:
import_err_msg.append("sheet[表信息]中行:"+str(index+1) + "中所对应的表不存在,无法上传补录")
continue
tableOnum = uuid.uuid4()
suppTableInfo = MetadataSuppInfoVett()
suppTableInfo.onum = tableOnum
suppTableInfo.ssys_id = ssysId
suppTableInfo.mdl_name = row['mdl_name']
suppTableInfo.tab_eng_name = row['tab_eng_name']
suppTableInfo.tab_crrct_name = row['tab_crrct_name']
suppTableInfo.tab_desc = row['tab_desc']
suppTableInfo.pic = oldTable.pic if oldTable else None
suppTableInfo.gov_flag = oldTable.gov_flag if oldTable else None
suppTableInfo.tab_clas = oldTable.tab_clas if oldTable else None
suppTableInfo.rec_subm_prsn = row['rec_subm_prsn']
suppTableInfo.apply_time = applyTime
suppTableInfo.apply_status = 'waiting'
suppTableInfo.business_id = businessId
suppTableInfo.oldTableData = cls.castToTableStr(oldTable, tableInfo)
await MetaDao.insertMetadataSuppInfoVett(suppTableInfo, result_db)
successCount += 1
if columnSheet == '字段信息':
# 字段信息补录
df = excel_file.parse(sheet_name=columnSheet)
df.rename(columns=column_header_dict, inplace=True)
for index, row in df.iterrows():
noneValid = ''
if row['ssys_cd'] is None or len(row['ssys_cd']) == 0:
noneValid += "sheet[字段信息]中行:" + str(index+1) + "中的系统代码不能为空"
if row['mdl_name'] is None or len(row['mdl_name']) == 0:
if len(noneValid) > 0:
noneValid += ",模式名称不能为空"
else:
noneValid += "sheet[字段信息]中行:" + str(index+1) + "中的模式名称不能为空"
if row['tab_eng_name'] is None or len(row['tab_eng_name']) == 0:
if len(noneValid) > 0:
noneValid += ",表英文名称不能为空"
else:
noneValid += "sheet[字段信息]中行:" + str(index+1) + "中的表英文名不能为空"
if len(noneValid) > 0:
import_err_msg.append(noneValid)
continue
ssysId = next((item["id"] for item in dataSourceList if item["name"] == row['ssys_cd']), None)
if any(
table['ssys_id'] == ssysId
and table['mdl_name'] == row['mdl_name']
and table['tab_eng_name'] == row['tab_eng_name']
for table in skip_table
):
continue
oldColumn = await MetaDao.get_supp_column_by_columnInfo(ssysId, row['mdl_name'],
row['tab_eng_name'],
row['fld_eng_name'], result_db)
data_dict_name = ''
if oldColumn and oldColumn.data_dict_id and oldColumn.data_dict_id != '':
oldDict = await DataStdDao.get_data_dict_by_id(result_db, oldColumn.data_dict_id)
data_dict_name = oldDict.data_dict_cn_name if oldDict else ''
columnInfo = await MetaDao.get_meta_column(ssysId, row['mdl_name'],
row['tab_eng_name'],
row['fld_eng_name'], result_db)
if columnInfo is None:
import_err_msg.append("sheet[字段信息]中行:"+str(index+1) + "中所对应的字段不存在,无法上传补录")
continue
dataDictId = ''
if row['data_dict_id'] and len(row['data_dict_id']) > 0:
dataDict = await DataStdDao.get_data_dict_by_code(result_db, row['data_dict_id'])
if dataDict is None:
import_err_msg.append("sheet[字段信息]中行:"+str(index+1) + "中所对应的数据字典不存在,无法上传补录")
continue
else:
if int(dataDict.src_sys) != ssysId:
import_err_msg.append("sheet[字段信息]中行:"+str(index+1) + "中所对应的数据字典所属系统与表所属系统不一致,无法上传补录")
continue
else:
dataDictId = dataDict.onum
suppColumnInfo = MetadataFldSuppInfoVett()
suppColumnInfo.onum = uuid.uuid4()
suppColumnInfo.ssys_id = ssysId
suppColumnInfo.mdl_name = row['mdl_name']
suppColumnInfo.tab_eng_name = row['tab_eng_name']
suppColumnInfo.fld_eng_name = row['fld_eng_name']
suppColumnInfo.fld_crrct_name = row['fld_crrct_name']
suppColumnInfo.crrct_pk_flag = True if row['crrct_pk_flag'] and row['crrct_pk_flag'] == '' else False
suppColumnInfo.fld_desc = row['fld_desc']
suppColumnInfo.pic = oldColumn.pic if oldColumn else None
suppColumnInfo.fld_clas = oldColumn.fld_clas if oldColumn else None
suppColumnInfo.fld_null_rate = oldColumn.fld_null_rate if oldColumn else None
suppColumnInfo.data_dict_id = dataDictId if dataDictId != '' else None
suppColumnInfo.data_sec_lvl = row['data_sec_lvl']
suppColumnInfo.rec_stat = oldColumn.rec_stat if oldColumn else None
suppColumnInfo.rec_subm_prsn = row['rec_subm_prsn']
suppColumnInfo.business_id = businessId
suppColumnInfo.apply_time = applyTime
suppColumnInfo.apply_status = 'waiting'
suppColumnInfo.oldColumnData = cls.castToColumnStr(oldColumn, columnInfo, data_dict_name)
await MetaDao.insertMetadataFldSuppInfoVett(suppColumnInfo, result_db)
successCount += 1
if successCount > 0:
applyModel = ApplyModel()
applyModel.businessType = "metaDataInfo"
applyModel.businessId = businessId
applyModel.applicant = current_user.user.user_name
await ApprovalService.apply_services(result_db, applyModel, 'metaDataInfo')
await result_db.commit()
else:
import_err_msg.append("上传的数据均有问题,本次导入0条")
if len(import_err_msg) > 0:
return ";".join(import_err_msg)
else:
return "操作成功"