Browse Source

多类型采集任务

master
si@aidatagov.com 2 months ago
parent
commit
531b93a90d
  1. 819
      vue-fastapi-backend/module_admin/service/metatask_service.py
  2. 7
      vue-fastapi-frontend/src/views/meta/metatask/index.vue

819
vue-fastapi-backend/module_admin/service/metatask_service.py

@ -129,7 +129,18 @@ class MetataskService:
processconfig.ac_target=page_object.ac_target
processconfig.db_type=page_object.dbCode
processConfigList =await MetaprocessconfigService.get_metaprocessconfig_list_services(query_db,processconfig,False)
message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object,current_user)
if processconfig.db_type=="MYSQL":
message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object,current_user)
elif processconfig.db_type=="POSTGRESQL":
message= await cls.pg_process_defind_change_add(request,processConfigList,page_object,current_user)
elif processconfig.db_type=="DB2":
message= await cls.db2_process_defind_change_add(request,processConfigList,page_object,current_user)
elif processconfig.db_type=="ORACLE":
message= await cls.oracle_process_defind_change_add(request,processConfigList,page_object,current_user)
elif processconfig.db_type=="HIVE":
message= await cls.hive_process_defind_change_add(request,processConfigList,page_object,current_user)
else:
raise ServiceException(message=f"不支持的数据库类型: {processconfig.db_type}")
if "成功" not in message:
await query_db.rollback()
raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,dolphinscheduler创建失败')
@ -141,6 +152,46 @@ class MetataskService:
await query_db.rollback()
raise e
@classmethod
async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel):
"""
编辑参数配置信息service
:param request: Request对象
:param query_db: orm对象
:param page_object: 编辑参数配置对象
:return: 编辑参数配置校验结果
"""
edit_metatask = page_object.model_dump(exclude_unset=True)
metatask_info = await cls.metatask_detail_services(query_db, page_object.metatask_id)
if metatask_info.metatask_id:
if not await cls.check_metatask_name_unique_services(query_db, page_object):
raise ServiceException(message=f'修改任务{page_object.metatask_name}失败,任务名称已存在')
else:
try:
metatask_old = await cls.metatask_detail_services(query_db, metatask_info.metatask_id)
if metatask_old.dbCode=="MYSQL":
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
elif metatask_old.dbCode=="POSTGRESQL":
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
elif metatask_old.dbCode=="DB2":
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
elif metatask_old.dbCode=="ORACLE":
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
elif metatask_old.dbCode=="HIVE":
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
else:
raise ServiceException(message=f"不支持的数据库类型: {metatask_old.dbCode}")
if "成功" not in message:
await query_db.rollback()
raise ServiceException(message=f'修改元数据任务{page_object.metatask_name}失败,dolphinscheduler修改失败')
await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
await query_db.commit()
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='更新失败')
@classmethod
async def mysql_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
"""
mysql类型
@ -227,7 +278,7 @@ class MetataskService:
page_object.ds_ids=','.join([str(i) for i in intdsids])
page_object.ds_types=','.join([str(i) for i in dstypes])
return message
@classmethod
async def mysql_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
"""
@ -325,34 +376,746 @@ class MetataskService:
message += page_object.metatask_name + "-存储过程采集修改失败"
return message
@classmethod
async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel):
async def oracle_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
"""
编辑参数配置信息service
:param request: Request对象
:param query_db: orm对象
:param page_object: 编辑参数配置对象
:return: 编辑参数配置校验结果
mysql类型
"""
edit_metatask = page_object.model_dump(exclude_unset=True)
metatask_info = await cls.metatask_detail_services(query_db, page_object.metatask_id)
if metatask_info.metatask_id:
if not await cls.check_metatask_name_unique_services(query_db, page_object):
raise ServiceException(message=f'修改任务{page_object.metatask_name}失败,任务名称已存在')
else:
try:
metatask_old = await cls.metatask_detail_services(query_db, metatask_info.metatask_id)
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
if "成功" not in message:
await query_db.rollback()
raise ServiceException(message=f'修改元数据任务{page_object.metatask_name}失败,dolphinscheduler修改失败')
await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
await query_db.commit()
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='更新失败')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
dstypes=page_object.ds_types.split(",")
dsids=page_object.ds_ids.split(",")
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
message=''
# 查询接口
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
for config in result_list:
# mysql表字段
if config['dstype']=='0' :
response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改失败"
if config['dstype']=='1':
response = requests.get(f"{url}/{config['dsid']}", headers=headers)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改失败"
return message
@classmethod
async def oracle_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
"""
mysql类型
"""
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
# 新增接口
url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.get(url, headers=headers, verify=False)
if response.reason == 'OK':
intdsids=[]
message=''
dstypes=[]
response_text = response.text
data = json.loads(response_text)
code_list = data["data"]
str_list = list(map(str, code_list))
for config in processConfigList:
# mysql表字段
if config.ac_target=='0':
modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
# text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
intdsids.append(responsJson['data']['code'])
dstypes.append('0')
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增失败"
if config.ac_target=='1':
modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}")
modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
#text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
dstypes.append('1')
intdsids.append(responsJson['data']['code'])
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增失败"
if len(intdsids)>0:
page_object.ds_ids=','.join([str(i) for i in intdsids])
page_object.ds_types=','.join([str(i) for i in dstypes])
return message
@classmethod
async def db2_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
"""
mysql类型
"""
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
# 新增接口
url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.get(url, headers=headers, verify=False)
if response.reason == 'OK':
intdsids=[]
message=''
dstypes=[]
response_text = response.text
data = json.loads(response_text)
code_list = data["data"]
str_list = list(map(str, code_list))
for config in processConfigList:
# mysql表字段
if config.ac_target=='0':
modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
# text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
intdsids.append(responsJson['data']['code'])
dstypes.append('0')
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增失败"
if config.ac_target=='1':
modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}")
modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
#text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
dstypes.append('1')
intdsids.append(responsJson['data']['code'])
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增失败"
if len(intdsids)>0:
page_object.ds_ids=','.join([str(i) for i in intdsids])
page_object.ds_types=','.join([str(i) for i in dstypes])
return message
@classmethod
async def db2_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
"""
mysql类型
"""
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
dstypes=page_object.ds_types.split(",")
dsids=page_object.ds_ids.split(",")
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
message=''
# 查询接口
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
for config in result_list:
# mysql表字段
if config['dstype']=='0' :
response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改失败"
if config['dstype']=='1':
response = requests.get(f"{url}/{config['dsid']}", headers=headers)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改失败"
return message
@classmethod
async def pg_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
"""
POSTGRESQL类型
"""
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
# 新增接口
url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.get(url, headers=headers, verify=False)
if response.reason == 'OK':
intdsids=[]
message=''
dstypes=[]
response_text = response.text
data = json.loads(response_text)
code_list = data["data"]
str_list = list(map(str, code_list))
for config in processConfigList:
# POSTGRESQL表字段
if config.ac_target=='0':
modified_json_str = config.taskDefinitionJson.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2])
modified_json_str2=config.taskRelationJson.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2])
modified_json_str3=config.locations.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
intdsids.append(responsJson['data']['code'])
dstypes.append('0')
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增失败"
elif config.ac_target=='1':
modified_json_str = config.taskDefinitionJson.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3])
modified_json_str2=config.taskRelationJson.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3])
modified_json_str3=config.locations.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
dstypes.append('1')
intdsids.append(responsJson['data']['code'])
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增失败"
if len(intdsids)>0:
page_object.ds_ids=','.join([str(i) for i in intdsids])
page_object.ds_types=','.join([str(i) for i in dstypes])
return message
@classmethod
async def pg_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
"""
mysql类型
"""
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
dstypes=page_object.ds_types.split(",")
dsids=page_object.ds_ids.split(",")
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
message=''
# 查询接口
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
for config in result_list:
# mysql表字段
if config['dstype']=='0' :
response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改失败"
if config['dstype']=='1':
response = requests.get(f"{url}/{config['dsid']}", headers=headers)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改失败"
return message
@classmethod
async def hive_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
"""
mysql类型
"""
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
# 新增接口
url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.get(url, headers=headers, verify=False)
if response.reason == 'OK':
intdsids=[]
message=''
dstypes=[]
response_text = response.text
data = json.loads(response_text)
code_list = data["data"]
str_list = list(map(str, code_list))
for config in processConfigList:
# mysql表字段
if config.ac_target=='0':
modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
# text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
intdsids.append(responsJson['data']['code'])
dstypes.append('0')
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增失败"
if config.ac_target=='1':
modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}")
modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
#text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
dstypes.append('1')
intdsids.append(responsJson['data']['code'])
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增失败"
if len(intdsids)>0:
page_object.ds_ids=','.join([str(i) for i in intdsids])
page_object.ds_types=','.join([str(i) for i in dstypes])
return message
@classmethod
async def hive_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
"""
mysql类型
"""
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
dstypes=page_object.ds_types.split(",")
dsids=page_object.ds_ids.split(",")
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
message=''
# 查询接口
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
for config in result_list:
# mysql表字段
if config['dstype']=='0' :
response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改失败"
if config['dstype']=='1':
response = requests.get(f"{url}/{config['dsid']}", headers=headers)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改失败"
return message
@classmethod
async def up_or_down_metatask_services(

7
vue-fastapi-frontend/src/views/meta/metatask/index.vue

@ -670,9 +670,10 @@ onMounted(() => {
// Methods
const handleChangeType = (value) => {
form.metataskType = value[value.length - 1];
if (form.metataskType === '00') form.acTarget = '0';
else if (form.metataskType === '01') form.acTarget = '1';
form.value.metataskType = value[value.length - 1];
if (form.value.metataskType === '00') form.value.acTarget = '0';
else if (form.value.metataskType === '01') form.value.acTarget = '1';
console.log(form.value.metataskType ,"改变metataskType ")
};
const getList = async () => {

Loading…
Cancel
Save