|
|
@ -1,4 +1,4 @@ |
|
|
|
from sqlalchemy import delete, select, update, desc,or_,not_,insert,and_,func |
|
|
|
from sqlalchemy import delete, select, update, desc,or_,not_,insert,and_,text |
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession |
|
|
|
from module_admin.entity.do.dataast_do import DataAssetInfoAppr |
|
|
|
from module_admin.entity.do.data_ast_content_do import DataAssetInfo |
|
|
@ -55,87 +55,208 @@ class DataAstDao: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
async def add_dataast_data(cls, db: AsyncSession, model: DataAstInfoModel): |
|
|
|
# 子查询:获取每个 ast_no 对应的最新已成功审批版本 |
|
|
|
subquery = ( |
|
|
|
select( |
|
|
|
DataAssetInfoAppr.ast_no, |
|
|
|
func.max(DataAssetInfoAppr.version_no).label("max_version") |
|
|
|
) |
|
|
|
.filter( |
|
|
|
DataAssetInfoAppr.approStatus == "succeed" |
|
|
|
) |
|
|
|
.group_by(DataAssetInfoAppr.ast_no) |
|
|
|
.subquery() |
|
|
|
) |
|
|
|
|
|
|
|
# 主查询:筛选符合条件的记录并映射到目标模型 |
|
|
|
query = ( |
|
|
|
select(DataAssetInfoAppr) |
|
|
|
.join( |
|
|
|
subquery, |
|
|
|
and_( |
|
|
|
DataAssetInfoAppr.ast_no == subquery.c.ast_no, |
|
|
|
DataAssetInfoAppr.version_no == subquery.c.max_version |
|
|
|
try: |
|
|
|
# 构建 SQL 语句 |
|
|
|
upsert_sql = text(""" |
|
|
|
INSERT INTO t_data_ast_info ( |
|
|
|
data_ast_eng_name, |
|
|
|
data_ast_cn_name, |
|
|
|
data_ast_type, |
|
|
|
data_ast_stat, |
|
|
|
data_ast_desc, |
|
|
|
data_ast_screen, |
|
|
|
data_ast_scren_clas, |
|
|
|
data_ast_cont, |
|
|
|
data_ast_faq, |
|
|
|
data_ast_estb_time, |
|
|
|
data_ast_upd_time, |
|
|
|
data_ast_src, |
|
|
|
ast_no, |
|
|
|
data_ast_clas |
|
|
|
) VALUES ( |
|
|
|
:data_ast_eng_name, |
|
|
|
:data_ast_cn_name, |
|
|
|
:data_ast_type, |
|
|
|
:data_ast_stat, |
|
|
|
:data_ast_desc, |
|
|
|
:data_ast_screen, |
|
|
|
:data_ast_scren_clas, |
|
|
|
:data_ast_cont, |
|
|
|
:data_ast_faq, |
|
|
|
:data_ast_estb_time, |
|
|
|
:data_ast_upd_time, |
|
|
|
:data_ast_src, |
|
|
|
:ast_no, |
|
|
|
:data_ast_clas |
|
|
|
) |
|
|
|
) |
|
|
|
.filter( |
|
|
|
DataAssetInfoAppr.data_ast_stat == "1", |
|
|
|
DataAssetInfoAppr.changeType == "add", |
|
|
|
DataAssetInfoAppr.approStatus == "succeed" |
|
|
|
) |
|
|
|
) |
|
|
|
ON DUPLICATE KEY UPDATE |
|
|
|
data_ast_eng_name = VALUES(data_ast_eng_name), |
|
|
|
data_ast_cn_name = VALUES(data_ast_cn_name), |
|
|
|
data_ast_type = VALUES(data_ast_type), |
|
|
|
data_ast_stat = VALUES(data_ast_stat), |
|
|
|
data_ast_desc = VALUES(data_ast_desc), |
|
|
|
data_ast_screen = VALUES(data_ast_screen), |
|
|
|
data_ast_scren_clas = VALUES(data_ast_scren_clas), |
|
|
|
data_ast_cont = VALUES(data_ast_cont), |
|
|
|
data_ast_faq = VALUES(data_ast_faq), |
|
|
|
data_ast_estb_time = VALUES(data_ast_estb_time), |
|
|
|
data_ast_upd_time = VALUES(data_ast_upd_time), |
|
|
|
data_ast_src = VALUES(data_ast_src), |
|
|
|
data_ast_clas = VALUES(data_ast_clas); |
|
|
|
""") |
|
|
|
|
|
|
|
# 执行查询获取符合条件的记录 |
|
|
|
approved_records = await db.scalars(query) |
|
|
|
|
|
|
|
# 收集需要处理的记录数据 |
|
|
|
records_to_upsert = [] |
|
|
|
for record in approved_records: |
|
|
|
# 从源记录中提取需要的字段 |
|
|
|
asset_data = { |
|
|
|
"data_ast_eng_name": record.data_ast_eng_name, |
|
|
|
"data_ast_cn_name": record.data_ast_cn_name, |
|
|
|
"data_ast_type": record.data_ast_type, |
|
|
|
"data_ast_stat": record.data_ast_stat, |
|
|
|
"data_ast_desc": record.data_ast_desc, |
|
|
|
"data_ast_screen": record.data_ast_screen, |
|
|
|
"data_ast_scren_clas": record.data_ast_scren_clas, |
|
|
|
"data_ast_cont": record.data_ast_cont, |
|
|
|
"data_ast_faq": record.data_ast_faq, |
|
|
|
"data_ast_estb_time": record.data_ast_estb_time, |
|
|
|
"data_ast_upd_time": datetime.now(), # 使用当前时间 |
|
|
|
"data_ast_src": record.data_ast_src, |
|
|
|
"ast_no": record.ast_no, |
|
|
|
"data_ast_clas": record.data_ast_clas |
|
|
|
} |
|
|
|
records_to_upsert.append(asset_data) |
|
|
|
|
|
|
|
# 使用 bulk_insert_mappings 执行批量插入/更新 |
|
|
|
if records_to_upsert: |
|
|
|
from sqlalchemy.dialects.mysql import insert |
|
|
|
|
|
|
|
insert_stmt = insert(DataAssetInfo).values(records_to_upsert) |
|
|
|
|
|
|
|
# 定义 ON DUPLICATE KEY UPDATE 子句 |
|
|
|
update_dict = { |
|
|
|
c.name: insert_stmt.inserted[c.name] |
|
|
|
for c in DataAssetInfo.__table__.columns |
|
|
|
if c.name != 'data_ast_no' and c.name != 'ast_no' # 排除主键和业务主键 |
|
|
|
} |
|
|
|
# 手动设置 upd_time 为当前时间 |
|
|
|
update_dict['data_ast_upd_time'] = datetime.now() |
|
|
|
|
|
|
|
upsert_stmt = insert_stmt.on_duplicate_key_update(update_dict) |
|
|
|
model_dict = model.model_dump() |
|
|
|
model_dict['data_ast_upd_time'] = datetime.now() |
|
|
|
|
|
|
|
# 执行 UPSERT 操作 |
|
|
|
await db.execute(upsert_sql, model_dict) |
|
|
|
|
|
|
|
# 提交事务 |
|
|
|
await db.commit() |
|
|
|
|
|
|
|
return "元数据成功发布到数据资产!" |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
# 出现异常时回滚事务 |
|
|
|
await db.rollback() |
|
|
|
raise e |
|
|
|
# async def add_dataast_data(cls, db: AsyncSession, model: DataAstInfoModel): |
|
|
|
# try: |
|
|
|
# # 动态SQL:更新目标表中已存在的记录 |
|
|
|
# update_sql = text(""" |
|
|
|
# UPDATE t_data_ast_info target |
|
|
|
# JOIN ( |
|
|
|
# SELECT |
|
|
|
# appr.data_ast_eng_name, |
|
|
|
# appr.data_ast_cn_name, |
|
|
|
# appr.data_ast_type, |
|
|
|
# appr.data_ast_stat, |
|
|
|
# appr.data_ast_desc, |
|
|
|
# appr.data_ast_screen, |
|
|
|
# appr.data_ast_scren_clas, |
|
|
|
# appr.data_ast_cont, |
|
|
|
# appr.data_ast_faq, |
|
|
|
# appr.data_ast_estb_time, |
|
|
|
# NOW() AS data_ast_upd_time, |
|
|
|
# appr.data_ast_src, |
|
|
|
# appr.ast_no, |
|
|
|
# appr.data_ast_clas |
|
|
|
# FROM t_data_ast_info_appr appr |
|
|
|
# INNER JOIN ( |
|
|
|
# SELECT |
|
|
|
# ast_no, |
|
|
|
# MAX(version_no) AS max_version |
|
|
|
# FROM t_data_ast_info_appr |
|
|
|
# WHERE approStatus = 'succeed' |
|
|
|
# GROUP BY ast_no |
|
|
|
# ) latest ON appr.ast_no = latest.ast_no AND appr.version_no = latest.max_version |
|
|
|
# WHERE |
|
|
|
# appr.data_ast_stat = '1' |
|
|
|
# AND appr.changeType = 'add' |
|
|
|
# AND appr.approStatus = 'succeed' |
|
|
|
# ) source ON target.ast_no = source.ast_no |
|
|
|
# SET |
|
|
|
# target.data_ast_eng_name = source.data_ast_eng_name, |
|
|
|
# target.data_ast_cn_name = source.data_ast_cn_name, |
|
|
|
# target.data_ast_type = source.data_ast_type, |
|
|
|
# target.data_ast_stat = source.data_ast_stat, |
|
|
|
# target.data_ast_desc = source.data_ast_desc, |
|
|
|
# target.data_ast_screen = source.data_ast_screen, |
|
|
|
# target.data_ast_scren_clas = source.data_ast_scren_clas, |
|
|
|
# target.data_ast_cont = source.data_ast_cont, |
|
|
|
# target.data_ast_faq = source.data_ast_faq, |
|
|
|
# target.data_ast_estb_time = source.data_ast_estb_time, |
|
|
|
# target.data_ast_upd_time = source.data_ast_upd_time, |
|
|
|
# target.data_ast_src = source.data_ast_src, |
|
|
|
# target.data_ast_clas = source.data_ast_clas; |
|
|
|
# """) |
|
|
|
|
|
|
|
# # 动态SQL:插入目标表中不存在的新记录 |
|
|
|
# insert_sql = text(""" |
|
|
|
# INSERT INTO t_data_ast_info ( |
|
|
|
# data_ast_eng_name, |
|
|
|
# data_ast_cn_name, |
|
|
|
# data_ast_type, |
|
|
|
# data_ast_stat, |
|
|
|
# data_ast_desc, |
|
|
|
# data_ast_screen, |
|
|
|
# data_ast_scren_clas, |
|
|
|
# data_ast_cont, |
|
|
|
# data_ast_faq, |
|
|
|
# data_ast_estb_time, |
|
|
|
# data_ast_upd_time, |
|
|
|
# data_ast_src, |
|
|
|
# ast_no, |
|
|
|
# data_ast_clas |
|
|
|
# ) |
|
|
|
# SELECT |
|
|
|
# source.data_ast_eng_name, |
|
|
|
# source.data_ast_cn_name, |
|
|
|
# source.data_ast_type, |
|
|
|
# source.data_ast_stat, |
|
|
|
# source.data_ast_desc, |
|
|
|
# source.data_ast_screen, |
|
|
|
# source.data_ast_scren_clas, |
|
|
|
# source.data_ast_cont, |
|
|
|
# source.data_ast_faq, |
|
|
|
# source.data_ast_estb_time, |
|
|
|
# source.data_ast_upd_time, |
|
|
|
# source.data_ast_src, |
|
|
|
# source.ast_no, |
|
|
|
# source.data_ast_clas |
|
|
|
# FROM ( |
|
|
|
# SELECT |
|
|
|
# appr.data_ast_eng_name, |
|
|
|
# appr.data_ast_cn_name, |
|
|
|
# appr.data_ast_type, |
|
|
|
# appr.data_ast_stat, |
|
|
|
# appr.data_ast_desc, |
|
|
|
# appr.data_ast_screen, |
|
|
|
# appr.data_ast_scren_clas, |
|
|
|
# appr.data_ast_cont, |
|
|
|
# appr.data_ast_faq, |
|
|
|
# appr.data_ast_estb_time, |
|
|
|
# NOW() AS data_ast_upd_time, |
|
|
|
# appr.data_ast_src, |
|
|
|
# appr.ast_no, |
|
|
|
# appr.data_ast_clas |
|
|
|
# FROM t_data_ast_info_appr appr |
|
|
|
# INNER JOIN ( |
|
|
|
# SELECT |
|
|
|
# ast_no, |
|
|
|
# MAX(version_no) AS max_version |
|
|
|
# FROM t_data_ast_info_appr |
|
|
|
# WHERE approStatus = 'succeed' |
|
|
|
# GROUP BY ast_no |
|
|
|
# ) latest ON appr.ast_no = latest.ast_no AND appr.version_no = latest.max_version |
|
|
|
# WHERE |
|
|
|
# appr.data_ast_stat = '1' |
|
|
|
# AND appr.changeType = 'add' |
|
|
|
# AND appr.approStatus = 'succeed' |
|
|
|
# ) source |
|
|
|
# LEFT JOIN t_data_ast_info target ON source.ast_no = target.ast_no |
|
|
|
# WHERE target.ast_no IS NULL; |
|
|
|
# """) |
|
|
|
|
|
|
|
# # 执行更新SQL |
|
|
|
# await db.execute(update_sql) |
|
|
|
|
|
|
|
# # 执行插入SQL |
|
|
|
# await db.execute(insert_sql) |
|
|
|
|
|
|
|
# # 提交事务 |
|
|
|
# await db.commit() |
|
|
|
|
|
|
|
# 执行插入/更新语句 |
|
|
|
await db.execute(upsert_stmt) |
|
|
|
|
|
|
|
# 提交会话使更改生效 |
|
|
|
await db.commit() |
|
|
|
# return "元数据成功发布到数据资产!" |
|
|
|
|
|
|
|
return "元数据成功发布到数据资产!" |
|
|
|
# except Exception as e: |
|
|
|
# # 回滚事务 |
|
|
|
# await db.rollback() |
|
|
|
# raise e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|