You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
193 lines
11 KiB
193 lines
11 KiB
1 month ago
|
from fastapi import APIRouter, Request
|
||
|
from fastapi import Depends
|
||
|
from config.get_db import get_db
|
||
|
from module_admin.service.login_service import get_current_user, CurrentUserInfoServiceResponse
|
||
|
from module_admin.service.ndstand_service import *
|
||
|
from module_admin.service.fddict_service import FddictService
|
||
|
from module_admin.entity.vo.ndstand_vo import *
|
||
|
from utils.response_util import *
|
||
|
from utils.log_util import *
|
||
|
from utils.page_util import get_page_obj
|
||
|
from utils.common_util import bytes2file_response
|
||
|
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth
|
||
|
from module_admin.annotation.log_annotation import log_decorator
|
||
|
|
||
|
|
||
|
ndstandController = APIRouter(dependencies=[Depends(get_current_user)])
|
||
|
|
||
|
|
||
|
@ndstandController.post("/ndstand/forSelectOption", response_model=NdstandSelectOptionResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('common'))])
|
||
|
async def get_dasset_ndstand_select(request: Request, query_db: Session = Depends(get_db)):
|
||
|
try:
|
||
|
role_query_result = NdstandService.get_ndstand_select_option_services(query_db)
|
||
|
logger.info('获取成功')
|
||
|
return response_200(data=role_query_result, message="获取成功")
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.post("/ndstand/get", response_model=NdstandPageObjectResponse, dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:list'))])
|
||
|
async def get_dasset_ndstand_list(request: Request, ndstand_page_query: NdstandPageObject, query_db: Session = Depends(get_db)):
|
||
|
try:
|
||
|
ndstand_query = NdstandModel(**ndstand_page_query.dict())
|
||
|
#print(66666,ndstand_query,77777)
|
||
|
# 获取全量数据
|
||
|
ndstand_query_result = NdstandService.get_ndstand_list_services(query_db, ndstand_query)
|
||
|
#print(888,ndstand_query_result,999)
|
||
|
# 分页操作
|
||
|
ndstand_page_query_result = get_page_obj(ndstand_query_result, ndstand_page_query.page_num, ndstand_page_query.page_size)
|
||
|
#print(999,ndstand_page_query_result,101010)
|
||
|
logger.info('获取成功')
|
||
|
return response_200(data=ndstand_page_query_result, message="获取成功")
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.post("/ndstand/add", response_model=CrudNdstandResponse, dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:add'))])
|
||
|
@log_decorator(title='数据字典管理', business_type=1)
|
||
|
async def add_dasset_ndstand(request: Request, add_ndstand: NdstandModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)):
|
||
|
try:
|
||
|
add_ndstand.create_by = current_user.user.user_name
|
||
|
add_ndstand.update_by = current_user.user.user_name
|
||
|
add_ndstand_result = NdstandService.add_ndstand_services(query_db, add_ndstand)
|
||
|
if add_ndstand_result.is_success:
|
||
|
logger.info(add_ndstand_result.message)
|
||
|
return response_200(data=add_ndstand_result, message=add_ndstand_result.message)
|
||
|
else:
|
||
|
logger.warning(add_ndstand_result.message)
|
||
|
return response_400(data="", message=add_ndstand_result.message)
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.patch("/ndstand/edit", response_model=CrudNdstandResponse, dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:edit'))])
|
||
|
@log_decorator(title='数据字典管理', business_type=2)
|
||
|
async def edit_dasset_ndstand(request: Request, edit_ndstand: NdstandModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)):
|
||
|
try:
|
||
|
edit_ndstand.update_by = current_user.user.user_name
|
||
|
edit_ndstand.update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
edit_ndstand_result = NdstandService.edit_ndstand_services(query_db, edit_ndstand)
|
||
|
if edit_ndstand_result.is_success:
|
||
|
logger.info(edit_ndstand_result.message)
|
||
|
return response_200(data=edit_ndstand_result, message=edit_ndstand_result.message)
|
||
|
else:
|
||
|
logger.warning(edit_ndstand_result.message)
|
||
|
return response_400(data="", message=edit_ndstand_result.message)
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.post("/ndstand/delete", response_model=CrudNdstandResponse, dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:remove'))])
|
||
|
@log_decorator(title='数据字典管理', business_type=3)
|
||
|
async def delete_dasset_ndstand(request: Request, delete_ndstand: DeleteNdstandModel, query_db: Session = Depends(get_db)):
|
||
|
try:
|
||
|
delete_ndstand_result = NdstandService.delete_ndstand_services(query_db, delete_ndstand)
|
||
|
if delete_ndstand_result.is_success:
|
||
|
logger.info(delete_ndstand_result.message)
|
||
|
return response_200(data=delete_ndstand_result, message=delete_ndstand_result.message)
|
||
|
else:
|
||
|
logger.warning(delete_ndstand_result.message)
|
||
|
return response_400(data="", message=delete_ndstand_result.message)
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.get("/ndstand/{onum}", response_model=NdstandModel, dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:query'))])
|
||
|
async def query_detail_dasset_ndstand(request: Request, onum: int, query_db: Session = Depends(get_db)):
|
||
|
try:
|
||
|
detail_ndstand_result = NdstandService.detail_ndstand_services(query_db, onum)
|
||
|
logger.info(f'获取onum为{onum}的信息成功')
|
||
|
return response_200(data=detail_ndstand_result, message='获取成功')
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.post("/ndstand/export", dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:export'))])
|
||
|
@log_decorator(title='数据字典管理', business_type=5)
|
||
|
async def export_dasset_ndstand_list(request: Request, ndstand_query: NdstandModel, query_db: Session = Depends(get_db)):
|
||
|
try:
|
||
|
# 获取全量数据
|
||
|
ndstand_query_result = NdstandService.get_ndstand_list_services(query_db, ndstand_query)
|
||
|
ndstand_export_result = NdstandService.export_ndstand_list_services(ndstand_query_result)
|
||
|
logger.info('导出成功')
|
||
|
return streaming_response_200(data=bytes2file_response(ndstand_export_result))
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.post("/ndstand/importData", dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:import'))])
|
||
|
@log_decorator(title='数据标准导入管理', business_type=6)
|
||
|
async def batch_import_dasset_ndstand(request: Request, ndstand_import: ImportNdstandModel, query_db: Session = Depends(get_db), current_user: CurrentUserInfoServiceResponse = Depends(get_current_user)):
|
||
|
try:
|
||
|
batch_import_result = NdstandService.batch_import_ndstand_services(query_db, ndstand_import, current_user)
|
||
|
if batch_import_result.is_success:
|
||
|
logger.info(batch_import_result.message)
|
||
|
return response_200(data=batch_import_result, message=batch_import_result.message)
|
||
|
else:
|
||
|
logger.warning(batch_import_result.message)
|
||
|
return response_400(data="", message=batch_import_result.message)
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.post("/ndstand/importTemplate", dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:import'))])
|
||
|
async def export_dasset_ndstand_template(request: Request, query_db: Session = Depends(get_db)):
|
||
|
try:
|
||
|
ndstand_import_template_result = NdstandService.get_ndstand_import_template_services()
|
||
|
logger.info('获取成功')
|
||
|
return streaming_response_200(data=bytes2file_response(ndstand_import_template_result))
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.post("/ndstand/all", dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:list'))])
|
||
|
async def get_dasset_all_ndstand(request: Request, ndstand_query: NdstandQueryModel, query_db: Session = Depends(get_db)):
|
||
|
try:
|
||
|
ndstand_query_result = NdstandService.get_all_ndstand_services(query_db)
|
||
|
logger.info('获取成功')
|
||
|
return response_200(data=ndstand_query_result, message="获取成功")
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.post("/nfddictData/get", response_model=FddictPageObjectResponse, dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:list'))])
|
||
|
async def get_dasset_nfddict_data_list(request: Request, nfddict_data_page_query: FddictPageObject, query_db: Session = Depends(get_db)):
|
||
|
try:
|
||
|
nfddict_data_query = FddictModel(**nfddict_data_page_query.dict())
|
||
|
print(888,nfddict_data_query,888888)
|
||
|
# 获取全量数据
|
||
|
nfddict_data_query_result = FddictService.get_fddict_list_services(query_db, nfddict_data_query)
|
||
|
#print(99999,nfddict_data_query_result)
|
||
|
#分页操作,由于上述:nfddict_data_query = FddictModel(**nfddict_data_page_query.dict()) 该逻辑报错暂时无法解决,先这样处理
|
||
|
page_num = nfddict_data_page_query.dict().get('page_num')
|
||
|
page_size = nfddict_data_page_query.dict().get('page_size')
|
||
|
# 分页操作
|
||
|
#nfddict_data_page_query_result = get_page_obj(nfddict_data_query_result, nfddict_data_query.page_num, nfddict_data_query.page_size)
|
||
|
nfddict_data_page_query_result = get_page_obj(nfddict_data_query_result, page_num, page_size)
|
||
|
logger.info('获取成功')
|
||
|
return response_200(data=nfddict_data_page_query_result, message="获取成功")
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|
||
|
|
||
|
|
||
|
@ndstandController.get("/nfddictData/query/{fddict_col_no}", dependencies=[Depends(CheckUserInterfaceAuth('dasset:ndstand:list'))])
|
||
|
async def query_dasset_nfddict_data_list(request: Request, fddict_col_no: str, query_db: Session = Depends(get_db)):
|
||
|
try:
|
||
|
# 获取全量数据
|
||
|
nfddict_data_query_result = await FddictService.query_fddict_list_from_cache_services(request.app.state.redis, fddict_col_no)
|
||
|
logger.info('获取成功')
|
||
|
return response_200(data=nfddict_data_query_result, message="获取成功")
|
||
|
except Exception as e:
|
||
|
logger.exception(e)
|
||
|
return response_500(data="", message=str(e))
|