from sqlalchemy import delete, select, update, desc,or_,not_,insert,and_,text from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.dataast_do import DataAssetInfoAppr from module_admin.entity.do.data_ast_content_do import DataAssetInfo from module_admin.entity.vo.dataast_vo import DataAstApprModel,DataAstInfoModel from utils.page_util import PageUtil from sqlalchemy.orm import aliased from datetime import datetime class DataAstDao: @classmethod async def get_dataast_appr_list(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataAssetInfoAppr.flowId == flowId) query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def get_dataast_appr_list_Flow(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataAssetInfoAppr.flowId == flowId) query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def add_dataast_appr(cls, db: AsyncSession, model: DataAstApprModel): # 从model中获取ast_no ast_no = model.ast_no # 查询当前ast_no对应的最大版本号 from sqlalchemy import select, func query = select(func.max(DataAssetInfoAppr.version_no)).where( DataAssetInfoAppr.ast_no == ast_no ) result = await db.execute(query) max_version = result.scalar_one_or_none() # 计算新的版本号 new_version = '1' if max_version is None else str(int(max_version) + 1) # 创建对象时设置版本号 data = model.model_dump() data['version_no'] = new_version col = DataAssetInfoAppr(**data) db.add(col) await db.flush() return col @classmethod async def add_dataast_data(cls, db: AsyncSession, model: DataAstInfoModel): try: # 构建 SQL 语句 upsert_sql = text(""" INSERT INTO t_data_ast_info ( data_ast_eng_name, data_ast_cn_name, data_ast_type, data_ast_stat, data_ast_desc, data_ast_screen, data_ast_scren_clas, data_ast_cont, data_ast_faq, data_ast_estb_time, data_ast_upd_time, data_ast_src, ast_no, data_ast_clas ) VALUES ( :data_ast_eng_name, :data_ast_cn_name, :data_ast_type, :data_ast_stat, :data_ast_desc, :data_ast_screen, :data_ast_scren_clas, :data_ast_cont, :data_ast_faq, :data_ast_estb_time, :data_ast_upd_time, :data_ast_src, :ast_no, :data_ast_clas ) ON DUPLICATE KEY UPDATE data_ast_eng_name = VALUES(data_ast_eng_name), data_ast_cn_name = VALUES(data_ast_cn_name), data_ast_type = VALUES(data_ast_type), data_ast_stat = VALUES(data_ast_stat), data_ast_desc = VALUES(data_ast_desc), data_ast_screen = VALUES(data_ast_screen), data_ast_scren_clas = VALUES(data_ast_scren_clas), data_ast_cont = VALUES(data_ast_cont), data_ast_faq = VALUES(data_ast_faq), data_ast_estb_time = VALUES(data_ast_estb_time), data_ast_upd_time = VALUES(data_ast_upd_time), data_ast_src = VALUES(data_ast_src), data_ast_clas = VALUES(data_ast_clas); """) model_dict = model.model_dump() model_dict['data_ast_upd_time'] = datetime.now() # 执行 UPSERT 操作 await db.execute(upsert_sql, model_dict) # 提交事务 await db.commit() return "元数据成功发布到数据资产!" except Exception as e: # 出现异常时回滚事务 await db.rollback() raise e # async def add_dataast_data(cls, db: AsyncSession, model: DataAstInfoModel): # try: # # 动态SQL:更新目标表中已存在的记录 # update_sql = text(""" # UPDATE t_data_ast_info target # JOIN ( # SELECT # appr.data_ast_eng_name, # appr.data_ast_cn_name, # appr.data_ast_type, # appr.data_ast_stat, # appr.data_ast_desc, # appr.data_ast_screen, # appr.data_ast_scren_clas, # appr.data_ast_cont, # appr.data_ast_faq, # appr.data_ast_estb_time, # NOW() AS data_ast_upd_time, # appr.data_ast_src, # appr.ast_no, # appr.data_ast_clas # FROM t_data_ast_info_appr appr # INNER JOIN ( # SELECT # ast_no, # MAX(version_no) AS max_version # FROM t_data_ast_info_appr # WHERE approStatus = 'succeed' # GROUP BY ast_no # ) latest ON appr.ast_no = latest.ast_no AND appr.version_no = latest.max_version # WHERE # appr.data_ast_stat = '1' # AND appr.changeType = 'add' # AND appr.approStatus = 'succeed' # ) source ON target.ast_no = source.ast_no # SET # target.data_ast_eng_name = source.data_ast_eng_name, # target.data_ast_cn_name = source.data_ast_cn_name, # target.data_ast_type = source.data_ast_type, # target.data_ast_stat = source.data_ast_stat, # target.data_ast_desc = source.data_ast_desc, # target.data_ast_screen = source.data_ast_screen, # target.data_ast_scren_clas = source.data_ast_scren_clas, # target.data_ast_cont = source.data_ast_cont, # target.data_ast_faq = source.data_ast_faq, # target.data_ast_estb_time = source.data_ast_estb_time, # target.data_ast_upd_time = source.data_ast_upd_time, # target.data_ast_src = source.data_ast_src, # target.data_ast_clas = source.data_ast_clas; # """) # # 动态SQL:插入目标表中不存在的新记录 # insert_sql = text(""" # INSERT INTO t_data_ast_info ( # data_ast_eng_name, # data_ast_cn_name, # data_ast_type, # data_ast_stat, # data_ast_desc, # data_ast_screen, # data_ast_scren_clas, # data_ast_cont, # data_ast_faq, # data_ast_estb_time, # data_ast_upd_time, # data_ast_src, # ast_no, # data_ast_clas # ) # SELECT # source.data_ast_eng_name, # source.data_ast_cn_name, # source.data_ast_type, # source.data_ast_stat, # source.data_ast_desc, # source.data_ast_screen, # source.data_ast_scren_clas, # source.data_ast_cont, # source.data_ast_faq, # source.data_ast_estb_time, # source.data_ast_upd_time, # source.data_ast_src, # source.ast_no, # source.data_ast_clas # FROM ( # SELECT # appr.data_ast_eng_name, # appr.data_ast_cn_name, # appr.data_ast_type, # appr.data_ast_stat, # appr.data_ast_desc, # appr.data_ast_screen, # appr.data_ast_scren_clas, # appr.data_ast_cont, # appr.data_ast_faq, # appr.data_ast_estb_time, # NOW() AS data_ast_upd_time, # appr.data_ast_src, # appr.ast_no, # appr.data_ast_clas # FROM t_data_ast_info_appr appr # INNER JOIN ( # SELECT # ast_no, # MAX(version_no) AS max_version # FROM t_data_ast_info_appr # WHERE approStatus = 'succeed' # GROUP BY ast_no # ) latest ON appr.ast_no = latest.ast_no AND appr.version_no = latest.max_version # WHERE # appr.data_ast_stat = '1' # AND appr.changeType = 'add' # AND appr.approStatus = 'succeed' # ) source # LEFT JOIN t_data_ast_info target ON source.ast_no = target.ast_no # WHERE target.ast_no IS NULL; # """) # # 执行更新SQL # await db.execute(update_sql) # # 执行插入SQL # await db.execute(insert_sql) # # 提交事务 # await db.commit() # return "元数据成功发布到数据资产!" # except Exception as e: # # 回滚事务 # await db.rollback() # raise e @classmethod async def update_data_ast_appr(cls, db: AsyncSession, update_data: DataAstInfoModel): await db.execute(update(DataAssetInfoAppr), [update_data]) await db.flush() @classmethod async def get_ast_main_appr_list(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataAssetInfoAppr.flowId == flowId) query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False)