from datetime import datetime from fastapi import APIRouter, Depends, Request, Form from pydantic_validation_decorator import ValidateFields from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.vo.metadata_config_vo import ( MetadataClasModel, MetadataClasPageQueryModel, MetadataSecModel, MetadataSecPageQueryModel, SecuBizConfigModel, SecuBizConfigPageQueryModel, SecuBizConfigQueryModel, SecuBizPermiConfigModel, SecuBizPermiConfigBatchModel, SecuBizConfigAddModel, SecuBizPermiConfigPageQueryModel, BatchBusiLabelConfigModel, BatchDataopLabelConfigModel, BatchDatatypeLabelConfigModel, BatchBusiLabelConfigPageQueryModel, BatchDataopLabelConfigModelVo, BatchDataopLabelModelPageQueryModel, BatchDatatypeLabelConfigPageQueryModel, DatasecConfigPageQueryModel, DatasecConfigModel, ) from module_admin.service.metadata_config_service import MetadataConfigService from module_admin.service.batch_label_config_service import BatchLabelConfigService from module_admin.service.datasec_config_service import DatasecConfigService from config.get_db import get_db from utils.response_util import ResponseUtil from utils.page_util import PageResponseModel from utils.log_util import logger from module_admin.service.login_service import LoginService from module_admin.entity.vo.user_vo import CurrentUserModel metadataConfigController = APIRouter(prefix='/metadataConfig') @metadataConfigController.get('/clas/list', response_model=PageResponseModel) async def get_metadata_clas_list( request: Request, query: MetadataClasPageQueryModel = Depends(MetadataClasPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), ): result = await MetadataConfigService.get_metadata_clas_list_services(query_db, query, is_page=True) logger.info('获取元数据分类列表成功') return ResponseUtil.success(model_content=result) @metadataConfigController.post('/clas') @ValidateFields(validate_model='add_metadata_clas') async def add_metadata_clas( request: Request, add_clas: MetadataClasModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): add_clas.upd_time = datetime.now() add_clas.rec_subm_prsn = current_user.user.user_name # 这里没有用户信息,若需要创建人信息可自行补充 result = await MetadataConfigService.add_metadata_clas_services(query_db, add_clas) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.put('/clas') @ValidateFields(validate_model='edit_metadata_clas') async def edit_metadata_clas( request: Request, edit_clas: MetadataClasModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): edit_clas.upd_time = datetime.now() edit_clas.rec_subm_prsn = current_user.user.user_name result = await MetadataConfigService.edit_metadata_clas_services(query_db, edit_clas) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.delete('/clas/{clas_ids}') async def delete_metadata_clas( request: Request, clas_ids: str, query_db: AsyncSession = Depends(get_db), ): result = await MetadataConfigService.delete_metadata_clas_services(query_db, clas_ids) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.get('/clas/{clas_id}', response_model=MetadataClasModel) async def get_metadata_clas_detail( request: Request, clas_id: int, query_db: AsyncSession = Depends(get_db), ): result = await MetadataConfigService.get_metadata_clas_detail_services(query_db, clas_id) logger.info(f'获取元数据分类ID={clas_id}详情成功') return ResponseUtil.success(data=result) # ------------------------ MetadataSec 控制接口 ------------------------ # @metadataConfigController.get('/sec/list', response_model=PageResponseModel) async def get_metadata_sec_list( request: Request, query: MetadataSecPageQueryModel = Depends(MetadataSecPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), ): result = await MetadataConfigService.get_metadata_sec_list_services(query_db, query, is_page=True) logger.info('获取数据安全配置列表成功') return ResponseUtil.success(model_content=result) @metadataConfigController.post('/sec') @ValidateFields(validate_model='add_metadata_sec') async def add_metadata_sec( request: Request, add_sec: MetadataSecModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): add_sec.upd_time = datetime.now() add_sec.rec_subm_prsn = current_user.user.user_name result = await MetadataConfigService.add_metadata_sec_services(query_db, add_sec) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.put('/sec') @ValidateFields(validate_model='edit_metadata_sec') async def edit_metadata_sec( request: Request, edit_sec: MetadataSecModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): edit_sec.upd_time = datetime.now() edit_sec.rec_subm_prsn = current_user.user.user_name result = await MetadataConfigService.edit_metadata_sec_services(query_db, edit_sec) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.delete('/sec/{sec_ids}') async def delete_metadata_sec( request: Request, sec_ids: str, query_db: AsyncSession = Depends(get_db), ): result = await MetadataConfigService.delete_metadata_sec_services(query_db, sec_ids) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.get('/sec/{sec_id}', response_model=MetadataSecModel) async def get_metadata_sec_detail( request: Request, sec_id: str, query_db: AsyncSession = Depends(get_db), ): result = await MetadataConfigService.get_metadata_sec_detail_services(query_db, sec_id) logger.info(f'获取数据安全配置 onum={sec_id} 详情成功') return ResponseUtil.success(data=result) # ---------- t_secu_biz_config 接口(改造后) ---------- @metadataConfigController.get('/bizConfig/list', response_model=PageResponseModel) async def get_biz_config_list( request: Request, query: SecuBizConfigQueryModel = Depends(SecuBizConfigPageQueryModel.as_query), db: AsyncSession = Depends(get_db), ): result = await MetadataConfigService.get_biz_config_list_services(db, query, is_page=True) return ResponseUtil.success(model_content=result) @metadataConfigController.get('/bizConfig/listall') async def get_biz_config_list_all( request: Request, query: SecuBizConfigQueryModel = Depends(SecuBizConfigPageQueryModel.as_query), db: AsyncSession = Depends(get_db), ): result = await MetadataConfigService.get_biz_config_list_services(db, query, is_page=False) return ResponseUtil.success(data=result) @metadataConfigController.post('/bizConfig/add') @ValidateFields(['bizModule', 'configType', 'data_sec_lvl', 'applyType']) async def add_biz_config( item: SecuBizConfigAddModel, db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): item.create_by = current_user.user.user_name item.create_time = datetime.now() result = await MetadataConfigService.add_biz_config_services(db, item) return ResponseUtil.success(msg=result.message) @metadataConfigController.post('/bizConfig/edit') @ValidateFields(['onum']) async def edit_biz_config( item: SecuBizConfigAddModel, db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): item.update_by = current_user.user.user_name item.update_time = datetime.now() result = await MetadataConfigService.edit_biz_config_services(db, item) return ResponseUtil.success(msg=result.message) @metadataConfigController.delete('/bizConfig/delete') async def delete_biz_config(onums: str, db: AsyncSession = Depends(get_db)): result = await MetadataConfigService.delete_biz_config_services(db, onums) return ResponseUtil.success(msg=result.message) @metadataConfigController.get('/bizConfig/detail') async def get_biz_config_detail(onum: str, db: AsyncSession = Depends(get_db)): data = await MetadataConfigService.get_biz_config_detail_services(db, onum) return ResponseUtil.success(data=data) # ---------- t_secu_biz_permi_config 接口 ---------- @metadataConfigController.get('/bizPermiConfig/list', response_model=PageResponseModel) async def get_biz_permi_config_list( request: Request, query: SecuBizConfigModel = Depends(SecuBizPermiConfigPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), ): result = await MetadataConfigService.get_biz_permi_config_list_services(query_db, query, is_page=True) return ResponseUtil.success(model_content=result) @metadataConfigController.post('/bizPermiConfig/add') async def add_biz_permi_config( item: SecuBizPermiConfigBatchModel, db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): create_by = current_user.user.user_name create_time = datetime.now() result = await MetadataConfigService.add_biz_permi_config_services(db, item, create_by, create_time) return ResponseUtil.success(msg=result.message) @metadataConfigController.post('/bizPermiConfig/edit') @ValidateFields(['onum']) async def edit_biz_permi_config( item: SecuBizPermiConfigModel, db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): item.update_by = current_user.user.user_name item.update_time = datetime.now() result = await MetadataConfigService.edit_biz_permi_config_services(db, item) return ResponseUtil.success(msg=result.message) @metadataConfigController.delete('/bizPermiConfig/delete') async def delete_biz_permi_config(onums: str, db: AsyncSession = Depends(get_db)): result = await MetadataConfigService.delete_biz_permi_config_services(db, onums) return ResponseUtil.success(msg=result.message) @metadataConfigController.get('/bizPermiConfig/detail') async def get_biz_permi_config_detail(onum: str, db: AsyncSession = Depends(get_db)): data = await MetadataConfigService.get_biz_permi_config_detail_services(db, onum) return ResponseUtil.success(data=data) @metadataConfigController.get('/bizConfigRela/list/{bizOnum}') async def get_biz_rela_config_list( bizOnum: int, query_db: AsyncSession = Depends(get_db), ): ai_chat_list_result = await MetadataConfigService.get_biz_config_rela_list_services(query_db, bizOnum) logger.info('获取成功') return ResponseUtil.success(data=ai_chat_list_result) @metadataConfigController.get('/busiLabel/list', response_model=PageResponseModel) async def get_busi_label_list( request: Request, query: BatchBusiLabelConfigModel = Depends(BatchBusiLabelConfigPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), ): result = await BatchLabelConfigService.get_busi_list_services(query_db, query, False) logger.info('获取业务标签配置列表成功') return ResponseUtil.success(data=result) @metadataConfigController.post('/busiLabel') @ValidateFields(validate_model='add_busi_label') async def add_busi_label( request: Request, model: BatchBusiLabelConfigModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): model.upd_by = current_user.user.user_name model.upd_time = datetime.now() result = await BatchLabelConfigService.add_busi_services(query_db, model) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.put('/busiLabel') @ValidateFields(validate_model='edit_busi_label') async def edit_busi_label( request: Request, model: BatchBusiLabelConfigModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): model.upd_by = current_user.user.user_name model.upd_time = datetime.now() result = await BatchLabelConfigService.edit_busi_services(query_db, model) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.delete('/busiLabel/{onum_list}') async def delete_busi_label( request: Request, onum_list: str, query_db: AsyncSession = Depends(get_db), ): result = await BatchLabelConfigService.delete_busi_services(query_db, onum_list) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.get('/busiLabel/{onum}', response_model=BatchBusiLabelConfigModel) async def get_busi_label_detail( request: Request, onum: str, query_db: AsyncSession = Depends(get_db), ): result = await BatchLabelConfigService.get_busi_detail_services(query_db, onum) logger.info(f'获取业务标签配置 onum={onum} 详情成功') return ResponseUtil.success(data=result) @metadataConfigController.get('/dataopLabel/list', response_model=PageResponseModel) async def get_dataop_label_list( request: Request, query: BatchDataopLabelConfigModel=Depends(BatchDataopLabelModelPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), ): result = await BatchLabelConfigService.get_dataop_vo_services(query_db, query, is_page=True) logger.info('获取操作标签配置列表成功') return ResponseUtil.success(data=result) @metadataConfigController.post('/dataopLabel') @ValidateFields(validate_model='add_dataop_label') async def add_dataop_label( request: Request, model: BatchDataopLabelConfigModelVo, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): model.upd_by = current_user.user.user_name model.upd_time = datetime.now() result = await BatchLabelConfigService.add_dataop_services(query_db, model) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.put('/dataopLabel') @ValidateFields(validate_model='edit_dataop_label') async def edit_dataop_label( request: Request, model: BatchDataopLabelConfigModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): model.upd_by = current_user.user.user_name model.upd_time = datetime.now() result = await BatchLabelConfigService.edit_dataop_services(query_db, model) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.delete('/dataopLabel/{onum_list}') async def delete_dataop_label( request: Request, onum_list: str, query_db: AsyncSession = Depends(get_db), ): result = await BatchLabelConfigService.delete_dataop_services(query_db, onum_list) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.get('/dataopLabel/{onum}', response_model=BatchDataopLabelConfigModel) async def get_dataop_label_detail( request: Request, onum: str, query_db: AsyncSession = Depends(get_db), ): result = await BatchLabelConfigService.get_dataop_detail_services(query_db, onum) logger.info(f'获取操作标签配置 onum={onum} 详情成功') return ResponseUtil.success(data=result) @metadataConfigController.get('/datatypeLabel/list', response_model=PageResponseModel) async def get_datatype_label_list( request: Request, query: BatchDatatypeLabelConfigModel = Depends(BatchDatatypeLabelConfigPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), ): result = await BatchLabelConfigService.get_datatype_list_services(query_db, query,False) logger.info('获取数据类型标签配置列表成功') return ResponseUtil.success(data=result) @metadataConfigController.post('/datatypeLabel') @ValidateFields(validate_model='add_datatype_label') async def add_datatype_label( request: Request, model: BatchDatatypeLabelConfigModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): model.upd_by = current_user.user.user_name model.upd_time = datetime.now() result = await BatchLabelConfigService.add_datatype_services(query_db, model) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.put('/datatypeLabel') @ValidateFields(validate_model='edit_datatype_label') async def edit_datatype_label( request: Request, model: BatchDatatypeLabelConfigModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): model.upd_by = current_user.user.user_name model.upd_time = datetime.now() result = await BatchLabelConfigService.edit_datatype_services(query_db, model) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.delete('/datatypeLabel/{onum_list}') async def delete_datatype_label( request: Request, onum_list: str, query_db: AsyncSession = Depends(get_db), ): result = await BatchLabelConfigService.delete_datatype_services(query_db, onum_list) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.get('/datatypeLabel/{onum}', response_model=BatchDatatypeLabelConfigModel) async def get_datatype_label_detail( request: Request, onum: str, query_db: AsyncSession = Depends(get_db), ): result = await BatchLabelConfigService.get_datatype_detail_services(query_db, onum) logger.info(f'获取数据类型标签配置 onum={onum} 详情成功') return ResponseUtil.success(data=result) @metadataConfigController.get("/datasecConfig/list", response_model=PageResponseModel) async def get_datasec_config_list( request: Request, query: DatasecConfigModel = Depends(DatasecConfigPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), ): result = await DatasecConfigService.get_datasec_list_services(query_db, query, False) logger.info("获取数据安全参数配置列表成功") return ResponseUtil.success(data=result) @metadataConfigController.post("/datasecConfig") @ValidateFields(validate_model="add_datasec_config") async def add_datasec_config( request: Request, model: DatasecConfigModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): model.create_by = current_user.user.user_name model.create_time = datetime.now() result = await DatasecConfigService.add_datasec_services(query_db, model) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.put("/datasecConfig") @ValidateFields(validate_model="edit_datasec_config") async def edit_datasec_config( request: Request, model: DatasecConfigModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): model.update_by = current_user.user.user_name model.update_time = datetime.now() result = await DatasecConfigService.edit_datasec_services(query_db, model) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.delete("/datasecConfig/{onum_list}") async def delete_datasec_config( request: Request, onum_list: str, query_db: AsyncSession = Depends(get_db), ): result = await DatasecConfigService.delete_datasec_services(query_db, onum_list) logger.info(result.message) return ResponseUtil.success(msg=result.message) @metadataConfigController.get("/datasecConfig/{onum}", response_model=DatasecConfigModel) async def get_datasec_config_detail( request: Request, onum: int, query_db: AsyncSession = Depends(get_db), ): result = await DatasecConfigService.get_datasec_detail_services(query_db, onum) logger.info(f"获取数据安全参数配置 onum={onum} 详情成功") return ResponseUtil.success(data=result)