You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
449 lines
25 KiB
449 lines
25 KiB
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from module_admin.dao.datasec_config_dao import DatasecConfigDao
|
|
from module_admin.entity.vo.metadata_config_vo import DatasecConfigModel
|
|
from module_admin.entity.vo.common_vo import CrudResponseModel
|
|
from exceptions.exception import ServiceException
|
|
from utils.common_util import CamelCaseUtil
|
|
from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel
|
|
from module_admin.service.metaprocessconfig_service import MetaprocessconfigService
|
|
import requests
|
|
import json
|
|
import re
|
|
from module_admin.entity.vo.dataSource_vo import ProcessDefinition,ParmScheduleVo,ProcessInstancePage,ParmSchedule
|
|
from config.enums import RedisInitKeyConfig
|
|
from fastapi import Request,Depends
|
|
from config.env import AppConfig
|
|
from module_admin.entity.vo.user_vo import CurrentUserModel
|
|
from module_admin.entity.vo.metatask_vo import DeleteMetataskModel
|
|
from datetime import datetime
|
|
class DatasecConfigService:
|
|
"""
|
|
数据安全参数配置表 Service 层
|
|
"""
|
|
|
|
@classmethod
|
|
async def get_datasec_list_services(cls, db: AsyncSession, query_object, is_page: bool = False):
|
|
"""
|
|
获取配置列表(支持分页)
|
|
"""
|
|
return await DatasecConfigDao.get_list(db, query_object, is_page)
|
|
|
|
@classmethod
|
|
async def add_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel, current_user: CurrentUserModel, request: Request):
|
|
"""
|
|
新增配置
|
|
"""
|
|
try:
|
|
# 检查任务名称或参数是否已存在
|
|
exists = await DatasecConfigDao.check_name_or_param_exist(db, page_object.metatask_name, page_object.metatask_param)
|
|
if exists:
|
|
raise ServiceException(message="任务名称或参数字段已存在")
|
|
|
|
# 获取流程配置
|
|
processconfig = MetaprocessconfigQueryModel()
|
|
processconfig.db_type = "SecConfig"
|
|
processConfigList = await MetaprocessconfigService.get_metaprocessconfig_list_services(db, processconfig, False)
|
|
|
|
# 调用流程定义创建逻辑
|
|
message = await cls.sec_process_defind_change_add(request, processConfigList, page_object, current_user)
|
|
if "成功" not in message:
|
|
await db.rollback()
|
|
raise ServiceException(message=f'新增数据安全_批次标签任务 {page_object.metatask_name} 失败,dolphinscheduler 创建失败'+message)
|
|
|
|
# 添加记录并提交
|
|
await DatasecConfigDao.add(db, page_object)
|
|
await db.commit()
|
|
return CrudResponseModel(is_success=True, message="新增成功")
|
|
|
|
except Exception as e:
|
|
await db.rollback()
|
|
raise e
|
|
|
|
@classmethod
|
|
async def sec_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:DatasecConfigModel,current_user: CurrentUserModel):
|
|
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
|
|
|
|
# 新增接口
|
|
url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
|
|
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
response = requests.get(url, headers=headers, verify=False)
|
|
if response.reason == 'OK':
|
|
intdsids=[]
|
|
message=''
|
|
dstypes=[]
|
|
response_text = response.text
|
|
data = json.loads(response_text)
|
|
code_list = data["data"]
|
|
str_list = list(map(str, code_list))
|
|
for config in processConfigList:
|
|
modified_json_str = config.taskDefinitionJson.replace("18093081592672", str_list[0]).replace("sh /home/xx/code/remote_python_sec.sh", "sh /home/xx/code/remote_python_sec.sh "+page_object.metatask_param)
|
|
modified_json_str2=config.taskRelationJson.replace("18093081592672", str_list[0])
|
|
modified_json_str3=config.locations.replace("18093081592672", str_list[0])
|
|
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
|
|
description="", # 替换工作流备注
|
|
locations=modified_json_str3,# 替换locations
|
|
name =page_object.metatask_name,# 替换工作流名称
|
|
timeout=config.timeout,
|
|
globalParams =config.globalParams ,
|
|
tenantCode =config.tenantCode ,
|
|
taskRelationJson =modified_json_str2,# 替换taskRelationJson
|
|
executionType =config.executionType ,
|
|
releaseState=config.releaseState
|
|
).model_dump(exclude_unset=True, by_alias=True)
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
|
|
response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
|
|
text= response_post0.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['msg'] == 'success':
|
|
dstypes.append('1')
|
|
intdsids.append(responsJson['data']['code'])
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-数据安全_批次标签新增成功"
|
|
else:
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-数据安全_批次标签新增失败"
|
|
if len(intdsids)>0:
|
|
page_object.ds_ids=','.join([str(i) for i in intdsids])
|
|
return message
|
|
|
|
|
|
@classmethod
|
|
async def edit_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel,current_user: CurrentUserModel,request: Request,):
|
|
"""
|
|
编辑配置
|
|
"""
|
|
edit_data = page_object.model_dump(exclude_unset=True)
|
|
|
|
# 获取原始数据
|
|
info = await cls.get_datasec_detail_services(db, page_object.onum)
|
|
if info and info.onum:
|
|
# 检查任务名称或参数是否重复(排除自身)
|
|
exists = await DatasecConfigDao.check_name_or_param_exist(
|
|
db,
|
|
metatask_name=page_object.metatask_name,
|
|
metatask_param=page_object.metatask_param,
|
|
exclude_onum=page_object.onum
|
|
)
|
|
if exists:
|
|
raise ServiceException(message="任务名称或参数字段已存在")
|
|
|
|
try:
|
|
# 调用流程定义创建逻辑
|
|
message= await cls.sec_process_defind_change_update(request,page_object,info,current_user)
|
|
if "成功" not in message:
|
|
await db.rollback()
|
|
raise ServiceException(message=f'更新数据安全_批次标签任务 {page_object.metatask_name} 失败,dolphinscheduler 创建失败'+message)
|
|
await DatasecConfigDao.edit(db, page_object.onum, edit_data)
|
|
await db.commit()
|
|
return CrudResponseModel(is_success=True, message="更新成功")
|
|
except Exception as e:
|
|
await db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message="配置数据不存在")
|
|
@classmethod
|
|
async def sec_process_defind_change_update(cls,request: Request,page_object:DatasecConfigModel,metatask_old:DatasecConfigModel,current_user: CurrentUserModel):
|
|
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
dsids=page_object.ds_ids.split(",")
|
|
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)]
|
|
message=''
|
|
# 查询接口
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
|
|
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
for config in result_list:
|
|
response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['msg'] == 'success':
|
|
task_def_list = responsJson['data']['taskDefinitionList']
|
|
|
|
# 遍历修改 rawScript
|
|
task_def_list = responsJson['data']['taskDefinitionList']
|
|
|
|
for task in task_def_list:
|
|
task_params = task.get("taskParams")
|
|
if task_params and isinstance(task_params, dict):
|
|
raw_script = task_params.get("rawScript")
|
|
if raw_script and raw_script.startswith("sh /home/xx/code/remote_python_sec.sh"):
|
|
task_params["rawScript"] = f"sh /home/xx/code/remote_python_sec.sh {page_object.metatask_param}"
|
|
|
|
|
|
# 再序列化为 JSON 字符串
|
|
modified_json_str = json.dumps(task_def_list, ensure_ascii=False, indent=0)
|
|
getTaskRelationList=responsJson['data']['processTaskRelationList']
|
|
putTaskRelationList=[]
|
|
for item in getTaskRelationList:
|
|
new_item = {
|
|
"name": item['name'],
|
|
"preTaskCode":item['preTaskCode'] ,
|
|
"preTaskVersion":item['preTaskVersion'] ,
|
|
"postTaskCode":item['postTaskCode'] ,
|
|
"conditionType":item['conditionType'] ,
|
|
"conditionParams":item['conditionParams']
|
|
}
|
|
putTaskRelationList.append(new_item)
|
|
|
|
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
|
|
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
|
|
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
|
|
description="", # 替换工作流备注
|
|
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
|
|
name =page_object.metatask_name,# 替换工作流名称
|
|
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
|
|
taskRelationJson =modified_json_str2,# 替换taskRelationJson
|
|
).model_dump(exclude_unset=True, by_alias=True)
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
|
|
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
|
|
putText= response_put0.text
|
|
responsPutJson=json.loads(putText)
|
|
if responsPutJson['msg'] == 'success':
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-数据安全_批次标签成功"
|
|
else:
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-数据安全_批次标签修改失败"
|
|
return message
|
|
|
|
# @classmethod
|
|
# async def delete_datasec_services(cls, db: AsyncSession, onum_list: str):
|
|
# """
|
|
# 批量删除配置
|
|
# """
|
|
# id_list = [int(x.strip()) for x in onum_list.split(",") if x.strip().isdigit()]
|
|
# if not id_list:
|
|
# raise ServiceException(message="无效的编号列表")
|
|
|
|
# try:
|
|
# await DatasecConfigDao.delete(db, id_list)
|
|
# await db.commit()
|
|
# return CrudResponseModel(is_success=True, message="删除成功")
|
|
# except Exception as e:
|
|
# await db.rollback()
|
|
# raise e
|
|
@classmethod
|
|
async def delete_datasec_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel,current_user: CurrentUserModel):
|
|
"""
|
|
删除参数配置信息service
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 删除参数配置对象
|
|
:return: 删除参数配置校验结果
|
|
"""
|
|
|
|
if page_object.metatask_ids and page_object.ds_ids:
|
|
metatask_id_list = page_object.metatask_ids.split(',')
|
|
try:
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
# 查询接口
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete'
|
|
form_data={'codes':page_object.ds_ids}
|
|
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
response = requests.post(url, headers=headers, data=form_data, verify=False)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['success'] is True:
|
|
for metatask_id in metatask_id_list:
|
|
await DatasecConfigDao.delete(query_db, DatasecConfigModel(onum=int(metatask_id)))
|
|
await query_db.commit()
|
|
return CrudResponseModel(is_success=True, message='删除成功')
|
|
else :
|
|
raise ServiceException(message='ds删除失败')
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message='传入参数配置id为空')
|
|
@classmethod
|
|
async def get_datasec_detail_services(cls, db: AsyncSession, onum: int):
|
|
"""
|
|
获取配置详情
|
|
"""
|
|
result = await DatasecConfigDao.get_detail_by_id(db, onum)
|
|
if result:
|
|
return DatasecConfigModel(**CamelCaseUtil.transform_result(result))
|
|
return DatasecConfigModel(**dict())
|
|
@classmethod
|
|
async def up_or_down_metatask_services(
|
|
cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str
|
|
):
|
|
metatask_info = await cls.get_datasec_detail_services(query_db, id)
|
|
metatask_info.update_by = current_user.user.user_name
|
|
metatask_info.update_time = datetime.now()
|
|
type_str: str
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
dsids=metatask_info.ds_ids.split(",")
|
|
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)]
|
|
message=''
|
|
# 查询接口
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
for config in result_list:
|
|
metaprocessconfig_dict = {
|
|
'name': metatask_info.metatask_name,
|
|
'releaseState': type
|
|
}
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
|
|
response = requests.post(f"{url}/{config['dsid']}/release", headers=headers, data=form_data, verify=False)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['success'] is True:
|
|
message='成功!'
|
|
else:
|
|
raise ServiceException(message='失败'+responsJson['msg'])
|
|
if type == 'OFFLINE':
|
|
# 下线
|
|
type_str = '下线'
|
|
metatask_info.status = 'OFFLINE'
|
|
else:
|
|
# 上线
|
|
type_str = '上线'
|
|
metatask_info.status = 'ONLINE'
|
|
|
|
edit_metatask = metatask_info.model_dump(exclude_unset=True)
|
|
try:
|
|
await DatasecConfigDao.edit(query_db,metatask_info.onum, edit_metatask)
|
|
await query_db.commit()
|
|
return CrudResponseModel(is_success=True, message=message)
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message='更新失败')
|
|
@classmethod
|
|
async def datasec_detail_services(cls, query_db: AsyncSession, metatask_id: int):
|
|
"""
|
|
获取参数配置详细信息service
|
|
|
|
:param query_db: orm对象
|
|
:param metatask_id: 参数配置id
|
|
:return: 参数配置id对应的信息
|
|
"""
|
|
metatask = await DatasecConfigDao.get_detail_by_id(query_db, metatask_id=metatask_id)
|
|
if metatask:
|
|
result = DatasecConfigModel(**CamelCaseUtil.transform_result(metatask))
|
|
else:
|
|
result = DatasecConfigModel(**dict())
|
|
return result
|
|
@classmethod
|
|
async def ds_metatask_services(
|
|
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel
|
|
):
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
# 先查询是否建立定时任务
|
|
getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode)
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
getdsresponse = requests.get(getdsurl, headers=headers, verify=False)
|
|
getdstext= getdsresponse.text
|
|
responsJson = json.loads(getdstext)
|
|
if responsJson['msg'] == 'success':
|
|
if responsJson['data']['total']>0:
|
|
# getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4)
|
|
getds_json_list = responsJson['data']['totalList']
|
|
for item in getds_json_list:
|
|
if item['releaseState']=='ONLINE':
|
|
# 先下线在删除
|
|
offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline"
|
|
offresponse = requests.post(offdsurl, headers=headers, verify=False)
|
|
# 删除对应的调度
|
|
deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}"
|
|
delresponse = requests.delete(deldsurl, headers=headers, verify=False)
|
|
|
|
parm =ParmSchedule()
|
|
parm.failureStrategy='CONTINUE'
|
|
parm.warningType='NONE'
|
|
parm.warningGroupId=process.warningGroupId
|
|
parm.workerGroup=process.workerGroup
|
|
parm.processDefinitionCode =process.processDefinitionCode
|
|
parm.environmentCode=process.environmentCode
|
|
parm.processInstancePriority='MEDIUM'
|
|
parm.schedule = (
|
|
'{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') +
|
|
'", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') +
|
|
'", "crontab":"' + process.crontab +
|
|
'", "timezoneId":"Asia/Shanghai"}')
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules'
|
|
# form_data = {key: str(value) for key, value in process.__dict__.items()}
|
|
form_data = {key: value for key, value in parm.__dict__.items()}
|
|
response = requests.post(url, headers=headers, data=form_data, verify=False)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['msg'] == 'success':
|
|
scheduleId= responsJson['data']['id']
|
|
ondsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{scheduleId}/online"
|
|
ondsurl = requests.post(ondsurl, headers=headers, verify=False)
|
|
metatask_info = await cls.get_datasec_detail_services(query_db, process.metaTaskId)
|
|
metatask_info.schId=scheduleId
|
|
metatask_info2 = metatask_info.model_dump(exclude_unset=True)
|
|
await DatasecConfigDao.edit(query_db,metatask_info.onum, metatask_info2)
|
|
await query_db.commit()
|
|
return "调度运行成功!"
|
|
else:
|
|
raise ServiceException(message='运行失败!')
|
|
@classmethod
|
|
async def ds_metatask_detail(
|
|
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel
|
|
):
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
# 先查询是否建立定时任务
|
|
getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode)
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
getdsresponse = requests.get(getdsurl, headers=headers, verify=False)
|
|
getdstext= getdsresponse.text
|
|
processVo=ParmScheduleVo()
|
|
responsJson = json.loads(getdstext)
|
|
if responsJson['msg'] == 'success':
|
|
if responsJson['data']['total']>0:
|
|
# getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4)
|
|
getds_json_list = responsJson['data']['totalList']
|
|
for item in getds_json_list:
|
|
processVo.crontab=item['crontab']
|
|
processVo.beginTime=item['startTime']
|
|
processVo.endTime=item['endTime']
|
|
processVo.workerGroup=item['workerGroup']
|
|
processVo.warningGroupId=item['warningGroupId']
|
|
processVo.environmentCode=item['environmentCode']
|
|
return processVo
|
|
|
|
@classmethod
|
|
async def ds_metatask_delete(
|
|
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel
|
|
):
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
# 先查询是否建立定时任务
|
|
getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode)
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
getdsresponse = requests.get(getdsurl, headers=headers, verify=False)
|
|
getdstext= getdsresponse.text
|
|
responsJson = json.loads(getdstext)
|
|
if responsJson['msg'] == 'success':
|
|
if responsJson['data']['total']>0:
|
|
# getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4)
|
|
getds_json_list = responsJson['data']['totalList']
|
|
for item in getds_json_list:
|
|
if item['releaseState']=='ONLINE':
|
|
# 先下线在删除
|
|
offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline"
|
|
offresponse = requests.post(offdsurl, headers=headers, verify=False)
|
|
# 删除对应的调度
|
|
deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}"
|
|
delresponse = requests.delete(deldsurl, headers=headers, verify=False)
|
|
deldstext= delresponse.text
|
|
delresponsJson = json.loads(deldstext)
|
|
if delresponsJson['msg'] == 'success':
|
|
metatask_info = await cls.get_datasec_detail_services(query_db, process.metaTaskId)
|
|
metatask_info2 = metatask_info.model_dump(exclude_unset=True)
|
|
metatask_info2.schId=""
|
|
await DatasecConfigDao.edit(query_db,metatask_info.onum, metatask_info2)
|
|
await query_db.commit()
|
|
return "调度删除成功!"
|
|
return "调度删除成功!"
|