from datetime import datetime, time from sqlalchemy import delete, select, update,and_ from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.meta_do import MetadataClas # ORM 类 from module_admin.entity.do.metadata_config_do import MetadataSec,SecuBizConfig,SecuBizPermiConfig # ORM 类 from typing import List from sqlalchemy.orm import joinedload from sqlalchemy.sql import true from utils.page_util import PageUtil class MetadataConfigDao: """ 标签信息数据库操作层 """ @classmethod async def get_clas_detail_by_id(cls, db: AsyncSession, clas_id: int): """ 根据标签序号获取标签详细信息 """ result = await db.execute(select(MetadataClas).where(MetadataClas.clas_onum == clas_id)) return result.scalars().first() @classmethod async def get_clas_detail_by_info(cls, db: AsyncSession, clas): """ 根据标签参数获取标签信息(根据 MetadataClasModel 实例) """ result = await db.execute( select(MetadataClas).where( MetadataClas.clas_name == clas.clas_name if clas.clas_name else True, MetadataClas.clas_eff_flag == clas.clas_eff_flag if clas.clas_eff_flag else True, ) ) return result.scalars().first() @classmethod async def get_metadata_clas_list(cls, db: AsyncSession, query_object, is_page: bool = False): """ 获取标签信息列表(支持分页) """ query = select(MetadataClas).where( MetadataClas.clas_pri_clas.like(f"%{query_object.clas_pri_clas}%") if query_object.clas_pri_clas else True, MetadataClas.clas_scd_clas.like(f"%{query_object.clas_scd_clas}%") if query_object.clas_scd_clas else True, MetadataClas.clas_thre_clas.like(f"%{query_object.clas_thre_clas}%") if query_object.clas_thre_clas else True, MetadataClas.clas_name.like(f"%{query_object.clas_name}%") if query_object.clas_name else True, MetadataClas.clas_eff_flag == query_object.clas_eff_flag if query_object.clas_eff_flag else True, MetadataClas.upd_time.between( datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)), datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)), ) if query_object.begin_time and query_object.end_time else True, ).order_by(MetadataClas.clas_onum).distinct() return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page) @classmethod async def add_metadata_clas_dao(cls, db: AsyncSession, clas): """ 新增标签信息 """ db_clas = MetadataClas(**clas.model_dump()) db.add(db_clas) await db.flush() return db_clas @classmethod async def edit_metadata_clas_dao(cls, db: AsyncSession, clas_id: int, update_data: dict): """ 修改标签信息 """ await db.execute( update(MetadataClas) .where(MetadataClas.clas_onum == clas_id) .values(**update_data) ) @classmethod async def delete_metadata_clas_dao(cls, db: AsyncSession, clas_onum_list: list[int]): """ 删除标签信息(支持批量) """ await db.execute(delete(MetadataClas).where(MetadataClas.clas_onum.in_(clas_onum_list))) @classmethod async def get_sec_detail_by_id(cls, db: AsyncSession, onum: str): result = await db.execute(select(MetadataSec).where(MetadataSec.onum == onum)) return result.scalars().first() @classmethod async def get_metadata_sec_list(cls, db: AsyncSession, query_object, is_page: bool = False): query = select(MetadataSec).where( MetadataSec.sec_level_name.like(f"%{query_object.sec_level_name}%") if query_object.sec_level_name else True, MetadataSec.sec_eff_flag == query_object.sec_eff_flag if query_object.sec_eff_flag else True, MetadataSec.upd_time.between( datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)), datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)), ) if query_object.begin_time and query_object.end_time else True, ).order_by(MetadataSec.upd_time.desc()).distinct() from utils.page_util import PageUtil return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page) @classmethod async def add_metadata_sec_dao(cls, db: AsyncSession, sec): db_sec = MetadataSec(**sec.model_dump()) db.add(db_sec) await db.flush() return db_sec @classmethod async def edit_metadata_sec_dao(cls, db: AsyncSession, onum: str, update_data: dict): await db.execute( update(MetadataSec).where(MetadataSec.onum == onum).values(**update_data) ) @classmethod async def delete_metadata_sec_dao(cls, db: AsyncSession, onum_list: list[str]): await db.execute(delete(MetadataSec).where(MetadataSec.onum.in_(onum_list))) # ------------- t_secu_biz_config 相关方法 -------------- @classmethod async def get_biz_config_detail_by_id(cls, db: AsyncSession, onum: int): result = await db.execute(select(SecuBizConfig).where(SecuBizConfig.onum == onum)) return result.scalars().first() @classmethod async def get_biz_config_list(cls, db: AsyncSession, query_object, is_page: bool = False): query = select(SecuBizConfig).where( SecuBizConfig.risk_lvl.like(f"%{query_object.risk_lvl}%") if query_object.risk_lvl else True, SecuBizConfig.isStop == query_object.isStop if query_object.isStop is not None else True, SecuBizConfig.create_time.between( datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min), datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max), ) if query_object.begin_time and query_object.end_time else True, ).order_by(SecuBizConfig.create_time.desc()) from utils.page_util import PageUtil return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page) @classmethod async def add_biz_config(cls, db: AsyncSession, obj): db_obj = SecuBizConfig(**obj.model_dump()) db.add(db_obj) await db.flush() return db_obj @classmethod async def edit_biz_config(cls, db: AsyncSession, onum: int, update_data: dict): await db.execute( update(SecuBizConfig).where(SecuBizConfig.onum == onum).values(**update_data) ) @classmethod async def delete_biz_config(cls, db: AsyncSession, onum_list: list[int]): await db.execute(delete(SecuBizConfig).where(SecuBizConfig.onum.in_(onum_list))) @classmethod async def add_biz_permi_config_batch(cls, db: AsyncSession, records: List[SecuBizPermiConfig]): orm_objs = [] for rec in records: orm_obj = SecuBizPermiConfig( biz_onum=rec.biz_onum, obj_type=rec.obj_type, obj_value=rec.obj_value, obj_name=rec.obj_name, isStop=rec.isStop, create_by=rec.create_by, create_time=rec.create_time, update_by=rec.update_by, update_time=rec.update_time, ) orm_objs.append(orm_obj) db.add_all(orm_objs) # ------------- t_secu_biz_permi_config 相关方法 -------------- @classmethod async def get_biz_permi_config_detail_by_id(cls, db: AsyncSession, onum: int): result = await db.execute(select(SecuBizPermiConfig).where(SecuBizPermiConfig.onum == onum)) return result.scalars().first() # @classmethod # async def get_biz_permi_config_list(cls, db: AsyncSession, query_object, is_page: bool = False): # query = select(SecuBizPermiConfig).where( # SecuBizPermiConfig.obj_value.like(f"%{query_object.obj_value}%") if query_object.obj_value else True, # SecuBizPermiConfig.obj_type == query_object.obj_type if query_object.obj_type else True, # SecuBizPermiConfig.isStop == query_object.isStop if query_object.isStop is not None else True, # SecuBizPermiConfig.create_time.between( # datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min), # datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max), # ) if query_object.begin_time and query_object.end_time else True, # ).order_by(SecuBizPermiConfig.create_time.desc()) # from utils.page_util import PageUtil # return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page) @classmethod async def get_biz_permi_config_list(cls, db: AsyncSession, query_object, is_page: bool = False): # 构建基础查询 query = ( select( SecuBizPermiConfig.onum, SecuBizPermiConfig.biz_onum, SecuBizPermiConfig.obj_type, SecuBizPermiConfig.obj_value, SecuBizPermiConfig.obj_name, SecuBizPermiConfig.isStop, SecuBizPermiConfig.create_by, SecuBizPermiConfig.create_time, SecuBizPermiConfig.update_by, SecuBizPermiConfig.update_time, SecuBizConfig.biz_name, MetadataSec.sec_level_summary ) .join(SecuBizConfig, SecuBizPermiConfig.biz_onum == SecuBizConfig.onum, isouter=True) .join(MetadataSec, SecuBizConfig.risk_lvl == MetadataSec.onum, isouter=True) .where( SecuBizPermiConfig.obj_value.like(f"%{query_object.obj_value}%") if query_object.obj_value else true(), SecuBizPermiConfig.obj_type == query_object.obj_type if query_object.obj_type else true(), SecuBizPermiConfig.isStop == query_object.isStop if query_object.isStop is not None else true(), SecuBizPermiConfig.create_time.between( datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time.min), datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time.max), ) if query_object.begin_time and query_object.end_time else true() ) .order_by(SecuBizPermiConfig.create_time.desc()) ) return await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) @classmethod async def add_biz_permi_config(cls, db: AsyncSession, obj): db_obj = SecuBizPermiConfig(**obj.model_dump()) db.add(db_obj) await db.flush() return db_obj @classmethod async def edit_biz_permi_config(cls, db: AsyncSession, onum: int, update_data: dict): await db.execute( update(SecuBizPermiConfig).where(SecuBizPermiConfig.onum == onum).values(**update_data) ) @classmethod async def delete_biz_permi_config(cls, db: AsyncSession, onum_list: list[int]): await db.execute(delete(SecuBizPermiConfig).where(SecuBizPermiConfig.onum.in_(onum_list)))