|
|
|
from datetime import datetime, time
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from module_admin.entity.do.meta_do import MetadataClas, MetadataExtractInfo # ORM 类
|
|
|
|
from module_admin.entity.do.metadata_config_do import MetadataSec, SecuBizConfig, SecuBizPermiConfig, SecuBizConfigRela, \
|
|
|
|
DatasecConfig, \
|
|
|
|
TaskBizConfigRela, TaskBizConfig
|
|
|
|
# ORM 类
|
|
|
|
from typing import List
|
|
|
|
from module_admin.entity.do.metadata_config_do import DataAstContent, DataAstContentRela
|
|
|
|
from sqlalchemy.orm import aliased
|
|
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from sqlalchemy.sql import true
|
|
|
|
from utils.page_util import PageUtil
|
|
|
|
from module_admin.entity.vo.data_ast_content_vo import DataCatalogPageQueryModel, DeleteDataCatalogModel, \
|
|
|
|
DataCatalogChild
|
|
|
|
from sqlalchemy import delete, select, update, desc, or_, not_, and_
|
|
|
|
|
|
|
|
class MetadataConfigDao:
|
|
|
|
"""
|
|
|
|
标签信息数据库操作层
|
|
|
|
"""
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_clas_detail_by_id(cls, db: AsyncSession, clas_id: int):
|
|
|
|
"""
|
|
|
|
根据标签序号获取标签详细信息
|
|
|
|
"""
|
|
|
|
result = await db.execute(select(MetadataClas).where(MetadataClas.clas_onum == clas_id))
|
|
|
|
return result.scalars().first()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_clas_detail_by_info(cls, db: AsyncSession, clas):
|
|
|
|
"""
|
|
|
|
根据标签参数获取标签信息(根据 MetadataClasModel 实例)
|
|
|
|
"""
|
|
|
|
result = await db.execute(
|
|
|
|
select(MetadataClas).where(
|
|
|
|
MetadataClas.clas_name == clas.clas_name if clas.clas_name else True,
|
|
|
|
MetadataClas.clas_eff_flag == clas.clas_eff_flag if clas.clas_eff_flag else True,
|
|
|
|
MetadataClas.clas_onum == clas.clas_onum if clas.clas_onum else True,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
return result.scalars().first()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_clas_by_onum(cls, db: AsyncSession, clas_onum: int):
|
|
|
|
"""
|
|
|
|
根据主键 clas_onum 获取分类信息
|
|
|
|
"""
|
|
|
|
result = await db.execute(
|
|
|
|
select(MetadataClas).where(MetadataClas.clas_onum == clas_onum)
|
|
|
|
)
|
|
|
|
return result.scalar_one_or_none()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_metadata_clas_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
|
|
"""
|
|
|
|
获取标签信息列表(支持分页)
|
|
|
|
"""
|
|
|
|
query = select(MetadataClas).where(
|
|
|
|
MetadataClas.belt_batch_content==query_object.belt_batch_content if query_object.belt_batch_content else True,
|
|
|
|
MetadataClas.clas_name.like(f"%{query_object.clas_name}%") if query_object.clas_name else True,
|
|
|
|
MetadataClas.clas_eff_flag == query_object.clas_eff_flag if query_object.clas_eff_flag else True,
|
|
|
|
MetadataClas.upd_time.between(
|
|
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
|
|
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
|
|
|
|
)
|
|
|
|
if query_object.begin_time and query_object.end_time else True,
|
|
|
|
).order_by(MetadataClas.clas_onum).distinct()
|
|
|
|
|
|
|
|
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_metadata_clas_dao(cls, db: AsyncSession, clas):
|
|
|
|
"""
|
|
|
|
新增标签信息
|
|
|
|
"""
|
|
|
|
db_clas = MetadataClas(**clas.model_dump())
|
|
|
|
db.add(db_clas)
|
|
|
|
await db.flush()
|
|
|
|
return db_clas
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def edit_metadata_clas_dao(cls, db: AsyncSession, clas_id: int, update_data: dict):
|
|
|
|
"""
|
|
|
|
修改标签信息
|
|
|
|
"""
|
|
|
|
await db.execute(
|
|
|
|
update(MetadataClas)
|
|
|
|
.where(MetadataClas.clas_onum == clas_id)
|
|
|
|
.values(**update_data)
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def delete_metadata_clas_dao(cls, db: AsyncSession, clas_onum_list: list[int]):
|
|
|
|
"""
|
|
|
|
删除标签信息(支持批量)
|
|
|
|
"""
|
|
|
|
await db.execute(delete(MetadataClas).where(MetadataClas.clas_onum.in_(clas_onum_list)))
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_sec_detail_by_id(cls, db: AsyncSession, onum: str):
|
|
|
|
result = await db.execute(select(MetadataSec).where(MetadataSec.onum == onum))
|
|
|
|
return result.scalars().first()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_metadata_sec_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
|
|
query = select(MetadataSec).where(
|
|
|
|
MetadataSec.sec_level_name.like(
|
|
|
|
f"%{query_object.sec_level_name}%") if query_object.sec_level_name else True,
|
|
|
|
MetadataSec.sec_eff_flag == query_object.sec_eff_flag if query_object.sec_eff_flag else True,
|
|
|
|
MetadataSec.upd_time.between(
|
|
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
|
|
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
|
|
|
|
) if query_object.begin_time and query_object.end_time else True,
|
|
|
|
).order_by(MetadataSec.upd_time.desc()).distinct()
|
|
|
|
|
|
|
|
from utils.page_util import PageUtil
|
|
|
|
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_metadata_sec_option_list(cls, db: AsyncSession):
|
|
|
|
query = select(MetadataSec).where(MetadataSec.sec_eff_flag == '1').order_by(MetadataSec.sec_level_summary)
|
|
|
|
result = (await db.execute(query)).scalars().all()
|
|
|
|
return result
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_metadata_sec_dao(cls, db: AsyncSession, sec):
|
|
|
|
db_sec = MetadataSec(**sec.model_dump())
|
|
|
|
db.add(db_sec)
|
|
|
|
await db.flush()
|
|
|
|
return db_sec
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_by_sec_level_summary(cls, query_db: AsyncSession, sec_level_summary: str, exclude_onum: str = None):
|
|
|
|
stmt = select(MetadataSec).where(MetadataSec.sec_level_summary == sec_level_summary)
|
|
|
|
if exclude_onum:
|
|
|
|
stmt = stmt.where(MetadataSec.onum != exclude_onum)
|
|
|
|
result = await query_db.execute(stmt)
|
|
|
|
return result.scalars().first()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def edit_metadata_sec_dao(cls, db: AsyncSession, onum: str, update_data: dict):
|
|
|
|
await db.execute(
|
|
|
|
update(MetadataSec).where(MetadataSec.onum == onum).values(**update_data)
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def delete_metadata_sec_dao(cls, db: AsyncSession, onum_list: list[str]):
|
|
|
|
await db.execute(delete(MetadataSec).where(MetadataSec.onum.in_(onum_list)))
|
|
|
|
|
|
|
|
# ------------- t_secu_biz_config 相关方法 --------------
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_biz_config_detail_by_id(cls, db: AsyncSession, onum: int):
|
|
|
|
result = await db.execute(select(SecuBizConfig).where(SecuBizConfig.onum == onum))
|
|
|
|
return result.scalars().first()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_biz_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
|
|
query = select(SecuBizConfig).where(
|
|
|
|
SecuBizConfig.risk_lvl.like(f"%{query_object.risk_lvl}%") if query_object.risk_lvl else True,
|
|
|
|
SecuBizConfig.isStop == query_object.isStop if query_object.isStop is not None else True,
|
|
|
|
SecuBizConfig.create_time.between(
|
|
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
|
|
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max),
|
|
|
|
) if query_object.begin_time and query_object.end_time else True,
|
|
|
|
).order_by(SecuBizConfig.create_time.desc())
|
|
|
|
|
|
|
|
from utils.page_util import PageUtil
|
|
|
|
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_biz_config(cls, db: AsyncSession, obj):
|
|
|
|
db_obj = SecuBizConfig(**obj.model_dump())
|
|
|
|
db.add(db_obj)
|
|
|
|
await db.flush()
|
|
|
|
return db_obj
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def edit_biz_config(cls, db: AsyncSession, onum: int, update_data: dict):
|
|
|
|
await db.execute(
|
|
|
|
update(SecuBizConfig).where(SecuBizConfig.onum == onum).values(**update_data)
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def delete_biz_config(cls, db: AsyncSession, onum_list: list[int]):
|
|
|
|
await db.execute(delete(SecuBizConfig).where(SecuBizConfig.onum.in_(onum_list)))
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_biz_permi_config_batch(cls, db: AsyncSession, records: List[SecuBizPermiConfig]):
|
|
|
|
orm_objs = []
|
|
|
|
for rec in records:
|
|
|
|
orm_obj = SecuBizPermiConfig(
|
|
|
|
biz_onum=rec.biz_onum,
|
|
|
|
obj_type=rec.obj_type,
|
|
|
|
obj_value=rec.obj_value,
|
|
|
|
obj_name=rec.obj_name,
|
|
|
|
isStop=rec.isStop,
|
|
|
|
create_by=rec.create_by,
|
|
|
|
create_time=rec.create_time,
|
|
|
|
update_by=rec.update_by,
|
|
|
|
update_time=rec.update_time,
|
|
|
|
)
|
|
|
|
orm_objs.append(orm_obj)
|
|
|
|
db.add_all(orm_objs)
|
|
|
|
|
|
|
|
# ------------- t_secu_biz_permi_config 相关方法 --------------
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_biz_permi_config_detail_by_id(cls, db: AsyncSession, onum: int):
|
|
|
|
result = await db.execute(select(SecuBizPermiConfig).where(SecuBizPermiConfig.onum == onum))
|
|
|
|
return result.scalars().first()
|
|
|
|
|
|
|
|
# @classmethod
|
|
|
|
# async def get_biz_permi_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
|
|
# query = select(SecuBizPermiConfig).where(
|
|
|
|
# SecuBizPermiConfig.obj_value.like(f"%{query_object.obj_value}%") if query_object.obj_value else True,
|
|
|
|
# SecuBizPermiConfig.obj_type == query_object.obj_type if query_object.obj_type else True,
|
|
|
|
# SecuBizPermiConfig.isStop == query_object.isStop if query_object.isStop is not None else True,
|
|
|
|
# SecuBizPermiConfig.create_time.between(
|
|
|
|
# datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
|
|
|
|
# datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max),
|
|
|
|
# ) if query_object.begin_time and query_object.end_time else True,
|
|
|
|
# ).order_by(SecuBizPermiConfig.create_time.desc())
|
|
|
|
|
|
|
|
# from utils.page_util import PageUtil
|
|
|
|
# return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
@classmethod
|
|
|
|
async def get_biz_permi_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
|
|
# 构建基础查询
|
|
|
|
query = (
|
|
|
|
select(
|
|
|
|
SecuBizPermiConfig.onum,
|
|
|
|
SecuBizPermiConfig.biz_onum,
|
|
|
|
SecuBizPermiConfig.obj_type,
|
|
|
|
SecuBizPermiConfig.obj_value,
|
|
|
|
SecuBizPermiConfig.obj_name,
|
|
|
|
SecuBizPermiConfig.isStop,
|
|
|
|
SecuBizPermiConfig.create_by,
|
|
|
|
SecuBizPermiConfig.create_time,
|
|
|
|
SecuBizPermiConfig.update_by,
|
|
|
|
SecuBizPermiConfig.update_time,
|
|
|
|
SecuBizConfig.biz_name,
|
|
|
|
MetadataSec.sec_level_summary
|
|
|
|
)
|
|
|
|
.join(SecuBizConfig, SecuBizPermiConfig.biz_onum == SecuBizConfig.onum, isouter=True)
|
|
|
|
.join(MetadataSec, SecuBizConfig.risk_lvl == MetadataSec.onum, isouter=True)
|
|
|
|
.where(
|
|
|
|
SecuBizPermiConfig.obj_value.like(f"%{query_object.obj_value}%") if query_object.obj_value else true(),
|
|
|
|
SecuBizPermiConfig.obj_name.like(f"%{query_object.obj_name}%") if query_object.obj_name else true(),
|
|
|
|
SecuBizPermiConfig.obj_type == query_object.obj_type if query_object.obj_type else true(),
|
|
|
|
SecuBizPermiConfig.isStop == query_object.isStop if query_object.isStop is not None else true(),
|
|
|
|
SecuBizPermiConfig.create_time.between(
|
|
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
|
|
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max),
|
|
|
|
) if query_object.begin_time and query_object.end_time else true()
|
|
|
|
)
|
|
|
|
.order_by(SecuBizPermiConfig.create_time.desc())
|
|
|
|
)
|
|
|
|
|
|
|
|
return await PageUtil.paginate(
|
|
|
|
db, query, query_object.page_num, query_object.page_size, is_page
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_biz_permi_config(cls, db: AsyncSession, obj):
|
|
|
|
db_obj = SecuBizPermiConfig(**obj.model_dump())
|
|
|
|
db.add(db_obj)
|
|
|
|
await db.flush()
|
|
|
|
return db_obj
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def edit_biz_permi_config(cls, db: AsyncSession, onum: int, update_data: dict):
|
|
|
|
await db.execute(
|
|
|
|
update(SecuBizPermiConfig).where(SecuBizPermiConfig.onum == onum).values(**update_data)
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def delete_biz_permi_config(cls, db: AsyncSession, onum_list: list[int]):
|
|
|
|
await db.execute(delete(SecuBizPermiConfig).where(SecuBizPermiConfig.onum.in_(onum_list)))
|
|
|
|
|
|
|
|
# ----------------------------------biz_confg_rela------------------------------
|
|
|
|
@classmethod
|
|
|
|
async def delete_biz_rela_dao(cls, db: AsyncSession, biz_onum: int):
|
|
|
|
await db.execute(delete(SecuBizConfigRela).where(SecuBizConfigRela.biz_onum == biz_onum))
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def delete_biz_rela_batch(cls, db: AsyncSession, bonum_list: list[int]):
|
|
|
|
await db.execute(delete(SecuBizConfigRela).where(SecuBizConfigRela.biz_onum.in_(bonum_list)))
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_batch_biz_rela_dao(cls, db: AsyncSession, rela_list: List[SecuBizConfigRela]):
|
|
|
|
db.add_all(rela_list)
|
|
|
|
await db.flush()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_biz_rela_by_biz_id(cls, db: AsyncSession, biz_onum: int):
|
|
|
|
result = await db.execute(
|
|
|
|
select(MetadataExtractInfo)
|
|
|
|
.join(SecuBizConfigRela, MetadataExtractInfo.onum == SecuBizConfigRela.tab_onum)
|
|
|
|
.where(SecuBizConfigRela.biz_onum == biz_onum)
|
|
|
|
)
|
|
|
|
return result.scalars().all()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_detail_by_id(cls, db: AsyncSession, onum: int):
|
|
|
|
"""
|
|
|
|
根据主键获取详情
|
|
|
|
"""
|
|
|
|
result = await db.execute(select(DatasecConfig).where(DatasecConfig.onum == onum))
|
|
|
|
return result.scalars().first()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
|
|
"""
|
|
|
|
获取数据列表(支持分页)
|
|
|
|
"""
|
|
|
|
query = select(DatasecConfig).where(
|
|
|
|
DatasecConfig.metatask_name.like(f"%{query_object.metatask_name}%") if query_object.metatask_name else True,
|
|
|
|
DatasecConfig.status == query_object.status if query_object.status else True,
|
|
|
|
DatasecConfig.create_time.between(
|
|
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
|
|
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
|
|
|
|
) if query_object.begin_time and query_object.end_time else True
|
|
|
|
).order_by(DatasecConfig.onum)
|
|
|
|
|
|
|
|
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add(cls, db: AsyncSession, model_obj):
|
|
|
|
"""
|
|
|
|
新增任务配置
|
|
|
|
"""
|
|
|
|
db_obj = DatasecConfig(**model_obj.model_dump())
|
|
|
|
db.add(db_obj)
|
|
|
|
await db.flush()
|
|
|
|
return db_obj
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def edit(cls, db: AsyncSession, onum: int, update_dict: dict):
|
|
|
|
"""
|
|
|
|
编辑任务配置
|
|
|
|
"""
|
|
|
|
await db.execute(
|
|
|
|
update(DatasecConfig)
|
|
|
|
.where(DatasecConfig.onum == onum)
|
|
|
|
.values(**update_dict)
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def delete(cls, db: AsyncSession, onum_list: List[int]):
|
|
|
|
"""
|
|
|
|
批量删除任务配置
|
|
|
|
"""
|
|
|
|
await db.execute(delete(DatasecConfig).where(DatasecConfig.onum.in_(onum_list)))
|
|
|
|
|
|
|
|
# ----------------------------------元数据标签调度------------------------------
|
|
|
|
@classmethod
|
|
|
|
async def get_task_biz_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
|
|
query = select(TaskBizConfig).where(
|
|
|
|
TaskBizConfig.risk_lvl.like(f"%{query_object.risk_lvl}%") if query_object.risk_lvl else True,
|
|
|
|
TaskBizConfig.isStop == query_object.isStop if query_object.isStop is not None else True,
|
|
|
|
TaskBizConfig.create_time.between(
|
|
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
|
|
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max)
|
|
|
|
) if query_object.begin_time and query_object.end_time else True,
|
|
|
|
).order_by(TaskBizConfig.create_time.desc())
|
|
|
|
|
|
|
|
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_task_biz_config(cls, db: AsyncSession, obj):
|
|
|
|
db_obj = TaskBizConfig(**obj.model_dump())
|
|
|
|
db.add(db_obj)
|
|
|
|
await db.flush()
|
|
|
|
return db_obj
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def edit_task_biz_config(cls, db: AsyncSession, onum: int, update_data: dict):
|
|
|
|
await db.execute(
|
|
|
|
update(TaskBizConfig).where(TaskBizConfig.onum == onum).values(**update_data)
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def delete_task_biz_config(cls, db: AsyncSession, onum_list: List[int]):
|
|
|
|
await db.execute(
|
|
|
|
delete(TaskBizConfig).where(TaskBizConfig.onum.in_(onum_list))
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_task_biz_config_detail_by_id(cls, db: AsyncSession, onum: int):
|
|
|
|
result = await db.execute(
|
|
|
|
select(TaskBizConfig).where(TaskBizConfig.onum == onum)
|
|
|
|
)
|
|
|
|
return result.scalars().first()
|
|
|
|
|
|
|
|
# ✅ 使用 TaskBizConfigRela 替代原来的 SecuBizConfigRela
|
|
|
|
@classmethod
|
|
|
|
async def delete_task_rela_dao(cls, db: AsyncSession, biz_onum: int):
|
|
|
|
await db.execute(
|
|
|
|
delete(TaskBizConfigRela).where(TaskBizConfigRela.biz_onum == biz_onum)
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def delete_task_rela_batch(cls, db: AsyncSession, bonum_list: List[int]):
|
|
|
|
await db.execute(
|
|
|
|
delete(TaskBizConfigRela).where(TaskBizConfigRela.biz_onum.in_(bonum_list))
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_batch_task_rela_dao(cls, db: AsyncSession, rela_list: List[TaskBizConfigRela]):
|
|
|
|
db.add_all(rela_list)
|
|
|
|
await db.flush()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_task_rela_by_biz_id(cls, db: AsyncSession, biz_onum: int):
|
|
|
|
result = await db.execute(
|
|
|
|
select(MetadataExtractInfo)
|
|
|
|
.join(TaskBizConfigRela, MetadataExtractInfo.onum == TaskBizConfigRela.tab_onum)
|
|
|
|
.where(TaskBizConfigRela.biz_onum == biz_onum)
|
|
|
|
)
|
|
|
|
return result.scalars().all()
|
|
|
|
# 数据标准目录
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_catalog_by_id(cls, db: AsyncSession, content_onum: int):
|
|
|
|
"""
|
|
|
|
根据目录ID获取目录详细信息
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param content_onum: 目录ID
|
|
|
|
:return: 目录信息对象
|
|
|
|
"""
|
|
|
|
catalog_info = (
|
|
|
|
(await db.execute(select(DataAstContent).where(DataAstContent.content_onum == content_onum,
|
|
|
|
DataAstContent.content_stat == 1)))
|
|
|
|
.scalars()
|
|
|
|
.first()
|
|
|
|
)
|
|
|
|
|
|
|
|
return catalog_info
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_catalog_detail_by_info(cls, db: AsyncSession, catalog: DataCatalogPageQueryModel):
|
|
|
|
"""
|
|
|
|
根据目录参数获取目录信息
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param catalog: 目录参数对象
|
|
|
|
:return: 目录信息对象
|
|
|
|
"""
|
|
|
|
catalog_info = (
|
|
|
|
(
|
|
|
|
await db.execute(
|
|
|
|
select(DataAstContent).where(
|
|
|
|
DataAstContent.content_name == catalog.content_name if catalog.content_name else True,
|
|
|
|
DataAstContent.content_stat == catalog.content_stat if catalog.content_stat else True,
|
|
|
|
DataAstContent.content_pic == catalog.content_pic if catalog.content_pic else True,
|
|
|
|
DataAstContent.content_stat == 1,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.scalars()
|
|
|
|
.first()
|
|
|
|
)
|
|
|
|
|
|
|
|
return catalog_info
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def update_leaf_node_flag(cls, db: AsyncSession):
|
|
|
|
"""
|
|
|
|
更新leaf_node_flag字段
|
|
|
|
"""
|
|
|
|
# 创建别名对象
|
|
|
|
t2 = aliased(DataAstContent, name='t2') # 正确使用aliased创建别名
|
|
|
|
subquery = (
|
|
|
|
select(DataAstContent.content_onum)
|
|
|
|
.where(
|
|
|
|
DataAstContent.content_stat == '1',
|
|
|
|
DataAstContent.leaf_node_flag == 0,
|
|
|
|
not_(
|
|
|
|
select(1)
|
|
|
|
.select_from(t2) # 使用别名后的表
|
|
|
|
.where(
|
|
|
|
t2.supr_content_onum == DataAstContent.content_onum,
|
|
|
|
t2.content_stat == '1'
|
|
|
|
)
|
|
|
|
.exists() # 添加exists()方法
|
|
|
|
)
|
|
|
|
)
|
|
|
|
).alias('temp')
|
|
|
|
|
|
|
|
stmt = (
|
|
|
|
update(DataAstContent)
|
|
|
|
.where(DataAstContent.content_onum.in_(subquery))
|
|
|
|
.values(leaf_node_flag=1, upd_time=datetime.now())
|
|
|
|
)
|
|
|
|
await db.execute(stmt)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def add_catalog_dao(cls, db: AsyncSession, catalog1: dict, catalog2: dict):
|
|
|
|
"""
|
|
|
|
新增目录数据库操作
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param catalog: 目录对象
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
db_catalog = DataAstContent(**catalog1)
|
|
|
|
db.add(db_catalog)
|
|
|
|
await db.flush()
|
|
|
|
|
|
|
|
# 处理子关系(统一转换为 ORM 模型)
|
|
|
|
for child in catalog2.get('children', []):
|
|
|
|
# 如果是 Pydantic 模型实例,先转换为字典
|
|
|
|
if isinstance(child, DataCatalogChild):
|
|
|
|
child_dict = child.model_dump()
|
|
|
|
elif isinstance(child, dict):
|
|
|
|
child_dict = child
|
|
|
|
else:
|
|
|
|
raise TypeError("不支持的子关系数据类型")
|
|
|
|
|
|
|
|
# 创建 ORM 模型实例
|
|
|
|
processed_child = dict(child_dict)
|
|
|
|
processed_child['content_onum'] = db_catalog.content_onum
|
|
|
|
db_child = DataAstContentRela(**processed_child)
|
|
|
|
|
|
|
|
db.add(db_child)
|
|
|
|
await db.flush()
|
|
|
|
|
|
|
|
return db_catalog
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def edit_catalog_leaf_dao(cls, db: AsyncSession, catalog: dict):
|
|
|
|
"""
|
|
|
|
编辑叶子节点目录数据库操作
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param catalog: 需要更新的目录字典
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
content_onum = catalog['content_onum']
|
|
|
|
stmt = (
|
|
|
|
update(DataAstContent)
|
|
|
|
.where(DataAstContent.content_onum == content_onum)
|
|
|
|
.values(
|
|
|
|
leaf_node_flag=catalog['leaf_node_flag']
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
await db.execute(stmt)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def edit_catalog_child_dao(cls, db: AsyncSession, catalog: dict):
|
|
|
|
"""
|
|
|
|
编辑目录数据库操作
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param catalog: 需要更新的目录字典
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
content_onum = catalog['content_onum']
|
|
|
|
|
|
|
|
stmt = (
|
|
|
|
update(DataAstContent)
|
|
|
|
.where(DataAstContent.content_onum == content_onum)
|
|
|
|
.values(
|
|
|
|
content_name=catalog['content_name'],
|
|
|
|
content_stat=catalog['content_stat'],
|
|
|
|
content_intr=catalog['content_intr'],
|
|
|
|
content_pic=catalog['content_pic'],
|
|
|
|
supr_content_onum=catalog['supr_content_onum'],
|
|
|
|
leaf_node_flag=catalog['leaf_node_flag'],
|
|
|
|
upd_prsn=catalog['upd_prsn'],
|
|
|
|
upd_time=datetime.now()
|
|
|
|
))
|
|
|
|
|
|
|
|
await db.execute(stmt)
|
|
|
|
|
|
|
|
# 处理子关系
|
|
|
|
for child in catalog.get('children', []):
|
|
|
|
rela_onum = child.get('rela_onum')
|
|
|
|
if rela_onum:
|
|
|
|
st = (
|
|
|
|
update(DataAstContentRela)
|
|
|
|
.where(DataAstContentRela.rela_onum == rela_onum)
|
|
|
|
.values(
|
|
|
|
content_onum=child.get('content_onum'),
|
|
|
|
ast_onum=child.get('ast_onum'),
|
|
|
|
rela_type=child.get('rela_type'),
|
|
|
|
rela_eff_begn_date=child.get('rela_eff_begn_date'),
|
|
|
|
rela_eff_end_date=child.get('rela_eff_end_date'),
|
|
|
|
upd_prsn=child.get('upd_prsn'))
|
|
|
|
)
|
|
|
|
|
|
|
|
await db.execute(st)
|
|
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
else:
|
|
|
|
child['content_onum'] = content_onum
|
|
|
|
db_child = DataAstContentRela(**child)
|
|
|
|
db.add(db_child)
|
|
|
|
await db.flush()
|
|
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def delete_catalog_dao(cls, db: AsyncSession, catalog: DeleteDataCatalogModel):
|
|
|
|
"""
|
|
|
|
删除目录数据库操作
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param catalog: 目录对象
|
|
|
|
:content_stat=0 作废
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
content_onums = catalog.content_onums.split(',')
|
|
|
|
await db.execute(
|
|
|
|
update(DataAstContentRela)
|
|
|
|
.where(DataAstContentRela.content_onum.in_(content_onums))
|
|
|
|
.values(
|
|
|
|
rela_status=0,
|
|
|
|
rela_eff_end_date=datetime.now()
|
|
|
|
)
|
|
|
|
)
|
|
|
|
await db.execute(
|
|
|
|
update(DataAstContent)
|
|
|
|
.where(DataAstContent.content_onum.in_(content_onums))
|
|
|
|
.values(
|
|
|
|
content_stat=0,
|
|
|
|
upd_time=datetime.now()
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def moved_catalog_instr_dao(cls, db: AsyncSession, moved_catalog_data: dict):
|
|
|
|
"""
|
|
|
|
编辑目录数据库操作
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param catalog: 需要更新的目录字典
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
# content_onum = moved_catalog_data['content_onum']
|
|
|
|
|
|
|
|
stmt = (
|
|
|
|
update(DataAstContent)
|
|
|
|
.where(DataAstContent.content_onum == moved_catalog_data['content_onum'],
|
|
|
|
DataAstContent.supr_content_onum == moved_catalog_data['supr_content_onum'])
|
|
|
|
.values(
|
|
|
|
supr_content_onum=moved_catalog_data['supr_content_onum_after'],
|
|
|
|
upd_time=datetime.now()
|
|
|
|
))
|
|
|
|
|
|
|
|
await db.execute(stmt)
|
|
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def merge_catalog_instr_dao(cls, db: AsyncSession, merge_catalog_data: dict):
|
|
|
|
"""
|
|
|
|
编辑目录数据库操作
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param catalog: 需要更新的目录字典
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
|
|
|
|
# stmt = (
|
|
|
|
# update(DataAstContent)
|
|
|
|
# .where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum'])
|
|
|
|
# .values(
|
|
|
|
# content_onum=merge_catalog_data['content_onum_after'],
|
|
|
|
# supr_content_onum=merge_catalog_data['supr_content_onum_after'],
|
|
|
|
# upd_time=datetime.now()
|
|
|
|
# ) )
|
|
|
|
|
|
|
|
# await db.execute(stmt)
|
|
|
|
stmt1 = (
|
|
|
|
update(DataAstContentRela)
|
|
|
|
.where(DataAstContentRela.content_onum == merge_catalog_data[
|
|
|
|
'content_onum'] and DataAstContentRela.rela_status == 1)
|
|
|
|
.values(
|
|
|
|
content_onum=merge_catalog_data['content_onum_after'],
|
|
|
|
rela_eff_begn_date=datetime.now()
|
|
|
|
)
|
|
|
|
)
|
|
|
|
await db.execute(stmt1)
|
|
|
|
|
|
|
|
stmt2 = (
|
|
|
|
update(DataAstContent)
|
|
|
|
.where(DataAstContent.content_onum == merge_catalog_data['content_onum'],
|
|
|
|
DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum'])
|
|
|
|
.values(content_stat='0')
|
|
|
|
)
|
|
|
|
await db.execute(stmt2)
|
|
|
|
|
|
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def removerel_data_ast_catalog_dao(cls, db: AsyncSession, removerel_catalog_data: dict):
|
|
|
|
"""
|
|
|
|
编辑资产关系数据库操作
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param catalog: 需要更新的目录字典
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
|
|
|
|
stmt = (
|
|
|
|
update(DataAstContentRela)
|
|
|
|
.where(DataAstContentRela.rela_onum == removerel_catalog_data['rela_onum'],
|
|
|
|
DataAstContentRela.content_onum == removerel_catalog_data['content_onum'])
|
|
|
|
.values(
|
|
|
|
rela_status=removerel_catalog_data['rela_status']
|
|
|
|
))
|
|
|
|
|
|
|
|
await db.execute(stmt)
|
|
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def moverel_data_ast_catalog_dao(cls, db: AsyncSession, moverel_catalog_data: dict):
|
|
|
|
"""
|
|
|
|
编辑资产关系数据库操作
|
|
|
|
|
|
|
|
:param db: orm对象
|
|
|
|
:param catalog: 需要更新的目录字典
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
|
|
|
|
stmt = (
|
|
|
|
update(DataAstContentRela)
|
|
|
|
.where(DataAstContentRela.rela_onum == moverel_catalog_data['rela_onum'],
|
|
|
|
DataAstContentRela.content_onum == moverel_catalog_data['content_onum'])
|
|
|
|
.values(
|
|
|
|
content_onum=moverel_catalog_data['content_onum_after'],
|
|
|
|
rela_eff_end_date=datetime.now()
|
|
|
|
))
|
|
|
|
|
|
|
|
await db.execute(stmt)
|
|
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int,
|
|
|
|
is_page: bool = False):
|
|
|
|
"""
|
|
|
|
根据查询参数获取数据资产目录列表
|
|
|
|
|
|
|
|
:param db: 异步会话对象
|
|
|
|
:param query_object: 分页查询参数对象
|
|
|
|
:param is_page: 是否分页
|
|
|
|
:return: 数据资产目录分页列表
|
|
|
|
"""
|
|
|
|
|
|
|
|
# 修改子查询部分
|
|
|
|
subquery_t1 = (
|
|
|
|
select(DataAstContentRela)
|
|
|
|
.where(DataAstContentRela.upd_prsn == query_object.upd_prsn,
|
|
|
|
DataAstContentRela.content_onum == '2' and DataAstContentRela.rela_status == '1')
|
|
|
|
.union_all(
|
|
|
|
select(DataAstContentRela)
|
|
|
|
.where(DataAstContentRela.content_onum != '2' and DataAstContentRela.rela_status == '1')
|
|
|
|
)
|
|
|
|
).alias('subquery_t1') # 为子查询分配唯一别名
|
|
|
|
|
|
|
|
query = (
|
|
|
|
select(
|
|
|
|
DataAstContent.content_onum,
|
|
|
|
DataAstContent.content_name,
|
|
|
|
DataAstContent.content_stat,
|
|
|
|
DataAstContent.content_intr,
|
|
|
|
DataAstContent.content_pic,
|
|
|
|
DataAstContent.supr_content_onum,
|
|
|
|
DataAstContent.leaf_node_flag,
|
|
|
|
DataAstContent.upd_prsn,
|
|
|
|
DataAstContent.upd_time,
|
|
|
|
|
|
|
|
subquery_t1.c.rela_onum, # 明确指定子查询的字段
|
|
|
|
subquery_t1.c.ast_onum,
|
|
|
|
subquery_t1.c.rela_type,
|
|
|
|
subquery_t1.c.rela_eff_begn_date,
|
|
|
|
subquery_t1.c.rela_eff_end_date,
|
|
|
|
subquery_t1.c.upd_prsn,
|
|
|
|
)
|
|
|
|
.distinct()
|
|
|
|
.select_from(DataAstContent)
|
|
|
|
.outerjoin(subquery_t1, DataAstContent.content_onum == subquery_t1.c.content_onum) # 明确使用子查询别名
|
|
|
|
.where(DataAstContent.content_stat == 1)
|
|
|
|
.order_by(DataAstContent.content_onum)
|
|
|
|
)
|
|
|
|
|
|
|
|
# 使用分页工具进行查询
|
|
|
|
data_ast_list = await PageUtil.paginate(
|
|
|
|
db,
|
|
|
|
query,
|
|
|
|
page_num=query_object.page_num,
|
|
|
|
page_size=query_object.page_size,
|
|
|
|
is_page=is_page
|
|
|
|
)
|
|
|
|
|
|
|
|
return data_ast_list
|