from sqlalchemy import select, update, delete, insert from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.data_ast_content_do import DataAssetInfo from module_admin.entity.vo.data_asset_vo import DataAssetItemModel, DataAssetPageQueryModel from utils.page_util import PageUtil from typing import List, Dict, Any class DataAssetDao: """ 数据资产信息模块数据库操作层 """ @classmethod async def get_data_asset_list(cls, db: AsyncSession, query_object: DataAssetPageQueryModel, is_page: bool = False): """ 根据查询参数获取数据资产信息列表 :param db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 数据资产信息列表对象 """ query = ( select(DataAssetInfo) .where( DataAssetInfo.ast_no == query_object.ast_no if query_object.ast_no else True, DataAssetInfo.data_ast_eng_name.like(f'%{query_object.data_ast_eng_name}%') if query_object.data_ast_eng_name else True, DataAssetInfo.data_ast_cn_name.like(f'%{query_object.data_ast_cn_name}%') if query_object.data_ast_cn_name else True, DataAssetInfo.data_ast_type == query_object.data_ast_type if query_object.data_ast_type else True, DataAssetInfo.data_ast_stat == query_object.data_ast_stat if query_object.data_ast_stat else True, DataAssetInfo.data_ast_src == query_object.data_ast_src if query_object.data_ast_src else True, ) .order_by(DataAssetInfo.data_ast_estb_time.desc()) ) data_asset_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page) return data_asset_list @classmethod async def get_data_asset_by_ast_no(cls, db: AsyncSession, ast_no: str): """ 根据资产编号获取数据资产信息 :param db: orm对象 :param ast_no: 资产编号 :return: 数据资产信息对象 """ data_asset = ( await db.execute(select(DataAssetInfo).where(DataAssetInfo.ast_no == ast_no)) ).scalars().first() return data_asset @classmethod async def insert_data_asset(cls, db: AsyncSession, data_asset: Dict[str, Any]): """ 插入数据资产信息 :param db: orm对象 :param data_asset: 数据资产信息字典 :return: 插入的数据资产对象 """ db_data_asset = DataAssetInfo(**data_asset) db.add(db_data_asset) await db.flush() return db_data_asset @classmethod async def update_data_asset(cls, db: AsyncSession, ast_no: str, data_asset: Dict[str, Any]): """ 更新数据资产信息 :param db: orm对象 :param ast_no: 资产编号 :param data_asset: 数据资产信息字典 :return: 更新结果 """ result = await db.execute( update(DataAssetInfo) .where(DataAssetInfo.ast_no == ast_no) .values(**data_asset) ) return result.rowcount @classmethod async def delete_data_asset(cls, db: AsyncSession, ast_no: str): """ 删除数据资产信息 :param db: orm对象 :param ast_no: 资产编号 :return: 删除结果 """ result = await db.execute( delete(DataAssetInfo) .where(DataAssetInfo.ast_no == ast_no) ) return result.rowcount @classmethod async def get_data_asset_sources(cls, db: AsyncSession): """ 获取所有数据资产来源列表 :param db: orm对象 :return: 数据资产来源列表 """ # 使用distinct查询不重复的数据资产来源 result = await db.execute( select(DataAssetInfo.data_ast_src) .distinct() .where(DataAssetInfo.data_ast_src.is_not(None)) .order_by(DataAssetInfo.data_ast_src) ) sources = result.scalars().all() return sources @classmethod async def search_data_assets(cls, db: AsyncSession, search_params: dict, is_page: bool = False, page_num: int = 1, page_size: int = 10): """ 综合查询数据资产信息 :param db: orm对象 :param search_params: 查询参数字典 :param is_page: 是否分页 :param page_num: 页码 :param page_size: 每页大小 :return: 查询结果 """ query = select(DataAssetInfo) # 构建查询条件 conditions = [] # 名称查询(支持英文名称和中文名称) if search_params.get('name'): name_keyword = f"%{search_params['name']}%" conditions.append( (DataAssetInfo.data_ast_eng_name.like(name_keyword)) | (DataAssetInfo.data_ast_cn_name.like(name_keyword)) ) # 数据资产类型 if search_params.get('data_ast_type'): conditions.append(DataAssetInfo.data_ast_type == search_params['data_ast_type']) # 资产应用场景 if search_params.get('data_ast_screen'): conditions.append(DataAssetInfo.data_ast_screen == search_params['data_ast_screen']) # 数据资产标签 if search_params.get('data_ast_clas'): conditions.append(DataAssetInfo.data_ast_clas.like(f"%{search_params['data_ast_clas']}%")) # 数据资产来源 if search_params.get('data_ast_src'): conditions.append(DataAssetInfo.data_ast_src == search_params['data_ast_src']) # 将所有条件添加到查询 for condition in conditions: query = query.where(condition) # 排序 query = query.order_by(DataAssetInfo.data_ast_estb_time.desc()) # 分页查询 result = await PageUtil.paginate(db, query, page_num, page_size, is_page) return result