You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

147 lines
5.0 KiB

2 months ago
from fastapi import APIRouter, Depends, Form, Request
from sqlalchemy.ext.asyncio import AsyncSession
from config.get_db import get_db
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth
from module_admin.service.login_service import LoginService
from module_admin.service.data_asset_service import DataAssetService
from module_admin.entity.vo.data_asset_vo import DataAssetBatchModel, DataAssetPageQueryModel, DataAssetResultModel, DataAssetSearchModel
from utils.log_util import logger
from utils.page_util import PageResponseModel
from utils.response_util import ResponseUtil
from typing import List
from config.enums import BusinessType
from utils.common_util import bytes2file_response
from module_admin.annotation.log_annotation import Log
dataAssetController = APIRouter(prefix='/system/dataAsset')
@dataAssetController.get('/list', response_model=PageResponseModel)
async def get_data_asset_list(
request: Request,
data_asset_page_query: DataAssetPageQueryModel = Depends(DataAssetPageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
"""
获取数据资产信息列表
:param request: 请求对象
:param data_asset_page_query: 查询参数
:param query_db: 数据库会话
:return: 数据资产信息列表
"""
# 获取分页数据
data_asset_page_query_result = await DataAssetService.get_data_asset_list_services(
query_db, data_asset_page_query, is_page=True
)
logger.info('获取数据资产信息列表成功')
return ResponseUtil.success(model_content=data_asset_page_query_result)
@dataAssetController.post('/batch')
async def batch_process_data_asset(
request: Request,
batch_data: DataAssetBatchModel,
query_db: AsyncSession = Depends(get_db),
):
"""
批量处理数据资产信息新增/修改/删除
:param request: 请求对象
:param batch_data: 批量处理数据
:param query_db: 数据库会话
:return: 处理结果
"""
try:
# 批量处理数据资产
result = await DataAssetService.batch_process_data_asset_services(query_db, batch_data)
# 记录处理结果
logger.info(f'批量处理数据资产成功,成功: {result.success_count},失败: {result.failed_count}')
# 返回处理结果
return ResponseUtil.success(data=result)
except Exception as e:
logger.error(f'批量处理数据资产失败: {str(e)}')
return ResponseUtil.error(msg=f"处理失败: {str(e)}")
@dataAssetController.get('/sources', response_model=List[str])
async def get_data_asset_sources(
request: Request,
query_db: AsyncSession = Depends(get_db),
):
"""
获取所有数据资产来源列表
:param request: 请求对象
:param query_db: 数据库会话
:return: 数据资产来源列表
"""
sources = await DataAssetService.get_data_asset_sources_services(query_db)
logger.info('获取数据资产来源列表成功')
return ResponseUtil.success(data=sources)
@dataAssetController.get('/search', response_model=PageResponseModel)
async def search_data_assets(
request: Request,
search_params: DataAssetSearchModel = Depends(DataAssetSearchModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
"""
综合查询数据资产信息
:param request: 请求对象
:param search_params: 查询参数
:param query_db: 数据库会话
:return: 查询结果
"""
# 将查询参数转换为字典,剔除None值
params_dict = {k: v for k, v in search_params.model_dump().items()
if v is not None and k not in ['page_num', 'page_size']}
# 执行查询
search_result = await DataAssetService.search_data_assets_services(
query_db,
params_dict,
search_params.page_num,
search_params.page_size
)
logger.info('综合查询数据资产信息成功')
return ResponseUtil.success(model_content=search_result)
# @dataAssetController.get('/search', response_model=PageResponseModel)
# async def search_data_assets(
# request: Request,
# search_params: DataAssetSearchModel = Depends(DataAssetSearchModel.as_query),
# query_db: AsyncSession = Depends(get_db),
# ):
@dataAssetController.post("/export")
@Log(title="数据资产", business_type=BusinessType.EXPORT)
async def export_data_assets(
request: Request,
data_asset_query: DataAssetPageQueryModel = Form(),
query_db: AsyncSession = Depends(get_db),
):
"""
导出数据资产信息
:param request: 请求对象
:param data_asset_query: 查询参数
:param query_db: 数据库会话
:return: Excel文件流
"""
data_assets = await DataAssetService.get_data_asset_list_services(
query_db, data_asset_query, is_page=False
)
excel_data = await DataAssetService.export_data_asset_list_services(data_assets)
logger.info('导出成功')
return ResponseUtil.streaming(data=bytes2file_response(excel_data))