from datetime import datetime from sqlalchemy import case from sqlalchemy.orm import aliased from sqlalchemy import delete, func, not_, select, update, or_, and_, desc from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import IntegrityError from module_admin.entity.do.data_ast_content_do import DataAstContent,DataAstContentRela,DataAstInfo,DataAstBookmarkRela,DataAstIndx from module_admin.entity.do.user_do import SysUser from module_admin.entity.vo.data_ast_content_vo import DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogChild,DataAstBookmarkRelaRequest,DataAstIndxRequest,DataAstIndxResponse from utils.page_util import PageUtil from utils.log_util import logger class DataCatalogDAO: """ 数据目录管理模块数据库操作层 """ @classmethod async def get_catalog_by_id(cls, db: AsyncSession, content_onum: int): """ 根据目录ID获取目录详细信息 :param db: orm对象 :param content_onum: 目录ID :return: 目录信息对象 """ catalog_info = ( (await db.execute(select(DataAstContent).where(DataAstContent.content_onum == content_onum , DataAstContent.content_stat == 1))) .scalars() .first() ) return catalog_info @classmethod async def get_catalog_detail_by_info(cls, db: AsyncSession, catalog: DataCatalogPageQueryModel): """ 根据目录参数获取目录信息 :param db: orm对象 :param catalog: 目录参数对象 :return: 目录信息对象 """ catalog_info = ( ( await db.execute( select(DataAstContent).where( DataAstContent.content_name == catalog.content_name if catalog.content_name else True, DataAstContent.content_stat == catalog.content_stat if catalog.content_stat else True, DataAstContent.content_pic == catalog.content_pic if catalog.content_pic else True, DataAstContent.content_stat == 1, ) ) ) .scalars() .first() ) return catalog_info # @classmethod # async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, is_page: bool = False): # """ # 根据查询参数获取数据资产目录列表 # :param db: 异步会话对象 # :param query_object: 分页查询参数对象 # :param is_page: 是否分页 # :return: 数据资产目录分页列表 # """ # # 创建别名对象 # t1 = aliased(DataAstContentRela, name='t1') # t2 = aliased(DataAstInfo, name='t2') # t3 = aliased(DataAstBookmarkRela, name='t3') # query = ( # select( # DataAstContent.content_onum, # DataAstContent.content_name, # DataAstContent.content_stat, # DataAstContent.content_intr, # DataAstContent.content_pic, # DataAstContent.supr_content_onum, # DataAstContent.leaf_node_flag, # DataAstContent.upd_prsn, # DataAstContent.upd_time, # t1.rela_onum, # t1.ast_onum, # t1.rela_type, # t1.rela_eff_begn_date, # t1.rela_eff_end_date, # t1.upd_prsn, # t2.data_ast_no, # t2.data_ast_eng_name, # t2.data_ast_cn_name, # t2.data_ast_type, # t2.data_ast_stat, # t2.data_ast_desc, # t2.data_ast_clas, # t2.data_ast_cont, # t2.data_ast_faq, # t2.data_ast_estb_time, # t2.data_ast_upd_time, # t2.data_ast_src, # t2.ast_no, # t3.bookmark_orde, # case( # (t3.rela_onum.isnot(None), 1), # else_=0 # ).label('bookmark_flag') # ) # .distinct() # .select_from(DataAstContent) # .outerjoin(t1, DataAstContent.content_onum == t1.content_onum) # .outerjoin(t2, t1.ast_onum == t2.ast_no) # .outerjoin(t3, and_( # # t1.rela_onum == t3.rela_onum, # t2.data_ast_no == t3.data_ast_no, # t3.user_id == user_id # admin用户的ID,后续以传参的形式过来 # )) # .where(DataAstContent.content_stat == 1) # .order_by(DataAstContent.content_onum) # ) # # 使用分页工具进行查询 # data_ast_list = await PageUtil.paginate( # db, # query, # page_num=query_object.page_num, # page_size=query_object.page_size, # is_page=is_page # ) # return data_ast_list @classmethod async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, is_page: bool = False): """ 根据查询参数获取数据资产目录列表 :param db: 异步会话对象 :param query_object: 分页查询参数对象 :param is_page: 是否分页 :return: 数据资产目录分页列表 """ # 创建别名对象 t1 = aliased(DataAstContentRela, name='t1') t2 = aliased(DataAstInfo, name='t2') t3 = aliased(DataAstBookmarkRela, name='t3') # 修改子查询部分 subquery_t1 = ( select(DataAstContentRela) .where(DataAstContentRela.upd_prsn == query_object.upd_prsn, DataAstContentRela.content_onum == '2' and DataAstContentRela.rela_status == '1') .union_all( select(DataAstContentRela) .where(DataAstContentRela.content_onum != '2' and DataAstContentRela.rela_status == '1') ) ).alias('subquery_t1') # 为子查询分配唯一别名 query = ( select( DataAstContent.content_onum, DataAstContent.content_name, DataAstContent.content_stat, DataAstContent.content_intr, DataAstContent.content_pic, DataAstContent.supr_content_onum, DataAstContent.leaf_node_flag, DataAstContent.upd_prsn, DataAstContent.upd_time, subquery_t1.c.rela_onum, # 明确指定子查询的字段 subquery_t1.c.ast_onum, subquery_t1.c.rela_type, subquery_t1.c.rela_eff_begn_date, subquery_t1.c.rela_eff_end_date, subquery_t1.c.upd_prsn, t2.data_ast_no, t2.data_ast_eng_name, t2.data_ast_cn_name, t2.data_ast_type, t2.data_ast_stat, t2.data_ast_desc, t2.data_ast_clas, t2.data_ast_cont, t2.data_ast_faq, t2.data_ast_estb_time, t2.data_ast_upd_time, t2.data_ast_src, t2.ast_no, t3.bookmark_orde, case( (t3.rela_onum.isnot(None), 1), else_=0 ).label('bookmark_flag') ) .distinct() .select_from(DataAstContent) .outerjoin(subquery_t1, DataAstContent.content_onum == subquery_t1.c.content_onum) # 明确使用子查询别名 .outerjoin(t2, subquery_t1.c.ast_onum == t2.ast_no) .outerjoin(t3, and_( subquery_t1.c.ast_onum == t3.data_ast_no, t3.user_id == user_id )) .where(DataAstContent.content_stat == 1) .order_by(DataAstContent.content_onum) ) # 使用分页工具进行查询 data_ast_list = await PageUtil.paginate( db, query, page_num=query_object.page_num, page_size=query_object.page_size, is_page=is_page ) return data_ast_list @classmethod async def add_catalog_dao(cls, db: AsyncSession, catalog1: dict, catalog2: dict): """ 新增目录数据库操作 :param db: orm对象 :param catalog: 目录对象 :return: """ db_catalog = DataAstContent(**catalog1) db.add(db_catalog) await db.flush() # 处理子关系(统一转换为 ORM 模型) for child in catalog2.get('children', []): # 如果是 Pydantic 模型实例,先转换为字典 if isinstance(child, DataCatalogChild): child_dict = child.model_dump() elif isinstance(child, dict): child_dict = child else: raise TypeError("不支持的子关系数据类型") # 创建 ORM 模型实例 processed_child = dict(child_dict) processed_child['content_onum'] = db_catalog.content_onum db_child = DataAstContentRela(**processed_child) db.add(db_child) await db.flush() return db_catalog @classmethod async def edit_catalog_leaf_dao(cls, db: AsyncSession, catalog: dict): """ 编辑叶子节点目录数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ content_onum = catalog['content_onum'] stmt = ( update(DataAstContent) .where(DataAstContent.content_onum == content_onum) .values( leaf_node_flag=catalog['leaf_node_flag'] ) ) await db.execute(stmt) @classmethod async def edit_catalog_child_dao(cls, db: AsyncSession, catalog: dict): """ 编辑目录数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ content_onum = catalog['content_onum'] stmt = ( update(DataAstContent) .where(DataAstContent.content_onum == content_onum) .values( content_name=catalog['content_name'], content_stat=catalog['content_stat'], content_intr=catalog['content_intr'], content_pic=catalog['content_pic'], supr_content_onum=catalog['supr_content_onum'], leaf_node_flag=catalog['leaf_node_flag'], upd_prsn=catalog['upd_prsn'], upd_time=datetime.now() ) ) await db.execute(stmt) # 处理子关系 for child in catalog.get('children', []): rela_onum = child.get('rela_onum') if rela_onum: st = ( update(DataAstContentRela) .where(DataAstContentRela.rela_onum == rela_onum) .values( content_onum=child.get('content_onum'), ast_onum=child.get('ast_onum'), rela_type=child.get('rela_type'), rela_eff_begn_date=child.get('rela_eff_begn_date'), rela_eff_end_date=child.get('rela_eff_end_date'), upd_prsn=child.get('upd_prsn')) ) await db.execute(st) await cls.update_leaf_node_flag(db) else: child['content_onum'] = content_onum db_child = DataAstContentRela(**child) db.add(db_child) await db.flush() await cls.update_leaf_node_flag(db) @classmethod async def edit_catalog_dao(cls, db: AsyncSession, catalog1: dict): """ 编辑目录数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ content_onum = catalog1['content_onum'] stmt = ( update(DataAstContent) .where(DataAstContent.content_onum == content_onum) .values( content_name=catalog1['content_name'], content_stat=catalog1['content_stat'], content_intr=catalog1['content_intr'], content_pic=catalog1['content_pic'], supr_content_onum=catalog1['supr_content_onum'], leaf_node_flag=catalog1['leaf_node_flag'], upd_prsn=catalog1['upd_prsn'], upd_time=datetime.now() ) ) await db.execute(stmt) await cls.update_leaf_node_flag(db) @classmethod async def delete_catalog_dao(cls, db: AsyncSession, catalog: DeleteDataCatalogModel): """ 删除目录数据库操作 :param db: orm对象 :param catalog: 目录对象 :content_stat=0 作废 :return: """ content_onums = catalog.content_onums.split(',') logger.info(f"Updating DataAstContent with supr_content_onum in {content_onums}") await db.execute( update(DataAstContentRela) .where(DataAstContentRela.content_onum.in_(content_onums)) .values( rela_status=0, rela_eff_end_date=datetime.now() ) ) await db.execute( update(DataAstContent) .where(DataAstContent.content_onum.in_(content_onums)) .values( content_stat=0, upd_time=datetime.now() ) ) logger.info("更新叶子节点标志成功") await cls.update_leaf_node_flag(db) @classmethod async def get_data_asset_catalog_tree(cls, db: AsyncSession): """ 获取数据资产树 :param db: 异步会话对象 :return: 去重后的数据资产树数据 """ query = ( select( DataAstInfo.data_ast_src, DataAstInfo.data_ast_eng_name, DataAstInfo.data_ast_cn_name, DataAstInfo.ast_no ) .distinct() .select_from(DataAstInfo) .where( DataAstInfo.data_ast_stat == 1, not_( DataAstInfo.ast_no.in_( select(DataAstContentRela.ast_onum) .where(DataAstContentRela.rela_status == 1) ) ) ) ) result = await db.execute(query) rows = result.fetchall() return rows @classmethod async def moved_catalog_instr_dao(cls, db: AsyncSession, moved_catalog_data: dict): """ 编辑目录数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ # content_onum = moved_catalog_data['content_onum'] stmt = ( update(DataAstContent) .where(DataAstContent.content_onum == moved_catalog_data['content_onum'] , DataAstContent.supr_content_onum == moved_catalog_data['supr_content_onum']) .values( supr_content_onum=moved_catalog_data['supr_content_onum_after'], upd_time=datetime.now() ) ) await db.execute(stmt) await cls.update_leaf_node_flag(db) @classmethod async def merge_catalog_instr_dao(cls, db: AsyncSession, merge_catalog_data: dict): """ 编辑目录数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ # stmt = ( # update(DataAstContent) # .where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum']) # .values( # content_onum=merge_catalog_data['content_onum_after'], # supr_content_onum=merge_catalog_data['supr_content_onum_after'], # upd_time=datetime.now() # ) ) # await db.execute(stmt) stmt1 = ( update(DataAstContentRela) .where( DataAstContentRela.content_onum == merge_catalog_data['content_onum'] and DataAstContentRela.rela_status == 1 ) .values( content_onum=merge_catalog_data['content_onum_after'], rela_eff_begn_date=datetime.now() ) ) await db.execute(stmt1) stmt2 = ( update(DataAstContent) .where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum']) .values( content_stat = '0' ) ) await db.execute(stmt2) await cls.update_leaf_node_flag(db) @classmethod async def removerel_data_ast_catalog_dao(cls, db: AsyncSession, removerel_catalog_data: dict): """ 编辑资产关系数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ stmt = ( update(DataAstContentRela) .where(DataAstContentRela.rela_onum == removerel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == removerel_catalog_data['content_onum']) .values( rela_status=removerel_catalog_data['rela_status'] ) ) await db.execute(stmt) await cls.update_leaf_node_flag(db) @classmethod async def moverel_data_ast_catalog_dao(cls, db: AsyncSession, moverel_catalog_data: dict): """ 编辑资产关系数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ stmt = ( update(DataAstContentRela) .where(DataAstContentRela.rela_onum == moverel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == moverel_catalog_data['content_onum']) .values( content_onum=moverel_catalog_data['content_onum_after'], rela_eff_end_date=datetime.now() ) ) await db.execute(stmt) await cls.update_leaf_node_flag(db) @classmethod async def update_leaf_node_flag(cls, db: AsyncSession): """ 更新leaf_node_flag字段 """ # 创建别名对象 t2 = aliased(DataAstContent, name='t2') # 正确使用aliased创建别名 subquery = ( select(DataAstContent.content_onum) .where( DataAstContent.content_stat == '1', DataAstContent.leaf_node_flag == 0, not_( select(1) .select_from(t2) # 使用别名后的表 .where( t2.supr_content_onum == DataAstContent.content_onum, t2.content_stat == '1' ) .exists() # 添加exists()方法 ) ) ).alias('temp') stmt = ( update(DataAstContent) .where(DataAstContent.content_onum.in_(subquery)) .values(leaf_node_flag=1, upd_time=datetime.now()) ) await db.execute(stmt) @classmethod async def delete_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest, user_name: str, user_id: str): """ 取消收藏数据库操作 :param db: orm对象 :param catalog: 收藏对象 :return: 操作结果字典(包含成功状态和提示信息) """ try: # 创建子查询:获取需要删除的资产编号 ast_onum_subquery = ( select(DataAstContentRela.ast_onum) .where( DataAstContentRela.rela_onum == catalog['rela_onum'], DataAstBookmarkRela.user_id == user_id, DataAstContentRela.rela_status == '1' ) ).subquery() # 构建删除语句 stmt1 = ( delete(DataAstContentRela) .where( DataAstContentRela.upd_prsn == user_name, DataAstContentRela.content_onum == '2', # 考虑使用变量或常量 DataAstContentRela.ast_onum.in_(ast_onum_subquery), DataAstContentRela.rela_status == 1 ) ) stmt2 = ( delete(DataAstBookmarkRela) .where( DataAstBookmarkRela.data_ast_no.in_(ast_onum_subquery), DataAstBookmarkRela.user_id == user_id ) ) # 执行删除操作 await db.execute(stmt1) await db.execute(stmt2) await db.commit() logger.info(f"成功删除收藏关系") return {"is_success": True} except IntegrityError as ie: await db.rollback() logger.error(f"删除收藏关系失败(完整性错误): {str(ie)}") return {"is_success": False, "message": "数据库完整性错误"} except Exception as e: await db.rollback() logger.error(f"删除收藏关系失败: {str(e)}") return {"is_success": False, "message": "未知错误"} @classmethod async def delete_ast_book_mark_rela_by_content_onum(cls, db: AsyncSession, content_onum: int, user_id: int): """ 根据目录ID和用户ID删除收藏关系 :param db: orm对象 :param content_onum: 目录ID :param user_id: 用户ID :return: """ # 创建别名对象 t1 = aliased(DataAstContentRela, name='t1') t2 = aliased(DataAstInfo, name='t2') t3 = aliased(DataAstBookmarkRela, name='t3') cte = ( select(t3.rela_onum) .select_from(DataAstContent) .outerjoin(t1, DataAstContent.content_onum == t1.content_onum) .outerjoin(t2, t1.ast_onum == t2.ast_no) .outerjoin(t3, and_( t2.data_ast_no == t3.data_ast_no, t3.user_id == user_id )) .where(DataAstContent.content_onum == content_onum) ).cte('cte') stmt = ( delete(DataAstBookmarkRela) .where(DataAstBookmarkRela.rela_onum.in_(select(cte.c.rela_onum))) ) await db.execute(stmt) await db.commit() logger.info(" 删除收藏关系,操作成功") # @classmethod # async def add_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest): # """ # 添加收藏数据库操作 # :param db: orm对象 # :param catalog: 收藏对象 # :return: # """ # #如果catalog[user_id]下已经存在了catalog[data_ast_no],那么返回已收藏,否则添加收藏,新添加的收藏顺序号要求是插入之前最大顺序号加1 # db_catalog = DataAstBookmarkRela(**catalog) # db.add(db_catalog) # await db.flush() # logger.info(" 添加收藏,操作成功") @classmethod async def add_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest, user_name: str): """ 添加收藏数据库操作 :param db: orm对象 :param catalog: 收藏对象 :return: 操作结果字典(包含成功状态和提示信息) """ try: logger.info(f"开始处理收藏请求,用户ID: {catalog['user_id']}, 资产编号: {catalog['data_ast_no']}") # 获取通过资产序号,获取资产编号 para_ast_no = ( select( DataAstInfo.ast_no ) .where( DataAstInfo.data_ast_no == catalog['data_ast_no'] ) ) # 1. 检查是否已存在相同收藏 logger.info("检查是否已存在相同收藏...") exists = await db.execute( select(DataAstBookmarkRela) .where( DataAstBookmarkRela.user_id == catalog['user_id'], DataAstBookmarkRela.data_ast_no == para_ast_no ) ) if exists.scalar(): logger.warning(f"用户 {catalog['user_id']} 已收藏资产 {para_ast_no}") return {"is_success": False, "message": "该资产已被收藏"} # 2. 获取当前最大顺序号 logger.info("获取当前最大顺序号...") max_order = await db.execute( select(func.max(DataAstBookmarkRela.bookmark_orde)) .where(DataAstBookmarkRela.user_id == catalog['user_id']) ) max_bookmark_orde = max_order.scalar() or 0 logger.info(f"当前最大顺序号: {max_bookmark_orde}") # 3. 设置新顺序号 logger.info("设置新顺序号...") catalog_dict = { 'user_id': catalog["user_id"], 'data_ast_no': para_ast_no, 'bookmark_orde': max_bookmark_orde + 1, 'bookmark_time': datetime.now() } # 4. 创建并保存新记录 logger.info("创建并保存新记录...") db_catalog = DataAstBookmarkRela(**catalog_dict) db.add(db_catalog) # 添加收藏关系内容 logger.info("添加收藏关系内容...") new_rela = DataAstContentRela( content_onum=catalog["content_onum"], ast_onum=para_ast_no, rela_type=catalog["rela_type"], rela_eff_begn_date=catalog["rela_eff_begn_date"], rela_eff_end_date=catalog["rela_eff_end_date"], upd_prsn=user_name, rela_status="1" ) db.add(new_rela) # 提交事务 logger.info("提交事务...") await db.flush() logger.info(f"用户 {catalog['user_id']} 收藏资产 {para_ast_no} 成功") return {"is_success": True, "message": "收藏成功"} except Exception as e: logger.error(f"收藏操作失败: {str(e)}", exc_info=True) await db.rollback() return {"is_success": False, "message": "收藏操作失败"} @classmethod async def get_data_ast_indx_list(cls, db: AsyncSession, query_object: DataAstIndxRequest): """ 根据查询参数获取数据资产指标列表 """ query = ( select( DataAstIndx.ast_no, DataAstIndx.indx_no, DataAstIndx.indx_name, DataAstIndx.indx_val ) .where( DataAstIndx.ast_no == query_object.ast_no if query_object.ast_no else True, DataAstIndx.indx_no == query_object.indx_no if query_object.indx_no else True, DataAstIndx.indx_name.like(f"%{query_object.indx_name}%") if query_object.indx_name else True ) ) result = await db.execute(query) rows = result.mappings().all() # 直接获取字典列表 return rows