from datetime import datetime from collections import defaultdict from fastapi.responses import HTMLResponse from fastapi import APIRouter, Depends, Request from pydantic_validation_decorator import ValidateFields from sqlalchemy.ext.asyncio import AsyncSession from config.enums import BusinessType from config.get_db import get_db from module_admin.annotation.log_annotation import Log from module_admin.aspect.interface_auth import CheckUserInterfaceAuth from module_admin.service.login_service import LoginService from module_admin.service.data_ast_content_service import DataCatalogService from module_admin.entity.vo.data_ast_content_vo import DataAstSecuResponse, DataAstSecuRequest,DataCatalogRequest, DataCatalogResponse, DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogResponseWithChildren,DataAssetCatalogTreeResponse,DataCatalogMovedRequest,DataCatalogMergeRequest,DataCatalogChild,DataCatalogMoverelRequest,DataAstIndxRequest,DataAstBookmarkRelaRequest from module_admin.entity.vo.user_vo import CurrentUserModel from module_admin.entity.vo.metasecurity_vo import MetaSecurityApiModel from module_admin.service.metasecurity_service import MetaSecurityService from utils.common_util import bytes2file_response from utils.log_util import logger from utils.page_util import PageResponseModel from utils.response_util import ResponseUtil dataCatalogController = APIRouter(prefix='/system/data_catalog', dependencies=[Depends(LoginService.get_current_user)]) @dataCatalogController.get( '/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:list'))] ) async def get_data_catalog_list( request: Request, catalog_page_query: DataCatalogPageQueryModel = Depends(DataCatalogPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): #设置字段 user_id = current_user.user.user_id user_name = current_user.user.user_name # 获取分页数据 catalog_page_query_result = await DataCatalogService.get_catalog_list_services(query_db, catalog_page_query, user_id, user_name, is_page=True) logger.info('获取成功') return ResponseUtil.success(model_content=catalog_page_query_result) @dataCatalogController.get( '/atree',response_model=DataAssetCatalogTreeResponse,dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:atree'))],summary="Data Asset Tree Query" ) async def get_data_asset_catalog_tree(query_db: AsyncSession = Depends(get_db)): try: logger.debug("开始获取数据资产目录树") catalog_tree_result = await DataCatalogService.get_data_asset_catalog_tree_services(query_db) logger.info('数据资产树获取成功') return ResponseUtil.success(data=catalog_tree_result) except Exception as e: logger.error(f"数据资产树获取失败: {str(e)}", exc_info=True) return ResponseUtil.error(msg="数据查询异常,请联系管理员") @dataCatalogController.post('', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:add'))]) @ValidateFields(validate_model='add_data_catalog') @Log(title='数据目录管理', business_type=BusinessType.INSERT) async def add_data_catalog( request: Request, add_catalog: DataCatalogResponseWithChildren, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 设置字段 add_catalog.upd_prsn = current_user.user.user_name # 调用服务层方法 add_result = await DataCatalogService.add_catalog_services(query_db, add_catalog) logger.info(add_result.message) # 新增成功后,更新新增数据目录的父亲节点的叶子标志为0 if add_result.is_success: if add_catalog.supr_content_onum is not None: supr_content_onum = add_catalog.supr_content_onum await DataCatalogService.edit_catalog_leaf_services(query_db,supr_content_onum, 0) else: logger.error(add_result.message) # 返回标准化响应 return ResponseUtil.success( msg=add_result.message ) @dataCatalogController.get( '/getMetaSercuityData', response_model=DataAstSecuResponse, dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:secu'))] ) @ValidateFields(validate_model='get_secu_data_request') async def getMetaSercuityData( request: Request, dataAstSecuRequest: DataAstSecuRequest=Depends(DataAstSecuRequest), query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 获取当前用户信息 user_name = current_user.user.user_name password = current_user.user.password logger.info(f"获取当前用户信息:user_id={user_name}, password={password}") # 设置字段 apiModel = MetaSecurityApiModel() apiModel.dbRCode = dataAstSecuRequest.data_ast_src apiModel.username = user_name apiModel.password = password apiModel.sqlStr = "select * from " + dataAstSecuRequest.data_ast_eng_name # logger.info(f"设置 apiModel 参数:dbRId={apiModel.dbRId}, username={apiModel.username}, password={apiModel.password}, sqlStr={apiModel.sqlStr}") # 打印 apiModel 对象 # logger.debug(f"apiModel 对象内容:{apiModel}") # 调用服务层方法 config_detail_result = await MetaSecurityService.getMetaSercuitybysql(request, query_db, apiModel) logger.info(f"调用 MetaSecurityService.getMetaSercuitybysql 方法,返回结果:{config_detail_result}") # 记录成功日志 logger.info(f"获取 config_id 为 {apiModel} 的信息成功") return ResponseUtil.success(data=config_detail_result) @dataCatalogController.put('/edit', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) @ValidateFields(validate_model='edit_data_catalog') @Log(title='数据目录管理', business_type=BusinessType.UPDATE) async def edit_data_catalog( request: Request, edit_catalog: DataCatalogResponseWithChildren, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 设置审计字段 edit_catalog.upd_prsn = current_user.user.user_name # 调用服务层方法 edit_result = await DataCatalogService.edit_catalog_child_services(query_db, edit_catalog) logger.info(edit_result.message) # 返回标准化响应 return ResponseUtil.success( msg=edit_result.message ) @dataCatalogController.put('/moved', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) @ValidateFields(validate_model='moved_data_catalog') @Log(title='数据目录管理', business_type=BusinessType.UPDATE) async def moved_data_catalog( request: Request, moved_catalog: DataCatalogMovedRequest, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 调用服务层方法 moved_result = await DataCatalogService.moved_catalog_instr_services(query_db, moved_catalog) logger.info(moved_result.message) # 返回标准化响应 return ResponseUtil.success( msg=moved_result.message ) @dataCatalogController.put('/merge', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) @ValidateFields(validate_model='merge_data_catalog') @Log(title='数据目录管理', business_type=BusinessType.UPDATE) async def moved_data_catalog( request: Request, merge_catalog: DataCatalogMergeRequest, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 调用服务层方法 merge_result = await DataCatalogService.merge_catalog_instr_services(query_db, merge_catalog) logger.info(merge_result.message) # 返回标准化响应 return ResponseUtil.success( msg=merge_result.message ) @dataCatalogController.put('/removerel', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) @ValidateFields(validate_model='removerel_data_ast_catalog') @Log(title='数据目录管理', business_type=BusinessType.UPDATE) async def removerel_data_ast_catalog( request: Request, removerel_catalog: DataCatalogChild, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 调用服务层方法 removerel_result = await DataCatalogService.removerel_data_ast_catalog_services(query_db, removerel_catalog) logger.info(removerel_result.message) # 返回标准化响应 return ResponseUtil.success() @dataCatalogController.put('/moverel', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) @ValidateFields(validate_model='moverel_data_ast_catalog') @Log(title='数据目录管理', business_type=BusinessType.UPDATE) async def moverel_data_ast_catalog( request: Request, moverel_catalog: DataCatalogMoverelRequest, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 调用服务层方法 moverel_result = await DataCatalogService.moverel_data_ast_catalog_services(query_db, moverel_catalog) logger.info(moverel_result.message) # 返回标准化响应 return ResponseUtil.success() @dataCatalogController.delete('/{content_onums}', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:remove'))]) @Log(title='数据目录管理', business_type=BusinessType.DELETE) async def delete_data_catalog(request: Request, content_onums: str, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): delete_catalog = DeleteDataCatalogModel(content_onums=content_onums) delete_catalog_result = await DataCatalogService.delete_catalog_services(query_db, delete_catalog,user_id=current_user.user.user_id) logger.info(delete_catalog_result.message) return ResponseUtil.success(msg=delete_catalog_result.message) @dataCatalogController.get( '/{content_onum}', response_model=DataCatalogResponse, dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:query'))] ) async def query_detail_data_catalog(request: Request, content_onum: int, query_db: AsyncSession = Depends(get_db)): catalog_detail_result = await DataCatalogService.get_catalog_detail_services(query_db, content_onum) logger.info(f'获取content_onum为{content_onum}的信息成功') return ResponseUtil.success(data=catalog_detail_result) @dataCatalogController.delete( '/bookmark/{rela_onum}', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))] ) @Log(title='数据资产收藏管理', business_type=BusinessType.DELETE) async def delete_ast_book_mark_rela( request: Request, rela_onum: int, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): user_name = current_user.user.user_name user_id = current_user.user.user_id # 创建请求对象 delete_request = DataAstBookmarkRelaRequest(rela_onum=rela_onum) # 调用服务层方法 delete_result = await DataCatalogService.delete_ast_book_mark_rela_services(query_db, delete_request,user_name,user_id) logger.info(delete_result.message) # 返回标准化响应 return ResponseUtil.success(msg=delete_result.message) @dataCatalogController.post( '/bookmark', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))] ) @ValidateFields(validate_model='add_data_ast_bookmark') @Log(title='数据资产收藏管理', business_type=BusinessType.INSERT) async def add_ast_book_mark_rela( request: Request, add_bookmark: DataAstBookmarkRelaRequest, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 设置字段 add_bookmark.user_id = current_user.user.user_id user_name = current_user.user.user_name # 调用服务层方法 add_result = await DataCatalogService.add_ast_book_mark_rela_services(query_db, add_bookmark,user_name) logger.info(add_result.message) # 返回标准化响应 return ResponseUtil.success(msg=add_result.message) @dataCatalogController.get( '/indx/list', response_class=HTMLResponse, # 指定返回HTML类型, dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:list'))] ) async def get_data_ast_indx_list( request: Request, indx_page_query: DataAstIndxRequest = Depends(DataAstIndxRequest), query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 获取分页数据 indx_page_query_result = await DataCatalogService.get_data_ast_indx_list_services(query_db, indx_page_query) logger.info('获取成功') return indx_page_query_result @dataCatalogController.post('/bookmark/folder', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:add'))]) @ValidateFields(validate_model='add_bookmark_folder') @Log(title='收藏目录管理', business_type=BusinessType.INSERT) async def add_bookmark_folder( request: Request, add_folder: DataCatalogRequest, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 设置字段 add_folder.upd_prsn = current_user.user.user_name add_folder.supr_content_onum = 2 # 固定为"我的收藏"的目录ID add_folder.content_stat = "1" # 设置为有效状态 add_folder.leaf_node_flag = 1 # 默认为叶子节点 # 调用服务层方法 add_result = await DataCatalogService.add_bookmark_folder_services(query_db, add_folder) logger.info(add_result.message) # 返回标准化响应 return ResponseUtil.success( msg=add_result.message ) @dataCatalogController.put('/bookmark/folder', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) @ValidateFields(validate_model='edit_bookmark_folder') @Log(title='收藏目录管理', business_type=BusinessType.UPDATE) async def edit_bookmark_folder( request: Request, edit_folder: DataCatalogRequest, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 设置审计字段 edit_folder.upd_prsn = current_user.user.user_name # 调用服务层方法 edit_result = await DataCatalogService.edit_bookmark_folder_services(query_db, edit_folder) logger.info(edit_result.message) # 返回标准化响应 return ResponseUtil.success( msg=edit_result.message ) @dataCatalogController.delete('/bookmark/folder/{content_onum}', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:remove'))]) @Log(title='收藏目录管理', business_type=BusinessType.DELETE) async def delete_bookmark_folder( request: Request, content_onum: int, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 调用服务层方法 delete_result = await DataCatalogService.delete_bookmark_folder_services( query_db, content_onum, current_user.user.user_name, current_user.user.user_id ) logger.info(delete_result.message) # 返回标准化响应 return ResponseUtil.success( msg=delete_result.message ) @dataCatalogController.get( '/bookmark/folders', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:list'))] ) async def get_bookmark_folders( request: Request, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 获取当前用户名 user_name = current_user.user.user_name # 调用服务层方法 folders = await DataCatalogService.get_bookmark_folders_services(query_db, user_name) logger.info(f'获取用户 {user_name} 的收藏目录列表成功') # 返回标准化响应 return ResponseUtil.success(data=folders) @dataCatalogController.put('/bookmark/asset/move', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) @ValidateFields(validate_model='move_bookmark_asset') @Log(title='收藏资产管理', business_type=BusinessType.UPDATE) async def move_bookmark_asset( request: Request, moverel_catalog: DataCatalogMoverelRequest, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): # 设置用户信息 moverel_catalog.upd_prsn = current_user.user.user_name # 调用服务层方法 moverel_result = await DataCatalogService.move_bookmark_asset_services(query_db, moverel_catalog) logger.info(moverel_result.message) # 返回标准化响应 return ResponseUtil.success(msg=moverel_result.message)