You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

119 lines
6.3 KiB

1 month ago
from fastapi import APIRouter, Request
from fastapi import Depends
from config.get_db import get_db
from module_admin.service.login_service import get_current_user, CurrentUserInfoServiceResponse
from module_admin.service.newtest_service import *
from module_admin.entity.vo.newtest_vo import *
from utils.response_util import *
from utils.log_util import *
from utils.page_util import get_page_obj
from utils.common_util import bytes2file_response
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth
from module_admin.annotation.log_annotation import log_decorator
newtestController = APIRouter(dependencies=[Depends(get_current_user)])
@newtestController.post("/newtest/forSelectOption", response_model=NewtestSelectOptionResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('common'))])
async def get_system_newtest_select(request: Request, query_db: Session = Depends(get_db)):
try:
role_query_result = NewtestService.get_newtest_select_option_services(query_db)
logger.info('获取成功')
return response_200(data=role_query_result, message="获取成功")
except Exception as e:
logger.exception(e)
return response_500(data="", message=str(e))
@newtestController.post("/newtest/get", response_model=NewtestPageObjectResponse, dependencies=[Depends(CheckUserInterfaceAuth('system:newtest:list'))])
async def get_system_newtest_list(request: Request, newtest_page_query: NewtestPageObject, query_db: Session = Depends(get_db)):
try:
newtest_query = NewtestModel(**newtest_page_query.dict())
# 获取全量数据
newtest_query_result = NewtestService.get_newtest_list_services(query_db, newtest_query)
# 分页操作
newtest_page_query_result = get_page_obj(newtest_query_result, newtest_page_query.page_num, newtest_page_query.page_size)
logger.info('获取成功')
return response_200(data=newtest_page_query_result, message="获取成功")
except Exception as e:
logger.exception(e)
return response_500(data="", message=str(e))
@newtestController.post("/newtest/add", response_model=CrudNewtestResponse, dependencies=[Depends(CheckUserInterfaceAuth('system:newtest:add'))])
@log_decorator(title='测试管理', business_type=1)
async def add_system_newtest(request: Request, add_newtest: NewtestModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)):
try:
add_newtest.create_by = current_user.user.user_name
add_newtest.update_by = current_user.user.user_name
add_newtest_result = NewtestService.add_newtest_services(query_db, add_newtest)
if add_newtest_result.is_success:
logger.info(add_newtest_result.message)
return response_200(data=add_newtest_result, message=add_newtest_result.message)
else:
logger.warning(add_newtest_result.message)
return response_400(data="", message=add_newtest_result.message)
except Exception as e:
logger.exception(e)
return response_500(data="", message=str(e))
@newtestController.patch("/newtest/edit", response_model=CrudNewtestResponse, dependencies=[Depends(CheckUserInterfaceAuth('system:newtest:edit'))])
@log_decorator(title='测试管理', business_type=2)
async def edit_system_newtest(request: Request, edit_newtest: NewtestModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)):
try:
edit_newtest.update_by = current_user.user.user_name
edit_newtest.update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
edit_newtest_result = NewtestService.edit_newtest_services(query_db, edit_newtest)
if edit_newtest_result.is_success:
logger.info(edit_newtest_result.message)
return response_200(data=edit_newtest_result, message=edit_newtest_result.message)
else:
logger.warning(edit_newtest_result.message)
return response_400(data="", message=edit_newtest_result.message)
except Exception as e:
logger.exception(e)
return response_500(data="", message=str(e))
@newtestController.post("/newtest/delete", response_model=CrudNewtestResponse, dependencies=[Depends(CheckUserInterfaceAuth('system:newtest:remove'))])
@log_decorator(title='测试管理', business_type=3)
async def delete_system_newtest(request: Request, delete_newtest: DeleteNewtestModel, query_db: Session = Depends(get_db)):
try:
delete_newtest_result = NewtestService.delete_newtest_services(query_db, delete_newtest)
if delete_newtest_result.is_success:
logger.info(delete_newtest_result.message)
return response_200(data=delete_newtest_result, message=delete_newtest_result.message)
else:
logger.warning(delete_newtest_result.message)
return response_400(data="", message=delete_newtest_result.message)
except Exception as e:
logger.exception(e)
return response_500(data="", message=str(e))
@newtestController.get("/newtest/{newtest_id}", response_model=NewtestModel, dependencies=[Depends(CheckUserInterfaceAuth('system:newtest:query'))])
async def query_detail_system_newtest(request: Request, newtest_id: int, query_db: Session = Depends(get_db)):
try:
detail_newtest_result = NewtestService.detail_newtest_services(query_db, newtest_id)
logger.info(f'获取newtest_id为{newtest_id}的信息成功')
return response_200(data=detail_newtest_result, message='获取成功')
except Exception as e:
logger.exception(e)
return response_500(data="", message=str(e))
@newtestController.post("/newtest/export", dependencies=[Depends(CheckUserInterfaceAuth('system:newtest:export'))])
@log_decorator(title='测试管理', business_type=5)
async def export_system_newtest_list(request: Request, newtest_query: NewtestModel, query_db: Session = Depends(get_db)):
try:
# 获取全量数据
newtest_query_result = NewtestService.get_newtest_list_services(query_db, newtest_query)
newtest_export_result = NewtestService.export_newtest_list_services(newtest_query_result)
logger.info('导出成功')
return streaming_response_200(data=bytes2file_response(newtest_export_result))
except Exception as e:
logger.exception(e)
return response_500(data="", message=str(e))