|
|
|
from sqlalchemy import delete, select, update, desc,or_,not_,insert,and_,text
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from module_admin.entity.do.dataast_do import DataAssetInfoAppr
|
|
|
|
from module_admin.entity.do.data_ast_content_do import DataAssetInfo
|
|
|
|
from module_admin.entity.vo.dataast_vo import DataAstApprModel,DataAstInfoModel
|
|
|
|
from utils.page_util import PageUtil
|
|
|
|
from sqlalchemy.orm import aliased
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DataAstDao:
|
|
|
|
@classmethod
|
|
|
|
async def get_dataast_appr_list(cls, flowId:str,db: AsyncSession):
|
|
|
|
filters = []
|
|
|
|
filters.append(DataAssetInfoAppr.flowId == flowId)
|
|
|
|
query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time))
|
|
|
|
return await PageUtil.paginate(db, query, 0, 0, False)
|
|
|
|
@classmethod
|
|
|
|
async def get_dataast_appr_list_Flow(cls, flowId:str,db: AsyncSession):
|
|
|
|
filters = []
|
|
|
|
filters.append(DataAssetInfoAppr.flowId == flowId)
|
|
|
|
query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time))
|
|
|
|
return await PageUtil.paginate(db, query, 0, 0, False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_dataast_appr(cls, db: AsyncSession, model: DataAstApprModel):
|
|
|
|
# 从model中获取ast_no
|
|
|
|
ast_no = model.ast_no
|
|
|
|
|
|
|
|
# 查询当前ast_no对应的最大版本号
|
|
|
|
from sqlalchemy import select, func
|
|
|
|
|
|
|
|
query = select(func.max(DataAssetInfoAppr.version_no)).where(
|
|
|
|
DataAssetInfoAppr.ast_no == ast_no
|
|
|
|
)
|
|
|
|
|
|
|
|
result = await db.execute(query)
|
|
|
|
max_version = result.scalar_one_or_none()
|
|
|
|
|
|
|
|
# 计算新的版本号
|
|
|
|
new_version = '1' if max_version is None else str(int(max_version) + 1)
|
|
|
|
|
|
|
|
# 创建对象时设置版本号
|
|
|
|
data = model.model_dump()
|
|
|
|
data['version_no'] = new_version
|
|
|
|
|
|
|
|
col = DataAssetInfoAppr(**data)
|
|
|
|
db.add(col)
|
|
|
|
await db.flush()
|
|
|
|
|
|
|
|
return col
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_dataast_data(cls, db: AsyncSession, model: DataAstInfoModel):
|
|
|
|
try:
|
|
|
|
# 构建 SQL 语句
|
|
|
|
upsert_sql = text("""
|
|
|
|
INSERT INTO t_data_ast_info (
|
|
|
|
data_ast_eng_name,
|
|
|
|
data_ast_cn_name,
|
|
|
|
data_ast_type,
|
|
|
|
data_ast_stat,
|
|
|
|
data_ast_desc,
|
|
|
|
data_ast_screen,
|
|
|
|
data_ast_scren_clas,
|
|
|
|
data_ast_cont,
|
|
|
|
data_ast_faq,
|
|
|
|
data_ast_estb_time,
|
|
|
|
data_ast_upd_time,
|
|
|
|
data_ast_src,
|
|
|
|
ast_no,
|
|
|
|
data_ast_clas
|
|
|
|
) VALUES (
|
|
|
|
:data_ast_eng_name,
|
|
|
|
:data_ast_cn_name,
|
|
|
|
:data_ast_type,
|
|
|
|
:data_ast_stat,
|
|
|
|
:data_ast_desc,
|
|
|
|
:data_ast_screen,
|
|
|
|
:data_ast_scren_clas,
|
|
|
|
:data_ast_cont,
|
|
|
|
:data_ast_faq,
|
|
|
|
:data_ast_estb_time,
|
|
|
|
:data_ast_upd_time,
|
|
|
|
:data_ast_src,
|
|
|
|
:ast_no,
|
|
|
|
:data_ast_clas
|
|
|
|
)
|
|
|
|
ON DUPLICATE KEY UPDATE
|
|
|
|
data_ast_eng_name = VALUES(data_ast_eng_name),
|
|
|
|
data_ast_cn_name = VALUES(data_ast_cn_name),
|
|
|
|
data_ast_type = VALUES(data_ast_type),
|
|
|
|
data_ast_stat = VALUES(data_ast_stat),
|
|
|
|
data_ast_desc = VALUES(data_ast_desc),
|
|
|
|
data_ast_screen = VALUES(data_ast_screen),
|
|
|
|
data_ast_scren_clas = VALUES(data_ast_scren_clas),
|
|
|
|
data_ast_cont = VALUES(data_ast_cont),
|
|
|
|
data_ast_faq = VALUES(data_ast_faq),
|
|
|
|
data_ast_estb_time = VALUES(data_ast_estb_time),
|
|
|
|
data_ast_upd_time = VALUES(data_ast_upd_time),
|
|
|
|
data_ast_src = VALUES(data_ast_src),
|
|
|
|
data_ast_clas = VALUES(data_ast_clas);
|
|
|
|
""")
|
|
|
|
|
|
|
|
model_dict = model.model_dump()
|
|
|
|
model_dict['data_ast_upd_time'] = datetime.now()
|
|
|
|
|
|
|
|
# 执行 UPSERT 操作
|
|
|
|
await db.execute(upsert_sql, model_dict)
|
|
|
|
|
|
|
|
# 提交事务
|
|
|
|
await db.commit()
|
|
|
|
|
|
|
|
return "元数据成功发布到数据资产!"
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
# 出现异常时回滚事务
|
|
|
|
await db.rollback()
|
|
|
|
raise e
|
|
|
|
# async def add_dataast_data(cls, db: AsyncSession, model: DataAstInfoModel):
|
|
|
|
# try:
|
|
|
|
# # 动态SQL:更新目标表中已存在的记录
|
|
|
|
# update_sql = text("""
|
|
|
|
# UPDATE t_data_ast_info target
|
|
|
|
# JOIN (
|
|
|
|
# SELECT
|
|
|
|
# appr.data_ast_eng_name,
|
|
|
|
# appr.data_ast_cn_name,
|
|
|
|
# appr.data_ast_type,
|
|
|
|
# appr.data_ast_stat,
|
|
|
|
# appr.data_ast_desc,
|
|
|
|
# appr.data_ast_screen,
|
|
|
|
# appr.data_ast_scren_clas,
|
|
|
|
# appr.data_ast_cont,
|
|
|
|
# appr.data_ast_faq,
|
|
|
|
# appr.data_ast_estb_time,
|
|
|
|
# NOW() AS data_ast_upd_time,
|
|
|
|
# appr.data_ast_src,
|
|
|
|
# appr.ast_no,
|
|
|
|
# appr.data_ast_clas
|
|
|
|
# FROM t_data_ast_info_appr appr
|
|
|
|
# INNER JOIN (
|
|
|
|
# SELECT
|
|
|
|
# ast_no,
|
|
|
|
# MAX(version_no) AS max_version
|
|
|
|
# FROM t_data_ast_info_appr
|
|
|
|
# WHERE approStatus = 'succeed'
|
|
|
|
# GROUP BY ast_no
|
|
|
|
# ) latest ON appr.ast_no = latest.ast_no AND appr.version_no = latest.max_version
|
|
|
|
# WHERE
|
|
|
|
# appr.data_ast_stat = '1'
|
|
|
|
# AND appr.changeType = 'add'
|
|
|
|
# AND appr.approStatus = 'succeed'
|
|
|
|
# ) source ON target.ast_no = source.ast_no
|
|
|
|
# SET
|
|
|
|
# target.data_ast_eng_name = source.data_ast_eng_name,
|
|
|
|
# target.data_ast_cn_name = source.data_ast_cn_name,
|
|
|
|
# target.data_ast_type = source.data_ast_type,
|
|
|
|
# target.data_ast_stat = source.data_ast_stat,
|
|
|
|
# target.data_ast_desc = source.data_ast_desc,
|
|
|
|
# target.data_ast_screen = source.data_ast_screen,
|
|
|
|
# target.data_ast_scren_clas = source.data_ast_scren_clas,
|
|
|
|
# target.data_ast_cont = source.data_ast_cont,
|
|
|
|
# target.data_ast_faq = source.data_ast_faq,
|
|
|
|
# target.data_ast_estb_time = source.data_ast_estb_time,
|
|
|
|
# target.data_ast_upd_time = source.data_ast_upd_time,
|
|
|
|
# target.data_ast_src = source.data_ast_src,
|
|
|
|
# target.data_ast_clas = source.data_ast_clas;
|
|
|
|
# """)
|
|
|
|
|
|
|
|
# # 动态SQL:插入目标表中不存在的新记录
|
|
|
|
# insert_sql = text("""
|
|
|
|
# INSERT INTO t_data_ast_info (
|
|
|
|
# data_ast_eng_name,
|
|
|
|
# data_ast_cn_name,
|
|
|
|
# data_ast_type,
|
|
|
|
# data_ast_stat,
|
|
|
|
# data_ast_desc,
|
|
|
|
# data_ast_screen,
|
|
|
|
# data_ast_scren_clas,
|
|
|
|
# data_ast_cont,
|
|
|
|
# data_ast_faq,
|
|
|
|
# data_ast_estb_time,
|
|
|
|
# data_ast_upd_time,
|
|
|
|
# data_ast_src,
|
|
|
|
# ast_no,
|
|
|
|
# data_ast_clas
|
|
|
|
# )
|
|
|
|
# SELECT
|
|
|
|
# source.data_ast_eng_name,
|
|
|
|
# source.data_ast_cn_name,
|
|
|
|
# source.data_ast_type,
|
|
|
|
# source.data_ast_stat,
|
|
|
|
# source.data_ast_desc,
|
|
|
|
# source.data_ast_screen,
|
|
|
|
# source.data_ast_scren_clas,
|
|
|
|
# source.data_ast_cont,
|
|
|
|
# source.data_ast_faq,
|
|
|
|
# source.data_ast_estb_time,
|
|
|
|
# source.data_ast_upd_time,
|
|
|
|
# source.data_ast_src,
|
|
|
|
# source.ast_no,
|
|
|
|
# source.data_ast_clas
|
|
|
|
# FROM (
|
|
|
|
# SELECT
|
|
|
|
# appr.data_ast_eng_name,
|
|
|
|
# appr.data_ast_cn_name,
|
|
|
|
# appr.data_ast_type,
|
|
|
|
# appr.data_ast_stat,
|
|
|
|
# appr.data_ast_desc,
|
|
|
|
# appr.data_ast_screen,
|
|
|
|
# appr.data_ast_scren_clas,
|
|
|
|
# appr.data_ast_cont,
|
|
|
|
# appr.data_ast_faq,
|
|
|
|
# appr.data_ast_estb_time,
|
|
|
|
# NOW() AS data_ast_upd_time,
|
|
|
|
# appr.data_ast_src,
|
|
|
|
# appr.ast_no,
|
|
|
|
# appr.data_ast_clas
|
|
|
|
# FROM t_data_ast_info_appr appr
|
|
|
|
# INNER JOIN (
|
|
|
|
# SELECT
|
|
|
|
# ast_no,
|
|
|
|
# MAX(version_no) AS max_version
|
|
|
|
# FROM t_data_ast_info_appr
|
|
|
|
# WHERE approStatus = 'succeed'
|
|
|
|
# GROUP BY ast_no
|
|
|
|
# ) latest ON appr.ast_no = latest.ast_no AND appr.version_no = latest.max_version
|
|
|
|
# WHERE
|
|
|
|
# appr.data_ast_stat = '1'
|
|
|
|
# AND appr.changeType = 'add'
|
|
|
|
# AND appr.approStatus = 'succeed'
|
|
|
|
# ) source
|
|
|
|
# LEFT JOIN t_data_ast_info target ON source.ast_no = target.ast_no
|
|
|
|
# WHERE target.ast_no IS NULL;
|
|
|
|
# """)
|
|
|
|
|
|
|
|
# # 执行更新SQL
|
|
|
|
# await db.execute(update_sql)
|
|
|
|
|
|
|
|
# # 执行插入SQL
|
|
|
|
# await db.execute(insert_sql)
|
|
|
|
|
|
|
|
# # 提交事务
|
|
|
|
# await db.commit()
|
|
|
|
|
|
|
|
# return "元数据成功发布到数据资产!"
|
|
|
|
|
|
|
|
# except Exception as e:
|
|
|
|
# # 回滚事务
|
|
|
|
# await db.rollback()
|
|
|
|
# raise e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def update_data_ast_appr(cls, db: AsyncSession, update_data: DataAstInfoModel):
|
|
|
|
|
|
|
|
await db.execute(update(DataAssetInfoAppr), [update_data])
|
|
|
|
await db.flush()
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_ast_main_appr_list(cls, flowId:str,db: AsyncSession):
|
|
|
|
filters = []
|
|
|
|
filters.append(DataAssetInfoAppr.flowId == flowId)
|
|
|
|
query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time))
|
|
|
|
return await PageUtil.paginate(db, query, 0, 0, False)
|