From 980974db2beedf350f016d060171d9fc9a3d2056 Mon Sep 17 00:00:00 2001 From: siyaqi Date: Sat, 5 Jul 2025 19:58:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/metadata_config_controller.py | 61 +- .../entity/vo/metadata_config_vo.py | 4 +- .../service/metadata_config_service.py | 333 ++++++++- .../api/taskMetadataConfig/metadataConfig.js | 32 +- .../metadataConfig/taskBizConfig/index.vue | 636 ++++++++++++++---- 5 files changed, 906 insertions(+), 160 deletions(-) diff --git a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py index 1f4beb5..b837846 100644 --- a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py +++ b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py @@ -632,35 +632,41 @@ async def get_task_biz_config_list_all( @metadataConfigController.post('/taskBizConfig/add') @ValidateFields(['bizModule', 'configType', 'data_sec_lvl', 'applyType']) async def add_task_biz_config( + request: Request, item: TaskBizConfigAddModel, db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): item.create_by = current_user.user.user_name item.create_time = datetime.now() - result = await MetadataConfigService.add_task_biz_config_services(db, item) + result = await MetadataConfigService.add_task_biz_config_services(db, item,current_user,request) return ResponseUtil.success(msg=result.message) @metadataConfigController.post('/taskBizConfig/edit') @ValidateFields(['onum']) async def edit_task_biz_config( + request: Request, item: TaskBizConfigAddModel, db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user), ): item.update_by = current_user.user.user_name item.update_time = datetime.now() - result = await MetadataConfigService.edit_task_biz_config_services(db, item) - return ResponseUtil.success(msg=result.message) - - -@metadataConfigController.delete('/taskBizConfig/delete') -async def delete_task_biz_config(onums: str, db: AsyncSession = Depends(get_db)): - result = await MetadataConfigService.delete_task_biz_config_services(db, onums) + result = await MetadataConfigService.edit_task_biz_config_services(db, item,current_user,request) return ResponseUtil.success(msg=result.message) +# @metadataConfigController.delete('/taskBizConfig/delete') +# async def delete_task_biz_config(onums: str, db: AsyncSession = Depends(get_db)): +# result = await MetadataConfigService.delete_task_biz_config_services(db, onums) +# return ResponseUtil.success(msg=result.message) +@metadataConfigController.delete('/taskBizConfig/{metatask_ids}/{ds_ids}') +async def delete_task_biz_config(request: Request, metatask_ids: str,ds_ids:str, query_db: AsyncSession = Depends(get_db),current_user: CurrentUserModel = Depends(LoginService.get_current_user)): + delete_config = DeleteMetataskModel(metatask_ids=metatask_ids,ds_ids=ds_ids) + delete_config_result = await MetadataConfigService.delete_task_biz_config_services(request, query_db, delete_config,current_user) + logger.info(delete_config_result.message) + return ResponseUtil.success(msg=delete_config_result.message) @metadataConfigController.get('/taskBizConfig/detail') async def get_task_biz_config_detail(onum: str, db: AsyncSession = Depends(get_db)): data = await MetadataConfigService.get_task_biz_config_detail_services(db, onum) @@ -672,4 +678,41 @@ async def get_task_biz_rela_config_list( ): ai_chat_list_result = await MetadataConfigService.get_task_biz_config_rela_list_services(query_db, bizOnum) logger.info('获取成功') - return ResponseUtil.success(data=ai_chat_list_result) \ No newline at end of file + return ResponseUtil.success(data=ai_chat_list_result) +@metadataConfigController.put('/taskBiz/upOrdown') +async def task_biz_up_or_down_meta_metatask( + request: Request, + DownOrUpdate: OperaMetataskModel, + query_db: AsyncSession = Depends(get_db), + current_user: CurrentUserModel = Depends(LoginService.get_current_user), +): + + edit_config_result = await MetadataConfigService.up_or_down_metatask_services(request, query_db,current_user, DownOrUpdate.id,DownOrUpdate.type) + logger.info(edit_config_result.message) + return ResponseUtil.success(msg=edit_config_result.message) + + + +@metadataConfigController.put('/taskBiz/DS') +async def task_biz_DS_meta_metatask( + request: Request, + process: ParmScheduleVo, + query_db: AsyncSession = Depends(get_db), + current_user: CurrentUserModel = Depends(LoginService.get_current_user) +): + + edit_config_result = await MetadataConfigService.ds_metatask_services(request, query_db, process,current_user) + + return ResponseUtil.success(msg=edit_config_result) + +@metadataConfigController.delete('/taskBiz/DS') +async def task_biz_DS_meta_metatask_delete( + request: Request, + process: ParmScheduleVo, + query_db: AsyncSession = Depends(get_db), + current_user: CurrentUserModel = Depends(LoginService.get_current_user) +): + + edit_config_result = await MetadataConfigService.ds_metatask_delete(request, query_db, process,current_user) + + return ResponseUtil.success(msg=edit_config_result) \ No newline at end of file diff --git a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py index 061f88a..55f0305 100644 --- a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py +++ b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py @@ -362,7 +362,7 @@ class TaskBizConfigAddModel(BaseModel): create_time: Optional[datetime] = Field(default=None, description='创建时间') update_by: Optional[str] = Field(default=None, description='更新者') update_time: Optional[datetime] = Field(default=None, description='更新时间') - status: Optional[str] = Field(default="N", description="状态") + status: Optional[str] = Field(default="OFFLINE", description="状态") ds_time: Optional[datetime] = Field(default=None, description="调度时间") ds_ids: Optional[str] = Field(default=None, description="任务ID") schId: Optional[str] = Field(default=None, description="调度id") @@ -381,7 +381,7 @@ class TaskBizConfigModel(BaseModel): create_time: Optional[datetime] = Field(default=None, description='创建时间') update_by: Optional[str] = Field(default=None, description='更新者') update_time: Optional[datetime] = Field(default=None, description='更新时间') - status: Optional[str] = Field(default="N", description="状态") + status: Optional[str] = Field(default="OFFLINE", description="状态") ds_time: Optional[datetime] = Field(default=None, description="调度时间") ds_ids: Optional[str] = Field(default=None, description="任务ID") schId: Optional[str] = Field(default=None, description="调度id") diff --git a/vue-fastapi-backend/module_admin/service/metadata_config_service.py b/vue-fastapi-backend/module_admin/service/metadata_config_service.py index 71a153a..d015c55 100644 --- a/vue-fastapi-backend/module_admin/service/metadata_config_service.py +++ b/vue-fastapi-backend/module_admin/service/metadata_config_service.py @@ -9,9 +9,20 @@ from utils.common_util import CamelCaseUtil from module_admin.entity.do.metadata_config_do import SecuBizConfigRela,TaskBizConfigRela # ORM 类 from exceptions.exception import ServiceException import uuid +from module_admin.entity.vo.dataSource_vo import ProcessDefinition,ParmScheduleVo,ProcessInstancePage,ParmSchedule from typing import List from datetime import datetime - +from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel +from module_admin.service.metaprocessconfig_service import MetaprocessconfigService +import requests +from fastapi import Request,Depends +import json +import re +from config.enums import RedisInitKeyConfig +from config.env import AppConfig +from module_admin.entity.vo.user_vo import CurrentUserModel +from module_admin.entity.vo.metatask_vo import DeleteMetataskModel +from datetime import datetime class MetadataConfigService: """ 元数据分类管理 Service @@ -355,7 +366,7 @@ class MetadataConfigService: return await MetadataConfigDao.get_task_biz_config_list(db, query_object, is_page) @classmethod - async def add_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel): + async def add_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel, current_user: CurrentUserModel, request: Request): try: config = TaskBizConfigModel() config.onum = page_object.onum @@ -370,8 +381,8 @@ class MetadataConfigService: config.update_time = page_object.update_time obj = await MetadataConfigDao.add_task_biz_config(db, config) - - # 添加关联信息 + page_object.onum=obj.onum + # 添加关联信息 records: List[TaskBizConfigRela] = [] for tab_onum in page_object.tab_onum_list: record = TaskBizConfigRela() @@ -382,14 +393,77 @@ class MetadataConfigService: records.append(record) await MetadataConfigDao.add_batch_task_rela_dao(db, records) + # 获取流程配置 + processconfig = MetaprocessconfigQueryModel() + processconfig.db_type = "BizConfig" + processConfigList = await MetaprocessconfigService.get_metaprocessconfig_list_services(db, processconfig, False) + + message = await cls.biz_process_defind_change_add(request, processConfigList, page_object, current_user) + if "成功" not in message: + await db.rollback() + raise ServiceException(message=f'新增批次标签任务 {page_object.biz_name} 失败,dolphinscheduler 创建失败'+message) + config.onum=page_object.onum + config.ds_ids=page_object.ds_ids + edit_data = config.model_dump(exclude_unset=True) + await MetadataConfigDao.edit_task_biz_config(db, page_object.onum, edit_data) await db.commit() return CrudResponseModel(is_success=True, message="新增成功") except Exception as e: await db.rollback() raise e - @classmethod - async def edit_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel): + async def biz_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:TaskBizConfigAddModel,current_user: CurrentUserModel): + + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} + + # 新增接口 + url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' + headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + response = requests.get(url, headers=headers, verify=False) + if response.reason == 'OK': + intdsids=[] + message='' + dstypes=[] + response_text = response.text + data = json.loads(response_text) + code_list = data["data"] + str_list = list(map(str, code_list)) + for config in processConfigList: + modified_json_str = config.taskDefinitionJson.replace("18093081592672", str_list[0]).replace("sh /home/xx/code/remote_python.sh", "sh /home/xx/code/remote_python.sh "+str(page_object.onum)) + modified_json_str2=config.taskRelationJson.replace("18093081592672", str_list[0]) + modified_json_str3=config.locations.replace("18093081592672", str_list[0]) + metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson + description="", # 替换工作流备注 + locations=modified_json_str3,# 替换locations + name =page_object.biz_name,# 替换工作流名称 + timeout=config.timeout, + globalParams =config.globalParams , + tenantCode =config.tenantCode , + taskRelationJson =modified_json_str2,# 替换taskRelationJson + executionType =config.executionType , + releaseState=config.releaseState + ).model_dump(exclude_unset=True, by_alias=True) + form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} + response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False) + text= response_post0.text + responsJson = json.loads(text) + if responsJson['msg'] == 'success': + dstypes.append('1') + intdsids.append(responsJson['data']['code']) + if message: + message += ", " + message += page_object.biz_name + "-批次标签新增成功" + else: + if message: + message += ", " + message += page_object.biz_name + "-批次标签新增失败" + if len(intdsids)>0: + page_object.ds_ids=','.join([str(i) for i in intdsids]) + return message + @classmethod + async def edit_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel,current_user: CurrentUserModel,request: Request): config = TaskBizConfigModel() config.onum = page_object.onum config.biz_name = page_object.biz_name @@ -420,6 +494,10 @@ class MetadataConfigService: records.append(record) await MetadataConfigDao.add_batch_task_rela_dao(db, records) + message= await cls.sec_process_defind_change_update(request,page_object,info,current_user) + if "成功" not in message: + await db.rollback() + raise ServiceException(message=f'更新批次标签任务 {page_object.biz_name} 失败,dolphinscheduler 创建失败'+message) await db.commit() return CrudResponseModel(is_success=True, message="更新成功") except Exception as e: @@ -427,24 +505,241 @@ class MetadataConfigService: raise e else: raise ServiceException(message="任务业务配置不存在") - @classmethod - async def delete_task_biz_config_services(cls, db: AsyncSession, onum_list: str): - if not onum_list: - raise ServiceException(message="传入的编号为空") - - id_list = [onum.strip() for onum in onum_list.split(",") if onum.strip()] - if not id_list: - raise ServiceException(message="无效的编号列表") + async def sec_process_defind_change_update(cls,request: Request,page_object:TaskBizConfigAddModel,metatask_old:TaskBizConfigAddModel,current_user: CurrentUserModel): + + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + dsids=page_object.ds_ids.split(",") + result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)] + message='' + # 查询接口 + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, } + headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + for config in result_list: + response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False) + text= response.text + responsJson = json.loads(text) + if responsJson['msg'] == 'success': + task_def_list = responsJson['data']['taskDefinitionList'] + + # 遍历修改 rawScript + task_def_list = responsJson['data']['taskDefinitionList'] + # 再序列化为 JSON 字符串 + modified_json_str = json.dumps(task_def_list, ensure_ascii=False, indent=0) + getTaskRelationList=responsJson['data']['processTaskRelationList'] + putTaskRelationList=[] + for item in getTaskRelationList: + new_item = { + "name": item['name'], + "preTaskCode":item['preTaskCode'] , + "preTaskVersion":item['preTaskVersion'] , + "postTaskCode":item['postTaskCode'] , + "conditionType":item['conditionType'] , + "conditionParams":item['conditionParams'] + } + putTaskRelationList.append(new_item) + + modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0) + modified_json_str2=re.sub(r'\s+', '', modified_json_str2) + metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson + description="", # 替换工作流备注 + locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations + name =page_object.biz_name,# 替换工作流名称 + tenantCode =responsJson['data']['processDefinition']['tenantCode'] , + taskRelationJson =modified_json_str2,# 替换taskRelationJson + ).model_dump(exclude_unset=True, by_alias=True) + form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} + response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False) + putText= response_put0.text + responsPutJson=json.loads(putText) + if responsPutJson['msg'] == 'success': + if message: + message += ", " + message += page_object.biz_name + "-批次标签成功" + else: + if message: + message += ", " + message += page_object.biz_name + "-批次标签修改失败" + return message + @classmethod + async def delete_task_biz_config_services( + cls, + request: Request, + query_db: AsyncSession, + page_object: DeleteMetataskModel, + current_user: CurrentUserModel + ): + if page_object.metatask_ids and page_object.ds_ids: + metatask_id_list = page_object.metatask_ids.split(',') + try: + projectCode = await request.app.state.redis.get( + f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode' + ) + + # 构造请求参数 + url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-definition/batch-delete' + form_data = {'codes': page_object.ds_ids} + headers = { + 'dashUserName': current_user.user.user_name, + 'dashPassword': current_user.user.password, + 'Content-Type': 'application/x-www-form-urlencoded' + } + + # 发送 POST 请求 + response = requests.post(url, headers=headers, data=form_data, verify=False) + responsJson = json.loads(response.text) + + if responsJson['success'] is True: + # 删除本地元任务配置 + await MetadataConfigDao.delete_task_biz_config(query_db, metatask_id_list) + await MetadataConfigDao.delete_task_rela_batch(query_db, metatask_id_list) + await query_db.commit() + return CrudResponseModel(is_success=True, message='删除成功') + else: + raise ServiceException(message='ds删除失败') + except Exception as e: + await query_db.rollback() + raise e + else: + raise ServiceException(message='传入参数配置id为空') + @classmethod + async def up_or_down_metatask_services( + cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str + ): + metatask_info = await cls.get_task_biz_config_detail_services(query_db, id) + metatask_info.update_by = current_user.user.user_name + metatask_info.update_time = datetime.now() + type_str: str + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + dsids=metatask_info.ds_ids.split(",") + result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)] + message='' + # 查询接口 + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + for config in result_list: + metaprocessconfig_dict = { + 'name': metatask_info.biz_name, + 'releaseState': type + } + form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} + response = requests.post(f"{url}/{config['dsid']}/release", headers=headers, data=form_data, verify=False) + text= response.text + responsJson = json.loads(text) + if responsJson['success'] is True: + message='成功!' + else: + raise ServiceException(message='失败'+responsJson['msg']) + if type == 'OFFLINE': + # 下线 + type_str = '下线' + metatask_info.status = 'OFFLINE' + else: + # 上线 + type_str = '上线' + metatask_info.status = 'ONLINE' + edit_metatask = metatask_info.model_dump(exclude_unset=True) try: - await MetadataConfigDao.delete_task_biz_config(db, id_list) - await MetadataConfigDao.delete_task_rela_batch(db, id_list) - await db.commit() - return CrudResponseModel(is_success=True, message="删除成功") + await MetadataConfigDao.edit_task_biz_config(query_db, metatask_info.onum, edit_metatask) + await query_db.commit() + return CrudResponseModel(is_success=True, message=message) except Exception as e: - await db.rollback() + await query_db.rollback() raise e + else: + raise ServiceException(message='更新失败') + @classmethod + async def ds_metatask_services( + cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel + ): + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + # 先查询是否建立定时任务 + getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + getdsresponse = requests.get(getdsurl, headers=headers, verify=False) + getdstext= getdsresponse.text + responsJson = json.loads(getdstext) + if responsJson['msg'] == 'success': + if responsJson['data']['total']>0: + # getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) + getds_json_list = responsJson['data']['totalList'] + for item in getds_json_list: + if item['releaseState']=='ONLINE': + # 先下线在删除 + offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline" + offresponse = requests.post(offdsurl, headers=headers, verify=False) + # 删除对应的调度 + deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}" + delresponse = requests.delete(deldsurl, headers=headers, verify=False) + + parm =ParmSchedule() + parm.failureStrategy='CONTINUE' + parm.warningType='NONE' + parm.warningGroupId=process.warningGroupId + parm.workerGroup=process.workerGroup + parm.processDefinitionCode =process.processDefinitionCode + parm.environmentCode=process.environmentCode + parm.processInstancePriority='MEDIUM' + parm.schedule = ( + '{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') + + '", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') + + '", "crontab":"' + process.crontab + + '", "timezoneId":"Asia/Shanghai"}') + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules' + # form_data = {key: str(value) for key, value in process.__dict__.items()} + form_data = {key: value for key, value in parm.__dict__.items()} + response = requests.post(url, headers=headers, data=form_data, verify=False) + text= response.text + responsJson = json.loads(text) + if responsJson['msg'] == 'success': + scheduleId= responsJson['data']['id'] + ondsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{scheduleId}/online" + ondsurl = requests.post(ondsurl, headers=headers, verify=False) + metatask_info = await cls.get_task_biz_config_detail_services(query_db, process.metaTaskId) + metatask_info.schId=scheduleId + metatask_info2 = metatask_info.model_dump(exclude_unset=True) + await MetadataConfigDao.edit_task_biz_config(query_db,metatask_info.onum, metatask_info2) + await query_db.commit() + return "调度运行成功!" + else: + raise ServiceException(message='运行失败!') + + @classmethod + async def ds_metatask_delete( + cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel + ): + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + # 先查询是否建立定时任务 + getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + getdsresponse = requests.get(getdsurl, headers=headers, verify=False) + getdstext= getdsresponse.text + responsJson = json.loads(getdstext) + if responsJson['msg'] == 'success': + if responsJson['data']['total']>0: + # getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) + getds_json_list = responsJson['data']['totalList'] + for item in getds_json_list: + if item['releaseState']=='ONLINE': + # 先下线在删除 + offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline" + offresponse = requests.post(offdsurl, headers=headers, verify=False) + # 删除对应的调度 + deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}" + delresponse = requests.delete(deldsurl, headers=headers, verify=False) + deldstext= delresponse.text + delresponsJson = json.loads(deldstext) + if delresponsJson['msg'] == 'success': + metatask_info = await cls.get_task_biz_config_detail_services(query_db, process.metaTaskId) + metatask_info2 = metatask_info.model_dump(exclude_unset=True) + metatask_info2.schId="" + await MetadataConfigDao.edit_task_biz_config(query_db,metatask_info.onum, metatask_info2) + await query_db.commit() + return "调度删除成功!" + return "调度删除成功!" + @classmethod async def get_task_biz_config_detail_services(cls, db: AsyncSession, onum: str): diff --git a/vue-fastapi-frontend/src/api/taskMetadataConfig/metadataConfig.js b/vue-fastapi-frontend/src/api/taskMetadataConfig/metadataConfig.js index b386a48..302868e 100644 --- a/vue-fastapi-frontend/src/api/taskMetadataConfig/metadataConfig.js +++ b/vue-fastapi-frontend/src/api/taskMetadataConfig/metadataConfig.js @@ -46,11 +46,10 @@ export function updateTaskBizConfig(data) { } // 删除任务业务配置(支持批量) -export function delTaskBizConfig(onumStr) { +export function delTaskBizConfig(metataskId,dsIds) { return request({ - url: '/default-api/metadataConfig/taskBizConfig/delete', + url: '/default-api/metadataConfig/taskBizConfig/'+ metataskId+"/"+dsIds, method: 'delete', - params: { onums: onumStr } }) } @@ -60,4 +59,31 @@ export function getTaskBizRelaList(onum) { url: '/default-api/metadataConfig/taskBizConfigRela/list/' + onum, method: 'get' }) +} + +export function downOrUpmetatask(id,type) { + const data = { + id: String(id), // 显式转换为字符串 + type + } + return request({ + url: '/default-api/metadataConfig/taskBiz/upOrdown', + method: 'put', + data: data + }) +} +export function dsmetataskSec(data) { + return request({ + url: '/default-api/metadataConfig/taskBiz/DS', + method: 'put', + data: data + }) +} + +export function dsmetataskdelete(data) { + return request({ + url: '/default-api/metadataConfig/taskBiz/DS', + method: 'delete', + data: data + }) } \ No newline at end of file diff --git a/vue-fastapi-frontend/src/views/metadataConfig/taskBizConfig/index.vue b/vue-fastapi-frontend/src/views/metadataConfig/taskBizConfig/index.vue index c765fe1..c35f3f7 100644 --- a/vue-fastapi-frontend/src/views/metadataConfig/taskBizConfig/index.vue +++ b/vue-fastapi-frontend/src/views/metadataConfig/taskBizConfig/index.vue @@ -1,53 +1,244 @@ + + + + +