You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
244 lines
11 KiB
244 lines
11 KiB
from datetime import datetime, time
|
|
from sqlalchemy import delete, select, update,and_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from module_admin.entity.do.meta_do import MetadataClas # ORM 类
|
|
from module_admin.entity.do.metadata_config_do import MetadataSec,SecuBizConfig,SecuBizPermiConfig # ORM 类
|
|
from typing import List
|
|
from sqlalchemy.orm import joinedload
|
|
from sqlalchemy.sql import true
|
|
from utils.page_util import PageUtil
|
|
|
|
class MetadataConfigDao:
|
|
"""
|
|
标签信息数据库操作层
|
|
"""
|
|
|
|
@classmethod
|
|
async def get_clas_detail_by_id(cls, db: AsyncSession, clas_id: int):
|
|
"""
|
|
根据标签序号获取标签详细信息
|
|
"""
|
|
result = await db.execute(select(MetadataClas).where(MetadataClas.clas_onum == clas_id))
|
|
return result.scalars().first()
|
|
|
|
@classmethod
|
|
async def get_clas_detail_by_info(cls, db: AsyncSession, clas):
|
|
"""
|
|
根据标签参数获取标签信息(根据 MetadataClasModel 实例)
|
|
"""
|
|
result = await db.execute(
|
|
select(MetadataClas).where(
|
|
MetadataClas.clas_name == clas.clas_name if clas.clas_name else True,
|
|
MetadataClas.clas_eff_flag == clas.clas_eff_flag if clas.clas_eff_flag else True,
|
|
)
|
|
)
|
|
return result.scalars().first()
|
|
|
|
@classmethod
|
|
async def get_metadata_clas_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
"""
|
|
获取标签信息列表(支持分页)
|
|
"""
|
|
query = select(MetadataClas).where(
|
|
MetadataClas.clas_pri_clas.like(f"%{query_object.clas_pri_clas}%") if query_object.clas_pri_clas else True,
|
|
MetadataClas.clas_scd_clas.like(f"%{query_object.clas_scd_clas}%") if query_object.clas_scd_clas else True,
|
|
MetadataClas.clas_thre_clas.like(f"%{query_object.clas_thre_clas}%") if query_object.clas_thre_clas else True,
|
|
MetadataClas.clas_name.like(f"%{query_object.clas_name}%") if query_object.clas_name else True,
|
|
MetadataClas.clas_eff_flag == query_object.clas_eff_flag if query_object.clas_eff_flag else True,
|
|
MetadataClas.upd_time.between(
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
|
|
)
|
|
if query_object.begin_time and query_object.end_time else True,
|
|
).order_by(MetadataClas.clas_onum).distinct()
|
|
|
|
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
@classmethod
|
|
async def add_metadata_clas_dao(cls, db: AsyncSession, clas):
|
|
"""
|
|
新增标签信息
|
|
"""
|
|
db_clas = MetadataClas(**clas.model_dump())
|
|
db.add(db_clas)
|
|
await db.flush()
|
|
return db_clas
|
|
|
|
@classmethod
|
|
async def edit_metadata_clas_dao(cls, db: AsyncSession, clas_id: int, update_data: dict):
|
|
"""
|
|
修改标签信息
|
|
"""
|
|
await db.execute(
|
|
update(MetadataClas)
|
|
.where(MetadataClas.clas_onum == clas_id)
|
|
.values(**update_data)
|
|
)
|
|
|
|
@classmethod
|
|
async def delete_metadata_clas_dao(cls, db: AsyncSession, clas_onum_list: list[int]):
|
|
"""
|
|
删除标签信息(支持批量)
|
|
"""
|
|
await db.execute(delete(MetadataClas).where(MetadataClas.clas_onum.in_(clas_onum_list)))
|
|
@classmethod
|
|
async def get_sec_detail_by_id(cls, db: AsyncSession, onum: str):
|
|
result = await db.execute(select(MetadataSec).where(MetadataSec.onum == onum))
|
|
return result.scalars().first()
|
|
|
|
@classmethod
|
|
async def get_metadata_sec_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
query = select(MetadataSec).where(
|
|
MetadataSec.sec_level_name.like(f"%{query_object.sec_level_name}%") if query_object.sec_level_name else True,
|
|
MetadataSec.sec_eff_flag == query_object.sec_eff_flag if query_object.sec_eff_flag else True,
|
|
MetadataSec.upd_time.between(
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
|
|
) if query_object.begin_time and query_object.end_time else True,
|
|
).order_by(MetadataSec.upd_time.desc()).distinct()
|
|
|
|
from utils.page_util import PageUtil
|
|
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
@classmethod
|
|
async def add_metadata_sec_dao(cls, db: AsyncSession, sec):
|
|
db_sec = MetadataSec(**sec.model_dump())
|
|
db.add(db_sec)
|
|
await db.flush()
|
|
return db_sec
|
|
|
|
@classmethod
|
|
async def edit_metadata_sec_dao(cls, db: AsyncSession, onum: str, update_data: dict):
|
|
await db.execute(
|
|
update(MetadataSec).where(MetadataSec.onum == onum).values(**update_data)
|
|
)
|
|
|
|
@classmethod
|
|
async def delete_metadata_sec_dao(cls, db: AsyncSession, onum_list: list[str]):
|
|
await db.execute(delete(MetadataSec).where(MetadataSec.onum.in_(onum_list)))
|
|
# ------------- t_secu_biz_config 相关方法 --------------
|
|
|
|
@classmethod
|
|
async def get_biz_config_detail_by_id(cls, db: AsyncSession, onum: int):
|
|
result = await db.execute(select(SecuBizConfig).where(SecuBizConfig.onum == onum))
|
|
return result.scalars().first()
|
|
|
|
@classmethod
|
|
async def get_biz_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
query = select(SecuBizConfig).where(
|
|
SecuBizConfig.risk_lvl.like(f"%{query_object.risk_lvl}%") if query_object.risk_lvl else True,
|
|
SecuBizConfig.isStop == query_object.isStop if query_object.isStop is not None else True,
|
|
SecuBizConfig.create_time.between(
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max),
|
|
) if query_object.begin_time and query_object.end_time else True,
|
|
).order_by(SecuBizConfig.create_time.desc())
|
|
|
|
from utils.page_util import PageUtil
|
|
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
@classmethod
|
|
async def add_biz_config(cls, db: AsyncSession, obj):
|
|
db_obj = SecuBizConfig(**obj.model_dump())
|
|
db.add(db_obj)
|
|
await db.flush()
|
|
return db_obj
|
|
|
|
@classmethod
|
|
async def edit_biz_config(cls, db: AsyncSession, onum: int, update_data: dict):
|
|
await db.execute(
|
|
update(SecuBizConfig).where(SecuBizConfig.onum == onum).values(**update_data)
|
|
)
|
|
|
|
@classmethod
|
|
async def delete_biz_config(cls, db: AsyncSession, onum_list: list[int]):
|
|
await db.execute(delete(SecuBizConfig).where(SecuBizConfig.onum.in_(onum_list)))
|
|
@classmethod
|
|
async def add_biz_permi_config_batch(cls, db: AsyncSession, records: List[SecuBizPermiConfig]):
|
|
orm_objs = []
|
|
for rec in records:
|
|
orm_obj = SecuBizPermiConfig(
|
|
biz_onum=rec.biz_onum,
|
|
obj_type=rec.obj_type,
|
|
obj_value=rec.obj_value,
|
|
obj_name=rec.obj_name,
|
|
isStop=rec.isStop,
|
|
create_by=rec.create_by,
|
|
create_time=rec.create_time,
|
|
update_by=rec.update_by,
|
|
update_time=rec.update_time,
|
|
)
|
|
orm_objs.append(orm_obj)
|
|
db.add_all(orm_objs)
|
|
# ------------- t_secu_biz_permi_config 相关方法 --------------
|
|
|
|
@classmethod
|
|
async def get_biz_permi_config_detail_by_id(cls, db: AsyncSession, onum: int):
|
|
result = await db.execute(select(SecuBizPermiConfig).where(SecuBizPermiConfig.onum == onum))
|
|
return result.scalars().first()
|
|
|
|
# @classmethod
|
|
# async def get_biz_permi_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
# query = select(SecuBizPermiConfig).where(
|
|
# SecuBizPermiConfig.obj_value.like(f"%{query_object.obj_value}%") if query_object.obj_value else True,
|
|
# SecuBizPermiConfig.obj_type == query_object.obj_type if query_object.obj_type else True,
|
|
# SecuBizPermiConfig.isStop == query_object.isStop if query_object.isStop is not None else True,
|
|
# SecuBizPermiConfig.create_time.between(
|
|
# datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
|
|
# datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max),
|
|
# ) if query_object.begin_time and query_object.end_time else True,
|
|
# ).order_by(SecuBizPermiConfig.create_time.desc())
|
|
|
|
# from utils.page_util import PageUtil
|
|
# return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
@classmethod
|
|
async def get_biz_permi_config_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
# 构建基础查询
|
|
query = (
|
|
select(
|
|
SecuBizPermiConfig.onum,
|
|
SecuBizPermiConfig.biz_onum,
|
|
SecuBizPermiConfig.obj_type,
|
|
SecuBizPermiConfig.obj_value,
|
|
SecuBizPermiConfig.obj_name,
|
|
SecuBizPermiConfig.isStop,
|
|
SecuBizPermiConfig.create_by,
|
|
SecuBizPermiConfig.create_time,
|
|
SecuBizPermiConfig.update_by,
|
|
SecuBizPermiConfig.update_time,
|
|
SecuBizConfig.biz_name,
|
|
MetadataSec.sec_level_summary
|
|
)
|
|
.join(SecuBizConfig, SecuBizPermiConfig.biz_onum == SecuBizConfig.onum, isouter=True)
|
|
.join(MetadataSec, SecuBizConfig.risk_lvl == MetadataSec.onum, isouter=True)
|
|
.where(
|
|
SecuBizPermiConfig.obj_value.like(f"%{query_object.obj_value}%") if query_object.obj_value else true(),
|
|
SecuBizPermiConfig.obj_type == query_object.obj_type if query_object.obj_type else true(),
|
|
SecuBizPermiConfig.isStop == query_object.isStop if query_object.isStop is not None else true(),
|
|
SecuBizPermiConfig.create_time.between(
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min),
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max),
|
|
) if query_object.begin_time and query_object.end_time else true()
|
|
)
|
|
.order_by(SecuBizPermiConfig.create_time.desc())
|
|
)
|
|
|
|
return await PageUtil.paginate(
|
|
db, query, query_object.page_num, query_object.page_size, is_page
|
|
)
|
|
@classmethod
|
|
async def add_biz_permi_config(cls, db: AsyncSession, obj):
|
|
db_obj = SecuBizPermiConfig(**obj.model_dump())
|
|
db.add(db_obj)
|
|
await db.flush()
|
|
return db_obj
|
|
|
|
@classmethod
|
|
async def edit_biz_permi_config(cls, db: AsyncSession, onum: int, update_data: dict):
|
|
await db.execute(
|
|
update(SecuBizPermiConfig).where(SecuBizPermiConfig.onum == onum).values(**update_data)
|
|
)
|
|
|
|
@classmethod
|
|
async def delete_biz_permi_config(cls, db: AsyncSession, onum_list: list[int]):
|
|
await db.execute(delete(SecuBizPermiConfig).where(SecuBizPermiConfig.onum.in_(onum_list)))
|