You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

106 lines
4.5 KiB

from sqlalchemy import delete, select, update,and_
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.metadata_config_do import BatchBusiLabelConfig,BatchDataopLabelConfig,BatchDatatypeLabelConfig
from utils.page_util import PageUtil
class BatchLabelConfigDAO:
# region === BatchBusiLabelConfig ===
@classmethod
async def get_busi_detail_by_id(cls, db: AsyncSession, onum: int):
result = await db.execute(select(BatchBusiLabelConfig).where(BatchBusiLabelConfig.onum == onum))
return result.scalars().first()
@classmethod
async def get_busi_list(cls, db: AsyncSession, query_object, is_page: bool = False):
query = select(BatchBusiLabelConfig).where(
BatchBusiLabelConfig.regex_name.like(f"%{query_object.regex_name}%") if query_object.regex_name else True,
BatchBusiLabelConfig.mdl_name == query_object.mdl_name if query_object.mdl_name else True,
).order_by(BatchBusiLabelConfig.upd_time.desc())
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
@classmethod
async def add_busi(cls, db: AsyncSession, obj):
db_obj = BatchBusiLabelConfig(**obj.model_dump())
db.add(db_obj)
await db.flush()
return db_obj
@classmethod
async def edit_busi(cls, db: AsyncSession, onum: int, update_data: dict):
await db.execute(update(BatchBusiLabelConfig).where(BatchBusiLabelConfig.onum == onum).values(**update_data))
@classmethod
async def delete_busi(cls, db: AsyncSession, onum_list: list[int]):
await db.execute(delete(BatchBusiLabelConfig).where(BatchBusiLabelConfig.onum.in_(onum_list)))
# endregion
# region === BatchDataopLabelConfig ===
@classmethod
async def get_dataop_detail_by_id(cls, db: AsyncSession, onum: int):
result = await db.execute(select(BatchDataopLabelConfig).where(BatchDataopLabelConfig.onum == onum))
return result.scalars().first()
@classmethod
async def get_dataop_list(cls, db: AsyncSession, query_object, is_page: bool = False):
query = select(BatchDataopLabelConfig).where(
BatchDataopLabelConfig.optype.like(f"%{query_object.optype}%") if query_object.optype else True,
BatchDataopLabelConfig.mdl_name == query_object.mdl_name if query_object.mdl_name else True,
)
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
@classmethod
async def add_dataop(cls, db: AsyncSession, obj):
db_obj = BatchDataopLabelConfig(**obj.model_dump())
db.add(db_obj)
await db.flush()
return db_obj
@classmethod
async def edit_dataop(cls, db: AsyncSession, onum: int, update_data: dict):
await db.execute(update(BatchDataopLabelConfig).where(BatchDataopLabelConfig.onum == onum).values(**update_data))
@classmethod
async def delete_dataop(cls, db: AsyncSession, onum_list: list[int]):
await db.execute(delete(BatchDataopLabelConfig).where(BatchDataopLabelConfig.onum.in_(onum_list)))
# endregion
# region === BatchDatatypeLabelConfig ===
@classmethod
async def get_datatype_detail_by_id(cls, db: AsyncSession, onum: int):
result = await db.execute(select(BatchDatatypeLabelConfig).where(BatchDatatypeLabelConfig.onum == onum))
return result.scalars().first()
@classmethod
async def get_datatype_list(cls, db: AsyncSession, query_object, is_page: bool = False):
query = select(BatchDatatypeLabelConfig).where(
BatchDatatypeLabelConfig.datatype.like(f"%{query_object.datatype}%") if query_object.datatype else True,
BatchDatatypeLabelConfig.mdl_name == query_object.mdl_name if query_object.mdl_name else True,
).order_by(BatchDatatypeLabelConfig.upd_time.desc())
return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
@classmethod
async def add_datatype(cls, db: AsyncSession, obj):
db_obj = BatchDatatypeLabelConfig(**obj.model_dump())
db.add(db_obj)
await db.flush()
return db_obj
@classmethod
async def edit_datatype(cls, db: AsyncSession, onum: int, update_data: dict):
await db.execute(update(BatchDatatypeLabelConfig).where(BatchDatatypeLabelConfig.onum == onum).values(**update_data))
@classmethod
async def delete_datatype(cls, db: AsyncSession, onum_list: list[int]):
await db.execute(delete(BatchDatatypeLabelConfig).where(BatchDatatypeLabelConfig.onum.in_(onum_list)))
# endregion