from fastapi import APIRouter, Request from fastapi import Depends from config.get_db import get_db from module_admin.service.login_service import get_current_user, CurrentUserInfoServiceResponse from module_admin.service.mmdt_service import * from module_admin.entity.vo.mmdt_vo import * from utils.response_util import * from utils.log_util import * from utils.page_util import get_page_obj from utils.common_util import bytes2file_response from module_admin.aspect.interface_auth import CheckUserInterfaceAuth from module_admin.annotation.log_annotation import log_decorator mmdtController = APIRouter(dependencies=[Depends(get_current_user)]) @mmdtController.post("/mmdt/forSelectOption", response_model=MmdtSelectOptionResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('common'))]) async def get_dasset_mmdt_select(request: Request, query_db: Session = Depends(get_db)): try: role_query_result = MmdtService.get_mmdt_select_option_services(query_db) logger.info('获取成功') return response_200(data=role_query_result, message="获取成功") except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @mmdtController.post("/mmdt/get", response_model=MmdtPageObjectResponse, dependencies=[Depends(CheckUserInterfaceAuth('dasset:mmdt:list'))]) async def get_dasset_mmdt_list(request: Request, mmdt_page_query: MmdtPageObject, query_db: Session = Depends(get_db)): try: mmdt_query = MmdtModel(**mmdt_page_query.dict()) # 获取全量数据 mmdt_query_result = MmdtService.get_mmdt_list_services(query_db, mmdt_query) # 分页操作 mmdt_page_query_result = get_page_obj(mmdt_query_result, mmdt_page_query.page_num, mmdt_page_query.page_size) logger.info('获取成功') return response_200(data=mmdt_page_query_result, message="获取成功") except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @mmdtController.post("/mmdt/add", response_model=CrudMmdtResponse, dependencies=[Depends(CheckUserInterfaceAuth('dasset:mmdt:add'))]) @log_decorator(title='模型市场管理', business_type=1) async def add_dasset_mmdt(request: Request, add_mmdt: MmdtModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)): try: add_mmdt.create_by = current_user.user.user_name add_mmdt.update_by = current_user.user.user_name add_mmdt_result = MmdtService.add_mmdt_services(query_db, add_mmdt) if add_mmdt_result.is_success: logger.info(add_mmdt_result.message) return response_200(data=add_mmdt_result, message=add_mmdt_result.message) else: logger.warning(add_mmdt_result.message) return response_400(data="", message=add_mmdt_result.message) except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @mmdtController.patch("/mmdt/edit", response_model=CrudMmdtResponse, dependencies=[Depends(CheckUserInterfaceAuth('dasset:mmdt:edit'))]) @log_decorator(title='模型市场管理', business_type=2) async def edit_dasset_mmdt(request: Request, edit_mmdt: MmdtModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)): try: edit_mmdt.update_by = current_user.user.user_name edit_mmdt.update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") edit_mmdt_result = MmdtService.edit_mmdt_services(query_db, edit_mmdt) if edit_mmdt_result.is_success: logger.info(edit_mmdt_result.message) return response_200(data=edit_mmdt_result, message=edit_mmdt_result.message) else: logger.warning(edit_mmdt_result.message) return response_400(data="", message=edit_mmdt_result.message) except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @mmdtController.post("/mmdt/delete", response_model=CrudMmdtResponse, dependencies=[Depends(CheckUserInterfaceAuth('dasset:mmdt:remove'))]) @log_decorator(title='模型市场管理', business_type=3) async def delete_dasset_mmdt(request: Request, delete_mmdt: DeleteMmdtModel, query_db: Session = Depends(get_db)): try: delete_mmdt_result = MmdtService.delete_mmdt_services(query_db, delete_mmdt) if delete_mmdt_result.is_success: logger.info(delete_mmdt_result.message) return response_200(data=delete_mmdt_result, message=delete_mmdt_result.message) else: logger.warning(delete_mmdt_result.message) return response_400(data="", message=delete_mmdt_result.message) except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @mmdtController.get("/mmdt/{onum}", response_model=MmdtModel, dependencies=[Depends(CheckUserInterfaceAuth('dasset:mmdt:query'))]) async def query_detail_dasset_mmdt(request: Request, onum: int, query_db: Session = Depends(get_db)): try: detail_mmdt_result = MmdtService.detail_mmdt_services(query_db, onum) logger.info(f'获取onum为{onum}的信息成功') return response_200(data=detail_mmdt_result, message='获取成功') except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @mmdtController.post("/mmdt/export", dependencies=[Depends(CheckUserInterfaceAuth('dasset:mmdt:export'))]) @log_decorator(title='模型市场管理', business_type=5) async def export_dasset_mmdt_list(request: Request, mmdt_query: MmdtModel, query_db: Session = Depends(get_db)): try: # 获取全量数据 mmdt_query_result = MmdtService.get_mmdt_list_services(query_db, mmdt_query) mmdt_export_result = MmdtService.export_mmdt_list_services(mmdt_query_result) logger.info('导出成功') return streaming_response_200(data=bytes2file_response(mmdt_export_result)) except Exception as e: logger.exception(e) return response_500(data="", message=str(e))