from datetime import datetime
from collections import defaultdict
from fastapi . responses import HTMLResponse
from fastapi import APIRouter , Depends , Request
from pydantic_validation_decorator import ValidateFields
from sqlalchemy . ext . asyncio import AsyncSession
from config . enums import BusinessType
from config . get_db import get_db
from module_admin . annotation . log_annotation import Log
from module_admin . aspect . interface_auth import CheckUserInterfaceAuth
from module_admin . service . login_service import LoginService
from module_admin . service . data_ast_content_service import DataCatalogService
from module_admin . entity . vo . data_ast_content_vo import DataAstSecuResponse , DataAstSecuRequest , DataCatalogRequest , DataCatalogResponse , DataCatalogPageQueryModel , DeleteDataCatalogModel , DataCatalogResponseWithChildren , DataAssetCatalogTreeResponse , DataCatalogMovedRequest , DataCatalogMergeRequest , DataCatalogChild , DataCatalogMoverelRequest , DataAstIndxRequest , DataAstBookmarkRelaRequest
from module_admin . entity . vo . user_vo import CurrentUserModel
from module_admin . entity . vo . metasecurity_vo import MetaSecurityApiModel
from module_admin . service . metasecurity_service import MetaSecurityService
from utils . common_util import bytes2file_response
from utils . log_util import logger
from utils . page_util import PageResponseModel
from utils . response_util import ResponseUtil
dataCatalogController = APIRouter ( prefix = ' /system/data_catalog ' , dependencies = [ Depends ( LoginService . get_current_user ) ] )
@dataCatalogController . get (
' /list ' , response_model = PageResponseModel , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:list ' ) ) ]
)
async def get_data_catalog_list (
request : Request ,
catalog_page_query : DataCatalogPageQueryModel = Depends ( DataCatalogPageQueryModel . as_query ) ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
#设置字段
user_id = current_user . user . user_id
user_name = current_user . user . user_name
# 获取分页数据
catalog_page_query_result = await DataCatalogService . get_catalog_list_services ( query_db , catalog_page_query , user_id , user_name , is_page = True )
logger . info ( ' 获取成功 ' )
return ResponseUtil . success ( model_content = catalog_page_query_result )
@dataCatalogController . get (
' /atree ' , response_model = DataAssetCatalogTreeResponse , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:atree ' ) ) ] , summary = " Data Asset Tree Query "
)
async def get_data_asset_catalog_tree ( query_db : AsyncSession = Depends ( get_db ) ) :
try :
logger . debug ( " 开始获取数据资产目录树 " )
catalog_tree_result = await DataCatalogService . get_data_asset_catalog_tree_services ( query_db )
logger . info ( ' 数据资产树获取成功 ' )
return ResponseUtil . success ( data = catalog_tree_result )
except Exception as e :
logger . error ( f " 数据资产树获取失败: { str ( e ) } " , exc_info = True )
return ResponseUtil . error ( msg = " 数据查询异常,请联系管理员 " )
@dataCatalogController . post ( ' ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:add ' ) ) ] )
@ValidateFields ( validate_model = ' add_data_catalog ' )
@Log ( title = ' 数据目录管理 ' , business_type = BusinessType . INSERT )
async def add_data_catalog (
request : Request ,
add_catalog : DataCatalogResponseWithChildren ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 设置字段
add_catalog . upd_prsn = current_user . user . user_name
# 调用服务层方法
add_result = await DataCatalogService . add_catalog_services ( query_db , add_catalog )
logger . info ( add_result . message )
# 新增成功后,更新新增数据目录的父亲节点的叶子标志为0
if add_result . is_success :
if add_catalog . supr_content_onum is not None :
supr_content_onum = add_catalog . supr_content_onum
await DataCatalogService . edit_catalog_leaf_services ( query_db , supr_content_onum , 0 )
else :
logger . error ( add_result . message )
# 返回标准化响应
return ResponseUtil . success (
msg = add_result . message
)
@dataCatalogController . get (
' /getMetaSercuityData ' ,
response_model = DataAstSecuResponse ,
dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:secu ' ) ) ]
)
@ValidateFields ( validate_model = ' get_secu_data_request ' )
async def getMetaSercuityData (
request : Request ,
dataAstSecuRequest : DataAstSecuRequest = Depends ( DataAstSecuRequest ) ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 获取当前用户信息
user_id = current_user . user . user_id
password = current_user . user . password
logger . info ( f " 获取当前用户信息:user_id= { user_id } , password= { password } " )
# 设置字段
apiModel = MetaSecurityApiModel ( )
apiModel . dbRId = dataAstSecuRequest . data_ast_src
apiModel . username = user_id
apiModel . password = password
apiModel . sqlStr = " select * from " + dataAstSecuRequest . data_ast_eng_name
logger . info ( f " 设置 apiModel 参数:dbRId= { apiModel . dbRId } , username= { apiModel . username } , password= { apiModel . password } , sqlStr= { apiModel . sqlStr } " )
# 打印 apiModel 对象
logger . debug ( f " apiModel 对象内容: { apiModel } " )
# 调用服务层方法
config_detail_result = await MetaSecurityService . getMetaSercuitybysql ( request , query_db , apiModel )
logger . info ( f " 调用 MetaSecurityService.getMetaSercuitybysql 方法,返回结果: { config_detail_result } " )
# 记录成功日志
logger . info ( f " 获取 config_id 为 { apiModel } 的信息成功 " )
return ResponseUtil . success ( data = config_detail_result )
@dataCatalogController . put ( ' /edit ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:edit ' ) ) ] )
@ValidateFields ( validate_model = ' edit_data_catalog ' )
@Log ( title = ' 数据目录管理 ' , business_type = BusinessType . UPDATE )
async def edit_data_catalog (
request : Request ,
edit_catalog : DataCatalogResponseWithChildren ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 设置审计字段
edit_catalog . upd_prsn = current_user . user . user_name
# 调用服务层方法
edit_result = await DataCatalogService . edit_catalog_child_services ( query_db , edit_catalog )
logger . info ( edit_result . message )
# 返回标准化响应
return ResponseUtil . success (
msg = edit_result . message
)
@dataCatalogController . put ( ' /moved ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:edit ' ) ) ] )
@ValidateFields ( validate_model = ' moved_data_catalog ' )
@Log ( title = ' 数据目录管理 ' , business_type = BusinessType . UPDATE )
async def moved_data_catalog (
request : Request ,
moved_catalog : DataCatalogMovedRequest ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 调用服务层方法
moved_result = await DataCatalogService . moved_catalog_instr_services ( query_db , moved_catalog )
logger . info ( moved_result . message )
# 返回标准化响应
return ResponseUtil . success (
msg = moved_result . message
)
@dataCatalogController . put ( ' /merge ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:edit ' ) ) ] )
@ValidateFields ( validate_model = ' merge_data_catalog ' )
@Log ( title = ' 数据目录管理 ' , business_type = BusinessType . UPDATE )
async def moved_data_catalog (
request : Request ,
merge_catalog : DataCatalogMergeRequest ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 调用服务层方法
merge_result = await DataCatalogService . merge_catalog_instr_services ( query_db , merge_catalog )
logger . info ( merge_result . message )
# 返回标准化响应
return ResponseUtil . success (
msg = merge_result . message
)
@dataCatalogController . put ( ' /removerel ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:edit ' ) ) ] )
@ValidateFields ( validate_model = ' removerel_data_ast_catalog ' )
@Log ( title = ' 数据目录管理 ' , business_type = BusinessType . UPDATE )
async def removerel_data_ast_catalog (
request : Request ,
removerel_catalog : DataCatalogChild ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 调用服务层方法
removerel_result = await DataCatalogService . removerel_data_ast_catalog_services ( query_db , removerel_catalog )
logger . info ( removerel_result . message )
# 返回标准化响应
return ResponseUtil . success ( )
@dataCatalogController . put ( ' /moverel ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:edit ' ) ) ] )
@ValidateFields ( validate_model = ' moverel_data_ast_catalog ' )
@Log ( title = ' 数据目录管理 ' , business_type = BusinessType . UPDATE )
async def moverel_data_ast_catalog (
request : Request ,
moverel_catalog : DataCatalogMoverelRequest ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 调用服务层方法
moverel_result = await DataCatalogService . moverel_data_ast_catalog_services ( query_db , moverel_catalog )
logger . info ( moverel_result . message )
# 返回标准化响应
return ResponseUtil . success ( )
@dataCatalogController . delete ( ' / {content_onums} ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:remove ' ) ) ] )
@Log ( title = ' 数据目录管理 ' , business_type = BusinessType . DELETE )
async def delete_data_catalog ( request : Request , content_onums : str , query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
delete_catalog = DeleteDataCatalogModel ( content_onums = content_onums )
delete_catalog_result = await DataCatalogService . delete_catalog_services ( query_db , delete_catalog , user_id = current_user . user . user_id )
logger . info ( delete_catalog_result . message )
return ResponseUtil . success ( msg = delete_catalog_result . message )
@dataCatalogController . get (
' / {content_onum} ' , response_model = DataCatalogResponse , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:query ' ) ) ]
)
async def query_detail_data_catalog ( request : Request , content_onum : int , query_db : AsyncSession = Depends ( get_db ) ) :
catalog_detail_result = await DataCatalogService . get_catalog_detail_services ( query_db , content_onum )
logger . info ( f ' 获取content_onum为 { content_onum } 的信息成功 ' )
return ResponseUtil . success ( data = catalog_detail_result )
@dataCatalogController . delete (
' /bookmark/ {rela_onum} ' ,
dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:edit ' ) ) ]
)
@Log ( title = ' 数据资产收藏管理 ' , business_type = BusinessType . DELETE )
async def delete_ast_book_mark_rela (
request : Request ,
rela_onum : int ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
user_name = current_user . user . user_name
user_id = current_user . user . user_id
# 创建请求对象
delete_request = DataAstBookmarkRelaRequest ( rela_onum = rela_onum )
# 调用服务层方法
delete_result = await DataCatalogService . delete_ast_book_mark_rela_services ( query_db , delete_request , user_name , user_id )
logger . info ( delete_result . message )
# 返回标准化响应
return ResponseUtil . success ( msg = delete_result . message )
@dataCatalogController . post (
' /bookmark ' ,
dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:edit ' ) ) ]
)
@ValidateFields ( validate_model = ' add_data_ast_bookmark ' )
@Log ( title = ' 数据资产收藏管理 ' , business_type = BusinessType . INSERT )
async def add_ast_book_mark_rela (
request : Request ,
add_bookmark : DataAstBookmarkRelaRequest ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 设置字段
add_bookmark . user_id = current_user . user . user_id
user_name = current_user . user . user_name
# 调用服务层方法
add_result = await DataCatalogService . add_ast_book_mark_rela_services ( query_db , add_bookmark , user_name )
logger . info ( add_result . message )
# 返回标准化响应
return ResponseUtil . success ( msg = add_result . message )
@dataCatalogController . get (
' /indx/list ' ,
response_class = HTMLResponse , # 指定返回HTML类型,
dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:list ' ) ) ]
)
async def get_data_ast_indx_list (
request : Request ,
indx_page_query : DataAstIndxRequest = Depends ( DataAstIndxRequest ) ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 获取分页数据
indx_page_query_result = await DataCatalogService . get_data_ast_indx_list_services ( query_db , indx_page_query )
logger . info ( ' 获取成功 ' )
return indx_page_query_result
@dataCatalogController . post ( ' /bookmark/folder ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:add ' ) ) ] )
@ValidateFields ( validate_model = ' add_bookmark_folder ' )
@Log ( title = ' 收藏目录管理 ' , business_type = BusinessType . INSERT )
async def add_bookmark_folder (
request : Request ,
add_folder : DataCatalogRequest ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 设置字段
add_folder . upd_prsn = current_user . user . user_name
add_folder . supr_content_onum = 2 # 固定为"我的收藏"的目录ID
add_folder . content_stat = " 1 " # 设置为有效状态
add_folder . leaf_node_flag = 1 # 默认为叶子节点
# 调用服务层方法
add_result = await DataCatalogService . add_bookmark_folder_services ( query_db , add_folder )
logger . info ( add_result . message )
# 返回标准化响应
return ResponseUtil . success (
msg = add_result . message
)
@dataCatalogController . put ( ' /bookmark/folder ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:edit ' ) ) ] )
@ValidateFields ( validate_model = ' edit_bookmark_folder ' )
@Log ( title = ' 收藏目录管理 ' , business_type = BusinessType . UPDATE )
async def edit_bookmark_folder (
request : Request ,
edit_folder : DataCatalogRequest ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 设置审计字段
edit_folder . upd_prsn = current_user . user . user_name
# 调用服务层方法
edit_result = await DataCatalogService . edit_bookmark_folder_services ( query_db , edit_folder )
logger . info ( edit_result . message )
# 返回标准化响应
return ResponseUtil . success (
msg = edit_result . message
)
@dataCatalogController . delete ( ' /bookmark/folder/ {content_onum} ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:remove ' ) ) ] )
@Log ( title = ' 收藏目录管理 ' , business_type = BusinessType . DELETE )
async def delete_bookmark_folder (
request : Request ,
content_onum : int ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 调用服务层方法
delete_result = await DataCatalogService . delete_bookmark_folder_services (
query_db ,
content_onum ,
current_user . user . user_name ,
current_user . user . user_id
)
logger . info ( delete_result . message )
# 返回标准化响应
return ResponseUtil . success (
msg = delete_result . message
)
@dataCatalogController . get (
' /bookmark/folders ' ,
dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:list ' ) ) ]
)
async def get_bookmark_folders (
request : Request ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 获取当前用户名
user_name = current_user . user . user_name
# 调用服务层方法
folders = await DataCatalogService . get_bookmark_folders_services ( query_db , user_name )
logger . info ( f ' 获取用户 { user_name } 的收藏目录列表成功 ' )
# 返回标准化响应
return ResponseUtil . success ( data = folders )
@dataCatalogController . put ( ' /bookmark/asset/move ' , dependencies = [ Depends ( CheckUserInterfaceAuth ( ' system:data_catalog:edit ' ) ) ] )
@ValidateFields ( validate_model = ' move_bookmark_asset ' )
@Log ( title = ' 收藏资产管理 ' , business_type = BusinessType . UPDATE )
async def move_bookmark_asset (
request : Request ,
moverel_catalog : DataCatalogMoverelRequest ,
query_db : AsyncSession = Depends ( get_db ) ,
current_user : CurrentUserModel = Depends ( LoginService . get_current_user ) ,
) :
# 设置用户信息
moverel_catalog . upd_prsn = current_user . user . user_name
# 调用服务层方法
moverel_result = await DataCatalogService . move_bookmark_asset_services ( query_db , moverel_catalog )
logger . info ( moverel_result . message )
# 返回标准化响应
return ResponseUtil . success ( msg = moverel_result . message )