You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

116 lines
5.0 KiB

from datetime import datetime, time
from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.meta_do import MetadataClas # ORM 类
from module_admin.entity.do.metadata_config_do import MetadataSec # ORM 类
class MetadataConfigDao:
"""
标签信息数据库操作层
"""
@classmethod
async def get_clas_detail_by_id(cls, db: AsyncSession, clas_id: int):
"""
根据标签序号获取标签详细信息
"""
result = await db.execute(select(MetadataClas).where(MetadataClas.clas_onum == clas_id))
return result.scalars().first()
@classmethod
async def get_clas_detail_by_info(cls, db: AsyncSession, clas):
"""
根据标签参数获取标签信息根据 MetadataClasModel 实例
"""
result = await db.execute(
select(MetadataClas).where(
MetadataClas.clas_name == clas.clas_name if clas.clas_name else True,
MetadataClas.clas_eff_flag == clas.clas_eff_flag if clas.clas_eff_flag else True,
)
)
return result.scalars().first()
@classmethod
async def get_metadata_clas_list(cls, db: AsyncSession, query_object, is_page: bool = False):
"""
获取标签信息列表支持分页
"""
query = select(MetadataClas).where(
MetadataClas.clas_pri_clas.like(f"%{query_object.clas_pri_clas}%") if query_object.clas_pri_clas else True,
MetadataClas.clas_scd_clas.like(f"%{query_object.clas_scd_clas}%") if query_object.clas_scd_clas else True,
MetadataClas.clas_thre_clas.like(f"%{query_object.clas_thre_clas}%") if query_object.clas_thre_clas else True,
MetadataClas.clas_name.like(f"%{query_object.clas_name}%") if query_object.clas_name else True,
MetadataClas.clas_eff_flag == query_object.clas_eff_flag if query_object.clas_eff_flag else True,
MetadataClas.upd_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
)
if query_object.begin_time and query_object.end_time else True,
).order_by(MetadataClas.clas_onum).distinct()
from utils.page_util import PageUtil
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
@classmethod
async def add_metadata_clas_dao(cls, db: AsyncSession, clas):
"""
新增标签信息
"""
db_clas = MetadataClas(**clas.model_dump())
db.add(db_clas)
await db.flush()
return db_clas
@classmethod
async def edit_metadata_clas_dao(cls, db: AsyncSession, clas_id: int, update_data: dict):
"""
修改标签信息
"""
await db.execute(
update(MetadataClas)
.where(MetadataClas.clas_onum == clas_id)
.values(**update_data)
)
@classmethod
async def delete_metadata_clas_dao(cls, db: AsyncSession, clas_onum_list: list[int]):
"""
删除标签信息支持批量
"""
await db.execute(delete(MetadataClas).where(MetadataClas.clas_onum.in_(clas_onum_list)))
@classmethod
async def get_sec_detail_by_id(cls, db: AsyncSession, onum: str):
result = await db.execute(select(MetadataSec).where(MetadataSec.onum == onum))
return result.scalars().first()
@classmethod
async def get_metadata_sec_list(cls, db: AsyncSession, query_object, is_page: bool = False):
query = select(MetadataSec).where(
MetadataSec.sec_level_name.like(f"%{query_object.sec_level_name}%") if query_object.sec_level_name else True,
MetadataSec.sec_eff_flag == query_object.sec_eff_flag if query_object.sec_eff_flag else True,
MetadataSec.upd_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(0, 0, 0)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
) if query_object.begin_time and query_object.end_time else True,
).order_by(MetadataSec.upd_time.desc()).distinct()
from utils.page_util import PageUtil
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
@classmethod
async def add_metadata_sec_dao(cls, db: AsyncSession, sec):
db_sec = MetadataSec(**sec.model_dump())
db.add(db_sec)
await db.flush()
return db_sec
@classmethod
async def edit_metadata_sec_dao(cls, db: AsyncSession, onum: str, update_data: dict):
await db.execute(
update(MetadataSec).where(MetadataSec.onum == onum).values(**update_data)
)
@classmethod
async def delete_metadata_sec_dao(cls, db: AsyncSession, onum_list: list[str]):
await db.execute(delete(MetadataSec).where(MetadataSec.onum.in_(onum_list)))