You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

140 lines
5.0 KiB

from sqlalchemy import delete, select, update, desc,or_,not_,insert,and_,text
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.dataast_do import DataAssetInfoAppr
from module_admin.entity.do.data_ast_content_do import DataAssetInfo
from module_admin.entity.vo.dataast_vo import DataAstApprModel,DataAstInfoModel
from utils.page_util import PageUtil
from sqlalchemy.orm import aliased
from datetime import datetime
class DataAstDao:
@classmethod
async def get_dataast_appr_list(cls, flowId:str,db: AsyncSession):
filters = []
filters.append(DataAssetInfoAppr.flowId == flowId)
query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time))
return await PageUtil.paginate(db, query, 0, 0, False)
@classmethod
async def get_dataast_appr_list_Flow(cls, flowId:str,db: AsyncSession):
filters = []
filters.append(DataAssetInfoAppr.flowId == flowId)
query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time))
return await PageUtil.paginate(db, query, 0, 0, False)
@classmethod
async def add_dataast_appr(cls, db: AsyncSession, model: DataAstApprModel):
# 从model中获取ast_no
ast_no = model.ast_no
# 查询当前ast_no对应的最大版本号
from sqlalchemy import select, func
query = select(func.max(DataAssetInfoAppr.version_no)).where(
DataAssetInfoAppr.ast_no == ast_no
)
result = await db.execute(query)
max_version = result.scalar_one_or_none()
# 计算新的版本号
new_version = '1' if max_version is None else str(int(max_version) + 1)
# 创建对象时设置版本号
data = model.model_dump()
data['version_no'] = new_version
col = DataAssetInfoAppr(**data)
db.add(col)
await db.flush()
return col
@classmethod
async def add_dataast_data(cls, db: AsyncSession, model: DataAstInfoModel):
try:
# 构建 SQL 语句
upsert_sql = text("""
INSERT INTO t_data_ast_info (
data_ast_eng_name,
data_ast_cn_name,
data_ast_type,
data_ast_stat,
data_ast_desc,
data_ast_screen,
data_ast_scren_clas,
data_ast_cont,
data_ast_faq,
data_ast_estb_time,
data_ast_upd_time,
data_ast_src,
ast_no,
data_ast_clas
) VALUES (
:data_ast_eng_name,
:data_ast_cn_name,
:data_ast_type,
:data_ast_stat,
:data_ast_desc,
:data_ast_screen,
:data_ast_scren_clas,
:data_ast_cont,
:data_ast_faq,
:data_ast_estb_time,
:data_ast_upd_time,
:data_ast_src,
:ast_no,
:data_ast_clas
)
ON DUPLICATE KEY UPDATE
data_ast_eng_name = VALUES(data_ast_eng_name),
data_ast_cn_name = VALUES(data_ast_cn_name),
data_ast_type = VALUES(data_ast_type),
data_ast_stat = VALUES(data_ast_stat),
data_ast_desc = VALUES(data_ast_desc),
data_ast_screen = VALUES(data_ast_screen),
data_ast_scren_clas = VALUES(data_ast_scren_clas),
data_ast_cont = VALUES(data_ast_cont),
data_ast_faq = VALUES(data_ast_faq),
data_ast_estb_time = VALUES(data_ast_estb_time),
data_ast_upd_time = VALUES(data_ast_upd_time),
data_ast_src = VALUES(data_ast_src),
data_ast_clas = VALUES(data_ast_clas);
""")
model_dict = model.model_dump()
model_dict['data_ast_upd_time'] = datetime.now()
# 执行 UPSERT 操作
await db.execute(upsert_sql, model_dict)
# 提交事务
await db.commit()
return "元数据成功发布到数据资产!"
except Exception as e:
# 出现异常时回滚事务
await db.rollback()
raise e
@classmethod
async def update_data_ast_appr(cls, db: AsyncSession, update_data: DataAstInfoModel):
await db.execute(update(DataAssetInfoAppr), [update_data])
await db.flush()
@classmethod
async def get_ast_main_appr_list(cls, flowId:str,db: AsyncSession):
filters = []
filters.append(DataAssetInfoAppr.flowId == flowId)
query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time))
return await PageUtil.paginate(db, query, 0, 0, False)