from sqlalchemy import delete, select, update, desc,or_ from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.datastd_do import DataStdCode,DataStdDict from module_admin.entity.vo.datastd_vo import DataStdCodeModel,DataStdDictModel from utils.page_util import PageUtil from sqlalchemy.orm import aliased class DataStdDao: # ----------------------------------------------------------------数据标准模块---------------------------------------------------------------------------------------------------- @classmethod async def get_std_code_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False): """ 获取 DataStdCode 的列表信息,支持模糊查询和分页 :param db: ORM对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列表信息 """ # 构建查询条件 filters = [] if query_object.code_name: filters.append(DataStdCode.code_name.like(f"%{query_object.code_name}%")) if query_object.code_num: filters.append( or_( DataStdCode.code_name.like(f"%{query_object.code_num}%"), DataStdCode.code_num.like(f"%{query_object.code_num}%") )) if query_object.code_status: filters.append(DataStdCode.code_status==query_object.code_status) if query_object.sys_id: filters.append(DataStdCode.sys_id==query_object.sys_id) if query_object.code_type: filters.append(DataStdCode.code_type==query_object.code_type) if query_object.class_id: filters.append(DataStdCode.class_id==query_object.class_id) if query_object.class_id=='codeItem' and not query_object.parent_id: filters.append(1==2) if query_object.parent_id: filters.append(DataStdCode.parent_id==query_object.parent_id) # 构建查询语句 query = ( select(DataStdCode) .where(*filters) .order_by(desc(DataStdCode.create_time)) # 按创建时间降序排序 ) # 分页处理 col_list = await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) return col_list @classmethod async def get_std_code_map_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False): # 构建查询条件 filters = [] # 系统级代码 c1 = aliased(DataStdCode) # 当前 codeItem 表(c1) c2 = aliased(DataStdCode) # 别名 c2 表示父级(parent) c3 = aliased(DataStdCode) # 别名 c3 表示映射的代码 c4 = aliased(DataStdCode) # 别名 c4 表示映射父级代码 if query_object.parent_id: if query_object.code_type =='sys': filters.append(c1.parent_id == query_object.parent_id) else: filters.append(c3.parent_id == query_object.parent_id) # 公司级代码 # 2. 创建别名并进行连接 filters.append(c1.class_id == 'codeItem') # class_id 为 'codeItem' # 3. 构建基础查询,使用连接和别名 query = ( select( c2.sys_id.label("target_sys_id"), # 父级系统编号 c2.sys_name.label("target_sys_name"), # 父级系统名称 c2.code_num.label("target_code_num"), # 父级代码编号 c2.code_name.label("target_code_name"), # 父级代码名称 c1.code_num.label("target_code_num_item"), # 当前 codeItem 的编号 c1.code_name.label("target_code_name_item"), # 当前 codeItem 的名称 c4.code_num.label("resource_code_num"), # 映射的父级代码编号 c4.code_name.label("resource_code_name"), # 映射的父级代码名称 c3.code_num.label("resource_code_num_item"), # 映射的父级代码编号 c3.code_name.label("resource_code_name_item") # 映射的父级代码名称 ) .select_from(c1) # 从 c1 开始查询 .join(c2, c1.parent_id == c2.id, isouter=True) # 连接 c2 .join(c3, c1.code_map_id == c3.id, isouter=True) # 连接 c3 .join(c4, c3.parent_id == c4.id, isouter=True) # 连接 c4 .where(*filters) # 使用所有过滤条件 .order_by(desc(c1.create_time)) # 按照创建时间降序排列 ) # 4. 分页逻辑 col_list = await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) return col_list @classmethod async def get_data_code_list_by_info(cls, db: AsyncSession, query_object: DataStdCodeModel): List = ( await db.execute( select(DataStdCode) .where( DataStdCode.code_name == query_object.code_name if query_object.code_name else True, DataStdCode.code_status == query_object.code_status if query_object.code_status else True, DataStdCode.sys_id == query_object.sys_id if query_object.sys_id else True, DataStdCode.code_type == query_object.code_type if query_object.code_type else True, DataStdCode.parent_id == query_object.parent_id if query_object.parent_id else True, DataStdCode.class_id == query_object.class_id if query_object.class_id else True ) ) ).scalars().all() return List @classmethod async def get_data_code_by_info(cls, db: AsyncSession, query_object: DataStdCodeModel): List = ( await db.execute( select(DataStdCode) .where( DataStdCode.code_name == query_object.code_name if query_object.code_name else True, DataStdCode.code_num == query_object.code_num if query_object.code_num else True, DataStdCode.code_status == query_object.code_status if query_object.code_status else True, DataStdCode.sys_id == query_object.sys_id if query_object.sys_id else True, DataStdCode.code_type == query_object.code_type if query_object.code_type else True, DataStdCode.parent_id == query_object.parent_id if query_object.parent_id else True, DataStdCode.class_id == query_object.class_id if query_object.class_id else True ) ) ).scalars().first() return List @classmethod async def get_std_code_by_id(cls, db: AsyncSession, Id: str): col = ( await db.execute( select(DataStdCode) .where(DataStdCode.id == Id) ) ).scalars().first() return col @classmethod async def add_std_code(cls, db: AsyncSession,model:DataStdCodeModel): col = DataStdCode( **model.model_dump() ) db.add(col) await db.flush() return col @classmethod async def delete_std_code(cls, db: AsyncSession, Id: str): await db.execute(delete(DataStdCode).where(DataStdCode.id == Id)) @classmethod async def update_std_code(cls, db: AsyncSession, update_data: DataStdCodeModel): await db.execute(update(DataStdCode), [update_data]) await db.flush() @classmethod async def check_code_num_exists(cls, query_db: AsyncSession, code_num: str) -> bool: """ 检查标准代码或代码项的code_num是否已经存在 """ # 查询标准代码表中是否存在相同的code_num result = await query_db.execute(select(DataStdCodeModel).filter(DataStdCodeModel.code_num == code_num)) existing_code = result.scalar_one_or_none() # 如果不存在,则检查代码项表中是否存在相同的code_num if not existing_code: result = await query_db.execute(select(DataStdCode).filter(DataStdCodeModel.code_num == code_num)) existing_code_item = result.scalar_one_or_none() return existing_code_item is not None return True # ----------------------------------------------------------------数据字典---------------------------------------------------------------------------------------------------- @classmethod async def get_std_dict_list(cls, db: AsyncSession, query_object: DataStdDictModel, is_page: bool = False): """ 获取 DataStdDict 的列表信息,支持模糊查询和分页 :param db: ORM对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列表信息 """ # 构建查询条件 filters = [] if query_object.dict_name: filters.append(DataStdDict.dict_name.like(f"%{query_object.dict_name}%")) if query_object.dict_code: filters.append(DataStdDict.dict_code.like(f"%{query_object.dict_code}%")) if query_object.dict_level: filters.append(DataStdDict.dict_level == query_object.dict_level) if query_object.sys_id: filters.append(DataStdDict.sys_id == query_object.sys_id) if query_object.dict_type: filters.append(DataStdDict.dict_type == query_object.dict_type) if query_object.dict_status: filters.append(DataStdDict.dict_status == query_object.dict_status) # 构建查询语句 query = ( select(DataStdDict) .where(*filters) .order_by(desc(DataStdDict.create_time)) # 按创建时间降序排序 ) # 分页处理 col_list = await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) return col_list @classmethod async def add_std_dict(cls, db: AsyncSession,model:DataStdDictModel): col = DataStdDict( **model.model_dump() ) db.add(col) await db.flush() return col @classmethod async def delete_std_dict(cls, db: AsyncSession, Id: str): await db.execute(delete(DataStdDict).where(DataStdDict.id == Id)) @classmethod async def update_std_dict(cls, db: AsyncSession, update_data: DataStdDictModel): await db.execute(update(DataStdDict), [update_data]) await db.flush() @classmethod async def get_std_dict_by_id(cls, db: AsyncSession, Id: str): col = ( await db.execute( select(DataStdDict) .where(DataStdDict.id == Id) ) ).scalars().first() return col @classmethod async def get_data_dict_by_info(cls, db: AsyncSession, query_object: DataStdDictModel): List = ( await db.execute( select(DataStdDict) .where( DataStdDict.dict_name == query_object.dict_name if query_object.dict_name else True, DataStdDict.dict_num == query_object.dict_num if query_object.dict_num else True, DataStdDict.dict_status == query_object.dict_status if query_object.dict_status else True, DataStdDict.sys_id == query_object.sys_id if query_object.sys_id else True, DataStdDict.dict_type == query_object.dict_type if query_object.dict_type else True, ) ) ).scalars().first() return List