You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
66 lines
2.3 KiB
66 lines
2.3 KiB
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.future import select
|
|
from sqlalchemy import update, delete
|
|
from datetime import datetime, time
|
|
from typing import List
|
|
|
|
from module_admin.entity.do.metadata_config_do import DatasecConfig # ORM 类
|
|
from utils.page_util import PageUtil
|
|
|
|
|
|
class DatasecConfigDao:
|
|
"""
|
|
数据安全参数配置表 DAO
|
|
"""
|
|
|
|
@classmethod
|
|
async def get_detail_by_id(cls, db: AsyncSession, onum: int):
|
|
"""
|
|
根据主键获取详情
|
|
"""
|
|
result = await db.execute(select(DatasecConfig).where(DatasecConfig.onum == onum))
|
|
return result.scalars().first()
|
|
|
|
@classmethod
|
|
async def get_list(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
"""
|
|
获取数据列表(支持分页)
|
|
"""
|
|
query = select(DatasecConfig).where(
|
|
DatasecConfig.metatask_name.like(f"%{query_object.metatask_name}%") if query_object.metatask_name else True,
|
|
DatasecConfig.status == query_object.status if query_object.status else True,
|
|
DatasecConfig.create_time.between(
|
|
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
|
|
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
|
|
) if query_object.begin_time and query_object.end_time else True
|
|
).order_by(DatasecConfig.onum)
|
|
|
|
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
|
|
@classmethod
|
|
async def add(cls, db: AsyncSession, model_obj):
|
|
"""
|
|
新增任务配置
|
|
"""
|
|
db_obj = DatasecConfig(**model_obj.model_dump())
|
|
db.add(db_obj)
|
|
await db.flush()
|
|
return db_obj
|
|
|
|
@classmethod
|
|
async def edit(cls, db: AsyncSession, onum: int, update_dict: dict):
|
|
"""
|
|
编辑任务配置
|
|
"""
|
|
await db.execute(
|
|
update(DatasecConfig)
|
|
.where(DatasecConfig.onum == onum)
|
|
.values(**update_dict)
|
|
)
|
|
|
|
@classmethod
|
|
async def delete(cls, db: AsyncSession, onum_list: List[int]):
|
|
"""
|
|
批量删除任务配置
|
|
"""
|
|
await db.execute(delete(DatasecConfig).where(DatasecConfig.onum.in_(onum_list)))
|
|
|