from fastapi import APIRouter, Request from fastapi import Depends import base64 from config.get_db import get_db from module_admin.service.login_service import get_current_user, CurrentUserInfoServiceResponse from module_admin.service.vecset_service import * from module_admin.entity.vo.vecset_vo import * from module_admin.dao.vecset_dao import * from utils.page_util import get_page_obj from utils.response_util import * from utils.log_util import * from utils.common_util import bytes2file_response from module_admin.aspect.interface_auth import CheckUserInterfaceAuth from module_admin.aspect.data_scope import GetDataScope from module_admin.annotation.log_annotation import log_decorator vecsetController = APIRouter(dependencies=[Depends(get_current_user)]) # @vecsetController.post("/vecset/get", response_model=VecsetPageObjectResponse, dependencies=[Depends(CheckUserInterfaceAuth('dataint:vecset:list'))]) async def get_dasset_vecset_list(request: Request, vecset_page_query: VecsetPageObject, query_db: Session = Depends(get_db), data_scope_sql: str = Depends(GetDataScope('SysVecset'))): try: vecset_query = VecsetQueryModel(**vecset_page_query.dict()) # 获取全量数据 vecset_query_result = VecsetService.get_vecset_list_services(query_db, vecset_query, data_scope_sql) # 分页操作 vecset_page_query_result = get_page_obj(vecset_query_result, vecset_page_query.page_num, vecset_page_query.page_size) logger.info('获取成功') return response_200(data=vecset_page_query_result, message="获取成功") except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @vecsetController.post("/vecset/add", response_model=CrudVecsetResponse, dependencies=[Depends(CheckUserInterfaceAuth('dataint:vecset:add'))]) @log_decorator(title='智能语句配置', business_type=1) async def add_dasset_vecset(request: Request, add_vecset: AddVecsetModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)): try: add_vecset.create_by = current_user.user.user_name add_vecset.update_by = current_user.user.user_name add_vecset_result = VecsetService.add_vecset_services(query_db, add_vecset) if add_vecset_result.is_success: logger.info(add_vecset_result.message) return response_200(data=add_vecset_result, message=add_vecset_result.message) else: logger.warning(add_vecset_result.message) return response_400(data="", message=add_vecset_result.message) except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @vecsetController.patch("/vecset/edit", response_model=CrudVecsetResponse, dependencies=[Depends(CheckUserInterfaceAuth('dataint:vecset:edit'))]) @log_decorator(title='智能语句配置', business_type=2) async def edit_dasset_vecset(request: Request, edit_vecset: AddVecsetModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)): try: edit_vecset.update_by = current_user.user.user_name edit_vecset.update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") edit_vecset_result = VecsetService.edit_vecset_services(query_db, edit_vecset) if edit_vecset_result.is_success: logger.info(edit_vecset_result.message) return response_200(data=edit_vecset_result, message=edit_vecset_result.message) else: logger.warning(edit_vecset_result.message) return response_400(data="", message=edit_vecset_result.message) except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @vecsetController.post("/vecset/delete", response_model=CrudVecsetResponse, dependencies=[Depends(CheckUserInterfaceAuth('dataint:vecset:delete'))]) @log_decorator(title='智能语句配置', business_type=3) async def delete_dasset_vecset(request: Request, delete_vecset: DeleteVecsetModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)): try: delete_vecset.update_by = current_user.user.user_name delete_vecset.update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") #print('ppppppppppppppppppppppppppppp',delete_vecset) delete_vecset_result = VecsetService.delete_vecset_services(query_db, delete_vecset) #print(999999999999999,delete_vecset_result,8888888888888888888) if delete_vecset_result.is_success: logger.info(delete_vecset_result.message) return response_200(data=delete_vecset_result, message=delete_vecset_result.message) else: logger.warning(delete_vecset_result.message) return response_400(data="", message=delete_vecset_result.message) except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) #@vecsetController.get("/vecset/{onum}", response_model=VecsetDetailModel, dependencies=[Depends(CheckUserInterfaceAuth('dataint:vecset:query'))]) @vecsetController.get("/vecset/{onum}", response_model=VecsetDetailModel, dependencies=[Depends(CheckUserInterfaceAuth('dataint:vecset:query'))]) async def query_detail_dasset_vecset(request: Request, onum: int, query_db: Session = Depends(get_db)): try: delete_vecset_result = VecsetService.detail_vecset_services(query_db, onum) logger.info(f'获取onum为{onum}的信息成功') return response_200(data=delete_vecset_result, message='获取成功') except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @vecsetController.post("/vecset/importData", dependencies=[Depends(CheckUserInterfaceAuth('dataint:vecset:import'))]) @log_decorator(title='智能语句配置', business_type=6) async def batch_import_dasset_vecset(request: Request, vecset_import: ImportVecsetModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)): try: batch_import_result = VecsetService.batch_import_vecset_services(query_db, vecset_import, current_user) if batch_import_result.is_success: logger.info(batch_import_result.message) return response_200(data=batch_import_result, message=batch_import_result.message) else: logger.warning(batch_import_result.message) return response_400(data="", message=batch_import_result.message) except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @vecsetController.post("/vecset/importTemplate", dependencies=[Depends(CheckUserInterfaceAuth('dataint:vecset:import'))]) async def export_dasset_vecset_template(request: Request, query_db: Session = Depends(get_db)): try: vecset_import_template_result = VecsetService.get_vecset_import_template_services() logger.info('获取成功') return streaming_response_200(data=bytes2file_response(vecset_import_template_result)) except Exception as e: logger.exception(e) return response_500(data="", message=str(e)) @vecsetController.post("/vecset/export", dependencies=[Depends(CheckUserInterfaceAuth('dataint:vecset:export'))]) @log_decorator(title='智能语句配置', business_type=5) async def export_dasset_vecset_list(request: Request, vecset_query: VecsetQueryModel, query_db: Session = Depends(get_db), data_scope_sql: str = Depends(GetDataScope('SysVecset'))): try: # 获取全量数据 vecset_query_result = VecsetService.get_vecset_list_services(query_db, vecset_query, data_scope_sql) vecset_export_result = VecsetService.export_vecset_list_services(vecset_query_result) logger.info('导出成功') return streaming_response_200(data=bytes2file_response(vecset_export_result)) except Exception as e: logger.exception(e) return response_500(data="", message=str(e))