diff --git a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py
index 1f4beb5..b837846 100644
--- a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py
+++ b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py
@@ -632,35 +632,41 @@ async def get_task_biz_config_list_all(
@metadataConfigController.post('/taskBizConfig/add')
@ValidateFields(['bizModule', 'configType', 'data_sec_lvl', 'applyType'])
async def add_task_biz_config(
+ request: Request,
item: TaskBizConfigAddModel,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
item.create_by = current_user.user.user_name
item.create_time = datetime.now()
- result = await MetadataConfigService.add_task_biz_config_services(db, item)
+ result = await MetadataConfigService.add_task_biz_config_services(db, item,current_user,request)
return ResponseUtil.success(msg=result.message)
@metadataConfigController.post('/taskBizConfig/edit')
@ValidateFields(['onum'])
async def edit_task_biz_config(
+ request: Request,
item: TaskBizConfigAddModel,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
item.update_by = current_user.user.user_name
item.update_time = datetime.now()
- result = await MetadataConfigService.edit_task_biz_config_services(db, item)
- return ResponseUtil.success(msg=result.message)
-
-
-@metadataConfigController.delete('/taskBizConfig/delete')
-async def delete_task_biz_config(onums: str, db: AsyncSession = Depends(get_db)):
- result = await MetadataConfigService.delete_task_biz_config_services(db, onums)
+ result = await MetadataConfigService.edit_task_biz_config_services(db, item,current_user,request)
return ResponseUtil.success(msg=result.message)
+# @metadataConfigController.delete('/taskBizConfig/delete')
+# async def delete_task_biz_config(onums: str, db: AsyncSession = Depends(get_db)):
+# result = await MetadataConfigService.delete_task_biz_config_services(db, onums)
+# return ResponseUtil.success(msg=result.message)
+@metadataConfigController.delete('/taskBizConfig/{metatask_ids}/{ds_ids}')
+async def delete_task_biz_config(request: Request, metatask_ids: str,ds_ids:str, query_db: AsyncSession = Depends(get_db),current_user: CurrentUserModel = Depends(LoginService.get_current_user)):
+ delete_config = DeleteMetataskModel(metatask_ids=metatask_ids,ds_ids=ds_ids)
+ delete_config_result = await MetadataConfigService.delete_task_biz_config_services(request, query_db, delete_config,current_user)
+ logger.info(delete_config_result.message)
+ return ResponseUtil.success(msg=delete_config_result.message)
@metadataConfigController.get('/taskBizConfig/detail')
async def get_task_biz_config_detail(onum: str, db: AsyncSession = Depends(get_db)):
data = await MetadataConfigService.get_task_biz_config_detail_services(db, onum)
@@ -672,4 +678,41 @@ async def get_task_biz_rela_config_list(
):
ai_chat_list_result = await MetadataConfigService.get_task_biz_config_rela_list_services(query_db, bizOnum)
logger.info('获取成功')
- return ResponseUtil.success(data=ai_chat_list_result)
\ No newline at end of file
+ return ResponseUtil.success(data=ai_chat_list_result)
+@metadataConfigController.put('/taskBiz/upOrdown')
+async def task_biz_up_or_down_meta_metatask(
+ request: Request,
+ DownOrUpdate: OperaMetataskModel,
+ query_db: AsyncSession = Depends(get_db),
+ current_user: CurrentUserModel = Depends(LoginService.get_current_user),
+):
+
+ edit_config_result = await MetadataConfigService.up_or_down_metatask_services(request, query_db,current_user, DownOrUpdate.id,DownOrUpdate.type)
+ logger.info(edit_config_result.message)
+ return ResponseUtil.success(msg=edit_config_result.message)
+
+
+
+@metadataConfigController.put('/taskBiz/DS')
+async def task_biz_DS_meta_metatask(
+ request: Request,
+ process: ParmScheduleVo,
+ query_db: AsyncSession = Depends(get_db),
+ current_user: CurrentUserModel = Depends(LoginService.get_current_user)
+):
+
+ edit_config_result = await MetadataConfigService.ds_metatask_services(request, query_db, process,current_user)
+
+ return ResponseUtil.success(msg=edit_config_result)
+
+@metadataConfigController.delete('/taskBiz/DS')
+async def task_biz_DS_meta_metatask_delete(
+ request: Request,
+ process: ParmScheduleVo,
+ query_db: AsyncSession = Depends(get_db),
+ current_user: CurrentUserModel = Depends(LoginService.get_current_user)
+):
+
+ edit_config_result = await MetadataConfigService.ds_metatask_delete(request, query_db, process,current_user)
+
+ return ResponseUtil.success(msg=edit_config_result)
\ No newline at end of file
diff --git a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py
index 061f88a..55f0305 100644
--- a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py
+++ b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py
@@ -362,7 +362,7 @@ class TaskBizConfigAddModel(BaseModel):
create_time: Optional[datetime] = Field(default=None, description='创建时间')
update_by: Optional[str] = Field(default=None, description='更新者')
update_time: Optional[datetime] = Field(default=None, description='更新时间')
- status: Optional[str] = Field(default="N", description="状态")
+ status: Optional[str] = Field(default="OFFLINE", description="状态")
ds_time: Optional[datetime] = Field(default=None, description="调度时间")
ds_ids: Optional[str] = Field(default=None, description="任务ID")
schId: Optional[str] = Field(default=None, description="调度id")
@@ -381,7 +381,7 @@ class TaskBizConfigModel(BaseModel):
create_time: Optional[datetime] = Field(default=None, description='创建时间')
update_by: Optional[str] = Field(default=None, description='更新者')
update_time: Optional[datetime] = Field(default=None, description='更新时间')
- status: Optional[str] = Field(default="N", description="状态")
+ status: Optional[str] = Field(default="OFFLINE", description="状态")
ds_time: Optional[datetime] = Field(default=None, description="调度时间")
ds_ids: Optional[str] = Field(default=None, description="任务ID")
schId: Optional[str] = Field(default=None, description="调度id")
diff --git a/vue-fastapi-backend/module_admin/service/metadata_config_service.py b/vue-fastapi-backend/module_admin/service/metadata_config_service.py
index 71a153a..d015c55 100644
--- a/vue-fastapi-backend/module_admin/service/metadata_config_service.py
+++ b/vue-fastapi-backend/module_admin/service/metadata_config_service.py
@@ -9,9 +9,20 @@ from utils.common_util import CamelCaseUtil
from module_admin.entity.do.metadata_config_do import SecuBizConfigRela,TaskBizConfigRela # ORM 类
from exceptions.exception import ServiceException
import uuid
+from module_admin.entity.vo.dataSource_vo import ProcessDefinition,ParmScheduleVo,ProcessInstancePage,ParmSchedule
from typing import List
from datetime import datetime
-
+from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel
+from module_admin.service.metaprocessconfig_service import MetaprocessconfigService
+import requests
+from fastapi import Request,Depends
+import json
+import re
+from config.enums import RedisInitKeyConfig
+from config.env import AppConfig
+from module_admin.entity.vo.user_vo import CurrentUserModel
+from module_admin.entity.vo.metatask_vo import DeleteMetataskModel
+from datetime import datetime
class MetadataConfigService:
"""
元数据分类管理 Service
@@ -355,7 +366,7 @@ class MetadataConfigService:
return await MetadataConfigDao.get_task_biz_config_list(db, query_object, is_page)
@classmethod
- async def add_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel):
+ async def add_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel, current_user: CurrentUserModel, request: Request):
try:
config = TaskBizConfigModel()
config.onum = page_object.onum
@@ -370,8 +381,8 @@ class MetadataConfigService:
config.update_time = page_object.update_time
obj = await MetadataConfigDao.add_task_biz_config(db, config)
-
- # 添加关联信息
+ page_object.onum=obj.onum
+ # 添加关联信息
records: List[TaskBizConfigRela] = []
for tab_onum in page_object.tab_onum_list:
record = TaskBizConfigRela()
@@ -382,14 +393,77 @@ class MetadataConfigService:
records.append(record)
await MetadataConfigDao.add_batch_task_rela_dao(db, records)
+ # 获取流程配置
+ processconfig = MetaprocessconfigQueryModel()
+ processconfig.db_type = "BizConfig"
+ processConfigList = await MetaprocessconfigService.get_metaprocessconfig_list_services(db, processconfig, False)
+
+ message = await cls.biz_process_defind_change_add(request, processConfigList, page_object, current_user)
+ if "成功" not in message:
+ await db.rollback()
+ raise ServiceException(message=f'新增批次标签任务 {page_object.biz_name} 失败,dolphinscheduler 创建失败'+message)
+ config.onum=page_object.onum
+ config.ds_ids=page_object.ds_ids
+ edit_data = config.model_dump(exclude_unset=True)
+ await MetadataConfigDao.edit_task_biz_config(db, page_object.onum, edit_data)
await db.commit()
return CrudResponseModel(is_success=True, message="新增成功")
except Exception as e:
await db.rollback()
raise e
-
@classmethod
- async def edit_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel):
+ async def biz_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:TaskBizConfigAddModel,current_user: CurrentUserModel):
+
+ projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
+ url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
+ headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
+
+ # 新增接口
+ url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
+ headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
+ response = requests.get(url, headers=headers, verify=False)
+ if response.reason == 'OK':
+ intdsids=[]
+ message=''
+ dstypes=[]
+ response_text = response.text
+ data = json.loads(response_text)
+ code_list = data["data"]
+ str_list = list(map(str, code_list))
+ for config in processConfigList:
+ modified_json_str = config.taskDefinitionJson.replace("18093081592672", str_list[0]).replace("sh /home/xx/code/remote_python.sh", "sh /home/xx/code/remote_python.sh "+str(page_object.onum))
+ modified_json_str2=config.taskRelationJson.replace("18093081592672", str_list[0])
+ modified_json_str3=config.locations.replace("18093081592672", str_list[0])
+ metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
+ description="", # 替换工作流备注
+ locations=modified_json_str3,# 替换locations
+ name =page_object.biz_name,# 替换工作流名称
+ timeout=config.timeout,
+ globalParams =config.globalParams ,
+ tenantCode =config.tenantCode ,
+ taskRelationJson =modified_json_str2,# 替换taskRelationJson
+ executionType =config.executionType ,
+ releaseState=config.releaseState
+ ).model_dump(exclude_unset=True, by_alias=True)
+ form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
+ response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
+ text= response_post0.text
+ responsJson = json.loads(text)
+ if responsJson['msg'] == 'success':
+ dstypes.append('1')
+ intdsids.append(responsJson['data']['code'])
+ if message:
+ message += ", "
+ message += page_object.biz_name + "-批次标签新增成功"
+ else:
+ if message:
+ message += ", "
+ message += page_object.biz_name + "-批次标签新增失败"
+ if len(intdsids)>0:
+ page_object.ds_ids=','.join([str(i) for i in intdsids])
+ return message
+ @classmethod
+ async def edit_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel,current_user: CurrentUserModel,request: Request):
config = TaskBizConfigModel()
config.onum = page_object.onum
config.biz_name = page_object.biz_name
@@ -420,6 +494,10 @@ class MetadataConfigService:
records.append(record)
await MetadataConfigDao.add_batch_task_rela_dao(db, records)
+ message= await cls.sec_process_defind_change_update(request,page_object,info,current_user)
+ if "成功" not in message:
+ await db.rollback()
+ raise ServiceException(message=f'更新批次标签任务 {page_object.biz_name} 失败,dolphinscheduler 创建失败'+message)
await db.commit()
return CrudResponseModel(is_success=True, message="更新成功")
except Exception as e:
@@ -427,24 +505,241 @@ class MetadataConfigService:
raise e
else:
raise ServiceException(message="任务业务配置不存在")
-
@classmethod
- async def delete_task_biz_config_services(cls, db: AsyncSession, onum_list: str):
- if not onum_list:
- raise ServiceException(message="传入的编号为空")
-
- id_list = [onum.strip() for onum in onum_list.split(",") if onum.strip()]
- if not id_list:
- raise ServiceException(message="无效的编号列表")
+ async def sec_process_defind_change_update(cls,request: Request,page_object:TaskBizConfigAddModel,metatask_old:TaskBizConfigAddModel,current_user: CurrentUserModel):
+
+ projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
+ dsids=page_object.ds_ids.split(",")
+ result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)]
+ message=''
+ # 查询接口
+ url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
+ headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
+ headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
+ for config in result_list:
+ response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
+ text= response.text
+ responsJson = json.loads(text)
+ if responsJson['msg'] == 'success':
+ task_def_list = responsJson['data']['taskDefinitionList']
+
+ # 遍历修改 rawScript
+ task_def_list = responsJson['data']['taskDefinitionList']
+ # 再序列化为 JSON 字符串
+ modified_json_str = json.dumps(task_def_list, ensure_ascii=False, indent=0)
+ getTaskRelationList=responsJson['data']['processTaskRelationList']
+ putTaskRelationList=[]
+ for item in getTaskRelationList:
+ new_item = {
+ "name": item['name'],
+ "preTaskCode":item['preTaskCode'] ,
+ "preTaskVersion":item['preTaskVersion'] ,
+ "postTaskCode":item['postTaskCode'] ,
+ "conditionType":item['conditionType'] ,
+ "conditionParams":item['conditionParams']
+ }
+ putTaskRelationList.append(new_item)
+
+ modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
+ modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
+ metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
+ description="", # 替换工作流备注
+ locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
+ name =page_object.biz_name,# 替换工作流名称
+ tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
+ taskRelationJson =modified_json_str2,# 替换taskRelationJson
+ ).model_dump(exclude_unset=True, by_alias=True)
+ form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
+ response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
+ putText= response_put0.text
+ responsPutJson=json.loads(putText)
+ if responsPutJson['msg'] == 'success':
+ if message:
+ message += ", "
+ message += page_object.biz_name + "-批次标签成功"
+ else:
+ if message:
+ message += ", "
+ message += page_object.biz_name + "-批次标签修改失败"
+ return message
+ @classmethod
+ async def delete_task_biz_config_services(
+ cls,
+ request: Request,
+ query_db: AsyncSession,
+ page_object: DeleteMetataskModel,
+ current_user: CurrentUserModel
+ ):
+ if page_object.metatask_ids and page_object.ds_ids:
+ metatask_id_list = page_object.metatask_ids.split(',')
+ try:
+ projectCode = await request.app.state.redis.get(
+ f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode'
+ )
+
+ # 构造请求参数
+ url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-definition/batch-delete'
+ form_data = {'codes': page_object.ds_ids}
+ headers = {
+ 'dashUserName': current_user.user.user_name,
+ 'dashPassword': current_user.user.password,
+ 'Content-Type': 'application/x-www-form-urlencoded'
+ }
+
+ # 发送 POST 请求
+ response = requests.post(url, headers=headers, data=form_data, verify=False)
+ responsJson = json.loads(response.text)
+
+ if responsJson['success'] is True:
+ # 删除本地元任务配置
+ await MetadataConfigDao.delete_task_biz_config(query_db, metatask_id_list)
+ await MetadataConfigDao.delete_task_rela_batch(query_db, metatask_id_list)
+ await query_db.commit()
+ return CrudResponseModel(is_success=True, message='删除成功')
+ else:
+ raise ServiceException(message='ds删除失败')
+ except Exception as e:
+ await query_db.rollback()
+ raise e
+ else:
+ raise ServiceException(message='传入参数配置id为空')
+ @classmethod
+ async def up_or_down_metatask_services(
+ cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str
+ ):
+ metatask_info = await cls.get_task_biz_config_detail_services(query_db, id)
+ metatask_info.update_by = current_user.user.user_name
+ metatask_info.update_time = datetime.now()
+ type_str: str
+ projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
+ dsids=metatask_info.ds_ids.split(",")
+ result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)]
+ message=''
+ # 查询接口
+ url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
+ headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
+ for config in result_list:
+ metaprocessconfig_dict = {
+ 'name': metatask_info.biz_name,
+ 'releaseState': type
+ }
+ form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
+ response = requests.post(f"{url}/{config['dsid']}/release", headers=headers, data=form_data, verify=False)
+ text= response.text
+ responsJson = json.loads(text)
+ if responsJson['success'] is True:
+ message='成功!'
+ else:
+ raise ServiceException(message='失败'+responsJson['msg'])
+ if type == 'OFFLINE':
+ # 下线
+ type_str = '下线'
+ metatask_info.status = 'OFFLINE'
+ else:
+ # 上线
+ type_str = '上线'
+ metatask_info.status = 'ONLINE'
+ edit_metatask = metatask_info.model_dump(exclude_unset=True)
try:
- await MetadataConfigDao.delete_task_biz_config(db, id_list)
- await MetadataConfigDao.delete_task_rela_batch(db, id_list)
- await db.commit()
- return CrudResponseModel(is_success=True, message="删除成功")
+ await MetadataConfigDao.edit_task_biz_config(query_db, metatask_info.onum, edit_metatask)
+ await query_db.commit()
+ return CrudResponseModel(is_success=True, message=message)
except Exception as e:
- await db.rollback()
+ await query_db.rollback()
raise e
+ else:
+ raise ServiceException(message='更新失败')
+ @classmethod
+ async def ds_metatask_services(
+ cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel
+ ):
+ projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
+ # 先查询是否建立定时任务
+ getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode)
+ headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
+ getdsresponse = requests.get(getdsurl, headers=headers, verify=False)
+ getdstext= getdsresponse.text
+ responsJson = json.loads(getdstext)
+ if responsJson['msg'] == 'success':
+ if responsJson['data']['total']>0:
+ # getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4)
+ getds_json_list = responsJson['data']['totalList']
+ for item in getds_json_list:
+ if item['releaseState']=='ONLINE':
+ # 先下线在删除
+ offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline"
+ offresponse = requests.post(offdsurl, headers=headers, verify=False)
+ # 删除对应的调度
+ deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}"
+ delresponse = requests.delete(deldsurl, headers=headers, verify=False)
+
+ parm =ParmSchedule()
+ parm.failureStrategy='CONTINUE'
+ parm.warningType='NONE'
+ parm.warningGroupId=process.warningGroupId
+ parm.workerGroup=process.workerGroup
+ parm.processDefinitionCode =process.processDefinitionCode
+ parm.environmentCode=process.environmentCode
+ parm.processInstancePriority='MEDIUM'
+ parm.schedule = (
+ '{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') +
+ '", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') +
+ '", "crontab":"' + process.crontab +
+ '", "timezoneId":"Asia/Shanghai"}')
+ url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules'
+ # form_data = {key: str(value) for key, value in process.__dict__.items()}
+ form_data = {key: value for key, value in parm.__dict__.items()}
+ response = requests.post(url, headers=headers, data=form_data, verify=False)
+ text= response.text
+ responsJson = json.loads(text)
+ if responsJson['msg'] == 'success':
+ scheduleId= responsJson['data']['id']
+ ondsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{scheduleId}/online"
+ ondsurl = requests.post(ondsurl, headers=headers, verify=False)
+ metatask_info = await cls.get_task_biz_config_detail_services(query_db, process.metaTaskId)
+ metatask_info.schId=scheduleId
+ metatask_info2 = metatask_info.model_dump(exclude_unset=True)
+ await MetadataConfigDao.edit_task_biz_config(query_db,metatask_info.onum, metatask_info2)
+ await query_db.commit()
+ return "调度运行成功!"
+ else:
+ raise ServiceException(message='运行失败!')
+
+ @classmethod
+ async def ds_metatask_delete(
+ cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel
+ ):
+ projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
+ # 先查询是否建立定时任务
+ getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode)
+ headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
+ getdsresponse = requests.get(getdsurl, headers=headers, verify=False)
+ getdstext= getdsresponse.text
+ responsJson = json.loads(getdstext)
+ if responsJson['msg'] == 'success':
+ if responsJson['data']['total']>0:
+ # getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4)
+ getds_json_list = responsJson['data']['totalList']
+ for item in getds_json_list:
+ if item['releaseState']=='ONLINE':
+ # 先下线在删除
+ offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline"
+ offresponse = requests.post(offdsurl, headers=headers, verify=False)
+ # 删除对应的调度
+ deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}"
+ delresponse = requests.delete(deldsurl, headers=headers, verify=False)
+ deldstext= delresponse.text
+ delresponsJson = json.loads(deldstext)
+ if delresponsJson['msg'] == 'success':
+ metatask_info = await cls.get_task_biz_config_detail_services(query_db, process.metaTaskId)
+ metatask_info2 = metatask_info.model_dump(exclude_unset=True)
+ metatask_info2.schId=""
+ await MetadataConfigDao.edit_task_biz_config(query_db,metatask_info.onum, metatask_info2)
+ await query_db.commit()
+ return "调度删除成功!"
+ return "调度删除成功!"
+
@classmethod
async def get_task_biz_config_detail_services(cls, db: AsyncSession, onum: str):
diff --git a/vue-fastapi-frontend/src/api/taskMetadataConfig/metadataConfig.js b/vue-fastapi-frontend/src/api/taskMetadataConfig/metadataConfig.js
index b386a48..302868e 100644
--- a/vue-fastapi-frontend/src/api/taskMetadataConfig/metadataConfig.js
+++ b/vue-fastapi-frontend/src/api/taskMetadataConfig/metadataConfig.js
@@ -46,11 +46,10 @@ export function updateTaskBizConfig(data) {
}
// 删除任务业务配置(支持批量)
-export function delTaskBizConfig(onumStr) {
+export function delTaskBizConfig(metataskId,dsIds) {
return request({
- url: '/default-api/metadataConfig/taskBizConfig/delete',
+ url: '/default-api/metadataConfig/taskBizConfig/'+ metataskId+"/"+dsIds,
method: 'delete',
- params: { onums: onumStr }
})
}
@@ -60,4 +59,31 @@ export function getTaskBizRelaList(onum) {
url: '/default-api/metadataConfig/taskBizConfigRela/list/' + onum,
method: 'get'
})
+}
+
+export function downOrUpmetatask(id,type) {
+ const data = {
+ id: String(id), // 显式转换为字符串
+ type
+ }
+ return request({
+ url: '/default-api/metadataConfig/taskBiz/upOrdown',
+ method: 'put',
+ data: data
+ })
+}
+export function dsmetataskSec(data) {
+ return request({
+ url: '/default-api/metadataConfig/taskBiz/DS',
+ method: 'put',
+ data: data
+ })
+}
+
+export function dsmetataskdelete(data) {
+ return request({
+ url: '/default-api/metadataConfig/taskBiz/DS',
+ method: 'delete',
+ data: data
+ })
}
\ No newline at end of file
diff --git a/vue-fastapi-frontend/src/views/metadataConfig/taskBizConfig/index.vue b/vue-fastapi-frontend/src/views/metadataConfig/taskBizConfig/index.vue
index c765fe1..c35f3f7 100644
--- a/vue-fastapi-frontend/src/views/metadataConfig/taskBizConfig/index.vue
+++ b/vue-fastapi-frontend/src/views/metadataConfig/taskBizConfig/index.vue
@@ -1,53 +1,244 @@
+
-
-
+
+
- 搜索
+ 搜索
重置
+
- 新增
+ 新建任务
- 修改
+ 修改
- 删除
+ 上线
+
+ 下线
+
+
+ 调度
+
+
+ 运行
+
+
+ 日志
+
+
+ 删除
+
+
+ 删除调度
+
+
+
-
-
-
-
-
-
- {{ formatDateTime(row.createTime) }}
-
-
-
-
-
- {{ formatDateTime(row.updateTime) }}
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+ {{scope.row.schId}}
+
+
+
+
+ {{ parseTime(scope.row.createTime) }}
+
+
+
+
+ {{ parseTime(scope.row.updateTime) }}
+
+
+
+
+
+
+
+
+ 编辑
+ 删除
+
+
+
+
+
+
+
+
+
+
-
+
@@ -58,35 +249,81 @@
-
-
+
+
-
+
查询
-
+
-
+
- >
- <
+
+ >
+
+
+ <
+
-
+
@@ -100,41 +337,76 @@
保存
+
+
+
+
+