from sqlalchemy import delete, select, update, desc from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.datastd_do import DataStdCode, DataStdCodeItem from module_admin.entity.vo.datastd_vo import DataStdCodeModel,DataStdCodeItemModel from utils.page_util import PageUtil class DataStdDao: """ 数据标准模块 """ @classmethod async def get_std_code_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False): """ 获取 DataStdCode 的列表信息,支持模糊查询和分页 :param db: ORM对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列表信息 """ # 构建查询条件 filters = [] if query_object.code_name: filters.append(DataStdCode.code_name.like(f"%{query_object.code_name}%")) if query_object.code_status: filters.append(DataStdCode.code_status==query_object.code_status) if query_object.sys_id: filters.append(DataStdCode.sys_id==query_object.sys_id) if query_object.code_type: filters.append(DataStdCode.code_type==query_object.code_type) # 构建查询语句 query = ( select(DataStdCode) .where(*filters) .order_by(desc(DataStdCode.create_time)) # 按创建时间降序排序 ) # 分页处理 col_list = await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) return col_list @classmethod async def get_std_code_item_list(cls, db: AsyncSession, query_object: DataStdCodeItemModel, is_page: bool = False): """ 获取 DataStdCodeItem 的列表信息,支持模糊查询和分页 :param db: ORM对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列表信息 """ # 构建查询条件 filters = [] if query_object.code_name: filters.append(DataStdCodeItem.code_name.like(f"%{query_object.code_name}%") or DataStdCodeItem.code_num.like(f"%{query_object.code_name}%")) if query_object.code_status: filters.append(DataStdCodeItem.code_status==query_object.code_status) if query_object.code_mean: filters.append(DataStdCodeItem.code_mean==query_object.code_mean) if query_object.parent_id: filters.append(DataStdCodeItem.parent_id==query_object.parent_id) else : filters.append(1==2) # 构建查询语句 query = ( select(DataStdCodeItem) .where(*filters) .order_by(desc(DataStdCodeItem.create_time)) # 按创建时间降序排序 ) # 分页处理 col_list = await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) return col_list @classmethod async def get_data_code_list(cls, db: AsyncSession, query_object: DataStdCodeModel): List = ( await db.execute( select(DataStdCode) .where( DataStdCode.code_name == query_object.code_name if query_object.code_name else True, DataStdCode.code_status == query_object.code_status if query_object.code_status else True, DataStdCode.sys_id == query_object.sys_id if query_object.sys_id else True, DataStdCode.code_type == query_object.code_type if query_object.code_type else True ) ) ).scalars().all() return List @classmethod async def get_data_code_item_list(cls, db: AsyncSession, query_object: DataStdCodeItemModel): List = ( await db.execute( select(DataStdCodeItem) .where( DataStdCodeItem.code_name == query_object.code_name if query_object.code_name else True, DataStdCodeItem.code_status == query_object.code_status if query_object.code_status else True, ) ) ).scalars().all() return List @classmethod async def get_std_code_by_id(cls, db: AsyncSession, Id: str): col = ( await db.execute( select(DataStdCode) .where(DataStdCode.Id == Id) ) ).scalars().first() return col @classmethod async def get_std_code_item_by_id(cls, db: AsyncSession, Id: str): col = ( await db.execute( select(DataStdCodeItem) .where(DataStdCodeItem.Id == Id) ) ).scalars().first() return col @classmethod async def add_std_code(cls, db: AsyncSession,model:DataStdCodeModel): col = DataStdCode( **model.model_dump() ) db.add(col) await db.flush() return col @classmethod async def add_std_code_item(cls, db: AsyncSession,model:DataStdCodeItemModel): col = DataStdCodeItem( **model.model_dump() ) db.add(col) await db.flush() return col @classmethod async def delete_std_code(cls, db: AsyncSession, Id: str): await db.execute(delete(DataStdCode).where(DataStdCode.id == Id)) @classmethod async def delete_std_code_Item(cls, db: AsyncSession, Id: str): await db.execute(delete(DataStdCodeItem).where(DataStdCodeItem.id == Id)) @classmethod async def update_std_code(cls, db: AsyncSession, update_data: DataStdCodeModel): await db.execute(update(DataStdCode), [update_data]) await db.flush() @classmethod async def update_std_code_Item(cls, db: AsyncSession, update_data: DataStdCodeItemModel): await db.execute(update(DataStdCodeItem), [update_data]) await db.flush()