from fastapi import APIRouter, Depends, Form, Request from sqlalchemy.ext.asyncio import AsyncSession from config.get_db import get_db from module_admin.aspect.interface_auth import CheckUserInterfaceAuth from module_admin.service.login_service import LoginService from module_admin.service.data_asset_service import DataAssetService from module_admin.entity.vo.data_asset_vo import DataAssetBatchModel, DataAssetPageQueryModel, DataAssetResultModel, DataAssetSearchModel from utils.log_util import logger from utils.page_util import PageResponseModel from utils.response_util import ResponseUtil from typing import List from config.enums import BusinessType from utils.common_util import bytes2file_response from module_admin.annotation.log_annotation import Log dataAssetController = APIRouter(prefix='/system/dataAsset') @dataAssetController.get('/list', response_model=PageResponseModel) async def get_data_asset_list( request: Request, data_asset_page_query: DataAssetPageQueryModel = Depends(DataAssetPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), ): """ 获取数据资产信息列表 :param request: 请求对象 :param data_asset_page_query: 查询参数 :param query_db: 数据库会话 :return: 数据资产信息列表 """ # 获取分页数据 data_asset_page_query_result = await DataAssetService.get_data_asset_list_services( query_db, data_asset_page_query, is_page=True ) logger.info('获取数据资产信息列表成功') return ResponseUtil.success(model_content=data_asset_page_query_result) @dataAssetController.post('/batch') async def batch_process_data_asset( request: Request, batch_data: DataAssetBatchModel, query_db: AsyncSession = Depends(get_db), ): """ 批量处理数据资产信息(新增/修改/删除) :param request: 请求对象 :param batch_data: 批量处理数据 :param query_db: 数据库会话 :return: 处理结果 """ try: # 批量处理数据资产 result = await DataAssetService.batch_process_data_asset_services(query_db, batch_data) # 记录处理结果 logger.info(f'批量处理数据资产成功,成功: {result.success_count},失败: {result.failed_count}') # 返回处理结果 return ResponseUtil.success(data=result) except Exception as e: logger.error(f'批量处理数据资产失败: {str(e)}') return ResponseUtil.error(msg=f"处理失败: {str(e)}") @dataAssetController.get('/sources', response_model=List[str]) async def get_data_asset_sources( request: Request, query_db: AsyncSession = Depends(get_db), ): """ 获取所有数据资产来源列表 :param request: 请求对象 :param query_db: 数据库会话 :return: 数据资产来源列表 """ sources = await DataAssetService.get_data_asset_sources_services(query_db) logger.info('获取数据资产来源列表成功') return ResponseUtil.success(data=sources) @dataAssetController.get('/search', response_model=PageResponseModel) async def search_data_assets( request: Request, search_params: DataAssetSearchModel = Depends(DataAssetSearchModel.as_query), query_db: AsyncSession = Depends(get_db), ): """ 综合查询数据资产信息 :param request: 请求对象 :param search_params: 查询参数 :param query_db: 数据库会话 :return: 查询结果 """ # 将查询参数转换为字典,剔除None值 params_dict = {k: v for k, v in search_params.model_dump().items() if v is not None and k not in ['page_num', 'page_size']} # 执行查询 search_result = await DataAssetService.search_data_assets_services( query_db, params_dict, search_params.page_num, search_params.page_size ) logger.info('综合查询数据资产信息成功') return ResponseUtil.success(model_content=search_result) # @dataAssetController.get('/search', response_model=PageResponseModel) # async def search_data_assets( # request: Request, # search_params: DataAssetSearchModel = Depends(DataAssetSearchModel.as_query), # query_db: AsyncSession = Depends(get_db), # ): @dataAssetController.post("/export") @Log(title="数据资产", business_type=BusinessType.EXPORT) async def export_data_assets( request: Request, data_asset_query: DataAssetPageQueryModel = Form(), query_db: AsyncSession = Depends(get_db), ): """ 导出数据资产信息 :param request: 请求对象 :param data_asset_query: 查询参数 :param query_db: 数据库会话 :return: Excel文件流 """ data_assets = await DataAssetService.get_data_asset_list_services( query_db, data_asset_query, is_page=False ) excel_data = await DataAssetService.export_data_asset_list_services(data_assets) logger.info('导出成功') return ResponseUtil.streaming(data=bytes2file_response(excel_data))