You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

271 lines
11 KiB

from datetime import datetime
from collections import defaultdict
from fastapi.responses import HTMLResponse
from fastapi import APIRouter, Depends, Request
from pydantic_validation_decorator import ValidateFields
from sqlalchemy.ext.asyncio import AsyncSession
from config.enums import BusinessType
from config.get_db import get_db
from module_admin.annotation.log_annotation import Log
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth
from module_admin.service.login_service import LoginService
from module_admin.service.data_ast_content_service import DataCatalogService
from module_admin.entity.vo.data_ast_content_vo import DataCatalogRequest, DataCatalogResponse, DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogResponseWithChildren,DataAssetCatalogTreeResponse,DataCatalogMovedRequest,DataCatalogMergeRequest,DataCatalogChild,DataCatalogMoverelRequest,DataAstIndxRequest,DataAstBookmarkRelaRequest
from module_admin.entity.vo.user_vo import CurrentUserModel
from utils.common_util import bytes2file_response
from utils.log_util import logger
from utils.page_util import PageResponseModel
from utils.response_util import ResponseUtil
dataCatalogController = APIRouter(prefix='/system/data_catalog', dependencies=[Depends(LoginService.get_current_user)])
@dataCatalogController.get(
'/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:list'))]
)
async def get_data_catalog_list(
request: Request,
catalog_page_query: DataCatalogPageQueryModel = Depends(DataCatalogPageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
#设置字段
user_id = current_user.user.user_id
# 获取分页数据
catalog_page_query_result = await DataCatalogService.get_catalog_list_services(query_db, catalog_page_query, user_id, is_page=True)
logger.info('获取成功')
return ResponseUtil.success(model_content=catalog_page_query_result)
@dataCatalogController.get(
'/atree',response_model=DataAssetCatalogTreeResponse,dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:atree'))],summary="Data Asset Tree Query"
)
async def get_data_asset_catalog_tree(query_db: AsyncSession = Depends(get_db)):
try:
logger.debug("开始获取数据资产目录树")
catalog_tree_result = await DataCatalogService.get_data_asset_catalog_tree_services(query_db)
logger.info('数据资产树获取成功')
return ResponseUtil.success(data=catalog_tree_result)
except Exception as e:
logger.error(f"数据资产树获取失败: {str(e)}", exc_info=True)
return ResponseUtil.error(msg="数据查询异常,请联系管理员")
@dataCatalogController.post('', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:add'))])
@ValidateFields(validate_model='add_data_catalog')
@Log(title='数据目录管理', business_type=BusinessType.INSERT)
async def add_data_catalog(
request: Request,
add_catalog: DataCatalogResponseWithChildren,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
# 设置字段
add_catalog.upd_prsn = current_user.user.user_name
# 调用服务层方法
add_result = await DataCatalogService.add_catalog_services(query_db, add_catalog)
logger.info(add_result.message)
# 新增成功后,更新新增数据目录的父亲节点的叶子标志为0
if add_result.is_success:
if add_catalog.supr_content_onum is not None:
supr_content_onum = add_catalog.supr_content_onum
await DataCatalogService.edit_catalog_leaf_services(query_db,supr_content_onum, 0)
else:
logger.error(add_result.message)
# 返回标准化响应
return ResponseUtil.success(
msg=add_result.message
)
@dataCatalogController.put('/edit', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))])
@ValidateFields(validate_model='edit_data_catalog')
@Log(title='数据目录管理', business_type=BusinessType.UPDATE)
async def edit_data_catalog(
request: Request,
edit_catalog: DataCatalogResponseWithChildren,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
# 设置审计字段
edit_catalog.upd_prsn = current_user.user.user_name
# 调用服务层方法
edit_result = await DataCatalogService.edit_catalog_child_services(query_db, edit_catalog)
logger.info(edit_result.message)
# 返回标准化响应
return ResponseUtil.success(
msg=edit_result.message
)
@dataCatalogController.put('/moved', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))])
@ValidateFields(validate_model='moved_data_catalog')
@Log(title='数据目录管理', business_type=BusinessType.UPDATE)
async def moved_data_catalog(
request: Request,
moved_catalog: DataCatalogMovedRequest,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
# 调用服务层方法
moved_result = await DataCatalogService.moved_catalog_instr_services(query_db, moved_catalog)
logger.info(moved_result.message)
# 返回标准化响应
return ResponseUtil.success(
msg=moved_result.message
)
@dataCatalogController.put('/merge', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))])
@ValidateFields(validate_model='merge_data_catalog')
@Log(title='数据目录管理', business_type=BusinessType.UPDATE)
async def moved_data_catalog(
request: Request,
merge_catalog: DataCatalogMergeRequest,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
# 调用服务层方法
merge_result = await DataCatalogService.merge_catalog_instr_services(query_db, merge_catalog)
logger.info(merge_result.message)
# 返回标准化响应
return ResponseUtil.success(
msg=merge_result.message
)
@dataCatalogController.put('/removerel', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))])
@ValidateFields(validate_model='removerel_data_ast_catalog')
@Log(title='数据目录管理', business_type=BusinessType.UPDATE)
async def removerel_data_ast_catalog(
request: Request,
removerel_catalog: DataCatalogChild,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
# 调用服务层方法
removerel_result = await DataCatalogService.removerel_data_ast_catalog_services(query_db, removerel_catalog)
logger.info(removerel_result.message)
# 返回标准化响应
return ResponseUtil.success()
@dataCatalogController.put('/moverel', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))])
@ValidateFields(validate_model='moverel_data_ast_catalog')
@Log(title='数据目录管理', business_type=BusinessType.UPDATE)
async def moverel_data_ast_catalog(
request: Request,
moverel_catalog: DataCatalogMoverelRequest,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
# 调用服务层方法
moverel_result = await DataCatalogService.moverel_data_ast_catalog_services(query_db, moverel_catalog)
logger.info(moverel_result.message)
# 返回标准化响应
return ResponseUtil.success()
@dataCatalogController.delete('/{content_onums}', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:remove'))])
@Log(title='数据目录管理', business_type=BusinessType.DELETE)
async def delete_data_catalog(request: Request, content_onums: str, query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
delete_catalog = DeleteDataCatalogModel(content_onums=content_onums)
delete_catalog_result = await DataCatalogService.delete_catalog_services(query_db, delete_catalog,user_id=current_user.user.user_id)
logger.info(delete_catalog_result.message)
return ResponseUtil.success(msg=delete_catalog_result.message)
@dataCatalogController.get(
'/{content_onum}', response_model=DataCatalogResponse, dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:query'))]
)
async def query_detail_data_catalog(request: Request, content_onum: int, query_db: AsyncSession = Depends(get_db)):
catalog_detail_result = await DataCatalogService.get_catalog_detail_services(query_db, content_onum)
logger.info(f'获取content_onum为{content_onum}的信息成功')
return ResponseUtil.success(data=catalog_detail_result)
@dataCatalogController.delete(
'/bookmark/{rela_onum}',
dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]
)
@Log(title='数据资产收藏管理', business_type=BusinessType.DELETE)
async def delete_ast_book_mark_rela(
request: Request,
rela_onum: int,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
user_name = current_user.user.user_name
user_id = current_user.user.user_id
print(123456,user_id,type(user_id))
# 创建请求对象
delete_request = DataAstBookmarkRelaRequest(rela_onum=rela_onum)
# 调用服务层方法
delete_result = await DataCatalogService.delete_ast_book_mark_rela_services(query_db, delete_request,user_name,user_id)
logger.info(delete_result.message)
# 返回标准化响应
return ResponseUtil.success(msg=delete_result.message)
@dataCatalogController.post(
'/bookmark',
dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]
)
@ValidateFields(validate_model='add_data_ast_bookmark')
@Log(title='数据资产收藏管理', business_type=BusinessType.INSERT)
async def add_ast_book_mark_rela(
request: Request,
add_bookmark: DataAstBookmarkRelaRequest,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
# 设置字段
add_bookmark.user_id = current_user.user.user_id
user_name = current_user.user.user_name
# 调用服务层方法
print('调用服务层方法',add_bookmark)
add_result = await DataCatalogService.add_ast_book_mark_rela_services(query_db, add_bookmark,user_name)
logger.info(add_result.message)
# 返回标准化响应
return ResponseUtil.success(msg=add_result.message)
@dataCatalogController.get(
'/indx/list',
response_class=HTMLResponse, # 指定返回HTML类型,
dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:list'))]
)
async def get_data_ast_indx_list(
request: Request,
indx_page_query: DataAstIndxRequest = Depends(DataAstIndxRequest),
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
# 获取分页数据
indx_page_query_result = await DataCatalogService.get_data_ast_indx_list_services(query_db, indx_page_query)
logger.info('获取成功')
return indx_page_query_result