from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, text, cast, Integer, and_, or_, outerjoin, func, join, update, desc from module_admin.entity.vo.meta_vo import MetaPageObject, MetaColObject from module_admin.entity.do.datastd_do import DataStdDict from module_admin.entity.do.meta_do import MetadataExtractInfo, MetadataSuppInfo, MetadataFldTabExtractInfo, \ MetadataFldSuppInfo, MetadataClas, MetadataSuppInfoVett, MetadataFldSuppInfoVett, MetaBatchTabClas, \ MetaBatchFldClas, MetaBloodAnalysis from utils.common_util import CamelCaseUtil import uuid from utils.page_util import PageUtil from datetime import datetime class MetaDao: @classmethod async def get_meta_tab_clas(cls, db: AsyncSession, sysId: int, mdlName: str, tabName: str): query_result = ( ( await db.execute( select( MetaBatchTabClas.onum, MetaBatchTabClas.ssys_id, MetaBatchTabClas.data_whs_name, MetaBatchTabClas.mdl_name, MetaBatchTabClas.tab_no, MetaBatchTabClas.tab_eng_name, MetaBatchTabClas.clas_onum, MetaBatchTabClas.clas_value, MetadataClas.clas_tmpl, MetadataClas.clas_eff_flag, MetadataClas.rec_subm_prsn, MetadataClas.clas_name, MetadataClas.belt_batch_content, ).join( MetadataClas, MetaBatchTabClas.clas_onum == MetadataClas.clas_onum ) .where(MetaBatchTabClas.ssys_id == sysId, MetaBatchTabClas.mdl_name == mdlName, MetaBatchTabClas.tab_eng_name == tabName).distinct() ) ).fetchall() ) return query_result @classmethod async def get_meta_fld_clas(cls, db: AsyncSession, sysId: int, mdlName: str, tabName: str, fldName: str): query_result = ( ( await db.execute( select( MetaBatchFldClas.onum, MetaBatchFldClas.ssys_id, MetaBatchFldClas.data_whs_name, MetaBatchFldClas.mdl_name, MetaBatchFldClas.tab_no, MetaBatchFldClas.tab_eng_name, MetaBatchFldClas.fld_eng_name, MetaBatchFldClas.clas_onum, MetaBatchFldClas.clas_value, MetadataClas.clas_tmpl, MetadataClas.clas_eff_flag, MetadataClas.rec_subm_prsn, MetadataClas.belt_batch_content, MetadataClas.clas_name ).join( MetadataClas, MetaBatchFldClas.clas_onum == MetadataClas.clas_onum ) .where(MetaBatchFldClas.ssys_id == sysId, MetaBatchFldClas.mdl_name == mdlName, MetaBatchFldClas.tab_eng_name == tabName, MetaBatchFldClas.fld_eng_name == fldName).distinct() ) ).fetchall() ) return query_result @classmethod async def get_meta_rel_list(cls, db: AsyncSession, query_object: MetaPageObject): """ 根据查询参数获取用户列表信息 """ # 主查询 query = ( select( MetadataExtractInfo.onum.label('extract_onum'), MetadataExtractInfo.extract_ver_num, MetadataExtractInfo.ver_desc.label('extract_ver_desc'), MetadataExtractInfo.ssys_id, MetadataExtractInfo.data_whs_name, MetadataExtractInfo.mdl_name, MetadataExtractInfo.tab_no, MetadataExtractInfo.tab_type, MetadataExtractInfo.tab_eng_name, MetadataExtractInfo.tab_cn_name, MetadataExtractInfo.tab_rec_num, MetadataExtractInfo.upd_time.label('extract_upd_time'), MetadataSuppInfo.onum.label('supp_onum'), MetadataSuppInfo.crrct_ver_num.label('supp_crrct_ver_num'), MetadataSuppInfo.tab_crrct_name, MetadataSuppInfo.tab_desc, MetadataSuppInfo.pic, MetadataSuppInfo.gov_flag, MetadataSuppInfo.rec_stat.label('supp_rec_stat'), MetadataSuppInfo.tab_clas, MetadataSuppInfo.rec_subm_prsn, MetadataSuppInfo.upd_time.label('supp_upd_time'), ).join(MetadataSuppInfo, and_( MetadataExtractInfo.ssys_id == MetadataSuppInfo.ssys_id, MetadataExtractInfo.mdl_name == MetadataSuppInfo.mdl_name, MetadataExtractInfo.tab_eng_name == MetadataSuppInfo.tab_eng_name ), isouter=True) .join( MetadataFldTabExtractInfo, and_( MetadataExtractInfo.ssys_id == MetadataFldTabExtractInfo.ssys_id, MetadataExtractInfo.mdl_name == MetadataFldTabExtractInfo.mdl_name, MetadataExtractInfo.tab_eng_name == MetadataFldTabExtractInfo.tab_eng_name ), isouter=True) .join( MetadataFldSuppInfo, and_( MetadataExtractInfo.ssys_id == MetadataFldSuppInfo.ssys_id, MetadataExtractInfo.mdl_name == MetadataFldSuppInfo.mdl_name, MetadataExtractInfo.tab_eng_name == MetadataFldSuppInfo.tab_eng_name ), isouter=True) .where( MetadataExtractInfo.ssys_id == query_object.ssys_id if query_object.ssys_id else True, MetadataExtractInfo.mdl_name == query_object.mdl_name if query_object.mdl_name else True, or_(MetadataExtractInfo.tab_eng_name.like(f'%{query_object.tab_name}%'), MetadataExtractInfo.tab_cn_name.like(f'%{query_object.tab_name}%'), MetadataSuppInfo.tab_eng_name.like(f'%{query_object.tab_name}%'), MetadataSuppInfo.tab_crrct_name.like(f'%{query_object.tab_name}%'), ) if query_object.tab_name else True, or_( MetadataFldTabExtractInfo.fld_eng_name.like(f'%{query_object.col_name}%'), MetadataFldTabExtractInfo.fld_cn_name.like(f'%{query_object.col_name}%'), MetadataFldSuppInfo.fld_eng_name.like(f'%{query_object.col_name}%'), MetadataFldSuppInfo.fld_crrct_name.like(f'%{query_object.col_name}%'), ) if query_object.col_name else True, MetadataExtractInfo.tab_type == query_object.tab_type if query_object.tab_type else True, MetadataExtractInfo.rec_stat == query_object.rec_stat if query_object.rec_stat else True, ).distinct() ) result = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, True) return result @classmethod async def get_meta_col_list(cls, db: AsyncSession, query_object: MetaColObject): query_result = ( ( await db.execute( select( MetadataFldTabExtractInfo.onum.label('extract_onum'), MetadataFldTabExtractInfo.extract_ver_num, MetadataFldTabExtractInfo.ssys_id, MetadataFldTabExtractInfo.data_whs_name, MetadataFldTabExtractInfo.mdl_name, MetadataFldTabExtractInfo.tab_no, MetadataFldTabExtractInfo.tab_eng_name, MetadataFldTabExtractInfo.fld_no, MetadataFldTabExtractInfo.fld_eng_name, MetadataFldTabExtractInfo.fld_cn_name, MetadataFldTabExtractInfo.fld_type, MetadataFldTabExtractInfo.pk_flag, MetadataFldTabExtractInfo.require_flag, MetadataFldTabExtractInfo.idx_flag, MetadataFldTabExtractInfo.upd_time.label('extract_upd_time'), MetadataFldSuppInfo.onum.label('supp_onum'), MetadataFldSuppInfo.crrct_ver_num, MetadataFldSuppInfo.fld_crrct_name, MetadataFldSuppInfo.crrct_pk_flag, MetadataFldSuppInfo.fld_desc, MetadataFldSuppInfo.pic, MetadataFldSuppInfo.fld_clas, MetadataFldSuppInfo.fld_null_rate, MetadataFldSuppInfo.data_dict_id, MetadataFldSuppInfo.data_sec_lvl, DataStdDict.data_dict_cn_name.label('data_dict_name'), MetadataFldSuppInfo.rec_stat.label('supp_rec_stat'), MetadataFldSuppInfo.upd_time.label('supp_upd_time') ).select_from(MetadataFldTabExtractInfo) .join( MetadataFldSuppInfo, and_( MetadataFldTabExtractInfo.ssys_id == MetadataFldSuppInfo.ssys_id, MetadataFldTabExtractInfo.mdl_name == MetadataFldSuppInfo.mdl_name, MetadataFldTabExtractInfo.tab_eng_name == MetadataFldSuppInfo.tab_eng_name, MetadataFldTabExtractInfo.fld_eng_name == MetadataFldSuppInfo.fld_eng_name ), isouter=True ).join( DataStdDict, and_( DataStdDict.onum == MetadataFldSuppInfo.data_dict_id ), isouter=True ).where( MetadataFldTabExtractInfo.ssys_id == query_object.ssys_id, MetadataFldTabExtractInfo.mdl_name == query_object.mdl_name, MetadataFldTabExtractInfo.tab_eng_name == query_object.tab_name ).distinct().order_by(MetadataFldTabExtractInfo.fld_no) ) ).all() ) return [row._asdict() for row in query_result] @classmethod async def get_meta_clas_list(cls, db: AsyncSession): query_result = ( ( await db.execute( select(MetadataClas).where(MetadataClas.clas_eff_flag == 1).distinct() ) ).scalars().all() ) return query_result @classmethod async def get_lastest_meta_data_supp_vett(cls, db: AsyncSession, ssys_id: int, mdl_name: str, tab_eng_name: str): query_result = ( ( await db.execute( select(MetadataSuppInfoVett).where(MetadataSuppInfoVett.ssys_id == ssys_id, MetadataSuppInfoVett.mdl_name == mdl_name, MetadataSuppInfoVett.tab_eng_name == tab_eng_name) .order_by(desc(MetadataSuppInfoVett.apply_time)) .distinct().limit(1) ) ).scalars().first() ) return query_result @classmethod async def insertMetadataSuppInfoVett(cls, table: MetadataSuppInfoVett, db: AsyncSession): db.add(table) await db.flush() return table @classmethod async def updateMetadataSuppInfoVett(cls, onum: str, operateType: str, db: AsyncSession): table = dict( onum=onum, apply_status=operateType, upd_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) await db.execute(update(MetadataSuppInfoVett), [table]) @classmethod async def insertMetadataSuppInfo(cls, table: MetadataSuppInfoVett, db: AsyncSession): suppTable = MetadataSuppInfo() suppTable.onum = uuid.uuid4() suppTable.ssys_id = table.ssys_id suppTable.mdl_name = table.mdl_name suppTable.tab_eng_name = table.tab_eng_name suppTable.tab_crrct_name = table.tab_crrct_name suppTable.tab_desc = table.tab_desc suppTable.pic = table.pic suppTable.gov_flag = table.gov_flag suppTable.rec_stat = table.rec_stat suppTable.tab_clas = table.tab_clas suppTable.rec_subm_prsn = table.rec_subm_prsn suppTable.upd_time = table.upd_time db.add(suppTable) await db.flush() return suppTable @classmethod async def updateMetadataSuppInfo(cls, onum: str, table: MetadataSuppInfoVett, db: AsyncSession): suppTable = dict( onum=onum, tab_crrct_name=table.tab_crrct_name, tab_desc=table.tab_desc, pic=table.pic, gov_flag=table.gov_flag, rec_stat=table.rec_stat, tab_clas=table.tab_clas, rec_subm_prsn=table.rec_subm_prsn, upd_time=table.upd_time if table.upd_time else datetime.now() ) await db.execute(update(MetadataSuppInfo), [suppTable]) @classmethod async def get_supp_table_vett_by_id(cls, suppId: str, db: AsyncSession): query_result = ( ( await db.execute( select(MetadataSuppInfoVett).where(MetadataSuppInfoVett.onum == suppId).distinct() ) ).scalars().first() ) return query_result @classmethod async def get_supp_column_vett_by_tableInfo(cls, db: AsyncSession, tableInfo: MetadataSuppInfoVett): query_result = ( ( await db.execute( select(MetadataFldSuppInfoVett, DataStdDict.data_dict_cn_name.label("data_dict_name"), ) .join( DataStdDict, MetadataFldSuppInfoVett.data_dict_id == DataStdDict.onum, # 假设关联条件 isouter=True, # 可选,是否左连接 ).where( MetadataFldSuppInfoVett.ssys_id == tableInfo.ssys_id, MetadataFldSuppInfoVett.mdl_name == tableInfo.mdl_name, MetadataFldSuppInfoVett.tab_eng_name == tableInfo.tab_eng_name, MetadataFldSuppInfoVett.apply_time == tableInfo.apply_time ) ) ).all() ) result = [] for record in query_result: vett_info = record[0] # MetadataFldSuppInfoVett 对象 dict_name = record[1] # data_dict_name result.append({ "onum": vett_info.onum, "crrct_ver_num": vett_info.crrct_ver_num, "ssys_id": vett_info.ssys_id, "mdl_name": vett_info.mdl_name, "tab_eng_name": vett_info.tab_eng_name, "fld_eng_name": vett_info.fld_eng_name, "fld_crrct_name": vett_info.fld_crrct_name, "crrct_pk_flag": vett_info.crrct_pk_flag, "fld_desc": vett_info.fld_desc, "pic": vett_info.pic, "fld_clas": vett_info.fld_clas, "fld_null_rate": vett_info.fld_null_rate, "data_dict_id": vett_info.data_dict_id, "data_sec_lvl": vett_info.data_sec_lvl, "rec_stat": vett_info.rec_stat, "upd_time": vett_info.upd_time, "rec_subm_prsn": vett_info.rec_subm_prsn, "apply_time": vett_info.apply_time, "apply_status": vett_info.apply_status, "oldColumnData": vett_info.oldColumnData, "data_dict_name": dict_name }) return result @classmethod async def get_supp_table_by_vett(cls, ssys_id: int, mdlName: str, tableName: str, db: AsyncSession): query_result = ( ( await db.execute( select(MetadataSuppInfo).where(MetadataSuppInfo.ssys_id == ssys_id, MetadataSuppInfo.mdl_name == mdlName, MetadataSuppInfo.tab_eng_name == tableName).distinct() ) ).scalars().first() ) return query_result @classmethod async def get_meta_table(cls, ssys_id: int, mdlName: str, tableName: str, db: AsyncSession): query_result = ( ( await db.execute( select(MetadataExtractInfo).where(MetadataExtractInfo.ssys_id == ssys_id, MetadataExtractInfo.mdl_name == mdlName, MetadataExtractInfo.tab_eng_name == tableName).distinct() ) ).scalars().first() ) return query_result @classmethod async def insertMetadataFldSuppInfoVett(cls, column: MetadataFldSuppInfoVett, db: AsyncSession): db.add(column) await db.flush() return column @classmethod async def updateMetadataFldSuppInfoVett(cls, onum: str, operateType: str, db: AsyncSession): updateColumn = dict( onum=onum, apply_status=operateType, upd_time=datetime.now() ) await db.execute(update(MetadataFldSuppInfoVett), [updateColumn]) @classmethod async def updateMetadataFldSuppInfo(cls, onum: str, column: MetadataFldSuppInfoVett, db: AsyncSession): updateColumn = dict( onum=onum, fld_crrct_name=column.fld_crrct_name, crrct_pk_flag=column.crrct_pk_flag, fld_desc=column.fld_desc, pic=column.pic, fld_clas=column.fld_clas, fld_null_rate=column.fld_null_rate, data_dict_id=column.data_dict_id, data_sec_lvl=column.data_sec_lvl, rec_stat=column.rec_stat, upd_time=column.upd_time if column.upd_time else datetime.now(), ) await db.execute(update(MetadataFldSuppInfo), [updateColumn]) @classmethod async def insertMetadataFldSuppInfo(cls, column: MetadataFldSuppInfoVett, db: AsyncSession): suppColumn = MetadataFldSuppInfo() suppColumn.onum = uuid.uuid4() suppColumn.ssys_id = column.ssys_id suppColumn.mdl_name = column.mdl_name suppColumn.tab_eng_name = column.tab_eng_name suppColumn.fld_eng_name = column.fld_eng_name suppColumn.fld_crrct_name = column.fld_crrct_name suppColumn.crrct_pk_flag = column.crrct_pk_flag suppColumn.fld_desc = column.fld_desc suppColumn.pic = column.pic suppColumn.fld_clas = column.fld_clas suppColumn.fld_null_rate = column.fld_null_rate suppColumn.data_dict_id = column.data_dict_id suppColumn.data_sec_lvl = column.data_sec_lvl suppColumn.rec_stat = column.rec_stat suppColumn.upd_time = column.upd_time db.add(suppColumn) await db.flush() return suppColumn @classmethod async def get_meta_col_supp_vett(cls, table: MetadataSuppInfoVett, db: AsyncSession): sql_query = text("select max(apply_time) from t_metadata_fld_supp_info_vett where ssys_id ='" + table.ssys_id + "' and mdl_name = '" + table.mdl_name + "' and tab_eng_name = '" + table.tab_eng_name + "'") maxTime = (await db.execute(sql_query)).scalar() query_result = ( ( await db.execute( select(MetadataFldSuppInfoVett) .where(MetadataFldSuppInfoVett.ssys_id == table.ssys_id, MetadataFldSuppInfoVett.mdl_name == table.mdl_name, MetadataFldSuppInfoVett.tab_eng_name == table.tab_eng_name, MetadataFldSuppInfoVett.apply_time == maxTime).distinct() ) ).scalars().all() ) return query_result @classmethod async def get_supp_column_by_vett(cls, column: MetadataFldSuppInfoVett, db: AsyncSession): query_result = ( ( await db.execute( select(MetadataFldSuppInfo) .where(MetadataFldSuppInfo.ssys_id == column.ssys_id, MetadataFldSuppInfo.mdl_name == column.mdl_name, MetadataFldSuppInfo.tab_eng_name == column.tab_eng_name, MetadataFldSuppInfo.fld_eng_name == column.fld_eng_name).distinct() ) ).scalars().first() ) return query_result @classmethod async def get_supp_column_by_columnInfo(cls, ssys_id: int, mdlName: str, tabEngName: str, fldEngName: str, db: AsyncSession): query_result = ( ( await db.execute( select(MetadataFldSuppInfo) .where(MetadataFldSuppInfo.ssys_id == ssys_id, MetadataFldSuppInfo.mdl_name == mdlName, MetadataFldSuppInfo.tab_eng_name == tabEngName, MetadataFldSuppInfo.fld_eng_name == fldEngName).distinct() ) ).scalars().first() ) return query_result @classmethod async def get_meta_column(cls, ssys_id: int, mdlName: str, tabEngName: str, fldEngName: str, db: AsyncSession): query_result = ( ( await db.execute( select(MetadataFldTabExtractInfo) .where(MetadataFldTabExtractInfo.ssys_id == ssys_id, MetadataFldTabExtractInfo.mdl_name == mdlName, MetadataFldTabExtractInfo.tab_eng_name == tabEngName, MetadataFldTabExtractInfo.fld_eng_name == fldEngName).distinct() .order_by(MetadataFldTabExtractInfo.fld_no) ) ).scalars().first() ) return query_result @classmethod async def get_meta_table_by_id(cls, tableId: int, db: AsyncSession): """ 根据查询参数获取用户列表信息 """ # 主查询 result = (await db.execute( select( MetadataExtractInfo.onum.label('extract_onum'), MetadataExtractInfo.extract_ver_num, MetadataExtractInfo.ver_desc.label('extract_ver_desc'), MetadataExtractInfo.ssys_id, MetadataExtractInfo.data_whs_name, MetadataExtractInfo.mdl_name, MetadataExtractInfo.tab_no, MetadataExtractInfo.tab_type, MetadataExtractInfo.tab_eng_name, MetadataExtractInfo.tab_cn_name, MetadataExtractInfo.tab_rec_num, MetadataExtractInfo.upd_time.label('extract_upd_time'), MetadataSuppInfo.onum.label('supp_onum'), MetadataSuppInfo.crrct_ver_num.label('supp_crrct_ver_num'), MetadataSuppInfo.tab_crrct_name, MetadataSuppInfo.tab_desc, MetadataSuppInfo.pic, MetadataSuppInfo.gov_flag, MetadataSuppInfo.rec_stat.label('supp_rec_stat'), MetadataSuppInfo.tab_clas, MetadataSuppInfo.rec_subm_prsn, MetadataSuppInfo.upd_time.label('supp_upd_time'), ).join(MetadataSuppInfo, and_( MetadataExtractInfo.ssys_id == MetadataSuppInfo.ssys_id, MetadataExtractInfo.mdl_name == MetadataSuppInfo.mdl_name, MetadataExtractInfo.tab_eng_name == MetadataSuppInfo.tab_eng_name ), isouter=True) .join( MetadataFldTabExtractInfo, and_( MetadataExtractInfo.ssys_id == MetadataFldTabExtractInfo.ssys_id, MetadataExtractInfo.mdl_name == MetadataFldTabExtractInfo.mdl_name, MetadataExtractInfo.tab_eng_name == MetadataFldTabExtractInfo.tab_eng_name ), isouter=True) .join( MetadataFldSuppInfo, and_( MetadataExtractInfo.ssys_id == MetadataFldSuppInfo.ssys_id, MetadataExtractInfo.mdl_name == MetadataFldSuppInfo.mdl_name, MetadataExtractInfo.tab_eng_name == MetadataFldSuppInfo.tab_eng_name ), isouter=True) .where( MetadataExtractInfo.onum == tableId ).distinct() )).first() return CamelCaseUtil.transform_result(result) @classmethod async def get_meta_table_cn_name(cls, db: AsyncSession, ssys_id: int, mdlName: str, tabEngName: str): sql = text("select case when a.tab_cn_name is null then b.tab_crrct_name" " when a.tab_cn_name is not null then a.tab_cn_name end as tab_cn_name" " from t_metadata_extract_info a " "left join t_metadata_supp_info b " "on a.ssys_id = b.ssys_id and a.mdl_name=b.mdl_name and a.tab_eng_name=b.tab_crrct_name " "where a.ssys_id = :ssys_id and a.mdl_name = :mdlName and a.tab_eng_name = :tabEngName") result = (await db.execute(sql, {"ssys_id": ssys_id, "mdlName": mdlName, "tabEngName": tabEngName})) return result.first().tab_cn_name @classmethod async def get_meta_col_name_list(cls, db: AsyncSession, query_object: MetaColObject): query_result = ( ( await db.execute( select( MetadataFldTabExtractInfo.ssys_id, MetadataFldTabExtractInfo.mdl_name, MetadataFldTabExtractInfo.tab_eng_name, MetadataFldTabExtractInfo.fld_eng_name, MetadataFldTabExtractInfo.fld_cn_name, MetadataFldTabExtractInfo.fld_type, MetadataFldTabExtractInfo.pk_flag, MetadataFldTabExtractInfo.fld_no ).where( MetadataFldTabExtractInfo.ssys_id == query_object.ssys_id, MetadataFldTabExtractInfo.mdl_name == query_object.mdl_name, MetadataFldTabExtractInfo.tab_eng_name == query_object.tab_name ).distinct().order_by(MetadataFldTabExtractInfo.fld_no) ) ).all() ) return [row._asdict() for row in query_result] @classmethod async def get_er_relation_by_table(cls, result_db: AsyncSession, ssys_id: int, mdl_name: str, tab_eng_name: str): sql = text("with tmp as (select a1.*,row_number()over(partition by b_ssys_id,b_mdl_name,b_tab_eng_name," "b_fld_eng_name order by concat(op_relation,',',jd_relation) desc) as rn " "from t_batch_er_relation_step1 a1)select a1.*,'A' AS father from tmp a1 where a1.rn = 1 " " and a1.a_ssys_id = :ssys_id and a1.a_mdl_name = :mdlName and a1.a_tab_eng_name = :tabName") sql2 = text("with tmp as (select a1.*,row_number()over(partition by b_ssys_id,b_mdl_name,b_tab_eng_name," "b_fld_eng_name order by concat(op_relation,',',jd_relation) desc) as rn " "from t_batch_er_relation_step1 a1)select a1.*,'A' AS father from tmp a1 where a1.rn = 1 " " and a1.b_ssys_id = :ssys_id and a1.b_mdl_name = :mdlName and a1.b_tab_eng_name = :tabName") result = (await result_db.execute(sql, {"ssys_id": ssys_id, "mdlName": mdl_name, "tabName": tab_eng_name})) result2 = (await result_db.execute(sql2, {"ssys_id": ssys_id, "mdlName": mdl_name, "tabName": tab_eng_name})) # 获取列名 columns = result.keys() # 返回列名列表 columns2 = result2.keys() # 返回列名列表 # 将结果转换为字典列表 result_as_dict = [dict(zip(columns, row)) for row in result.fetchall()] result_as_dict2 = [dict(zip(columns, row)) for row in result2.fetchall()] result3 = result_as_dict + result_as_dict2 return result3 @classmethod async def get_op_relation_by_table(cls, result_db: AsyncSession, ssys_id: int, mdl_name: str, tab_eng_name: str): sql = text("select a1.* ,case when a2.rela_value = '1' then 'A' when a2.rela_value = '2' then 'B' end as father" " from (select distinct a_ssys_id ,a_mdl_name ,a_tab_eng_name ,a_fld_eng_name ,b_ssys_id,b_mdl_name," "b_tab_eng_name ,b_fld_eng_name from ( select a_ssys_id ,a_mdl_name ,a_tab_eng_name ,a_fld_eng_name," "b_ssys_id,b_mdl_name ,b_tab_eng_name ,b_fld_eng_name from t_batch_fld_relation " "where rela_type in ('jd_relation','op_relation') and rela_value not in ('0') union all " "SELECT a_ssys_id ,a_mdl_name ,a_tab_eng_name ,a_fld_eng_name ,b_ssys_id ,b_mdl_name,b_tab_eng_name," "b_fld_eng_name FROM t_batch_fld_relation t ,JSON_TABLE(replace(t.rela_value,'''','\"'),'$[*]' " "COLUMNS (value_num DOUBLE PATH '$.value')) AS j group by a_ssys_id ,a_mdl_name ,a_tab_eng_name ," "a_fld_eng_name ,b_ssys_id ,b_mdl_name ,b_tab_eng_name ,b_fld_eng_name " "having max(j.value_num) > 0.1) b1 ) a1 left join t_batch_fld_relation a2 " "on a1.a_ssys_id = a2.a_ssys_id and a1.a_mdl_name = a2.a_mdl_name " "and a1.a_tab_eng_name = a2.a_tab_eng_name and a1.a_fld_eng_name = a2.a_fld_eng_name " "and a1.b_ssys_id = a2.b_ssys_id and a1.b_mdl_name = a2.b_mdl_name " "and a1.b_tab_eng_name = a2.b_tab_eng_name and a1.b_fld_eng_name = a2.b_fld_eng_name " "and a2.rela_type = 'set_flag' left join t_batch_fld_clas a4 on a1.a_ssys_id = a4.ssys_id " "and a1.a_mdl_name = a4.mdl_name and a1.a_tab_eng_name = a4.tab_eng_name " "and a1.a_fld_eng_name = a4.fld_eng_name and a4.clas_onum = '2001001002' " "where coalesce(a4.clas_value,' ') <> '日期时间类'" " and ((a1.a_ssys_id = :ssys_id and a1.a_mdl_name = :mdlName and a1.a_tab_eng_name = :tabName) " " or (a1.b_ssys_id = :ssys_id and a1.b_mdl_name = :mdlName and a1.b_tab_eng_name = :tabName))") result = (await result_db.execute(sql, {"ssys_id": ssys_id, "mdlName": mdl_name, "tabName": tab_eng_name})) # 获取列名 columns = result.keys() # 返回列名列表 # 将结果转换为字典列表 result_as_dict = [dict(zip(columns, row)) for row in result.fetchall()] return result_as_dict @classmethod async def get_er_relation_by_column(cls, result_db: AsyncSession, column: {}, module: str): if module == 'pre': sql = text("with tmp as (select a1.*,row_number()over(partition by b_ssys_id,b_mdl_name,b_tab_eng_name," "b_fld_eng_name order by concat(op_relation,',',jd_relation) desc) as rn " "from t_batch_er_relation_step1 a1)select a1.*,'A' AS father from tmp a1 where a1.rn = 1 " "and a1.b_ssys_id = :ssys_id and a1.b_mdl_name = :mdlName " "and a1.b_tab_eng_name = :tabName and a1.b_fld_eng_name = :fldName ") else: sql = text("with tmp as (select a1.*,row_number()over(partition by b_ssys_id,b_mdl_name,b_tab_eng_name," "b_fld_eng_name order by concat(op_relation,',',jd_relation) desc) as rn " "from t_batch_er_relation_step1 a1)select a1.*,'A' AS father from tmp a1 where a1.rn = 1 " "and a1.a_ssys_id = :ssys_id and a1.a_mdl_name = :mdlName " "and a1.a_tab_eng_name = :tabName and a1.a_fld_eng_name = :fldName ") result = (await result_db.execute(sql, {"ssys_id": column['ssys_id'], "mdlName": column['mdl_name'], "tabName": column['tab_eng_name'], "fldName": column['fld_eng_name'] })) # 获取列名 columns = result.keys() # 返回列名列表 # 将结果转换为字典列表 result_as_dict = [dict(zip(columns, row)) for row in result.fetchall()] return result_as_dict @classmethod async def get_op_relation_by_column(cls, result_db: AsyncSession, column: {}, module: str): sql = text("select a1.* ,case when a2.rela_value = '1' then 'A' when a2.rela_value = '2' then 'B' end as father" " from (select distinct a_ssys_id ,a_mdl_name ,a_tab_eng_name ,a_fld_eng_name ,b_ssys_id,b_mdl_name," "b_tab_eng_name ,b_fld_eng_name from ( select a_ssys_id ,a_mdl_name ,a_tab_eng_name ,a_fld_eng_name," "b_ssys_id,b_mdl_name ,b_tab_eng_name ,b_fld_eng_name from t_batch_fld_relation " "where rela_type in ('jd_relation','op_relation') and rela_value not in ('0') union all " "SELECT a_ssys_id ,a_mdl_name ,a_tab_eng_name ,a_fld_eng_name ,b_ssys_id ,b_mdl_name,b_tab_eng_name," "b_fld_eng_name FROM t_batch_fld_relation t ,JSON_TABLE(replace(t.rela_value,'''','\"'),'$[*]' " "COLUMNS (value_num DOUBLE PATH '$.value')) AS j group by a_ssys_id ,a_mdl_name ,a_tab_eng_name ," "a_fld_eng_name ,b_ssys_id ,b_mdl_name ,b_tab_eng_name ,b_fld_eng_name " "having max(j.value_num) > 0.1) b1 ) a1 left join t_batch_fld_relation a2 " "on a1.a_ssys_id = a2.a_ssys_id and a1.a_mdl_name = a2.a_mdl_name " "and a1.a_tab_eng_name = a2.a_tab_eng_name and a1.a_fld_eng_name = a2.a_fld_eng_name " "and a1.b_ssys_id = a2.b_ssys_id and a1.b_mdl_name = a2.b_mdl_name " "and a1.b_tab_eng_name = a2.b_tab_eng_name and a1.b_fld_eng_name = a2.b_fld_eng_name " "and a2.rela_type = 'set_flag' left join t_batch_fld_clas a4 on a1.a_ssys_id = a4.ssys_id " "and a1.a_mdl_name = a4.mdl_name and a1.a_tab_eng_name = a4.tab_eng_name " "and a1.a_fld_eng_name = a4.fld_eng_name and a4.clas_onum = '2001001002' " "where coalesce(a4.clas_value,' ') <> '日期时间类'" "and ((a1.a_ssys_id = :ssys_id and a1.a_mdl_name = :mdlName and a1.a_tab_eng_name = :tabName " " and a1.a_fld_eng_name = :fldName and a2.rela_value = :rela_value) " " or (a1.b_ssys_id = :ssys_id and a1.b_mdl_name = :mdlName and a1.b_tab_eng_name = :tabName " " and a1.b_fld_eng_name = :fldName and a2.rela_value = :rela_value2))") result = (await result_db.execute(sql, {"ssys_id": column['ssys_id'], "mdlName": column['mdl_name'], "tabName": column['tab_eng_name'], "fldName": column['fld_eng_name'], "rela_value": '2' if module == 'pre' else '1', "rela_value2": '1' if module == 'pre' else '2' })) # 获取列名 columns = result.keys() # 返回列名列表 # 将结果转换为字典列表 result_as_dict = [dict(zip(columns, row)) for row in result.fetchall()] return result_as_dict @classmethod async def get_proc_by_table(cls, result_db: AsyncSession, ssys_id: int, mdl_name: str, tab_eng_name: str): sql = text("select onum, proc_text from t_metadata_data_lineage_info" " where onum in (" "select proId from meta_blood_analysis " "where targetSysId = :ssys_id and targetMdlName = :mdlName and targetTableName= :tableName)") result = (await result_db.execute(sql, {"ssys_id": ssys_id, "mdlName": mdl_name, "tableName": tab_eng_name})) # 获取列名 columns = result.keys() # 返回列名列表 # 将结果转换为字典列表 result_as_dict = [dict(zip(columns, row)) for row in result.fetchall()] return result_as_dict @classmethod async def get_blood_by_procId(cls, result_db: AsyncSession, procId: int): query_result = ( ( await result_db.execute( select(MetaBloodAnalysis).where(MetaBloodAnalysis.proId == procId).distinct() ) ).scalars().all() ) return query_result