You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
155 lines
5.7 KiB
155 lines
5.7 KiB
2 days ago
|
from sqlalchemy import delete, select, update, desc,or_,not_,insert,and_,func
|
||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
from module_admin.entity.do.dataast_do import DataAssetInfoAppr
|
||
|
from module_admin.entity.do.data_ast_content_do import DataAssetInfo
|
||
|
from module_admin.entity.vo.dataast_vo import DataAstApprModel,DataAstInfoModel
|
||
|
from utils.page_util import PageUtil
|
||
|
from sqlalchemy.orm import aliased
|
||
|
from datetime import datetime
|
||
|
|
||
|
|
||
|
|
||
|
class DataAstDao:
|
||
|
@classmethod
|
||
|
async def get_dataast_appr_list(cls, flowId:str,db: AsyncSession):
|
||
|
filters = []
|
||
|
filters.append(DataAssetInfoAppr.flowId == flowId)
|
||
|
query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time))
|
||
|
return await PageUtil.paginate(db, query, 0, 0, False)
|
||
|
@classmethod
|
||
|
async def get_dataast_appr_list_Flow(cls, flowId:str,db: AsyncSession):
|
||
|
filters = []
|
||
|
filters.append(DataAssetInfoAppr.flowId == flowId)
|
||
|
query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time))
|
||
|
return await PageUtil.paginate(db, query, 0, 0, False)
|
||
|
|
||
|
|
||
|
|
||
|
@classmethod
|
||
|
async def add_dataast_appr(cls, db: AsyncSession, model: DataAstApprModel):
|
||
|
# 从model中获取ast_no
|
||
|
ast_no = model.ast_no
|
||
|
|
||
|
# 查询当前ast_no对应的最大版本号
|
||
|
from sqlalchemy import select, func
|
||
|
|
||
|
query = select(func.max(DataAssetInfoAppr.version_no)).where(
|
||
|
DataAssetInfoAppr.ast_no == ast_no
|
||
|
)
|
||
|
|
||
|
result = await db.execute(query)
|
||
|
max_version = result.scalar_one_or_none()
|
||
|
|
||
|
# 计算新的版本号
|
||
|
new_version = '1' if max_version is None else str(int(max_version) + 1)
|
||
|
|
||
|
# 创建对象时设置版本号
|
||
|
data = model.model_dump()
|
||
|
data['version_no'] = new_version
|
||
|
|
||
|
col = DataAssetInfoAppr(**data)
|
||
|
db.add(col)
|
||
|
await db.flush()
|
||
|
|
||
|
return col
|
||
|
|
||
|
|
||
|
|
||
|
@classmethod
|
||
|
async def add_dataast_data(cls, db: AsyncSession, model: DataAstInfoModel):
|
||
|
# 子查询:获取每个 ast_no 对应的最新已成功审批版本
|
||
|
subquery = (
|
||
|
select(
|
||
|
DataAssetInfoAppr.ast_no,
|
||
|
func.max(DataAssetInfoAppr.version_no).label("max_version")
|
||
|
)
|
||
|
.filter(
|
||
|
DataAssetInfoAppr.approStatus == "succeed"
|
||
|
)
|
||
|
.group_by(DataAssetInfoAppr.ast_no)
|
||
|
.subquery()
|
||
|
)
|
||
|
|
||
|
# 主查询:筛选符合条件的记录并映射到目标模型
|
||
|
query = (
|
||
|
select(DataAssetInfoAppr)
|
||
|
.join(
|
||
|
subquery,
|
||
|
and_(
|
||
|
DataAssetInfoAppr.ast_no == subquery.c.ast_no,
|
||
|
DataAssetInfoAppr.version_no == subquery.c.max_version
|
||
|
)
|
||
|
)
|
||
|
.filter(
|
||
|
DataAssetInfoAppr.data_ast_stat == "1",
|
||
|
DataAssetInfoAppr.changeType == "add",
|
||
|
DataAssetInfoAppr.approStatus == "succeed"
|
||
|
)
|
||
|
)
|
||
|
|
||
|
# 执行查询获取符合条件的记录
|
||
|
approved_records = await db.scalars(query)
|
||
|
|
||
|
# 收集需要处理的记录数据
|
||
|
records_to_upsert = []
|
||
|
for record in approved_records:
|
||
|
# 从源记录中提取需要的字段
|
||
|
asset_data = {
|
||
|
"data_ast_eng_name": record.data_ast_eng_name,
|
||
|
"data_ast_cn_name": record.data_ast_cn_name,
|
||
|
"data_ast_type": record.data_ast_type,
|
||
|
"data_ast_stat": record.data_ast_stat,
|
||
|
"data_ast_desc": record.data_ast_desc,
|
||
|
"data_ast_screen": record.data_ast_screen,
|
||
|
"data_ast_scren_clas": record.data_ast_scren_clas,
|
||
|
"data_ast_cont": record.data_ast_cont,
|
||
|
"data_ast_faq": record.data_ast_faq,
|
||
|
"data_ast_estb_time": record.data_ast_estb_time,
|
||
|
"data_ast_upd_time": datetime.now(), # 使用当前时间
|
||
|
"data_ast_src": record.data_ast_src,
|
||
|
"ast_no": record.ast_no,
|
||
|
"data_ast_clas": record.data_ast_clas
|
||
|
}
|
||
|
records_to_upsert.append(asset_data)
|
||
|
|
||
|
# 使用 bulk_insert_mappings 执行批量插入/更新
|
||
|
if records_to_upsert:
|
||
|
from sqlalchemy.dialects.mysql import insert
|
||
|
|
||
|
insert_stmt = insert(DataAssetInfo).values(records_to_upsert)
|
||
|
|
||
|
# 定义 ON DUPLICATE KEY UPDATE 子句
|
||
|
update_dict = {
|
||
|
c.name: insert_stmt.inserted[c.name]
|
||
|
for c in DataAssetInfo.__table__.columns
|
||
|
if c.name != 'data_ast_no' and c.name != 'ast_no' # 排除主键和业务主键
|
||
|
}
|
||
|
# 手动设置 upd_time 为当前时间
|
||
|
update_dict['data_ast_upd_time'] = datetime.now()
|
||
|
|
||
|
upsert_stmt = insert_stmt.on_duplicate_key_update(update_dict)
|
||
|
|
||
|
# 执行插入/更新语句
|
||
|
await db.execute(upsert_stmt)
|
||
|
|
||
|
# 提交会话使更改生效
|
||
|
await db.commit()
|
||
|
|
||
|
return "元数据成功发布到数据资产!"
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
@classmethod
|
||
|
async def update_data_ast_appr(cls, db: AsyncSession, update_data: DataAstInfoModel):
|
||
|
|
||
|
await db.execute(update(DataAssetInfoAppr), [update_data])
|
||
|
await db.flush()
|
||
|
|
||
|
|
||
|
@classmethod
|
||
|
async def get_ast_main_appr_list(cls, flowId:str,db: AsyncSession):
|
||
|
filters = []
|
||
|
filters.append(DataAssetInfoAppr.flowId == flowId)
|
||
|
query = select(DataAssetInfoAppr).where(*filters).order_by(desc(DataAssetInfoAppr.create_time))
|
||
|
return await PageUtil.paginate(db, query, 0, 0, False)
|