22 changed files with 2951 additions and 308 deletions
@ -0,0 +1,271 @@ |
|||||
|
from datetime import datetime |
||||
|
from collections import defaultdict |
||||
|
from fastapi.responses import HTMLResponse |
||||
|
from fastapi import APIRouter, Depends, Request |
||||
|
from pydantic_validation_decorator import ValidateFields |
||||
|
from sqlalchemy.ext.asyncio import AsyncSession |
||||
|
from config.enums import BusinessType |
||||
|
from config.get_db import get_db |
||||
|
from module_admin.annotation.log_annotation import Log |
||||
|
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth |
||||
|
from module_admin.service.login_service import LoginService |
||||
|
from module_admin.service.data_ast_content_service import DataCatalogService |
||||
|
from module_admin.entity.vo.data_ast_content_vo import DataCatalogRequest, DataCatalogResponse, DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogResponseWithChildren,DataAssetCatalogTreeResponse,DataCatalogMovedRequest,DataCatalogMergeRequest,DataCatalogChild,DataCatalogMoverelRequest,DataAstIndxRequest,DataAstBookmarkRelaRequest |
||||
|
from module_admin.entity.vo.user_vo import CurrentUserModel |
||||
|
from utils.common_util import bytes2file_response |
||||
|
from utils.log_util import logger |
||||
|
from utils.page_util import PageResponseModel |
||||
|
from utils.response_util import ResponseUtil |
||||
|
|
||||
|
|
||||
|
dataCatalogController = APIRouter(prefix='/system/data_catalog', dependencies=[Depends(LoginService.get_current_user)]) |
||||
|
|
||||
|
|
||||
|
@dataCatalogController.get( |
||||
|
'/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:list'))] |
||||
|
) |
||||
|
async def get_data_catalog_list( |
||||
|
request: Request, |
||||
|
catalog_page_query: DataCatalogPageQueryModel = Depends(DataCatalogPageQueryModel.as_query), |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
|
||||
|
#设置字段 |
||||
|
user_id = current_user.user.user_id |
||||
|
# 获取分页数据 |
||||
|
catalog_page_query_result = await DataCatalogService.get_catalog_list_services(query_db, catalog_page_query, user_id, is_page=True) |
||||
|
logger.info('获取成功') |
||||
|
|
||||
|
|
||||
|
|
||||
|
return ResponseUtil.success(model_content=catalog_page_query_result) |
||||
|
|
||||
|
@dataCatalogController.get( |
||||
|
'/atree',response_model=DataAssetCatalogTreeResponse,dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:atree'))],summary="Data Asset Tree Query" |
||||
|
) |
||||
|
async def get_data_asset_catalog_tree(query_db: AsyncSession = Depends(get_db)): |
||||
|
try: |
||||
|
logger.debug("开始获取数据资产目录树") |
||||
|
catalog_tree_result = await DataCatalogService.get_data_asset_catalog_tree_services(query_db) |
||||
|
logger.info('数据资产树获取成功') |
||||
|
return ResponseUtil.success(data=catalog_tree_result) |
||||
|
except Exception as e: |
||||
|
logger.error(f"数据资产树获取失败: {str(e)}", exc_info=True) |
||||
|
return ResponseUtil.error(msg="数据查询异常,请联系管理员") |
||||
|
|
||||
|
@dataCatalogController.post('', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:add'))]) |
||||
|
@ValidateFields(validate_model='add_data_catalog') |
||||
|
@Log(title='数据目录管理', business_type=BusinessType.INSERT) |
||||
|
async def add_data_catalog( |
||||
|
request: Request, |
||||
|
add_catalog: DataCatalogResponseWithChildren, |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
# 设置字段 |
||||
|
add_catalog.upd_prsn = current_user.user.user_name |
||||
|
|
||||
|
# 调用服务层方法 |
||||
|
add_result = await DataCatalogService.add_catalog_services(query_db, add_catalog) |
||||
|
logger.info(add_result.message) |
||||
|
|
||||
|
# 新增成功后,更新新增数据目录的父亲节点的叶子标志为0 |
||||
|
if add_result.is_success: |
||||
|
if add_catalog.supr_content_onum is not None: |
||||
|
supr_content_onum = add_catalog.supr_content_onum |
||||
|
await DataCatalogService.edit_catalog_leaf_services(query_db,supr_content_onum, 0) |
||||
|
else: |
||||
|
logger.error(add_result.message) |
||||
|
# 返回标准化响应 |
||||
|
return ResponseUtil.success( |
||||
|
msg=add_result.message |
||||
|
) |
||||
|
|
||||
|
@dataCatalogController.put('/edit', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) |
||||
|
@ValidateFields(validate_model='edit_data_catalog') |
||||
|
@Log(title='数据目录管理', business_type=BusinessType.UPDATE) |
||||
|
async def edit_data_catalog( |
||||
|
request: Request, |
||||
|
edit_catalog: DataCatalogResponseWithChildren, |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
# 设置审计字段 |
||||
|
edit_catalog.upd_prsn = current_user.user.user_name |
||||
|
|
||||
|
|
||||
|
# 调用服务层方法 |
||||
|
edit_result = await DataCatalogService.edit_catalog_child_services(query_db, edit_catalog) |
||||
|
logger.info(edit_result.message) |
||||
|
|
||||
|
# 返回标准化响应 |
||||
|
return ResponseUtil.success( |
||||
|
msg=edit_result.message |
||||
|
) |
||||
|
|
||||
|
|
||||
|
@dataCatalogController.put('/moved', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) |
||||
|
@ValidateFields(validate_model='moved_data_catalog') |
||||
|
@Log(title='数据目录管理', business_type=BusinessType.UPDATE) |
||||
|
async def moved_data_catalog( |
||||
|
request: Request, |
||||
|
moved_catalog: DataCatalogMovedRequest, |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
# 调用服务层方法 |
||||
|
moved_result = await DataCatalogService.moved_catalog_instr_services(query_db, moved_catalog) |
||||
|
logger.info(moved_result.message) |
||||
|
|
||||
|
# 返回标准化响应 |
||||
|
return ResponseUtil.success( |
||||
|
msg=moved_result.message |
||||
|
) |
||||
|
|
||||
|
|
||||
|
@dataCatalogController.put('/merge', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) |
||||
|
@ValidateFields(validate_model='merge_data_catalog') |
||||
|
@Log(title='数据目录管理', business_type=BusinessType.UPDATE) |
||||
|
async def moved_data_catalog( |
||||
|
request: Request, |
||||
|
merge_catalog: DataCatalogMergeRequest, |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
# 调用服务层方法 |
||||
|
merge_result = await DataCatalogService.merge_catalog_instr_services(query_db, merge_catalog) |
||||
|
logger.info(merge_result.message) |
||||
|
|
||||
|
# 返回标准化响应 |
||||
|
return ResponseUtil.success( |
||||
|
msg=merge_result.message |
||||
|
) |
||||
|
|
||||
|
|
||||
|
@dataCatalogController.put('/removerel', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) |
||||
|
@ValidateFields(validate_model='removerel_data_ast_catalog') |
||||
|
@Log(title='数据目录管理', business_type=BusinessType.UPDATE) |
||||
|
async def removerel_data_ast_catalog( |
||||
|
request: Request, |
||||
|
removerel_catalog: DataCatalogChild, |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
# 调用服务层方法 |
||||
|
removerel_result = await DataCatalogService.removerel_data_ast_catalog_services(query_db, removerel_catalog) |
||||
|
logger.info(removerel_result.message) |
||||
|
|
||||
|
# 返回标准化响应 |
||||
|
return ResponseUtil.success() |
||||
|
|
||||
|
@dataCatalogController.put('/moverel', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))]) |
||||
|
@ValidateFields(validate_model='moverel_data_ast_catalog') |
||||
|
@Log(title='数据目录管理', business_type=BusinessType.UPDATE) |
||||
|
async def moverel_data_ast_catalog( |
||||
|
request: Request, |
||||
|
moverel_catalog: DataCatalogMoverelRequest, |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
# 调用服务层方法 |
||||
|
moverel_result = await DataCatalogService.moverel_data_ast_catalog_services(query_db, moverel_catalog) |
||||
|
logger.info(moverel_result.message) |
||||
|
|
||||
|
# 返回标准化响应 |
||||
|
return ResponseUtil.success() |
||||
|
|
||||
|
|
||||
|
|
||||
|
@dataCatalogController.delete('/{content_onums}', dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:remove'))]) |
||||
|
@Log(title='数据目录管理', business_type=BusinessType.DELETE) |
||||
|
async def delete_data_catalog(request: Request, content_onums: str, query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
delete_catalog = DeleteDataCatalogModel(content_onums=content_onums) |
||||
|
delete_catalog_result = await DataCatalogService.delete_catalog_services(query_db, delete_catalog,user_id=current_user.user.user_id) |
||||
|
logger.info(delete_catalog_result.message) |
||||
|
|
||||
|
return ResponseUtil.success(msg=delete_catalog_result.message) |
||||
|
|
||||
|
|
||||
|
@dataCatalogController.get( |
||||
|
'/{content_onum}', response_model=DataCatalogResponse, dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:query'))] |
||||
|
) |
||||
|
async def query_detail_data_catalog(request: Request, content_onum: int, query_db: AsyncSession = Depends(get_db)): |
||||
|
catalog_detail_result = await DataCatalogService.get_catalog_detail_services(query_db, content_onum) |
||||
|
logger.info(f'获取content_onum为{content_onum}的信息成功') |
||||
|
|
||||
|
return ResponseUtil.success(data=catalog_detail_result) |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
@dataCatalogController.delete( |
||||
|
'/bookmark/{rela_onum}', |
||||
|
dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))] |
||||
|
) |
||||
|
@Log(title='数据资产收藏管理', business_type=BusinessType.DELETE) |
||||
|
async def delete_ast_book_mark_rela( |
||||
|
request: Request, |
||||
|
rela_onum: int, |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
user_name = current_user.user.user_name |
||||
|
user_id = current_user.user.user_id |
||||
|
print(123456,user_id,type(user_id)) |
||||
|
# 创建请求对象 |
||||
|
delete_request = DataAstBookmarkRelaRequest(rela_onum=rela_onum) |
||||
|
|
||||
|
# 调用服务层方法 |
||||
|
delete_result = await DataCatalogService.delete_ast_book_mark_rela_services(query_db, delete_request,user_name,user_id) |
||||
|
logger.info(delete_result.message) |
||||
|
|
||||
|
# 返回标准化响应 |
||||
|
return ResponseUtil.success(msg=delete_result.message) |
||||
|
|
||||
|
|
||||
|
@dataCatalogController.post( |
||||
|
'/bookmark', |
||||
|
dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:edit'))] |
||||
|
) |
||||
|
@ValidateFields(validate_model='add_data_ast_bookmark') |
||||
|
@Log(title='数据资产收藏管理', business_type=BusinessType.INSERT) |
||||
|
async def add_ast_book_mark_rela( |
||||
|
request: Request, |
||||
|
add_bookmark: DataAstBookmarkRelaRequest, |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
# 设置字段 |
||||
|
add_bookmark.user_id = current_user.user.user_id |
||||
|
user_name = current_user.user.user_name |
||||
|
|
||||
|
# 调用服务层方法 |
||||
|
print('调用服务层方法',add_bookmark) |
||||
|
add_result = await DataCatalogService.add_ast_book_mark_rela_services(query_db, add_bookmark,user_name) |
||||
|
logger.info(add_result.message) |
||||
|
|
||||
|
# 返回标准化响应 |
||||
|
return ResponseUtil.success(msg=add_result.message) |
||||
|
|
||||
|
|
||||
|
|
||||
|
@dataCatalogController.get( |
||||
|
'/indx/list', |
||||
|
response_class=HTMLResponse, # 指定返回HTML类型, |
||||
|
dependencies=[Depends(CheckUserInterfaceAuth('system:data_catalog:list'))] |
||||
|
) |
||||
|
async def get_data_ast_indx_list( |
||||
|
request: Request, |
||||
|
indx_page_query: DataAstIndxRequest = Depends(DataAstIndxRequest), |
||||
|
query_db: AsyncSession = Depends(get_db), |
||||
|
current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
): |
||||
|
# 获取分页数据 |
||||
|
indx_page_query_result = await DataCatalogService.get_data_ast_indx_list_services(query_db, indx_page_query) |
||||
|
logger.info('获取成功') |
||||
|
|
||||
|
return indx_page_query_result |
@ -0,0 +1,54 @@ |
|||||
|
# # data_ast_content_controller.py |
||||
|
# from fastapi import APIRouter, Depends, Request |
||||
|
# from pydantic_validation_decorator import ValidateFields |
||||
|
# from sqlalchemy.ext.asyncio import AsyncSession |
||||
|
# from config.get_db import get_db |
||||
|
# from module_admin.aspect.interface_auth import CheckUserInterfaceAuth |
||||
|
# from module_admin.service.login_service import LoginService |
||||
|
# from module_admin.service.data_ast_content_service import DataAstInfoService |
||||
|
# from module_admin.entity.vo.data_ast_content_vo import DataAstInfoRequest, DataAstInfoResponse, DataAstInfoPageQueryModel |
||||
|
# from module_admin.entity.vo.user_vo import CurrentUserModel |
||||
|
# from utils.response_util import ResponseUtil |
||||
|
|
||||
|
# dataAstInfoController = APIRouter(prefix='/system/data_ast_info', dependencies=[Depends(LoginService.get_current_user)]) |
||||
|
|
||||
|
# @dataAstInfoController.post('', dependencies=[Depends(CheckUserInterfaceAuth('system:data_ast_info:add'))]) |
||||
|
# @ValidateFields(validate_model='add_data_ast_info') |
||||
|
# async def add_data_ast_info( |
||||
|
# request: Request, |
||||
|
# add_data_ast_info: DataAstInfoRequest, |
||||
|
# query_db: AsyncSession = Depends(get_db), |
||||
|
# current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
# ): |
||||
|
# add_result = await DataAstInfoService.add_data_ast_info_services(query_db, add_data_ast_info) |
||||
|
# logger.info(add_result.message) |
||||
|
# return ResponseUtil.success( |
||||
|
# msg=add_result.message, |
||||
|
# data=add_result.data |
||||
|
# ) |
||||
|
|
||||
|
# @dataAstInfoController.post('/batch', dependencies=[Depends(CheckUserInterfaceAuth('system:data_ast_info:add'))]) |
||||
|
# @ValidateFields(validate_model='add_data_ast_info_batch') |
||||
|
# async def add_data_ast_info_batch( |
||||
|
# request: Request, |
||||
|
# add_data_ast_info_list: List[DataAstInfoRequest], |
||||
|
# query_db: AsyncSession = Depends(get_db), |
||||
|
# current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
# ): |
||||
|
# add_result = await DataAstInfoService.add_data_ast_info_batch_services(query_db, add_data_ast_info_list) |
||||
|
# logger.info(add_result.message) |
||||
|
# return ResponseUtil.success( |
||||
|
# msg=add_result.message, |
||||
|
# data=add_result.data |
||||
|
# ) |
||||
|
|
||||
|
# @dataAstInfoController.get('/list', dependencies=[Depends(CheckUserInterfaceAuth('system:data_ast_info:list'))]) |
||||
|
# async def get_data_ast_info_list( |
||||
|
# request: Request, |
||||
|
# data_ast_info_page_query: DataAstInfoPageQueryModel = Depends(DataAstInfoPageQueryModel.as_query), |
||||
|
# query_db: AsyncSession = Depends(get_db), |
||||
|
# current_user: CurrentUserModel = Depends(LoginService.get_current_user), |
||||
|
# ): |
||||
|
# data_ast_info_list_result = await DataAstInfoService.get_data_ast_info_list_services(query_db, data_ast_info_page_query, is_page=True) |
||||
|
# logger.info('获取成功') |
||||
|
# return ResponseUtil.success(model_content=data_ast_info_list_result) |
@ -0,0 +1,794 @@ |
|||||
|
from datetime import datetime |
||||
|
from sqlalchemy import case |
||||
|
from sqlalchemy.orm import aliased |
||||
|
from sqlalchemy import delete, func, not_, select, update, or_, and_, desc |
||||
|
from sqlalchemy.ext.asyncio import AsyncSession |
||||
|
from sqlalchemy.exc import IntegrityError |
||||
|
from module_admin.entity.do.data_ast_content_do import DataAstContent,DataAstContentRela,DataAstInfo,DataAstBookmarkRela,DataAstIndx |
||||
|
from module_admin.entity.do.user_do import SysUser |
||||
|
from module_admin.entity.vo.data_ast_content_vo import DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogChild,DataAstBookmarkRelaRequest,DataAstIndxRequest,DataAstIndxResponse |
||||
|
from utils.page_util import PageUtil |
||||
|
from utils.log_util import logger |
||||
|
|
||||
|
|
||||
|
class DataCatalogDAO: |
||||
|
""" |
||||
|
数据目录管理模块数据库操作层 |
||||
|
""" |
||||
|
|
||||
|
@classmethod |
||||
|
async def get_catalog_by_id(cls, db: AsyncSession, content_onum: int): |
||||
|
""" |
||||
|
根据目录ID获取目录详细信息 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param content_onum: 目录ID |
||||
|
:return: 目录信息对象 |
||||
|
""" |
||||
|
catalog_info = ( |
||||
|
(await db.execute(select(DataAstContent).where(DataAstContent.content_onum == content_onum , DataAstContent.content_stat == 1))) |
||||
|
.scalars() |
||||
|
.first() |
||||
|
) |
||||
|
|
||||
|
return catalog_info |
||||
|
|
||||
|
@classmethod |
||||
|
async def get_catalog_detail_by_info(cls, db: AsyncSession, catalog: DataCatalogPageQueryModel): |
||||
|
""" |
||||
|
根据目录参数获取目录信息 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 目录参数对象 |
||||
|
:return: 目录信息对象 |
||||
|
""" |
||||
|
catalog_info = ( |
||||
|
( |
||||
|
await db.execute( |
||||
|
select(DataAstContent).where( |
||||
|
DataAstContent.content_name == catalog.content_name if catalog.content_name else True, |
||||
|
DataAstContent.content_stat == catalog.content_stat if catalog.content_stat else True, |
||||
|
DataAstContent.content_pic == catalog.content_pic if catalog.content_pic else True, |
||||
|
DataAstContent.content_stat == 1, |
||||
|
) |
||||
|
) |
||||
|
) |
||||
|
.scalars() |
||||
|
.first() |
||||
|
) |
||||
|
|
||||
|
return catalog_info |
||||
|
|
||||
|
|
||||
|
# @classmethod |
||||
|
# async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, is_page: bool = False): |
||||
|
# """ |
||||
|
# 根据查询参数获取数据资产目录列表 |
||||
|
|
||||
|
# :param db: 异步会话对象 |
||||
|
# :param query_object: 分页查询参数对象 |
||||
|
# :param is_page: 是否分页 |
||||
|
# :return: 数据资产目录分页列表 |
||||
|
# """ |
||||
|
# # 创建别名对象 |
||||
|
# t1 = aliased(DataAstContentRela, name='t1') |
||||
|
# t2 = aliased(DataAstInfo, name='t2') |
||||
|
# t3 = aliased(DataAstBookmarkRela, name='t3') |
||||
|
|
||||
|
# query = ( |
||||
|
# select( |
||||
|
# DataAstContent.content_onum, |
||||
|
# DataAstContent.content_name, |
||||
|
# DataAstContent.content_stat, |
||||
|
# DataAstContent.content_intr, |
||||
|
# DataAstContent.content_pic, |
||||
|
# DataAstContent.supr_content_onum, |
||||
|
# DataAstContent.leaf_node_flag, |
||||
|
# DataAstContent.upd_prsn, |
||||
|
# DataAstContent.upd_time, |
||||
|
|
||||
|
# t1.rela_onum, |
||||
|
# t1.ast_onum, |
||||
|
# t1.rela_type, |
||||
|
# t1.rela_eff_begn_date, |
||||
|
# t1.rela_eff_end_date, |
||||
|
# t1.upd_prsn, |
||||
|
|
||||
|
# t2.data_ast_no, |
||||
|
# t2.data_ast_eng_name, |
||||
|
# t2.data_ast_cn_name, |
||||
|
# t2.data_ast_type, |
||||
|
# t2.data_ast_stat, |
||||
|
# t2.data_ast_desc, |
||||
|
# t2.data_ast_clas, |
||||
|
# t2.data_ast_cont, |
||||
|
# t2.data_ast_faq, |
||||
|
# t2.data_ast_estb_time, |
||||
|
# t2.data_ast_upd_time, |
||||
|
# t2.data_ast_src, |
||||
|
# t2.ast_no, |
||||
|
# t3.bookmark_orde, |
||||
|
# case( |
||||
|
# (t3.rela_onum.isnot(None), 1), |
||||
|
# else_=0 |
||||
|
# ).label('bookmark_flag') |
||||
|
# ) |
||||
|
# .distinct() |
||||
|
# .select_from(DataAstContent) |
||||
|
# .outerjoin(t1, DataAstContent.content_onum == t1.content_onum) |
||||
|
# .outerjoin(t2, t1.ast_onum == t2.ast_no) |
||||
|
# .outerjoin(t3, and_( |
||||
|
# # t1.rela_onum == t3.rela_onum, |
||||
|
# t2.data_ast_no == t3.data_ast_no, |
||||
|
# t3.user_id == user_id # admin用户的ID,后续以传参的形式过来 |
||||
|
# )) |
||||
|
# .where(DataAstContent.content_stat == 1) |
||||
|
# .order_by(DataAstContent.content_onum) |
||||
|
# ) |
||||
|
|
||||
|
# # 使用分页工具进行查询 |
||||
|
# data_ast_list = await PageUtil.paginate( |
||||
|
# db, |
||||
|
# query, |
||||
|
# page_num=query_object.page_num, |
||||
|
# page_size=query_object.page_size, |
||||
|
# is_page=is_page |
||||
|
# ) |
||||
|
|
||||
|
# return data_ast_list |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, is_page: bool = False): |
||||
|
""" |
||||
|
根据查询参数获取数据资产目录列表 |
||||
|
|
||||
|
:param db: 异步会话对象 |
||||
|
:param query_object: 分页查询参数对象 |
||||
|
:param is_page: 是否分页 |
||||
|
:return: 数据资产目录分页列表 |
||||
|
""" |
||||
|
# 创建别名对象 |
||||
|
t1 = aliased(DataAstContentRela, name='t1') |
||||
|
t2 = aliased(DataAstInfo, name='t2') |
||||
|
t3 = aliased(DataAstBookmarkRela, name='t3') |
||||
|
|
||||
|
# 修改子查询部分 |
||||
|
subquery_t1 = ( |
||||
|
select(DataAstContentRela) |
||||
|
.where(DataAstContentRela.upd_prsn == query_object.upd_prsn, DataAstContentRela.content_onum == '2' and DataAstContentRela.rela_status == '1') |
||||
|
.union_all( |
||||
|
select(DataAstContentRela) |
||||
|
.where(DataAstContentRela.content_onum != '2' and DataAstContentRela.rela_status == '1') |
||||
|
) |
||||
|
).alias('subquery_t1') # 为子查询分配唯一别名 |
||||
|
|
||||
|
query = ( |
||||
|
select( |
||||
|
DataAstContent.content_onum, |
||||
|
DataAstContent.content_name, |
||||
|
DataAstContent.content_stat, |
||||
|
DataAstContent.content_intr, |
||||
|
DataAstContent.content_pic, |
||||
|
DataAstContent.supr_content_onum, |
||||
|
DataAstContent.leaf_node_flag, |
||||
|
DataAstContent.upd_prsn, |
||||
|
DataAstContent.upd_time, |
||||
|
|
||||
|
subquery_t1.c.rela_onum, # 明确指定子查询的字段 |
||||
|
subquery_t1.c.ast_onum, |
||||
|
subquery_t1.c.rela_type, |
||||
|
subquery_t1.c.rela_eff_begn_date, |
||||
|
subquery_t1.c.rela_eff_end_date, |
||||
|
subquery_t1.c.upd_prsn, |
||||
|
|
||||
|
t2.data_ast_no, |
||||
|
t2.data_ast_eng_name, |
||||
|
t2.data_ast_cn_name, |
||||
|
t2.data_ast_type, |
||||
|
t2.data_ast_stat, |
||||
|
t2.data_ast_desc, |
||||
|
t2.data_ast_clas, |
||||
|
t2.data_ast_cont, |
||||
|
t2.data_ast_faq, |
||||
|
t2.data_ast_estb_time, |
||||
|
t2.data_ast_upd_time, |
||||
|
t2.data_ast_src, |
||||
|
t2.ast_no, |
||||
|
t3.bookmark_orde, |
||||
|
case( |
||||
|
(t3.rela_onum.isnot(None), 1), |
||||
|
else_=0 |
||||
|
).label('bookmark_flag') |
||||
|
) |
||||
|
.distinct() |
||||
|
.select_from(DataAstContent) |
||||
|
.outerjoin(subquery_t1, DataAstContent.content_onum == subquery_t1.c.content_onum) # 明确使用子查询别名 |
||||
|
.outerjoin(t2, subquery_t1.c.ast_onum == t2.ast_no) |
||||
|
.outerjoin(t3, and_( |
||||
|
subquery_t1.c.ast_onum == t3.data_ast_no, |
||||
|
t3.user_id == user_id |
||||
|
)) |
||||
|
.where(DataAstContent.content_stat == 1) |
||||
|
.order_by(DataAstContent.content_onum) |
||||
|
) |
||||
|
|
||||
|
# 使用分页工具进行查询 |
||||
|
data_ast_list = await PageUtil.paginate( |
||||
|
db, |
||||
|
query, |
||||
|
page_num=query_object.page_num, |
||||
|
page_size=query_object.page_size, |
||||
|
is_page=is_page |
||||
|
) |
||||
|
|
||||
|
return data_ast_list |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def add_catalog_dao(cls, db: AsyncSession, catalog1: dict, catalog2: dict): |
||||
|
""" |
||||
|
新增目录数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 目录对象 |
||||
|
:return: |
||||
|
""" |
||||
|
db_catalog = DataAstContent(**catalog1) |
||||
|
db.add(db_catalog) |
||||
|
await db.flush() |
||||
|
|
||||
|
|
||||
|
# 处理子关系(统一转换为 ORM 模型) |
||||
|
for child in catalog2.get('children', []): |
||||
|
# 如果是 Pydantic 模型实例,先转换为字典 |
||||
|
if isinstance(child, DataCatalogChild): |
||||
|
child_dict = child.model_dump() |
||||
|
elif isinstance(child, dict): |
||||
|
child_dict = child |
||||
|
else: |
||||
|
raise TypeError("不支持的子关系数据类型") |
||||
|
|
||||
|
# 创建 ORM 模型实例 |
||||
|
processed_child = dict(child_dict) |
||||
|
processed_child['content_onum'] = db_catalog.content_onum |
||||
|
db_child = DataAstContentRela(**processed_child) |
||||
|
|
||||
|
db.add(db_child) |
||||
|
await db.flush() |
||||
|
|
||||
|
return db_catalog |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def edit_catalog_leaf_dao(cls, db: AsyncSession, catalog: dict): |
||||
|
""" |
||||
|
编辑叶子节点目录数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 需要更新的目录字典 |
||||
|
:return: |
||||
|
""" |
||||
|
content_onum = catalog['content_onum'] |
||||
|
stmt = ( |
||||
|
update(DataAstContent) |
||||
|
.where(DataAstContent.content_onum == content_onum) |
||||
|
.values( |
||||
|
leaf_node_flag=catalog['leaf_node_flag'] |
||||
|
) |
||||
|
) |
||||
|
|
||||
|
await db.execute(stmt) |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def edit_catalog_child_dao(cls, db: AsyncSession, catalog: dict): |
||||
|
""" |
||||
|
编辑目录数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 需要更新的目录字典 |
||||
|
:return: |
||||
|
""" |
||||
|
content_onum = catalog['content_onum'] |
||||
|
|
||||
|
stmt = ( |
||||
|
update(DataAstContent) |
||||
|
.where(DataAstContent.content_onum == content_onum) |
||||
|
.values( |
||||
|
content_name=catalog['content_name'], |
||||
|
content_stat=catalog['content_stat'], |
||||
|
content_intr=catalog['content_intr'], |
||||
|
content_pic=catalog['content_pic'], |
||||
|
supr_content_onum=catalog['supr_content_onum'], |
||||
|
leaf_node_flag=catalog['leaf_node_flag'], |
||||
|
upd_prsn=catalog['upd_prsn'], |
||||
|
upd_time=datetime.now() |
||||
|
) ) |
||||
|
|
||||
|
await db.execute(stmt) |
||||
|
|
||||
|
# 处理子关系 |
||||
|
for child in catalog.get('children', []): |
||||
|
rela_onum = child.get('rela_onum') |
||||
|
if rela_onum: |
||||
|
st = ( |
||||
|
update(DataAstContentRela) |
||||
|
.where(DataAstContentRela.rela_onum == rela_onum) |
||||
|
.values( |
||||
|
content_onum=child.get('content_onum'), |
||||
|
ast_onum=child.get('ast_onum'), |
||||
|
rela_type=child.get('rela_type'), |
||||
|
rela_eff_begn_date=child.get('rela_eff_begn_date'), |
||||
|
rela_eff_end_date=child.get('rela_eff_end_date'), |
||||
|
upd_prsn=child.get('upd_prsn')) |
||||
|
) |
||||
|
|
||||
|
await db.execute(st) |
||||
|
await cls.update_leaf_node_flag(db) |
||||
|
else: |
||||
|
child['content_onum'] = content_onum |
||||
|
db_child = DataAstContentRela(**child) |
||||
|
db.add(db_child) |
||||
|
await db.flush() |
||||
|
await cls.update_leaf_node_flag(db) |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def edit_catalog_dao(cls, db: AsyncSession, catalog1: dict): |
||||
|
""" |
||||
|
编辑目录数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 需要更新的目录字典 |
||||
|
:return: |
||||
|
""" |
||||
|
content_onum = catalog1['content_onum'] |
||||
|
stmt = ( |
||||
|
update(DataAstContent) |
||||
|
.where(DataAstContent.content_onum == content_onum) |
||||
|
.values( |
||||
|
content_name=catalog1['content_name'], |
||||
|
content_stat=catalog1['content_stat'], |
||||
|
content_intr=catalog1['content_intr'], |
||||
|
content_pic=catalog1['content_pic'], |
||||
|
supr_content_onum=catalog1['supr_content_onum'], |
||||
|
leaf_node_flag=catalog1['leaf_node_flag'], |
||||
|
upd_prsn=catalog1['upd_prsn'], |
||||
|
upd_time=datetime.now() |
||||
|
) |
||||
|
) |
||||
|
|
||||
|
await db.execute(stmt) |
||||
|
await cls.update_leaf_node_flag(db) |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def delete_catalog_dao(cls, db: AsyncSession, catalog: DeleteDataCatalogModel): |
||||
|
""" |
||||
|
删除目录数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 目录对象 |
||||
|
:content_stat=0 作废 |
||||
|
:return: |
||||
|
""" |
||||
|
content_onums = catalog.content_onums.split(',') |
||||
|
logger.info(f"Updating DataAstContent with supr_content_onum in {content_onums}") |
||||
|
await db.execute( |
||||
|
update(DataAstContentRela) |
||||
|
.where(DataAstContentRela.content_onum.in_(content_onums)) |
||||
|
.values( |
||||
|
rela_status=0, |
||||
|
rela_eff_end_date=datetime.now() |
||||
|
) |
||||
|
) |
||||
|
await db.execute( |
||||
|
update(DataAstContent) |
||||
|
.where(DataAstContent.content_onum.in_(content_onums)) |
||||
|
.values( |
||||
|
content_stat=0, |
||||
|
upd_time=datetime.now() |
||||
|
) |
||||
|
) |
||||
|
|
||||
|
|
||||
|
logger.info("更新叶子节点标志成功") |
||||
|
await cls.update_leaf_node_flag(db) |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def get_data_asset_catalog_tree(cls, db: AsyncSession): |
||||
|
""" |
||||
|
获取数据资产树 |
||||
|
|
||||
|
:param db: 异步会话对象 |
||||
|
:return: 去重后的数据资产树数据 |
||||
|
""" |
||||
|
query = ( |
||||
|
select( |
||||
|
DataAstInfo.data_ast_src, |
||||
|
DataAstInfo.data_ast_eng_name, |
||||
|
DataAstInfo.data_ast_cn_name, |
||||
|
DataAstInfo.ast_no |
||||
|
) |
||||
|
.distinct() |
||||
|
.select_from(DataAstInfo) |
||||
|
.where( |
||||
|
DataAstInfo.data_ast_stat == 1, |
||||
|
not_( |
||||
|
DataAstInfo.ast_no.in_( |
||||
|
select(DataAstContentRela.ast_onum) |
||||
|
.where(DataAstContentRela.rela_status == 1) |
||||
|
) |
||||
|
) |
||||
|
) |
||||
|
) |
||||
|
|
||||
|
result = await db.execute(query) |
||||
|
rows = result.fetchall() |
||||
|
|
||||
|
return rows |
||||
|
|
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def moved_catalog_instr_dao(cls, db: AsyncSession, moved_catalog_data: dict): |
||||
|
""" |
||||
|
编辑目录数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 需要更新的目录字典 |
||||
|
:return: |
||||
|
""" |
||||
|
# content_onum = moved_catalog_data['content_onum'] |
||||
|
|
||||
|
stmt = ( |
||||
|
update(DataAstContent) |
||||
|
.where(DataAstContent.content_onum == moved_catalog_data['content_onum'] , DataAstContent.supr_content_onum == moved_catalog_data['supr_content_onum']) |
||||
|
.values( |
||||
|
supr_content_onum=moved_catalog_data['supr_content_onum_after'], |
||||
|
upd_time=datetime.now() |
||||
|
) ) |
||||
|
|
||||
|
await db.execute(stmt) |
||||
|
await cls.update_leaf_node_flag(db) |
||||
|
|
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def merge_catalog_instr_dao(cls, db: AsyncSession, merge_catalog_data: dict): |
||||
|
""" |
||||
|
编辑目录数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 需要更新的目录字典 |
||||
|
:return: |
||||
|
""" |
||||
|
|
||||
|
# stmt = ( |
||||
|
# update(DataAstContent) |
||||
|
# .where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum']) |
||||
|
# .values( |
||||
|
# content_onum=merge_catalog_data['content_onum_after'], |
||||
|
# supr_content_onum=merge_catalog_data['supr_content_onum_after'], |
||||
|
# upd_time=datetime.now() |
||||
|
# ) ) |
||||
|
|
||||
|
# await db.execute(stmt) |
||||
|
stmt1 = ( |
||||
|
update(DataAstContentRela) |
||||
|
.where( DataAstContentRela.content_onum == merge_catalog_data['content_onum'] and DataAstContentRela.rela_status == 1 ) |
||||
|
.values( |
||||
|
content_onum=merge_catalog_data['content_onum_after'], |
||||
|
rela_eff_begn_date=datetime.now() |
||||
|
) |
||||
|
) |
||||
|
await db.execute(stmt1) |
||||
|
|
||||
|
stmt2 = ( |
||||
|
update(DataAstContent) |
||||
|
.where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum']) |
||||
|
.values( content_stat = '0' ) |
||||
|
) |
||||
|
await db.execute(stmt2) |
||||
|
|
||||
|
await cls.update_leaf_node_flag(db) |
||||
|
|
||||
|
@classmethod |
||||
|
async def removerel_data_ast_catalog_dao(cls, db: AsyncSession, removerel_catalog_data: dict): |
||||
|
""" |
||||
|
编辑资产关系数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 需要更新的目录字典 |
||||
|
:return: |
||||
|
""" |
||||
|
|
||||
|
stmt = ( |
||||
|
update(DataAstContentRela) |
||||
|
.where(DataAstContentRela.rela_onum == removerel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == removerel_catalog_data['content_onum']) |
||||
|
.values( |
||||
|
rela_status=removerel_catalog_data['rela_status'] |
||||
|
) ) |
||||
|
|
||||
|
await db.execute(stmt) |
||||
|
await cls.update_leaf_node_flag(db) |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def moverel_data_ast_catalog_dao(cls, db: AsyncSession, moverel_catalog_data: dict): |
||||
|
""" |
||||
|
编辑资产关系数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 需要更新的目录字典 |
||||
|
:return: |
||||
|
""" |
||||
|
|
||||
|
stmt = ( |
||||
|
update(DataAstContentRela) |
||||
|
.where(DataAstContentRela.rela_onum == moverel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == moverel_catalog_data['content_onum']) |
||||
|
.values( |
||||
|
content_onum=moverel_catalog_data['content_onum_after'], |
||||
|
rela_eff_end_date=datetime.now() |
||||
|
) ) |
||||
|
|
||||
|
await db.execute(stmt) |
||||
|
await cls.update_leaf_node_flag(db) |
||||
|
|
||||
|
@classmethod |
||||
|
async def update_leaf_node_flag(cls, db: AsyncSession): |
||||
|
""" |
||||
|
更新leaf_node_flag字段 |
||||
|
""" |
||||
|
# 创建别名对象 |
||||
|
t2 = aliased(DataAstContent, name='t2') # 正确使用aliased创建别名 |
||||
|
subquery = ( |
||||
|
select(DataAstContent.content_onum) |
||||
|
.where( |
||||
|
DataAstContent.content_stat == '1', |
||||
|
DataAstContent.leaf_node_flag == 0, |
||||
|
not_( |
||||
|
select(1) |
||||
|
.select_from(t2) # 使用别名后的表 |
||||
|
.where( |
||||
|
t2.supr_content_onum == DataAstContent.content_onum, |
||||
|
t2.content_stat == '1' |
||||
|
) |
||||
|
.exists() # 添加exists()方法 |
||||
|
) |
||||
|
) |
||||
|
).alias('temp') |
||||
|
|
||||
|
stmt = ( |
||||
|
update(DataAstContent) |
||||
|
.where(DataAstContent.content_onum.in_(subquery)) |
||||
|
.values(leaf_node_flag=1, upd_time=datetime.now()) |
||||
|
) |
||||
|
await db.execute(stmt) |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def delete_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest, user_name: str, user_id: str): |
||||
|
""" |
||||
|
取消收藏数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 收藏对象 |
||||
|
:return: 操作结果字典(包含成功状态和提示信息) |
||||
|
""" |
||||
|
try: |
||||
|
# 创建子查询:获取需要删除的资产编号 |
||||
|
ast_onum_subquery = ( |
||||
|
select(DataAstContentRela.ast_onum) |
||||
|
.where( |
||||
|
DataAstContentRela.rela_onum == catalog['rela_onum'], |
||||
|
DataAstBookmarkRela.user_id == user_id, |
||||
|
DataAstContentRela.rela_status == '1' |
||||
|
) |
||||
|
).subquery() |
||||
|
|
||||
|
# 构建删除语句 |
||||
|
stmt1 = ( |
||||
|
delete(DataAstContentRela) |
||||
|
.where( |
||||
|
DataAstContentRela.upd_prsn == user_name, |
||||
|
DataAstContentRela.content_onum == '2', # 考虑使用变量或常量 |
||||
|
DataAstContentRela.ast_onum.in_(ast_onum_subquery), |
||||
|
DataAstContentRela.rela_status == 1 |
||||
|
) |
||||
|
) |
||||
|
|
||||
|
stmt2 = ( |
||||
|
delete(DataAstBookmarkRela) |
||||
|
.where( |
||||
|
DataAstBookmarkRela.data_ast_no.in_(ast_onum_subquery), |
||||
|
DataAstBookmarkRela.user_id == user_id |
||||
|
) |
||||
|
) |
||||
|
|
||||
|
# 执行删除操作 |
||||
|
await db.execute(stmt1) |
||||
|
await db.execute(stmt2) |
||||
|
await db.commit() |
||||
|
|
||||
|
logger.info(f"成功删除收藏关系") |
||||
|
return {"is_success": True} |
||||
|
|
||||
|
except IntegrityError as ie: |
||||
|
await db.rollback() |
||||
|
logger.error(f"删除收藏关系失败(完整性错误): {str(ie)}") |
||||
|
return {"is_success": False, "message": "数据库完整性错误"} |
||||
|
except Exception as e: |
||||
|
await db.rollback() |
||||
|
logger.error(f"删除收藏关系失败: {str(e)}") |
||||
|
return {"is_success": False, "message": "未知错误"} |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def delete_ast_book_mark_rela_by_content_onum(cls, db: AsyncSession, content_onum: int, user_id: int): |
||||
|
""" |
||||
|
根据目录ID和用户ID删除收藏关系 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param content_onum: 目录ID |
||||
|
:param user_id: 用户ID |
||||
|
:return: |
||||
|
""" |
||||
|
# 创建别名对象 |
||||
|
t1 = aliased(DataAstContentRela, name='t1') |
||||
|
t2 = aliased(DataAstInfo, name='t2') |
||||
|
t3 = aliased(DataAstBookmarkRela, name='t3') |
||||
|
|
||||
|
|
||||
|
cte = ( |
||||
|
select(t3.rela_onum) |
||||
|
.select_from(DataAstContent) |
||||
|
.outerjoin(t1, DataAstContent.content_onum == t1.content_onum) |
||||
|
.outerjoin(t2, t1.ast_onum == t2.ast_no) |
||||
|
.outerjoin(t3, and_( |
||||
|
t2.data_ast_no == t3.data_ast_no, |
||||
|
t3.user_id == user_id |
||||
|
)) |
||||
|
.where(DataAstContent.content_onum == content_onum) |
||||
|
).cte('cte') |
||||
|
|
||||
|
stmt = ( |
||||
|
delete(DataAstBookmarkRela) |
||||
|
.where(DataAstBookmarkRela.rela_onum.in_(select(cte.c.rela_onum))) |
||||
|
) |
||||
|
|
||||
|
await db.execute(stmt) |
||||
|
await db.commit() |
||||
|
logger.info(" 删除收藏关系,操作成功") |
||||
|
|
||||
|
|
||||
|
|
||||
|
# @classmethod |
||||
|
# async def add_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest): |
||||
|
# """ |
||||
|
# 添加收藏数据库操作 |
||||
|
|
||||
|
# :param db: orm对象 |
||||
|
# :param catalog: 收藏对象 |
||||
|
# :return: |
||||
|
# """ |
||||
|
# #如果catalog[user_id]下已经存在了catalog[data_ast_no],那么返回已收藏,否则添加收藏,新添加的收藏顺序号要求是插入之前最大顺序号加1 |
||||
|
# db_catalog = DataAstBookmarkRela(**catalog) |
||||
|
# db.add(db_catalog) |
||||
|
# await db.flush() |
||||
|
# logger.info(" 添加收藏,操作成功") |
||||
|
|
||||
|
@classmethod |
||||
|
async def add_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest, user_name: str): |
||||
|
""" |
||||
|
添加收藏数据库操作 |
||||
|
|
||||
|
:param db: orm对象 |
||||
|
:param catalog: 收藏对象 |
||||
|
:return: 操作结果字典(包含成功状态和提示信息) |
||||
|
""" |
||||
|
try: |
||||
|
logger.info(f"开始处理收藏请求,用户ID: {catalog['user_id']}, 资产编号: {catalog['data_ast_no']}") |
||||
|
|
||||
|
# 获取通过资产序号,获取资产编号 |
||||
|
para_ast_no = ( |
||||
|
select( |
||||
|
DataAstInfo.ast_no |
||||
|
) |
||||
|
.where( |
||||
|
DataAstInfo.data_ast_no == catalog['data_ast_no'] |
||||
|
) |
||||
|
) |
||||
|
|
||||
|
# 1. 检查是否已存在相同收藏 |
||||
|
logger.info("检查是否已存在相同收藏...") |
||||
|
exists = await db.execute( |
||||
|
select(DataAstBookmarkRela) |
||||
|
.where( |
||||
|
DataAstBookmarkRela.user_id == catalog['user_id'], |
||||
|
DataAstBookmarkRela.data_ast_no == para_ast_no |
||||
|
) |
||||
|
) |
||||
|
if exists.scalar(): |
||||
|
logger.warning(f"用户 {catalog['user_id']} 已收藏资产 {para_ast_no}") |
||||
|
return {"is_success": False, "message": "该资产已被收藏"} |
||||
|
|
||||
|
# 2. 获取当前最大顺序号 |
||||
|
logger.info("获取当前最大顺序号...") |
||||
|
max_order = await db.execute( |
||||
|
select(func.max(DataAstBookmarkRela.bookmark_orde)) |
||||
|
.where(DataAstBookmarkRela.user_id == catalog['user_id']) |
||||
|
) |
||||
|
max_bookmark_orde = max_order.scalar() or 0 |
||||
|
logger.info(f"当前最大顺序号: {max_bookmark_orde}") |
||||
|
|
||||
|
# 3. 设置新顺序号 |
||||
|
logger.info("设置新顺序号...") |
||||
|
catalog_dict = { |
||||
|
'user_id': catalog["user_id"], |
||||
|
'data_ast_no': para_ast_no, |
||||
|
'bookmark_orde': max_bookmark_orde + 1, |
||||
|
'bookmark_time': datetime.now() |
||||
|
} |
||||
|
|
||||
|
# 4. 创建并保存新记录 |
||||
|
logger.info("创建并保存新记录...") |
||||
|
|
||||
|
db_catalog = DataAstBookmarkRela(**catalog_dict) |
||||
|
db.add(db_catalog) |
||||
|
# 添加收藏关系内容 |
||||
|
logger.info("添加收藏关系内容...") |
||||
|
|
||||
|
new_rela = DataAstContentRela( |
||||
|
content_onum=catalog["content_onum"], |
||||
|
ast_onum=para_ast_no, |
||||
|
rela_type=catalog["rela_type"], |
||||
|
rela_eff_begn_date=catalog["rela_eff_begn_date"], |
||||
|
rela_eff_end_date=catalog["rela_eff_end_date"], |
||||
|
upd_prsn=user_name, |
||||
|
rela_status="1" |
||||
|
) |
||||
|
db.add(new_rela) |
||||
|
|
||||
|
# 提交事务 |
||||
|
logger.info("提交事务...") |
||||
|
await db.flush() |
||||
|
logger.info(f"用户 {catalog['user_id']} 收藏资产 {para_ast_no} 成功") |
||||
|
return {"is_success": True, "message": "收藏成功"} |
||||
|
|
||||
|
except Exception as e: |
||||
|
logger.error(f"收藏操作失败: {str(e)}", exc_info=True) |
||||
|
await db.rollback() |
||||
|
return {"is_success": False, "message": "收藏操作失败"} |
||||
|
|
||||
|
@classmethod |
||||
|
async def get_data_ast_indx_list(cls, db: AsyncSession, query_object: DataAstIndxRequest): |
||||
|
""" |
||||
|
根据查询参数获取数据资产指标列表 |
||||
|
""" |
||||
|
query = ( |
||||
|
select( |
||||
|
DataAstIndx.ast_no, |
||||
|
DataAstIndx.indx_no, |
||||
|
DataAstIndx.indx_name, |
||||
|
DataAstIndx.indx_val |
||||
|
) |
||||
|
.where( |
||||
|
DataAstIndx.ast_no == query_object.ast_no if query_object.ast_no else True, |
||||
|
DataAstIndx.indx_no == query_object.indx_no if query_object.indx_no else True, |
||||
|
DataAstIndx.indx_name.like(f"%{query_object.indx_name}%") if query_object.indx_name else True |
||||
|
) |
||||
|
) |
||||
|
|
||||
|
result = await db.execute(query) |
||||
|
rows = result.mappings().all() # 直接获取字典列表 |
||||
|
|
||||
|
return rows |
@ -0,0 +1,73 @@ |
|||||
|
# # data_ast_content_dao.py |
||||
|
# from datetime import datetime |
||||
|
# from sqlalchemy import select, insert, update, delete, func, not_, and_ |
||||
|
# from sqlalchemy.ext.asyncio import AsyncSession |
||||
|
# from module_admin.entity.do.data_ast_content_do import DataAstInfo |
||||
|
# from module_admin.entity.vo.data_ast_content_vo import DataAstInfoRequest, DataAstInfoPageQueryModel |
||||
|
# from utils.page_util import PageUtil |
||||
|
# from utils.log_util import logger |
||||
|
|
||||
|
# class DataAstInfoDAO: |
||||
|
# """ |
||||
|
# 数据资产信息模块数据库操作层 |
||||
|
# """ |
||||
|
|
||||
|
# @classmethod |
||||
|
# async def add_data_ast_info_dao(cls, db: AsyncSession, data_ast_info: dict): |
||||
|
# """ |
||||
|
# 新增数据资产信息数据库操作 |
||||
|
|
||||
|
# :param db: orm对象 |
||||
|
# :param data_ast_info: 数据资产信息对象 |
||||
|
# :return: |
||||
|
# """ |
||||
|
# db_data_ast_info = DataAstInfo(**data_ast_info) |
||||
|
# db.add(db_data_ast_info) |
||||
|
# await db.flush() |
||||
|
# return db_data_ast_info |
||||
|
|
||||
|
# @classmethod |
||||
|
# async def add_data_ast_info_batch_dao(cls, db: AsyncSession, data_ast_info_list: list): |
||||
|
# """ |
||||
|
# 批量新增数据资产信息数据库操作 |
||||
|
|
||||
|
# :param db: orm对象 |
||||
|
# :param data_ast_info_list: 数据资产信息对象列表 |
||||
|
# :return: |
||||
|
# """ |
||||
|
# db_data_ast_info_list = [DataAstInfo(**data_ast_info) for data_ast_info in data_ast_info_list] |
||||
|
# db.add_all(db_data_ast_info_list) |
||||
|
# await db.flush() |
||||
|
# return db_data_ast_info_list |
||||
|
|
||||
|
# @classmethod |
||||
|
# async def get_data_ast_info_list(cls, db: AsyncSession, query_object: DataAstInfoPageQueryModel, is_page: bool = False): |
||||
|
# """ |
||||
|
# 根据查询参数获取数据资产信息列表 |
||||
|
|
||||
|
# :param db: 异步会话对象 |
||||
|
# :param query_object: 分页查询参数对象 |
||||
|
# :param is_page: 是否分页 |
||||
|
# :return: 数据资产信息分页列表 |
||||
|
# """ |
||||
|
# query = ( |
||||
|
# select(DataAstInfo) |
||||
|
# .where( |
||||
|
# DataAstInfo.data_ast_eng_name == query_object.data_ast_eng_name if query_object.data_ast_eng_name else True, |
||||
|
# DataAstInfo.data_ast_cn_name == query_object.data_ast_cn_name if query_object.data_ast_cn_name else True, |
||||
|
# DataAstInfo.data_ast_type == query_object.data_ast_type if query_object.data_ast_type else True, |
||||
|
# DataAstInfo.data_ast_clas == query_object.data_ast_clas if query_object.data_ast_clas else True |
||||
|
# ) |
||||
|
# .order_by(DataAstInfo.data_ast_no) |
||||
|
# ) |
||||
|
|
||||
|
# # 使用分页工具进行查询 |
||||
|
# data_ast_info_list = await PageUtil.paginate( |
||||
|
# db, |
||||
|
# query, |
||||
|
# page_num=query_object.page_num, |
||||
|
# page_size=query_object.page_size, |
||||
|
# is_page=is_page |
||||
|
# ) |
||||
|
|
||||
|
# return data_ast_info_list |
@ -0,0 +1,84 @@ |
|||||
|
from datetime import datetime |
||||
|
from sqlalchemy import Column, DateTime, Integer, String, Text, DateTime, ForeignKey, Date, Double |
||||
|
from config.database import Base |
||||
|
|
||||
|
# 定义数据资产目录表 |
||||
|
class DataAstContent(Base): |
||||
|
__tablename__ = "t_data_ast_content" |
||||
|
|
||||
|
content_onum = Column(Integer, primary_key=True, autoincrement=True, comment='目录序号') |
||||
|
content_name = Column(String(255), nullable=False, comment='目录名称') |
||||
|
content_stat = Column(String(10), nullable=False, comment='目录状态(有效/废弃/停用)') |
||||
|
content_intr = Column(Text, comment='目录简介') |
||||
|
content_pic = Column(String(255), comment='目录负责人') |
||||
|
supr_content_onum = Column(Integer, comment='上级目录序号') |
||||
|
leaf_node_flag = Column(Integer, default=1, comment='叶子节点标志') |
||||
|
upd_prsn = Column(String(255), nullable=False, comment='更新人员') |
||||
|
upd_time = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment='更新时间') |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
class DataAstContentRela(Base): |
||||
|
__tablename__ = "t_data_ast_content_rela" |
||||
|
|
||||
|
rela_onum = Column(Integer, primary_key=True, autoincrement=True, comment='关系序号') |
||||
|
content_onum = Column(Integer, nullable=False, comment='目录序号') |
||||
|
ast_onum = Column(Integer, nullable=False, comment='资产序号') |
||||
|
rela_type = Column(String(50), default='归属关系', comment='关系类型') |
||||
|
rela_eff_begn_date = Column(Date, nullable=True, comment='关系生效开始日期') |
||||
|
rela_eff_end_date = Column(Date, nullable=True, comment='关系生效结束日期') |
||||
|
upd_prsn = Column(String(255), nullable=False, comment='更新人员') |
||||
|
rela_status = Column(String(18), nullable=True, comment='关系状态') |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
class DataAstInfo(Base): |
||||
|
__tablename__ = "t_data_ast_info" |
||||
|
|
||||
|
data_ast_no = Column(Integer, primary_key=True, autoincrement=True, comment='数据资产编号') |
||||
|
data_ast_eng_name = Column(String(255), nullable=False, comment='数据资产英文名称') |
||||
|
data_ast_cn_name = Column(String(255), nullable=True, comment='数据资产中文名称') |
||||
|
data_ast_type = Column(String(50), nullable=True, comment='数据资产类型') |
||||
|
data_ast_stat = Column(String(50), nullable=True, comment='数据资产状态') |
||||
|
data_ast_desc = Column(Text, nullable=True, comment='数据资产描述/说明') |
||||
|
data_ast_clas = Column(String(255), nullable=True, comment='数据资产标签') |
||||
|
data_ast_cont = Column(Text, nullable=True, comment='数据资产内容') |
||||
|
data_ast_faq = Column(Text, nullable=True, comment='数据资产常见问题') |
||||
|
data_ast_estb_time = Column(DateTime, default=datetime.now, comment='数据资产建立时间') |
||||
|
data_ast_upd_time = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment='数据资产更新时间') |
||||
|
data_ast_src = Column(String(255), nullable=True, comment='数据资产来源') |
||||
|
ast_no = Column(Integer, nullable=True, comment='资产编号') |
||||
|
|
||||
|
|
||||
|
|
||||
|
class DataAstBookmarkRela(Base): |
||||
|
__tablename__ = "t_data_ast_bookmark_rela" |
||||
|
|
||||
|
rela_onum = Column(Integer, primary_key=True, autoincrement=True, comment='唯一关系序号') |
||||
|
user_id = Column(String(20), nullable=False, comment='用户ID') |
||||
|
data_ast_no = Column(String(32), nullable=False, comment='数据资产编号') |
||||
|
bookmark_orde = Column(Integer, default=0, comment='收藏顺序') |
||||
|
bookmark_time = Column(DateTime, default=datetime.now, comment='收藏时间') |
||||
|
|
||||
|
class UserDataAstRela(Base): |
||||
|
__tablename__ = "t_user_data_ast_rela" |
||||
|
|
||||
|
rela_onum = Column(Integer, primary_key=True, autoincrement=True, comment='唯一关系序号') |
||||
|
data_ast_no = Column(String(32), nullable=False, comment='数据资产编号') |
||||
|
user_id = Column(String(20), nullable=False, comment='用户ID') |
||||
|
rela_type = Column(String(50), default='', comment='关系类型') |
||||
|
user_data_ast_clas = Column(Text, comment='用户数据资产分类标签') |
||||
|
user_data_ast_faq = Column(Text, comment='用户常见问题记录') |
||||
|
rela_eff_begn_date = Column(Date, nullable=False, comment='生效起始日期') |
||||
|
rela_eff_end_date = Column(Date, nullable=True, comment='生效终止日期') |
||||
|
|
||||
|
|
||||
|
class DataAstIndx(Base): |
||||
|
__tablename__ = "t_data_ast_indx" |
||||
|
|
||||
|
ast_no = Column(String(32), primary_key=True, comment='资产编号') |
||||
|
indx_no = Column(String(32), primary_key=True, comment='指标编号') |
||||
|
indx_name = Column(String(100), nullable=False, comment='指标名称') |
||||
|
indx_val = Column(Double, default=0, comment='指标值') |
@ -0,0 +1,21 @@ |
|||||
|
# # data_ast_content_do.py |
||||
|
# from datetime import datetime |
||||
|
# from sqlalchemy import Column, DateTime, Integer, String, Text, ForeignKey, Date, Double |
||||
|
# from config.database import Base |
||||
|
|
||||
|
# class DataAstInfo(Base): |
||||
|
# __tablename__ = "t_data_ast_info" |
||||
|
|
||||
|
# data_ast_no = Column(Integer, primary_key=True, autoincrement=True, comment='数据资产编号') |
||||
|
# data_ast_eng_name = Column(String(255), nullable=False, comment='数据资产英文名称') |
||||
|
# data_ast_cn_name = Column(String(255), nullable=True, comment='数据资产中文名称') |
||||
|
# data_ast_type = Column(String(50), nullable=True, comment='数据资产类型') |
||||
|
# data_ast_stat = Column(String(50), nullable=True, comment='数据资产状态') |
||||
|
# data_ast_desc = Column(Text, nullable=True, comment='数据资产描述/说明') |
||||
|
# data_ast_clas = Column(String(255), nullable=True, comment='数据资产标签') |
||||
|
# data_ast_cont = Column(Text, nullable=True, comment='数据资产内容') |
||||
|
# data_ast_faq = Column(Text, nullable=True, comment='数据资产常见问题') |
||||
|
# data_ast_estb_time = Column(DateTime, default=datetime.now, comment='数据资产建立时间') |
||||
|
# data_ast_upd_time = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment='数据资产更新时间') |
||||
|
# data_ast_src = Column(String(255), nullable=True, comment='数据资产来源') |
||||
|
# ast_no = Column(Integer, nullable=True, comment='资产编号') |
@ -0,0 +1,206 @@ |
|||||
|
from datetime import datetime |
||||
|
from pydantic import BaseModel, ConfigDict, Field, Extra |
||||
|
from pydantic.alias_generators import to_camel |
||||
|
from pydantic_validation_decorator import NotBlank, Size |
||||
|
from typing import Literal, Optional, List, Dict, Any |
||||
|
from module_admin.annotation.pydantic_annotation import as_query |
||||
|
|
||||
|
|
||||
|
class DataCatalogRequest(BaseModel): |
||||
|
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True, from_attributes=True) |
||||
|
|
||||
|
content_onum: Optional[int] = Field(default=None, alias="contentOnum", description='目录序号') |
||||
|
content_name: Optional[str] = Field(default=None, alias="contentName", description='目录名称') |
||||
|
content_stat: Optional[str] = Field(default=None, alias="contentStat", description='目录状态') |
||||
|
content_intr: Optional[str] = Field(default=None, alias="contentIntr", description='目录简介') |
||||
|
content_pic: Optional[str] = Field(default=None, alias="contentPic", description='目录负责人') |
||||
|
supr_content_onum: Optional[int] = Field(default=None, alias="suprContentOnum", description='上级目录序号') |
||||
|
leaf_node_flag: Optional[int] = Field(default=None, alias="leafNodeFlag", description='叶子结点标志(自动计算)') |
||||
|
upd_prsn: Optional[str] = Field(default=None, alias="updPrsn", description='更新人员(必填)') |
||||
|
upd_time: Optional[datetime] = Field(default=None, alias="updTime", description='更新时间') |
||||
|
|
||||
|
# 校验逻辑保持不变 |
||||
|
@NotBlank(field_name='content_name', message='目录名称不能为空') |
||||
|
@Size(field_name='content_name', min_length=0, max_length=255, message='目录名称长度不能超过255个字符') |
||||
|
@NotBlank(field_name='upd_prsn', message='更新人员不能为空') |
||||
|
@Size(field_name='upd_prsn', min_length=0, max_length=255, message='更新人员长度不能超过255个字符') |
||||
|
@NotBlank(field_name='content_stat', message='目录状态不能为空') |
||||
|
@Size(field_name='content_stat', min_length=0, max_length=255) |
||||
|
def validate_fields(self): |
||||
|
pass |
||||
|
|
||||
|
|
||||
|
class DataCatalogResponse(BaseModel): |
||||
|
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) |
||||
|
|
||||
|
content_onum: Optional[int] = Field(default=None,alias="contentOnum", description='目录ID') |
||||
|
content_name: Optional[str] = Field(default=None,alias="contentName", description='目录名称') |
||||
|
content_stat: Optional[str] = Field(default=None,alias="contentStat", description='目录状态') |
||||
|
content_intr: Optional[str] = Field(default=None,alias="contentIntr", description='目录简介') |
||||
|
content_pic: Optional[str] = Field(default=None,alias="contentPic", description='目录负责人') |
||||
|
supr_content_onum: Optional[int] = Field(default=None,alias="suprContentOnum", description='上级目录序号') |
||||
|
leaf_node_flag: Optional[int] = Field(default=None,alias="leafNodeFlag", description='叶子结点标志') |
||||
|
upd_prsn: Optional[str] = Field(default=None,alias="updPrsn", description='更新人员') |
||||
|
upd_time: Optional[datetime] = Field(default=None,alias="updTime", description='更新时间') |
||||
|
|
||||
|
class DataCatalogQueryModel(DataCatalogRequest): |
||||
|
""" |
||||
|
目录管理不分页查询模型 |
||||
|
""" |
||||
|
|
||||
|
begin_time: Optional[str] = Field(default=None, description='开始时间') |
||||
|
end_time: Optional[str] = Field(default=None, description='结束时间') |
||||
|
|
||||
|
@as_query |
||||
|
class DataCatalogPageQueryModel(DataCatalogQueryModel): |
||||
|
""" |
||||
|
目录管理分页查询模型 |
||||
|
""" |
||||
|
|
||||
|
page_num: int = Field(default=1, description='当前页码') |
||||
|
page_size: int = Field(default=10, description='每页记录数') |
||||
|
|
||||
|
|
||||
|
class DeleteDataCatalogModel(BaseModel): |
||||
|
""" |
||||
|
删除目录模型 |
||||
|
""" |
||||
|
|
||||
|
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True) |
||||
|
|
||||
|
content_onums: str = Field(alias="contentOnums", description='需要删除的目录ID') |
||||
|
|
||||
|
|
||||
|
|
||||
|
class DataCatalogChild(BaseModel): |
||||
|
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) |
||||
|
|
||||
|
rela_onum: Optional[int] = Field(default=None, alias="relaOnum", description='目录关系序号') |
||||
|
content_onum: Optional[int] = Field(default=None, alias="contentOnum", description='目录序号') |
||||
|
ast_onum: Optional[int] = Field(default=None, alias="astOnum", description='资产序号') |
||||
|
rela_type: Optional[str] = Field(default="归属关系", alias="relaType", description='关系类型') |
||||
|
rela_eff_begn_date: Optional[datetime] = Field(default=None, alias="relaEffBegnDate", description='关系有效起始日期') |
||||
|
rela_eff_end_date: Optional[datetime] = Field(default=None, alias="relaEffEndDate", description='关系有效终止日期') |
||||
|
upd_prsn: Optional[str] = Field(default=None, alias="updPrsn", description='更新人员') |
||||
|
rela_status: Optional[str] = Field(default=None, alias="relaStatus", description='关系状态') |
||||
|
|
||||
|
class DataCatalogResponseWithChildren(BaseModel): |
||||
|
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) |
||||
|
|
||||
|
content_onum: Optional[int] = Field(default=None, alias="contentOnum", description='目录ID') |
||||
|
content_name: Optional[str] = Field(default=None, alias="contentName", description='目录名称') |
||||
|
content_stat: Optional[str] = Field(default=None, alias="contentStat", description='目录状态') |
||||
|
content_intr: Optional[str] = Field(default=None, alias="contentIntr", description='目录简介') |
||||
|
content_pic: Optional[str] = Field(default=None, alias="contentPic", description='目录负责人') |
||||
|
supr_content_onum: Optional[int] = Field(default=None, alias="suprContentOnum", description='上级目录序号') |
||||
|
leaf_node_flag: Optional[int] = Field(default=None, alias="leafNodeFlag", description='叶子结点标志') |
||||
|
upd_prsn: Optional[str] = Field(default=None, alias="updPrsn", description='更新人员') |
||||
|
upd_time: Optional[datetime] = Field(default=None, alias="updTime", description='更新时间') |
||||
|
children: List[DataCatalogChild] = Field(default_factory=list, description='子关系列表') |
||||
|
|
||||
|
|
||||
|
# 数据资产树模型 |
||||
|
|
||||
|
class DataAssetCatalogTreeNode(BaseModel): |
||||
|
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) |
||||
|
|
||||
|
data_ast_eng_name: Optional[str] = Field(default=None, alias="dataAssetCatalogNo", description='数据资产序号') |
||||
|
data_ast_cn_name: Optional[str] = Field(default=None, alias="dataAssetCatalogName", description='数据资产名称') |
||||
|
ast_no: Optional[int] = Field(default=None, alias="dataAssetCatalogAstno", description='资产编号') |
||||
|
children: List['DataAssetCatalogTreeNode'] = Field(default_factory=list, description='子节点列表') |
||||
|
|
||||
|
|
||||
|
class DataAssetCatalogTreeResponse(BaseModel): |
||||
|
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) |
||||
|
|
||||
|
data_ast_src: Optional[str] = Field(default=None, alias="dataAssetSysName", description='数据资产系统名称') |
||||
|
children: List[DataAssetCatalogTreeNode] = Field(default_factory=list, description='子节点列表') |
||||
|
|
||||
|
|
||||
|
class DataCatalogMovedRequest(BaseModel): |
||||
|
model_config = ConfigDict( |
||||
|
alias_generator=to_camel, |
||||
|
populate_by_name=True, |
||||
|
extra='ignore', # 使用字符串形式替代Extra.ignore |
||||
|
from_attributes=True |
||||
|
) |
||||
|
|
||||
|
content_onum: Optional[int] = Field(default=None, alias="contentOnum", description='移动前的目录序号') |
||||
|
supr_content_onum: Optional[int] = Field(default=None, alias="suprContentOnum", description='移动前的上级目录序号') |
||||
|
supr_content_onum_after: Optional[int] = Field(default=None, alias="suprContentOnumAfter", description='移动后的上级目录序号') |
||||
|
|
||||
|
class DataCatalogMergeRequest(BaseModel): |
||||
|
model_config = ConfigDict( |
||||
|
alias_generator=to_camel, |
||||
|
populate_by_name=True, |
||||
|
extra='ignore', |
||||
|
from_attributes=True |
||||
|
) |
||||
|
|
||||
|
content_onum: Optional[int] = Field(default=None, alias="contentOnum", description='移动前的目录序号') |
||||
|
supr_content_onum: Optional[int] = Field(default=None, alias="suprContentOnum", description='移动前的上级目录序号') |
||||
|
content_onum_after: Optional[int] = Field(default=None, alias="contentOnumAfter", description='合并前的上级目录序号') |
||||
|
supr_content_onum_after: Optional[int] = Field(default=None, alias="suprContentOnumAfter", description='合并后的上级目录序号') |
||||
|
|
||||
|
class DataCatalogMoverelRequest(BaseModel): |
||||
|
model_config = ConfigDict( |
||||
|
alias_generator=to_camel, |
||||
|
populate_by_name=True, |
||||
|
extra='ignore', |
||||
|
from_attributes=True |
||||
|
) |
||||
|
|
||||
|
rela_onum: Optional[int] = Field(default=None, alias="relaOnum", description='关系序号') |
||||
|
content_onum: Optional[int] = Field(default=None, alias="contentOnum", description='目录序号') |
||||
|
content_onum_after: Optional[int] = Field(default=None, alias="contentOnumAfter", description='移动后的目录序号') |
||||
|
|
||||
|
|
||||
|
|
||||
|
class DataCatalogMoverelRequest(BaseModel): |
||||
|
model_config = ConfigDict( |
||||
|
alias_generator=to_camel, |
||||
|
populate_by_name=True, |
||||
|
extra='ignore', |
||||
|
from_attributes=True |
||||
|
) |
||||
|
|
||||
|
rela_onum: Optional[int] = Field(default=None, alias="relaOnum", description='关系序号') |
||||
|
content_onum: Optional[int] = Field(default=None, alias="contentOnum", description='目录序号') |
||||
|
content_onum_after: Optional[int] = Field(default=None, alias="contentOnumAfter", description='移动后的目录序号') |
||||
|
|
||||
|
|
||||
|
class DataAstBookmarkRelaRequest(BaseModel): |
||||
|
model_config = ConfigDict( |
||||
|
alias_generator=to_camel, |
||||
|
populate_by_name=True, |
||||
|
extra='ignore', |
||||
|
from_attributes=True |
||||
|
) |
||||
|
|
||||
|
rela_onum: Optional[int] = Field(default=None, alias="relaOnum", description='唯一关系序号') |
||||
|
user_id: Optional[str] = Field(default=None, alias="userId", description='用户ID') |
||||
|
data_ast_no: Optional[str] = Field(default=None, alias="dataAstNo", description='数据资产编号') |
||||
|
bookmark_orde: Optional[int] = Field(default=0, alias="bookmarkOrde", description='收藏顺序') |
||||
|
bookmark_time: Optional[datetime] = Field(default_factory=datetime.now, alias="bookmarkTime", description='收藏时间') |
||||
|
content_onum: Optional[int] = Field(default=None, alias="contentOnum", description='目录序号') |
||||
|
rela_type: Optional[str] = Field(default="归属关系", alias="relaType", description='关系类型') |
||||
|
rela_eff_begn_date: Optional[datetime] = Field(default=None, alias="relaEffBegnDate", description='关系有效起始日期') |
||||
|
rela_eff_end_date: Optional[datetime] = Field(default=None, alias="relaEffEndDate", description='关系有效终止日期') |
||||
|
|
||||
|
|
||||
|
|
||||
|
class DataAstIndxRequest(BaseModel): |
||||
|
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True, from_attributes=True) |
||||
|
|
||||
|
ast_no: Optional[str] = Field(default=None, alias="astNo", description='资产编号') |
||||
|
indx_no: Optional[str] = Field(default=None, alias="indxNo", description='指标编号') |
||||
|
indx_name: Optional[str] = Field(default=None, alias="indxName", description='指标名称') |
||||
|
indx_val: Optional[float] = Field(default=None, alias="indxVal", description='指标值') |
||||
|
|
||||
|
class DataAstIndxResponse(BaseModel): |
||||
|
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) |
||||
|
|
||||
|
ast_no: Optional[str] = Field(default=None, alias="astNo", description='资产编号') |
||||
|
indx_no: Optional[str] = Field(default=None, alias="indxNo", description='指标编号') |
||||
|
indx_name: Optional[str] = Field(default=None, alias="indxName", description='指标名称') |
||||
|
indx_val: Optional[float] = Field(default=None, alias="indxVal", description='指标值') |
@ -0,0 +1,52 @@ |
|||||
|
# # data_ast_content_vo.py |
||||
|
# from datetime import datetime |
||||
|
# from pydantic import BaseModel, ConfigDict, Field, Extra |
||||
|
# from pydantic.alias_generators import to_camel |
||||
|
# from typing import Optional, List, Dict, Any |
||||
|
# from module_admin.annotation.pydantic_annotation import as_query |
||||
|
|
||||
|
# class DataAstInfoRequest(BaseModel): |
||||
|
# model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True, from_attributes=True) |
||||
|
|
||||
|
# data_ast_eng_name: str = Field(alias="astEngName", description='资产英文名') |
||||
|
# data_ast_cn_name: Optional[str] = Field(default=None, alias="astCnName", description='资产中文名') |
||||
|
# data_ast_type: Optional[str] = Field(default=None, alias="astType", description='资产类型') |
||||
|
# data_ast_stat: Optional[str] = Field(default=None, alias="astStat", description='资产状态') |
||||
|
# data_ast_desc: Optional[str] = Field(default=None, alias="astDesc", description='数据资产描述') |
||||
|
# data_ast_clas: Optional[str] = Field(default=None, alias="astClas", description='资产标签') |
||||
|
# data_ast_cont: Optional[str] = Field(default=None, alias="astCont", description='数据资产内容') |
||||
|
# data_ast_faq: Optional[str] = Field(default=None, alias="astFaq", description='数据资产常见问题') |
||||
|
# data_ast_src: Optional[str] = Field(default=None, alias="astSrc", description='数据资产来源') |
||||
|
# ast_no: Optional[int] = Field(default=None, alias="astNo", description='资产编号') |
||||
|
|
||||
|
# class DataAstInfoResponse(BaseModel): |
||||
|
# model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) |
||||
|
|
||||
|
# data_ast_no: Optional[int] = Field(default=None, alias="dataAstNo", description='数据资产编号') |
||||
|
# data_ast_eng_name: Optional[str] = Field(default=None, alias="astEngName", description='资产英文名') |
||||
|
# data_ast_cn_name: Optional[str] = Field(default=None, alias="astCnName", description='资产中文名') |
||||
|
# data_ast_type: Optional[str] = Field(default=None, alias="astType", description='资产类型') |
||||
|
# data_ast_stat: Optional[str] = Field(default=None, alias="astStat", description='资产状态') |
||||
|
# data_ast_desc: Optional[str] = Field(default=None, alias="astDesc", description='数据资产描述') |
||||
|
# data_ast_clas: Optional[str] = Field(default=None, alias="astClas", description='资产标签') |
||||
|
# data_ast_cont: Optional[str] = Field(default=None, alias="astCont", description='数据资产内容') |
||||
|
# data_ast_faq: Optional[str] = Field(default=None, alias="astFaq", description='数据资产常见问题') |
||||
|
# data_ast_estb_time: Optional[datetime] = Field(default=None, alias="astEstbTime", description='数据资产收录时间') |
||||
|
# data_ast_upd_time: Optional[datetime] = Field(default=None, alias="astUpdTime", description='数据资产更新时间') |
||||
|
# data_ast_src: Optional[str] = Field(default=None, alias="astSrc", description='数据资产来源') |
||||
|
# ast_no: Optional[int] = Field(default=None, alias="astNo", description='资产编号') |
||||
|
|
||||
|
# @as_query |
||||
|
# class DataAstInfoQueryModel(DataAstInfoRequest): |
||||
|
# """ |
||||
|
# 数据资产不分页查询模型 |
||||
|
# """ |
||||
|
# pass |
||||
|
|
||||
|
# @as_query |
||||
|
# class DataAstInfoPageQueryModel(DataAstInfoQueryModel): |
||||
|
# """ |
||||
|
# 数据资产分页查询模型 |
||||
|
# """ |
||||
|
# page_num: int = Field(default=1, description='当前页码') |
||||
|
# page_size: int = Field(default=10, description='每页记录数') |
@ -0,0 +1,536 @@ |
|||||
|
from datetime import datetime |
||||
|
from utils.log_util import logger |
||||
|
from collections import defaultdict |
||||
|
from pyecharts.options import LabelOpts |
||||
|
from fastapi.responses import JSONResponse |
||||
|
from pyecharts.charts import Pie, Bar, Page |
||||
|
from sqlalchemy.ext.asyncio import AsyncSession |
||||
|
from exceptions.exception import ServiceException |
||||
|
from module_admin.dao.data_ast_content_dao import DataCatalogDAO |
||||
|
from module_admin.entity.vo.common_vo import CrudResponseModel |
||||
|
from module_admin.entity.vo.data_ast_content_vo import DataCatalogRequest, DataCatalogResponse, DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogResponseWithChildren, DataAssetCatalogTreeResponse, DataAssetCatalogTreeNode,DataCatalogMovedRequest,DataCatalogMergeRequest,DataCatalogChild,DataCatalogMoverelRequest,DataAstBookmarkRelaRequest,DataAstIndxRequest |
||||
|
|
||||
|
|
||||
|
class DataCatalogService: |
||||
|
""" |
||||
|
数据目录管理模块服务层 |
||||
|
""" |
||||
|
|
||||
|
@classmethod |
||||
|
async def get_catalog_list_services( |
||||
|
cls, query_db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, is_page: bool = False |
||||
|
): |
||||
|
""" |
||||
|
获取数据目录列表信息service |
||||
|
|
||||
|
:param query_db: orm对象 |
||||
|
:param query_object: 查询参数对象 |
||||
|
:param is_page: 是否开启分页 |
||||
|
:return: 数据目录列表信息对象 |
||||
|
""" |
||||
|
catalog_list_result = await DataCatalogDAO.get_catalog_list(query_db, query_object, user_id, is_page) |
||||
|
|
||||
|
# 按contentOnum分组 |
||||
|
grouped = defaultdict(list) |
||||
|
for item in catalog_list_result.rows: |
||||
|
grouped[item['contentOnum']].append(item) |
||||
|
|
||||
|
nodes = {} # 存储所有处理后的节点 |
||||
|
|
||||
|
# 处理每个组,生成节点 |
||||
|
for content_onum, items in grouped.items(): |
||||
|
first_item = items[0] |
||||
|
is_leaf = first_item['leafNodeFlag'] == 1 |
||||
|
rela_onum = first_item['relaOnum'] is not None |
||||
|
|
||||
|
# 公共字段提取 |
||||
|
common_fields = { |
||||
|
'contentOnum': first_item['contentOnum'], |
||||
|
'contentName': first_item['contentName'], |
||||
|
'contentStat': first_item['contentStat'], |
||||
|
'contentIntr': first_item['contentIntr'], |
||||
|
'contentPic': first_item['contentPic'], |
||||
|
'suprContentOnum': first_item['suprContentOnum'], |
||||
|
'leafNodeFlag': first_item['leafNodeFlag'], |
||||
|
'updPrsn': first_item['updPrsn'], |
||||
|
'updTime': first_item['updTime'], |
||||
|
'children': [] |
||||
|
} |
||||
|
|
||||
|
# 处理叶子节点的数据资产子节点 |
||||
|
if is_leaf and rela_onum : |
||||
|
for item in items: |
||||
|
asset_child = { |
||||
|
'relaOnum': item['relaOnum'], |
||||
|
'contentOnum': first_item['contentOnum'], |
||||
|
'astOnum': item['astOnum'], |
||||
|
'relaType': item['relaType'], |
||||
|
'relaEffBegnDate': item['relaEffBegnDate'], |
||||
|
'relaEffEndDate': item['relaEffEndDate'], |
||||
|
'updPrsn1': item['updPrsn1'], |
||||
|
'dataAstNo': item['dataAstNo'], |
||||
|
'dataAstEngName': item['dataAstEngName'], |
||||
|
'dataAstCnName': item['dataAstCnName'], |
||||
|
'dataAstType': item['dataAstType'], |
||||
|
'dataAstStat': item['dataAstStat'], |
||||
|
'dataAstDesc': item['dataAstDesc'], |
||||
|
'dataAstClas': item['dataAstClas'], |
||||
|
'dataAstCont': item['dataAstCont'], |
||||
|
'dataAstFaq': item['dataAstFaq'], |
||||
|
'dataAstEstbTime': item['dataAstEstbTime'], |
||||
|
'dataAstUpdTime': item['dataAstUpdTime'], |
||||
|
'dataAstSrc': item['dataAstSrc'], |
||||
|
'astNo': item['astNo'], |
||||
|
'relaOnum': item['relaOnum'], |
||||
|
'bookmarkOrde': item['bookmarkOrde'], |
||||
|
'bookmarkFlag': item['bookmarkFlag'] |
||||
|
} |
||||
|
common_fields['children'].append(asset_child) |
||||
|
|
||||
|
nodes[content_onum] = common_fields |
||||
|
|
||||
|
# 构建父子关系 |
||||
|
root = None |
||||
|
for content_onum, node in nodes.items(): |
||||
|
supr = node['suprContentOnum'] |
||||
|
if supr is None: |
||||
|
root = node |
||||
|
else: |
||||
|
parent = nodes.get(supr) |
||||
|
if parent: |
||||
|
parent['children'].append(node) |
||||
|
|
||||
|
print('获取数据清单内容:',root) |
||||
|
|
||||
|
catalog_list_result.rows = [root] |
||||
|
|
||||
|
return catalog_list_result |
||||
|
|
||||
|
@classmethod |
||||
|
async def get_catalog_detail_services(cls, query_db: AsyncSession, content_onum: int): |
||||
|
""" |
||||
|
获取数据目录详细信息service |
||||
|
|
||||
|
:param query_db: orm对象 |
||||
|
:param content_onum: 数据目录ID |
||||
|
:return: 数据目录详细信息对象 |
||||
|
""" |
||||
|
|
||||
|
catalog_detail_result = await DataCatalogDAO.get_catalog_by_id(query_db, content_onum) |
||||
|
|
||||
|
return catalog_detail_result |
||||
|
|
||||
|
@classmethod |
||||
|
async def add_catalog_services(cls, query_db: AsyncSession, request: DataCatalogResponseWithChildren): |
||||
|
""" |
||||
|
新增数据目录信息service |
||||
|
|
||||
|
:param query_db: orm对象 |
||||
|
:param request: 新增数据目录请求对象 |
||||
|
:return: 新增目录操作结果 |
||||
|
""" |
||||
|
catalog_data1 = { |
||||
|
'content_name': request.content_name, |
||||
|
'content_stat': request.content_stat, |
||||
|
'content_intr': request.content_intr, |
||||
|
'content_pic': request.content_pic, |
||||
|
'supr_content_onum': request.supr_content_onum, |
||||
|
'leaf_node_flag': request.leaf_node_flag, |
||||
|
'upd_prsn': request.upd_prsn |
||||
|
} |
||||
|
catalog_data2 = { |
||||
|
'content_name': request.content_name, |
||||
|
'content_stat': request.content_stat, |
||||
|
'content_intr': request.content_intr, |
||||
|
'content_pic': request.content_pic, |
||||
|
'supr_content_onum': request.supr_content_onum, |
||||
|
'leaf_node_flag': request.leaf_node_flag, |
||||
|
'upd_prsn': request.upd_prsn, |
||||
|
'children': [child.model_dump() for child in request.children] # 将 children 转换为字典列表 |
||||
|
} |
||||
|
|
||||
|
|
||||
|
try: |
||||
|
for child in catalog_data2["children"]: |
||||
|
child["rela_eff_begn_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 设置默认值,当前时间 |
||||
|
child["rela_eff_end_date"] = datetime(year=2999, month=12, day=31, hour=0, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S"), # 设置默认值,2999-12-31 |
||||
|
child["upd_prsn"] = request.upd_prsn, |
||||
|
child["rela_status"] = "1" |
||||
|
|
||||
|
new_catalog = await DataCatalogDAO.add_catalog_dao(query_db, catalog_data1, catalog_data2) |
||||
|
await query_db.commit() |
||||
|
return CrudResponseModel(is_success=True, message='新增成功', data=new_catalog) |
||||
|
except Exception as e: |
||||
|
await query_db.rollback() |
||||
|
raise ServiceException(message=f"创建目录时发生错误: {str(e)}") |
||||
|
|
||||
|
@classmethod |
||||
|
async def edit_catalog_leaf_services(cls, query_db: AsyncSession,content_onum : int, leaf_node_flag : int): |
||||
|
""" |
||||
|
编辑数据目录信息service |
||||
|
|
||||
|
:param query_db: orm对象 |
||||
|
:param request: 编辑数据目录请求对象 |
||||
|
:return: 编辑目录操作结果 |
||||
|
""" |
||||
|
catalog_data1 = { |
||||
|
'content_onum': content_onum, |
||||
|
'leaf_node_flag': leaf_node_flag |
||||
|
} |
||||
|
try: |
||||
|
await DataCatalogDAO.edit_catalog_leaf_dao(query_db, catalog_data1) |
||||
|
await query_db.commit() |
||||
|
return CrudResponseModel(is_success=True, message='更新成功') |
||||
|
except Exception as e: |
||||
|
await query_db.rollback() |
||||
|
raise ServiceException(message=f"更新目录时发生错误: {str(e)}") |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def edit_catalog_child_services(cls, query_db: AsyncSession, request: DataCatalogResponseWithChildren): |
||||
|
""" |
||||
|
编辑数据目录信息service |
||||
|
|
||||
|
:param query_db: orm对象 |
||||
|
:param request: 编辑数据目录请求对象 |
||||
|
:return: 编辑目录操作结果 |
||||
|
""" |
||||
|
|
||||
|
catalog_data = { |
||||
|
'content_onum': request.content_onum, |
||||
|
'content_name': request.content_name, |
||||
|
'content_stat': request.content_stat, |
||||
|
'content_intr': request.content_intr, |
||||
|
'content_pic': request.content_pic, |
||||
|
'supr_content_onum': request.supr_content_onum, |
||||
|
'leaf_node_flag': request.leaf_node_flag, |
||||
|
'upd_prsn': request.upd_prsn, |
||||
|
'children': [child.model_dump() for child in request.children] # 将 children 转换为字典列表 |
||||
|
} |
||||
|
|
||||
|
try: |
||||
|
for child in catalog_data["children"]: |
||||
|
# 设置 rela_eff_begn_date |
||||
|
if child.get("rela_eff_begn_date"): |
||||
|
child["rela_eff_begn_date"] = child["rela_eff_begn_date"].strftime("%Y-%m-%d %H:%M:%S") |
||||
|
else: |
||||
|
child["rela_eff_begn_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
||||
|
|
||||
|
# 设置 rela_eff_end_date |
||||
|
if child.get("rela_eff_end_date"): |
||||
|
child["rela_eff_end_date"] = child["rela_eff_end_date"].strftime("%Y-%m-%d %H:%M:%S") |
||||
|
else: |
||||
|
child["rela_eff_end_date"] = datetime(year=2999, month=12, day=31, hour=0, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S") |
||||
|
|
||||
|
child["upd_prsn"] = request.upd_prsn |
||||
|
child["rela_status"] = "1" |
||||
|
await DataCatalogDAO.edit_catalog_child_dao(query_db, catalog_data) |
||||
|
await query_db.commit() |
||||
|
return CrudResponseModel(is_success=True, message='更新成功') |
||||
|
except Exception as e: |
||||
|
await query_db.rollback() |
||||
|
raise ServiceException(message=f"更新目录时发生错误: {str(e)}") |
||||
|
|
||||
|
@classmethod |
||||
|
async def delete_catalog_services(cls, query_db: AsyncSession, request: DeleteDataCatalogModel,user_id): |
||||
|
""" |
||||
|
删除数据目录信息service |
||||
|
|
||||
|
:param query_db: orm对象 |
||||
|
:param request: 删除数据目录请求对象 |
||||
|
:return: 删除目录操作结果 |
||||
|
""" |
||||
|
if request.content_onums: |
||||
|
content_onum_list = request.content_onums.split(',') |
||||
|
try: |
||||
|
for content_onum in content_onum_list: |
||||
|
catalog = await cls.get_catalog_detail_services(query_db, int(content_onum)) |
||||
|
if not catalog: |
||||
|
raise ServiceException(message=f'目录ID {content_onum} 不存在') |
||||
|
await DataCatalogDAO.delete_catalog_dao(query_db, DeleteDataCatalogModel(content_onums=content_onum)) |
||||
|
await DataCatalogDAO.delete_ast_book_mark_rela_by_content_onum(query_db, int(content_onum), user_id) |
||||
|
await query_db.commit() |
||||
|
return CrudResponseModel(is_success=True, message='删除成功') |
||||
|
except Exception as e: |
||||
|
await query_db.rollback() |
||||
|
raise e |
||||
|
else: |
||||
|
raise ServiceException(message='传入目录id为空') |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def get_data_asset_catalog_tree_services(cls, query_db: AsyncSession): |
||||
|
""" |
||||
|
获取数据资产树信息service |
||||
|
|
||||
|
:param query_db: orm对象 |
||||
|
:return: 数据资产树信息对象 |
||||
|
""" |
||||
|
# 从 DAO 层获取数据 |
||||
|
rows = await DataCatalogDAO.get_data_asset_catalog_tree(query_db) |
||||
|
# 使用字典暂存分组数据 |
||||
|
sys_groups = {} |
||||
|
|
||||
|
for item in rows: |
||||
|
sys_name, eng_name, cn_name, ast_no = item |
||||
|
# 创建或获取系统分组 |
||||
|
if sys_name not in sys_groups: |
||||
|
sys_groups[sys_name] = { |
||||
|
"dataAssetSysName": sys_name, |
||||
|
"children": [] |
||||
|
} |
||||
|
|
||||
|
# 添加子节点 |
||||
|
sys_groups[sys_name]["children"].append({ |
||||
|
"dataAssetCatalogNo": eng_name, |
||||
|
"dataAssetCatalogName": cn_name, |
||||
|
"dataAssetCatalogAstno": ast_no |
||||
|
}) |
||||
|
results = list(sys_groups.values()) |
||||
|
# 转换为最终列表格式 |
||||
|
return results |
||||
|
|
||||
|
@classmethod |
||||
|
async def moved_catalog_instr_services(cls, query_db: AsyncSession, request: DataCatalogMovedRequest): |
||||
|
|
||||
|
""" |
||||
|
移动数据目录service |
||||
|
""" |
||||
|
|
||||
|
moved_catalog_data = { |
||||
|
'content_onum': request.content_onum, |
||||
|
'supr_content_onum': request.supr_content_onum, |
||||
|
'supr_content_onum_after': request.supr_content_onum_after |
||||
|
} |
||||
|
|
||||
|
|
||||
|
try: |
||||
|
await DataCatalogDAO.moved_catalog_instr_dao(query_db, moved_catalog_data) |
||||
|
await query_db.commit() |
||||
|
return CrudResponseModel(is_success=True, message='目录移动成功') |
||||
|
except Exception as e: |
||||
|
await query_db.rollback() |
||||
|
raise ServiceException(message=f"移动目录时发生错误: {str(e)}") |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def merge_catalog_instr_services(cls, query_db: AsyncSession, request: DataCatalogMergeRequest): |
||||
|
|
||||
|
""" |
||||
|
移动数据目录service |
||||
|
""" |
||||
|
|
||||
|
merge_catalog_data = { |
||||
|
'content_onum': request.content_onum, |
||||
|
'supr_content_onum': request.supr_content_onum, |
||||
|
'content_onum_after': request.content_onum_after, |
||||
|
'supr_content_onum_after': request.supr_content_onum_after |
||||
|
} |
||||
|
try: |
||||
|
await DataCatalogDAO.merge_catalog_instr_dao(query_db, merge_catalog_data) |
||||
|
await query_db.commit() |
||||
|
return CrudResponseModel(is_success=True, message='目录合并成功') |
||||
|
except Exception as e: |
||||
|
await query_db.rollback() |
||||
|
raise ServiceException(message=f"目录合并时发生错误: {str(e)}") |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def removerel_data_ast_catalog_services(cls, query_db: AsyncSession, request: DataCatalogChild): |
||||
|
|
||||
|
""" |
||||
|
移除数据资产目录service |
||||
|
""" |
||||
|
|
||||
|
removerel_catalog_data = { |
||||
|
'rela_onum': request.rela_onum, |
||||
|
'content_onum': request.content_onum, |
||||
|
'ast_onum': request.ast_onum, |
||||
|
'rela_type': request.rela_type, |
||||
|
'rela_eff_begn_date': request.rela_eff_begn_date, |
||||
|
'rela_eff_end_date': request.rela_eff_end_date, |
||||
|
'upd_prsn': request.upd_prsn, |
||||
|
'rela_status': request.rela_status |
||||
|
} |
||||
|
|
||||
|
|
||||
|
try: |
||||
|
await DataCatalogDAO.removerel_data_ast_catalog_dao(query_db, removerel_catalog_data) |
||||
|
await query_db.commit() |
||||
|
return CrudResponseModel(is_success=True, message='资产移除成功') |
||||
|
except Exception as e: |
||||
|
await query_db.rollback() |
||||
|
raise ServiceException(message=f"移除资产时发生错误: {str(e)}") |
||||
|
|
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def moverel_data_ast_catalog_services(cls, query_db: AsyncSession, request: DataCatalogMoverelRequest): |
||||
|
|
||||
|
""" |
||||
|
移动数据目录service |
||||
|
""" |
||||
|
|
||||
|
moverel_catalog_data = { |
||||
|
'rela_onum': request.rela_onum, |
||||
|
'content_onum': request.content_onum, |
||||
|
'content_onum_after': request.content_onum_after |
||||
|
} |
||||
|
|
||||
|
|
||||
|
try: |
||||
|
await DataCatalogDAO.moverel_data_ast_catalog_dao(query_db, moverel_catalog_data) |
||||
|
await query_db.commit() |
||||
|
return CrudResponseModel(is_success=True, message='资产移动成功') |
||||
|
except Exception as e: |
||||
|
await query_db.rollback() |
||||
|
raise ServiceException(message=f"资产移动时发生错误: {str(e)}") |
||||
|
|
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
async def delete_ast_book_mark_rela_services(cls, db: AsyncSession, request: DataAstBookmarkRelaRequest, user_name: str, user_id: str): |
||||
|
""" |
||||
|
取消收藏数据库操作 |
||||
|
|
||||
|
""" |
||||
|
delete_rela_onum = { |
||||
|
'rela_onum': request.rela_onum |
||||
|
} |
||||
|
try: |
||||
|
await DataCatalogDAO.delete_ast_book_mark_rela_dao(db, delete_rela_onum,user_name,user_id) |
||||
|
await db.commit() |
||||
|
return CrudResponseModel(is_success=True, message='取消收藏,操作成功') |
||||
|
except Exception as e: |
||||
|
await db.rollback() |
||||
|
logger.error(" 取消收藏,操作失败") |
||||
|
|
||||
|
|
||||
|
# @classmethod |
||||
|
# async def add_ast_book_mark_rela_services(cls, db: AsyncSession, request: DataAstBookmarkRelaRequest): |
||||
|
# """ |
||||
|
# 添加收藏数据库操作 |
||||
|
|
||||
|
# """ |
||||
|
# add_rela_onum = { |
||||
|
# 'user_id': request.user_id, |
||||
|
# 'data_ast_no': request.data_ast_no |
||||
|
# } |
||||
|
# try: |
||||
|
# await DataCatalogDAO.add_ast_book_mark_rela_dao(db, add_rela_onum) |
||||
|
# await db.commit() |
||||
|
# return CrudResponseModel(is_success=True, message='添加收藏,操作成功') |
||||
|
# except Exception as e: |
||||
|
# await db.rollback() |
||||
|
# logger.error(" 添加收藏,操作失败") |
||||
|
|
||||
|
@classmethod |
||||
|
async def add_ast_book_mark_rela_services(cls, db: AsyncSession, request: DataAstBookmarkRelaRequest, user_name: str): |
||||
|
""" |
||||
|
添加收藏数据库操作 |
||||
|
|
||||
|
:param db: 异步会话对象 |
||||
|
:param request: 收藏请求对象,包含用户ID、资产编号等信息 |
||||
|
:param user_name: 当前操作用户的名称 |
||||
|
:return: 标准化响应模型,包含操作是否成功、消息和返回数据 |
||||
|
""" |
||||
|
# 构建DAO参数 |
||||
|
add_rela_onum = { |
||||
|
'user_id': request.user_id, |
||||
|
'data_ast_no': request.data_ast_no, # 我的收藏 |
||||
|
'content_onum': 2, |
||||
|
'rela_type': "归属关系", |
||||
|
'rela_eff_begn_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 设置默认值,当前时间 |
||||
|
'rela_eff_end_date': datetime(year=2999, month=12, day=31, hour=0, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S") |
||||
|
} |
||||
|
|
||||
|
try: |
||||
|
# 调用DAO层操作 |
||||
|
dao_result = await DataCatalogDAO.add_ast_book_mark_rela_dao(db, add_rela_onum, user_name) |
||||
|
|
||||
|
if not dao_result.get("is_success"): |
||||
|
# 业务逻辑错误(如重复收藏) |
||||
|
logger.warning(f"用户{request.user_id}收藏资产{request.data_ast_no}失败: {dao_result.get('message')}") |
||||
|
return CrudResponseModel( |
||||
|
is_success=False, |
||||
|
message=dao_result.get("message", "操作失败"), |
||||
|
data=None |
||||
|
) |
||||
|
|
||||
|
# 执行成功时提交事务 |
||||
|
await db.commit() |
||||
|
|
||||
|
# 记录成功日志 |
||||
|
if dao_result.get("data") is not None: |
||||
|
logger.info(f"用户{request.user_id}收藏资产{request.data_ast_no}成功,顺序号: {dao_result.get('data').bookmark_orde}") |
||||
|
return CrudResponseModel( |
||||
|
is_success=True, |
||||
|
message=dao_result.get("message", "操作成功"), |
||||
|
data={ |
||||
|
"rela_onum": dao_result.get("data").rela_onum, |
||||
|
"bookmark_orde": dao_result.get("data").bookmark_orde |
||||
|
} |
||||
|
) |
||||
|
else: |
||||
|
logger.warning(f"用户{request.user_id}收藏资产{request.data_ast_no}失败: 返回数据为空") |
||||
|
return CrudResponseModel( |
||||
|
is_success=False, |
||||
|
message="操作失败:返回数据为空", |
||||
|
data=None |
||||
|
) |
||||
|
|
||||
|
except Exception as e: |
||||
|
await db.rollback() |
||||
|
logger.error( |
||||
|
f"用户 {request.user_id} 收藏操作失败: {str(e)}", # 直接使用 request.user_id |
||||
|
exc_info=True |
||||
|
) |
||||
|
return CrudResponseModel( |
||||
|
is_success=False, |
||||
|
message=f"收藏操作失败: {str(e)}" |
||||
|
) |
||||
|
|
||||
|
@classmethod |
||||
|
async def get_data_ast_indx_list_services(cls, query_db: AsyncSession, query_object: DataAstIndxRequest): |
||||
|
|
||||
|
""" |
||||
|
获取数据资产指标列表信息service |
||||
|
""" |
||||
|
try: |
||||
|
indx_list_dict = await DataCatalogDAO.get_data_ast_indx_list(query_db, query_object) |
||||
|
|
||||
|
# 提取指标数据 |
||||
|
indx_names = [item["indx_name"] for item in indx_list_dict] |
||||
|
indx_vals = [item["indx_val"] for item in indx_list_dict] |
||||
|
|
||||
|
# 创建图表 |
||||
|
pie = ( |
||||
|
Pie() |
||||
|
.add("", list(zip(indx_names, indx_vals))) |
||||
|
.set_global_opts(title_opts={"text": "指标饼图"}) |
||||
|
.set_series_opts(label_opts=LabelOpts(formatter="{b}: {c}")) |
||||
|
) |
||||
|
|
||||
|
bar = ( |
||||
|
Bar() |
||||
|
.add_xaxis(indx_names) |
||||
|
.add_yaxis("指标值", indx_vals) |
||||
|
.set_global_opts(title_opts={"text": "指标柱状图"}) |
||||
|
) |
||||
|
|
||||
|
# 组合图表 |
||||
|
page = Page() |
||||
|
page.add(pie, bar) |
||||
|
except Exception as e: |
||||
|
logger.error(f"获取数据资产指标列表信息失败: {str(e)}") |
||||
|
raise ServiceException(message=f"获取数据资产指标列表信息失败: {str(e)}") |
||||
|
else: |
||||
|
return page.render_embed() |
||||
|
logger.info(f"获取数据资产指标列表信息成功") |
||||
|
|
||||
|
|
||||
|
|
@ -0,0 +1,90 @@ |
|||||
|
# # data_ast_content_service.py |
||||
|
# from datetime import datetime |
||||
|
# from utils.log_util import logger |
||||
|
# from collections import defaultdict |
||||
|
# from sqlalchemy.ext.asyncio import AsyncSession |
||||
|
# from exceptions.exception import ServiceException |
||||
|
# from module_admin.dao.data_ast_content_dao import DataAstInfoDAO |
||||
|
# from module_admin.entity.vo.common_vo import CrudResponseModel |
||||
|
# from module_admin.entity.vo.data_ast_content_vo import DataAstInfoRequest, DataAstInfoResponse, DataAstInfoPageQueryModel |
||||
|
|
||||
|
# class DataAstInfoService: |
||||
|
# """ |
||||
|
# 数据资产信息模块服务层 |
||||
|
# """ |
||||
|
|
||||
|
# @classmethod |
||||
|
# async def add_data_ast_info_services(cls, query_db: AsyncSession, request: DataAstInfoRequest): |
||||
|
# """ |
||||
|
# 新增数据资产信息service |
||||
|
|
||||
|
# :param query_db: orm对象 |
||||
|
# :param request: 新增数据资产信息请求对象 |
||||
|
# :return: 新增数据资产信息操作结果 |
||||
|
# """ |
||||
|
# data_ast_info_data = { |
||||
|
# 'data_ast_eng_name': request.data_ast_eng_name, |
||||
|
# 'data_ast_cn_name': request.data_ast_cn_name, |
||||
|
# 'data_ast_type': request.data_ast_type, |
||||
|
# 'data_ast_stat': request.data_ast_stat, |
||||
|
# 'data_ast_desc': request.data_ast_desc, |
||||
|
# 'data_ast_clas': request.data_ast_clas, |
||||
|
# 'data_ast_cont': request.data_ast_cont, |
||||
|
# 'data_ast_faq': request.data_ast_faq, |
||||
|
# 'data_ast_src': request.data_ast_src, |
||||
|
# 'ast_no': request.ast_no |
||||
|
# } |
||||
|
|
||||
|
# try: |
||||
|
# new_data_ast_info = await DataAstInfoDAO.add_data_ast_info_dao(query_db, data_ast_info_data) |
||||
|
# await query_db.commit() |
||||
|
# return CrudResponseModel(is_success=True, message='新增成功', data=new_data_ast_info) |
||||
|
# except Exception as e: |
||||
|
# await query_db.rollback() |
||||
|
# raise ServiceException(message=f"创建数据资产信息时发生错误: {str(e)}") |
||||
|
|
||||
|
# @classmethod |
||||
|
# async def add_data_ast_info_batch_services(cls, query_db: AsyncSession, request_list: List[DataAstInfoRequest]): |
||||
|
# """ |
||||
|
# 批量新增数据资产信息service |
||||
|
|
||||
|
# :param query_db: orm对象 |
||||
|
# :param request_list: 新增数据资产信息请求对象列表 |
||||
|
# :return: 新增数据资产信息操作结果 |
||||
|
# """ |
||||
|
# data_ast_info_list = [ |
||||
|
# { |
||||
|
# 'data_ast_eng_name': request.data_ast_eng_name, |
||||
|
# 'data_ast_cn_name': request.data_ast_cn_name, |
||||
|
# 'data_ast_type': request.data_ast_type, |
||||
|
# 'data_ast_stat': request.data_ast_stat, |
||||
|
# 'data_ast_desc': request.data_ast_desc, |
||||
|
# 'data_ast_clas': request.data_ast_clas, |
||||
|
# 'data_ast_cont': request.data_ast_cont, |
||||
|
# 'data_ast_faq': request.data_ast_faq, |
||||
|
# 'data_ast_src': request.data_ast_src, |
||||
|
# 'ast_no': request.ast_no |
||||
|
# } |
||||
|
# for request in request_list |
||||
|
# ] |
||||
|
|
||||
|
# try: |
||||
|
# new_data_ast_info_list = await DataAstInfoDAO.add_data_ast_info_batch_dao(query_db, data_ast_info_list) |
||||
|
# await query_db.commit() |
||||
|
# return CrudResponseModel(is_success=True, message='批量新增成功', data=new_data_ast_info_list) |
||||
|
# except Exception as e: |
||||
|
# await query_db.rollback() |
||||
|
# raise ServiceException(message=f"批量创建数据资产信息时发生错误: {str(e)}") |
||||
|
|
||||
|
# @classmethod |
||||
|
# async def get_data_ast_info_list_services(cls, query_db: AsyncSession, query_object: DataAstInfoPageQueryModel, is_page: bool = False): |
||||
|
# """ |
||||
|
# 获取数据资产信息列表service |
||||
|
|
||||
|
# :param query_db: orm对象 |
||||
|
# :param query_object: 查询参数对象 |
||||
|
# :param is_page: 是否开启分页 |
||||
|
# :return: 数据资产信息列表信息对象 |
||||
|
# """ |
||||
|
# data_ast_info_list_result = await DataAstInfoDAO.get_data_ast_info_list(query_db, query_object, is_page) |
||||
|
# return data_ast_info_list_result |
Loading…
Reference in new issue