You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

262 lines
12 KiB

from datetime import datetime, time
3 days ago
from sqlalchemy import delete, select, update,and_
from sqlalchemy.ext.asyncio import AsyncSession
5 hours ago
from module_admin.entity.do.meta_do import MetadataClas,MetadataExtractInfo # ORM 类
from module_admin.entity.do.metadata_config_do import MetadataSec,SecuBizConfig,SecuBizPermiConfig,SecuBizConfigRela # ORM 类
3 days ago
from typing import List
5 hours ago
3 days ago
from sqlalchemy.orm import joinedload
from sqlalchemy.sql import true
from utils.page_util import PageUtil
class MetadataConfigDao:
"""
标签信息数据库操作层
"""
@classmethod
async def get_clas_detail_by_id(cls, db: AsyncSession, clas_id: int):
"""
根据标签序号获取标签详细信息
"""
result = await db.execute(select(MetadataClas).where(MetadataClas.clas_onum == clas_id))
return result.scalars().first()
@classmethod
async def get_clas_detail_by_info(cls, db: AsyncSession, clas):
"""
根据标签参数获取标签信息根据 MetadataClasModel 实例
"""
result = await db.execute(
select(MetadataClas).where(
MetadataClas.clas_name == clas.clas_name if clas.clas_name else True,
MetadataClas.clas_eff_flag == clas.clas_eff_flag if clas.clas_eff_flag else True,
)
)
return result.scalars().first()
@classmethod
async def get_metadata_clas_list(cls, db: AsyncSession, query_object, is_page: bool = False):
"""
获取标签信息列表支持分页
"""
query = select(MetadataClas).where(
MetadataClas.clas_pri_clas.like(f"%{query_object.clas_pri_clas}%") if query_object.clas_pri_clas else True,
MetadataClas.clas_scd_clas.like(f"%{query_object.clas_scd_clas}%") if query_object.clas_scd_clas else True,
MetadataClas.clas_thre_clas.like(f"%{query_object.clas_thre_clas}%") if query_object.clas_thre_clas else True,
MetadataClas.clas_name.like(f"%{query_object.clas_name}%") if query_object.clas_name else True,
MetadataClas.clas_eff_flag == query_object.clas_eff_flag if query_object.clas_eff_flag else True,
MetadataClas.upd_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
)
if query_object.begin_time and query_object.end_time else True,
).order_by(MetadataClas.clas_onum).distinct()
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
@classmethod
async def add_metadata_clas_dao(cls, db: AsyncSession, clas):
"""
新增标签信息
"""
db_clas = MetadataClas(**clas.model_dump())
db.add(db_clas)
await db.flush()
return db_clas
@classmethod
async def edit_metadata_clas_dao(cls, db: AsyncSession, clas_id: int, update_data: dict):
"""
修改标签信息
"""
await db.execute(
update(MetadataClas)
.where(MetadataClas.clas_onum == clas_id)
.values(**update_data)
)
@classmethod
async def delete_metadata_clas_dao(cls, db: AsyncSession, clas_onum_list: list[int]):
"""
删除标签信息支持批量
"""
await db.execute(delete(MetadataClas).where(MetadataClas.clas_onum.in_(clas_onum_list)))
@classmethod
async def get_sec_detail_by_id(cls, db: AsyncSession, onum: str):
result = await db.execute(select(MetadataSec).where(MetadataSec.onum == onum))
return result.scalars().first()
@classmethod
async def get_metadata_sec_list(cls, db: AsyncSession, query_object, is_page: bool = False):
query = select(MetadataSec).where(
MetadataSec.sec_level_name.like(f"%{query_object.sec_level_name}%") if query_object.sec_level_name else True,
MetadataSec.sec_eff_flag == query_object.sec_eff_flag if query_object.sec_eff_flag else True,
MetadataSec.upd_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
) if query_object.begin_time and query_object.end_time else True,
).order_by(MetadataSec.upd_time.desc()).distinct()
from utils.page_util import PageUtil
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
@classmethod
async def add_metadata_sec_dao(cls, db: AsyncSession, sec):
db_sec = MetadataSec(**sec.model_dump())
db.add(db_sec)
await db.flush()
return db_sec
@classmethod
async def edit_metadata_sec_dao(cls, db: AsyncSession, onum: str, update_data: dict):
await db.execute(
update(MetadataSec).where(MetadataSec.onum == onum).values(**update_data)
)
@classmethod
async def delete_metadata_sec_dao(cls, db: AsyncSession, onum_list: list[str]):
3 days ago
await db.execute(delete(MetadataSec).where(MetadataSec.onum.in_(onum_list)))
# ------------- t_secu_biz_config 相关方法 --------------
@classmethod
async def get_biz_config_detail_by_id(cls, db: AsyncSession, onum: int):
result = await db.execute(select(SecuBizConfig).where(SecuBizConfig.onum == onum))
return result.scalars().first()
@classmethod
async def get_biz_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
query = select(SecuBizConfig).where(
SecuBizConfig.risk_lvl.like(f"%{query_object.risk_lvl}%") if query_object.risk_lvl else True,
SecuBizConfig.isStop == query_object.isStop if query_object.isStop is not None else True,
SecuBizConfig.create_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max),
) if query_object.begin_time and query_object.end_time else True,
).order_by(SecuBizConfig.create_time.desc())
from utils.page_util import PageUtil
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
@classmethod
async def add_biz_config(cls, db: AsyncSession, obj):
db_obj = SecuBizConfig(**obj.model_dump())
db.add(db_obj)
await db.flush()
return db_obj
@classmethod
async def edit_biz_config(cls, db: AsyncSession, onum: int, update_data: dict):
await db.execute(
update(SecuBizConfig).where(SecuBizConfig.onum == onum).values(**update_data)
)
@classmethod
async def delete_biz_config(cls, db: AsyncSession, onum_list: list[int]):
await db.execute(delete(SecuBizConfig).where(SecuBizConfig.onum.in_(onum_list)))
@classmethod
async def add_biz_permi_config_batch(cls, db: AsyncSession, records: List[SecuBizPermiConfig]):
orm_objs = []
for rec in records:
orm_obj = SecuBizPermiConfig(
biz_onum=rec.biz_onum,
obj_type=rec.obj_type,
obj_value=rec.obj_value,
obj_name=rec.obj_name,
isStop=rec.isStop,
create_by=rec.create_by,
create_time=rec.create_time,
update_by=rec.update_by,
update_time=rec.update_time,
)
orm_objs.append(orm_obj)
db.add_all(orm_objs)
# ------------- t_secu_biz_permi_config 相关方法 --------------
@classmethod
async def get_biz_permi_config_detail_by_id(cls, db: AsyncSession, onum: int):
result = await db.execute(select(SecuBizPermiConfig).where(SecuBizPermiConfig.onum == onum))
return result.scalars().first()
# @classmethod
# async def get_biz_permi_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
# query = select(SecuBizPermiConfig).where(
# SecuBizPermiConfig.obj_value.like(f"%{query_object.obj_value}%") if query_object.obj_value else True,
# SecuBizPermiConfig.obj_type == query_object.obj_type if query_object.obj_type else True,
# SecuBizPermiConfig.isStop == query_object.isStop if query_object.isStop is not None else True,
# SecuBizPermiConfig.create_time.between(
# datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
# datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max),
# ) if query_object.begin_time and query_object.end_time else True,
# ).order_by(SecuBizPermiConfig.create_time.desc())
# from utils.page_util import PageUtil
# return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
@classmethod
async def get_biz_permi_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
# 构建基础查询
query = (
select(
SecuBizPermiConfig.onum,
SecuBizPermiConfig.biz_onum,
SecuBizPermiConfig.obj_type,
SecuBizPermiConfig.obj_value,
SecuBizPermiConfig.obj_name,
SecuBizPermiConfig.isStop,
SecuBizPermiConfig.create_by,
SecuBizPermiConfig.create_time,
SecuBizPermiConfig.update_by,
SecuBizPermiConfig.update_time,
SecuBizConfig.biz_name,
MetadataSec.sec_level_summary
)
.join(SecuBizConfig, SecuBizPermiConfig.biz_onum == SecuBizConfig.onum, isouter=True)
.join(MetadataSec, SecuBizConfig.risk_lvl == MetadataSec.onum, isouter=True)
.where(
SecuBizPermiConfig.obj_value.like(f"%{query_object.obj_value}%") if query_object.obj_value else true(),
3 days ago
SecuBizPermiConfig.obj_name.like(f"%{query_object.obj_name}%") if query_object.obj_name else true(),
3 days ago
SecuBizPermiConfig.obj_type == query_object.obj_type if query_object.obj_type else true(),
SecuBizPermiConfig.isStop == query_object.isStop if query_object.isStop is not None else true(),
SecuBizPermiConfig.create_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max),
) if query_object.begin_time and query_object.end_time else true()
)
.order_by(SecuBizPermiConfig.create_time.desc())
)
return await PageUtil.paginate(
db, query, query_object.page_num, query_object.page_size, is_page
)
@classmethod
async def add_biz_permi_config(cls, db: AsyncSession, obj):
db_obj = SecuBizPermiConfig(**obj.model_dump())
db.add(db_obj)
await db.flush()
return db_obj
@classmethod
async def edit_biz_permi_config(cls, db: AsyncSession, onum: int, update_data: dict):
await db.execute(
update(SecuBizPermiConfig).where(SecuBizPermiConfig.onum == onum).values(**update_data)
)
@classmethod
async def delete_biz_permi_config(cls, db: AsyncSession, onum_list: list[int]):
5 hours ago
await db.execute(delete(SecuBizPermiConfig).where(SecuBizPermiConfig.onum.in_(onum_list)))
# ----------------------------------biz_confg_rela------------------------------
@classmethod
async def delete_biz_rela_dao(cls, db: AsyncSession, biz_onum: int):
await db.execute(delete(SecuBizConfigRela).where(SecuBizConfigRela.biz_onum == biz_onum) )
@classmethod
async def add_batch_biz_rela_dao(cls, db: AsyncSession, rela_list: List[SecuBizConfigRela]):
db.add_all(rela_list)
await db.flush()
@classmethod
async def get_biz_rela_by_biz_id(cls, db: AsyncSession, biz_onum: int):
result = await db.execute(
select(MetadataExtractInfo)
.join(SecuBizConfigRela, MetadataExtractInfo.onum == SecuBizConfigRela.tab_onum)
.where(SecuBizConfigRela.biz_onum == biz_onum)
)
return result.scalars().all()