from sqlalchemy import delete, select, update, desc,or_,not_,insert,and_,func from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.dataast_do import DataAssetInfoAppr from module_admin.entity.do.data_ast_content_do import DataAssetInfo from module_admin.entity.vo.dataast_vo import DataAstApprModel,DataAstInfoModel from utils.page_util import PageUtil from sqlalchemy.orm import aliased from datetime import datetime class DataAstDao: @classmethod async def get_dataast_appr_list(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataAssetInfoAppr.flowId == flowId) query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def get_dataast_appr_list_Flow(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataAssetInfoAppr.flowId == flowId) query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def add_dataast_appr(cls, db: AsyncSession, model: DataAstApprModel): # 从model中获取ast_no ast_no = model.ast_no # 查询当前ast_no对应的最大版本号 from sqlalchemy import select, func query = select(func.max(DataAssetInfoAppr.version_no)).where( DataAssetInfoAppr.ast_no == ast_no ) result = await db.execute(query) max_version = result.scalar_one_or_none() # 计算新的版本号 new_version = '1' if max_version is None else str(int(max_version) + 1) # 创建对象时设置版本号 data = model.model_dump() data['version_no'] = new_version col = DataAssetInfoAppr(**data) db.add(col) await db.flush() return col @classmethod async def add_dataast_data(cls, db: AsyncSession, model: DataAstInfoModel): # 子查询:获取每个 ast_no 对应的最新已成功审批版本 subquery = ( select( DataAssetInfoAppr.ast_no, func.max(DataAssetInfoAppr.version_no).label("max_version") ) .filter( DataAssetInfoAppr.approStatus == "succeed" ) .group_by(DataAssetInfoAppr.ast_no) .subquery() ) # 主查询:筛选符合条件的记录并映射到目标模型 query = ( select(DataAssetInfoAppr) .join( subquery, and_( DataAssetInfoAppr.ast_no == subquery.c.ast_no, DataAssetInfoAppr.version_no == subquery.c.max_version ) ) .filter( DataAssetInfoAppr.data_ast_stat == "1", DataAssetInfoAppr.changeType == "add", DataAssetInfoAppr.approStatus == "succeed" ) ) # 执行查询获取符合条件的记录 approved_records = await db.scalars(query) # 收集需要处理的记录数据 records_to_upsert = [] for record in approved_records: # 从源记录中提取需要的字段 asset_data = { "data_ast_eng_name": record.data_ast_eng_name, "data_ast_cn_name": record.data_ast_cn_name, "data_ast_type": record.data_ast_type, "data_ast_stat": record.data_ast_stat, "data_ast_desc": record.data_ast_desc, "data_ast_screen": record.data_ast_screen, "data_ast_scren_clas": record.data_ast_scren_clas, "data_ast_cont": record.data_ast_cont, "data_ast_faq": record.data_ast_faq, "data_ast_estb_time": record.data_ast_estb_time, "data_ast_upd_time": datetime.now(), # 使用当前时间 "data_ast_src": record.data_ast_src, "ast_no": record.ast_no, "data_ast_clas": record.data_ast_clas } records_to_upsert.append(asset_data) # 使用 bulk_insert_mappings 执行批量插入/更新 if records_to_upsert: from sqlalchemy.dialects.mysql import insert insert_stmt = insert(DataAssetInfo).values(records_to_upsert) # 定义 ON DUPLICATE KEY UPDATE 子句 update_dict = { c.name: insert_stmt.inserted[c.name] for c in DataAssetInfo.__table__.columns if c.name != 'data_ast_no' and c.name != 'ast_no' # 排除主键和业务主键 } # 手动设置 upd_time 为当前时间 update_dict['data_ast_upd_time'] = datetime.now() upsert_stmt = insert_stmt.on_duplicate_key_update(update_dict) # 执行插入/更新语句 await db.execute(upsert_stmt) # 提交会话使更改生效 await db.commit() return "元数据成功发布到数据资产!" @classmethod async def update_data_ast_appr(cls, db: AsyncSession, update_data: DataAstInfoModel): await db.execute(update(DataAssetInfoAppr), [update_data]) await db.flush() @classmethod async def get_ast_main_appr_list(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataAssetInfoAppr.flowId == flowId) query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False)