diff --git a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py index 6f7ff72..79cc857 100644 --- a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py +++ b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py @@ -33,6 +33,8 @@ from utils.page_util import PageResponseModel from utils.log_util import logger from module_admin.service.login_service import LoginService from module_admin.entity.vo.user_vo import CurrentUserModel +from module_admin.entity.vo.metatask_vo import DeleteMetataskModel,OperaMetataskModel +from module_admin.entity.vo.dataSource_vo import ProcessDefinition,ParmScheduleVo,ProcessInstancePage metadataConfigController = APIRouter(prefix='/metadataConfig') @@ -527,16 +529,22 @@ async def edit_datasec_config( return ResponseUtil.success(msg=result.message) -@metadataConfigController.delete("/datasecConfig/{onum_list}") -async def delete_datasec_config( - request: Request, - onum_list: str, - query_db: AsyncSession = Depends(get_db), -): - result = await DatasecConfigService.delete_datasec_services(query_db, onum_list) - logger.info(result.message) - return ResponseUtil.success(msg=result.message) +# @metadataConfigController.delete("/datasecConfig/{onum_list}") +# async def delete_datasec_config( +# request: Request, +# onum_list: str, +# query_db: AsyncSession = Depends(get_db), +# ): +# result = await DatasecConfigService.delete_datasec_services(query_db, onum_list) +# logger.info(result.message) +# return ResponseUtil.success(msg=result.message) +@metadataConfigController.delete('/datasecConfig/{metatask_ids}/{ds_ids}') +async def delete_system_config(request: Request, metatask_ids: str,ds_ids:str, query_db: AsyncSession = Depends(get_db),current_user: CurrentUserModel = Depends(LoginService.get_current_user)): + delete_config = DeleteMetataskModel(metatask_ids=metatask_ids,ds_ids=ds_ids) + delete_config_result = await DatasecConfigService.delete_datasec_services(request, query_db, delete_config,current_user) + logger.info(delete_config_result.message) + return ResponseUtil.success(msg=delete_config_result.message) @metadataConfigController.get("/datasecConfig/{onum}", response_model=DatasecConfigModel) async def get_datasec_config_detail( @@ -547,3 +555,51 @@ async def get_datasec_config_detail( result = await DatasecConfigService.get_datasec_detail_services(query_db, onum) logger.info(f"获取数据安全参数配置 onum={onum} 详情成功") return ResponseUtil.success(data=result) +@metadataConfigController.put('/upOrdown') +async def up_or_down_meta_metatask( + request: Request, + DownOrUpdate: OperaMetataskModel, + query_db: AsyncSession = Depends(get_db), + current_user: CurrentUserModel = Depends(LoginService.get_current_user), +): + + edit_config_result = await DatasecConfigService.up_or_down_metatask_services(request, query_db,current_user, DownOrUpdate.id,DownOrUpdate.type) + logger.info(edit_config_result.message) + return ResponseUtil.success(msg=edit_config_result.message) +# 元数据任务调度 +@metadataConfigController.put('/DS') +async def DS_meta_metatask( + request: Request, + process: ParmScheduleVo, + query_db: AsyncSession = Depends(get_db), + current_user: CurrentUserModel = Depends(LoginService.get_current_user) +): + + edit_config_result = await DatasecConfigService.ds_metatask_services(request, query_db, process,current_user) + + return ResponseUtil.success(msg=edit_config_result) + +@metadataConfigController.get('/DS/{id}') + +async def DS_meta_metatask_detail( + request: Request, + id: int, + query_db: AsyncSession = Depends(get_db), + current_user: CurrentUserModel = Depends(LoginService.get_current_user) +): + process=ParmScheduleVo() + process.processDefinitionCode=id + edit_config_result = await DatasecConfigService.ds_metatask_detail(request, query_db, process,current_user) + + return ResponseUtil.success(data=edit_config_result) +@metadataConfigController.delete('/DS') +async def DS_meta_metatask( + request: Request, + process: ParmScheduleVo, + query_db: AsyncSession = Depends(get_db), + current_user: CurrentUserModel = Depends(LoginService.get_current_user) +): + + edit_config_result = await DatasecConfigService.ds_metatask_delete(request, query_db, process,current_user) + + return ResponseUtil.success(msg=edit_config_result) \ No newline at end of file diff --git a/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py b/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py index 3a52ad8..1f346c9 100644 --- a/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py +++ b/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py @@ -58,12 +58,22 @@ class DatasecConfigDao: .values(**update_dict) ) + # @classmethod + # async def delete(cls, db: AsyncSession, onum_list: List[int]): + # """ + # 批量删除任务配置 + # """ + # await db.execute(delete(DatasecConfig).where(DatasecConfig.onum.in_(onum_list))) + @classmethod - async def delete(cls, db: AsyncSession, onum_list: List[int]): + async def delete(cls, db: AsyncSession, metatask: DatasecConfig): """ - 批量删除任务配置 + 删除参数配置数据库操作 + :param db: orm对象 + :param config: 参数配置对象 + :return: """ - await db.execute(delete(DatasecConfig).where(DatasecConfig.onum.in_(onum_list))) + await db.execute(delete(DatasecConfig).where(DatasecConfig.onum.in_([metatask.onum]))) @classmethod async def check_name_or_param_exist(cls, db: AsyncSession, metatask_name: str, metatask_param: str, exclude_onum: int = None): """ diff --git a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py index 333b743..f1e5499 100644 --- a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py +++ b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py @@ -328,7 +328,7 @@ class DatasecConfigModel(BaseModel): update_by: Optional[str] = Field(default=None, description="更新者") update_time: Optional[datetime] = Field(default=None, description="更新时间") metatask_param: Optional[str] = Field(default=None, description="参数") - status: Optional[str] = Field(default="N", description="状态") + status: Optional[str] = Field(default=None, description="状态") ds_time: Optional[datetime] = Field(default=None, description="调度时间") ds_ids: Optional[str] = Field(default=None, description="任务ID") schId: Optional[str] = Field(default=None, description="调度id") diff --git a/vue-fastapi-backend/module_admin/service/datasec_config_service.py b/vue-fastapi-backend/module_admin/service/datasec_config_service.py index 5db5010..3909553 100644 --- a/vue-fastapi-backend/module_admin/service/datasec_config_service.py +++ b/vue-fastapi-backend/module_admin/service/datasec_config_service.py @@ -9,11 +9,13 @@ from module_admin.service.metaprocessconfig_service import MetaprocessconfigServ import requests import json import re +from module_admin.entity.vo.dataSource_vo import ProcessDefinition,ParmScheduleVo,ProcessInstancePage,ParmSchedule from config.enums import RedisInitKeyConfig from fastapi import Request,Depends from config.env import AppConfig from module_admin.entity.vo.user_vo import CurrentUserModel - +from module_admin.entity.vo.metatask_vo import DeleteMetataskModel +from datetime import datetime class DatasecConfigService: """ 数据安全参数配置表 Service 层 @@ -211,23 +213,56 @@ class DatasecConfigService: message += page_object.metatask_name + "-数据安全_批次标签修改失败" return message + # @classmethod + # async def delete_datasec_services(cls, db: AsyncSession, onum_list: str): + # """ + # 批量删除配置 + # """ + # id_list = [int(x.strip()) for x in onum_list.split(",") if x.strip().isdigit()] + # if not id_list: + # raise ServiceException(message="无效的编号列表") + + # try: + # await DatasecConfigDao.delete(db, id_list) + # await db.commit() + # return CrudResponseModel(is_success=True, message="删除成功") + # except Exception as e: + # await db.rollback() + # raise e @classmethod - async def delete_datasec_services(cls, db: AsyncSession, onum_list: str): + async def delete_datasec_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel,current_user: CurrentUserModel): """ - 批量删除配置 + 删除参数配置信息service + :param request: Request对象 + :param query_db: orm对象 + :param page_object: 删除参数配置对象 + :return: 删除参数配置校验结果 """ - id_list = [int(x.strip()) for x in onum_list.split(",") if x.strip().isdigit()] - if not id_list: - raise ServiceException(message="无效的编号列表") - - try: - await DatasecConfigDao.delete(db, id_list) - await db.commit() - return CrudResponseModel(is_success=True, message="删除成功") - except Exception as e: - await db.rollback() - raise e + + if page_object.metatask_ids and page_object.ds_ids: + metatask_id_list = page_object.metatask_ids.split(',') + try: + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + # 查询接口 + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete' + form_data={'codes':page_object.ds_ids} + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + response = requests.post(url, headers=headers, data=form_data, verify=False) + text= response.text + responsJson = json.loads(text) + if responsJson['success'] is True: + for metatask_id in metatask_id_list: + await DatasecConfigDao.delete(query_db, DatasecConfigModel(onum=int(metatask_id))) + await query_db.commit() + return CrudResponseModel(is_success=True, message='删除成功') + else : + raise ServiceException(message='ds删除失败') + except Exception as e: + await query_db.rollback() + raise e + else: + raise ServiceException(message='传入参数配置id为空') @classmethod async def get_datasec_detail_services(cls, db: AsyncSession, onum: int): """ @@ -237,3 +272,178 @@ class DatasecConfigService: if result: return DatasecConfigModel(**CamelCaseUtil.transform_result(result)) return DatasecConfigModel(**dict()) + @classmethod + async def up_or_down_metatask_services( + cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str + ): + metatask_info = await cls.get_datasec_detail_services(query_db, id) + metatask_info.update_by = current_user.user.user_name + metatask_info.update_time = datetime.now() + type_str: str + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + dsids=metatask_info.ds_ids.split(",") + result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)] + message='' + # 查询接口 + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + for config in result_list: + metaprocessconfig_dict = { + 'name': metatask_info.metatask_name, + 'releaseState': type + } + form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} + response = requests.post(f"{url}/{config['dsid']}/release", headers=headers, data=form_data, verify=False) + text= response.text + responsJson = json.loads(text) + if responsJson['success'] is True: + message='成功!' + else: + raise ServiceException(message='失败'+responsJson['msg']) + if type == 'OFFLINE': + # 下线 + type_str = '下线' + metatask_info.status = 'OFFLINE' + else: + # 上线 + type_str = '上线' + metatask_info.status = 'ONLINE' + + edit_metatask = metatask_info.model_dump(exclude_unset=True) + try: + await DatasecConfigDao.edit(query_db,metatask_info.onum, edit_metatask) + await query_db.commit() + return CrudResponseModel(is_success=True, message=message) + except Exception as e: + await query_db.rollback() + raise e + else: + raise ServiceException(message='更新失败') + @classmethod + async def datasec_detail_services(cls, query_db: AsyncSession, metatask_id: int): + """ + 获取参数配置详细信息service + + :param query_db: orm对象 + :param metatask_id: 参数配置id + :return: 参数配置id对应的信息 + """ + metatask = await DatasecConfigDao.get_detail_by_id(query_db, metatask_id=metatask_id) + if metatask: + result = DatasecConfigModel(**CamelCaseUtil.transform_result(metatask)) + else: + result = DatasecConfigModel(**dict()) + return result + @classmethod + async def ds_metatask_services( + cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel + ): + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + # 先查询是否建立定时任务 + getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + getdsresponse = requests.get(getdsurl, headers=headers, verify=False) + getdstext= getdsresponse.text + responsJson = json.loads(getdstext) + if responsJson['msg'] == 'success': + if responsJson['data']['total']>0: + # getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) + getds_json_list = responsJson['data']['totalList'] + for item in getds_json_list: + if item['releaseState']=='ONLINE': + # 先下线在删除 + offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline" + offresponse = requests.post(offdsurl, headers=headers, verify=False) + # 删除对应的调度 + deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}" + delresponse = requests.delete(deldsurl, headers=headers, verify=False) + + parm =ParmSchedule() + parm.failureStrategy='CONTINUE' + parm.warningType='NONE' + parm.warningGroupId=process.warningGroupId + parm.workerGroup=process.workerGroup + parm.processDefinitionCode =process.processDefinitionCode + parm.environmentCode=process.environmentCode + parm.processInstancePriority='MEDIUM' + parm.schedule = ( + '{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') + + '", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') + + '", "crontab":"' + process.crontab + + '", "timezoneId":"Asia/Shanghai"}') + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules' + # form_data = {key: str(value) for key, value in process.__dict__.items()} + form_data = {key: value for key, value in parm.__dict__.items()} + response = requests.post(url, headers=headers, data=form_data, verify=False) + text= response.text + responsJson = json.loads(text) + if responsJson['msg'] == 'success': + scheduleId= responsJson['data']['id'] + ondsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{scheduleId}/online" + ondsurl = requests.post(ondsurl, headers=headers, verify=False) + metatask_info = await cls.get_datasec_detail_services(query_db, process.metaTaskId) + metatask_info.schId=scheduleId + metatask_info2 = metatask_info.model_dump(exclude_unset=True) + await DatasecConfigDao.edit(query_db,metatask_info.onum, metatask_info2) + await query_db.commit() + return "调度运行成功!" + else: + raise ServiceException(message='运行失败!') + @classmethod + async def ds_metatask_detail( + cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel + ): + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + # 先查询是否建立定时任务 + getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + getdsresponse = requests.get(getdsurl, headers=headers, verify=False) + getdstext= getdsresponse.text + processVo=ParmScheduleVo() + responsJson = json.loads(getdstext) + if responsJson['msg'] == 'success': + if responsJson['data']['total']>0: + # getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) + getds_json_list = responsJson['data']['totalList'] + for item in getds_json_list: + processVo.crontab=item['crontab'] + processVo.beginTime=item['startTime'] + processVo.endTime=item['endTime'] + processVo.workerGroup=item['workerGroup'] + processVo.warningGroupId=item['warningGroupId'] + processVo.environmentCode=item['environmentCode'] + return processVo + + @classmethod + async def ds_metatask_delete( + cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel + ): + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + # 先查询是否建立定时任务 + getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + getdsresponse = requests.get(getdsurl, headers=headers, verify=False) + getdstext= getdsresponse.text + responsJson = json.loads(getdstext) + if responsJson['msg'] == 'success': + if responsJson['data']['total']>0: + # getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) + getds_json_list = responsJson['data']['totalList'] + for item in getds_json_list: + if item['releaseState']=='ONLINE': + # 先下线在删除 + offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline" + offresponse = requests.post(offdsurl, headers=headers, verify=False) + # 删除对应的调度 + deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}" + delresponse = requests.delete(deldsurl, headers=headers, verify=False) + deldstext= delresponse.text + delresponsJson = json.loads(deldstext) + if delresponsJson['msg'] == 'success': + metatask_info = await cls.get_datasec_detail_services(query_db, process.metaTaskId) + metatask_info.schId="" + metatask_info2 = metatask_info.model_dump(exclude_unset=True) + await DatasecConfigDao.edit(query_db,metatask_info.onum, metatask_info2) + await query_db.commit() + return "调度删除成功!" + return "调度删除成功!" \ No newline at end of file diff --git a/vue-fastapi-frontend/src/api/metadataConfig/metadataConfig.js b/vue-fastapi-frontend/src/api/metadataConfig/metadataConfig.js index 4baa88f..2bab37d 100644 --- a/vue-fastapi-frontend/src/api/metadataConfig/metadataConfig.js +++ b/vue-fastapi-frontend/src/api/metadataConfig/metadataConfig.js @@ -343,13 +343,23 @@ export function editDatasecConfig(data) { } // 删除数据安全参数配置 -export function deleteDatasecConfig(onumList) { - return request({ - url: '/default-api/metadataConfig/datasecConfig/' + onumList, - method: 'delete' +// export function deleteDatasecConfig(onumList) { +// return request({ +// url: '/default-api/metadataConfig/datasecConfig/' + onumList, +// method: 'delete' +// }) +// } +// 删除元数据任务 +export function deleteDatasecConfig(metataskId,dsIds) { + const data = { + metataskId:metataskId, + dsIds:dsIds + } + return request({ + url: '/default-api/metadataConfig/datasecConfig/' + metataskId+"/"+dsIds, + method: 'delete', }) } - // 获取数据安全参数配置详情 export function getDatasecConfigDetail(onum) { return request({ @@ -357,3 +367,31 @@ export function getDatasecConfigDetail(onum) { method: 'get' }) } +// 上下线 +export function downOrUpmetatask(id,type) { + const data = { + id, + type + } + return request({ + url: '/default-api/metadataConfig/upOrdown', + method: 'put', + data: data + }) +} + +export function dsmetataskSec(data) { + return request({ + url: '/default-api/metadataConfig/DS', + method: 'put', + data: data + }) +} + +export function dsmetataskdelete(data) { + return request({ + url: '/default-api/metadataConfig/DS', + method: 'delete', + data: data + }) +} \ No newline at end of file diff --git a/vue-fastapi-frontend/src/views/meta/metatask/dsDialog.vue b/vue-fastapi-frontend/src/views/meta/metatask/dsDialog.vue index 19e4c0e..b8b2471 100644 --- a/vue-fastapi-frontend/src/views/meta/metatask/dsDialog.vue +++ b/vue-fastapi-frontend/src/views/meta/metatask/dsDialog.vue @@ -114,6 +114,7 @@