You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
618 lines
36 KiB
618 lines
36 KiB
from fastapi import Request,Depends
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from module_admin.entity.vo.metatask_vo import MetataskQueryModel, MetataskModel, DeleteMetataskModel
|
|
from module_admin.dao.metatask_dao import MetataskDao
|
|
from utils.page_util import PageResponseModel
|
|
# from module_admin.entity.vo.dataSource_vo import DataSource,Datasouceall,AlertGroups,Environment,WorkerGroup,ProcessDefinition,ParmScheduleVo,ParmSchedule,ProcessInstancePage
|
|
from module_admin.entity.vo.dataSource_vo import *
|
|
from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel
|
|
from config.constant import CommonConstant
|
|
from exceptions.exception import ServiceException
|
|
from module_admin.entity.vo.common_vo import CrudResponseModel
|
|
from utils.common_util import CamelCaseUtil
|
|
from module_admin.entity.vo.user_vo import CurrentUserModel
|
|
from datetime import datetime
|
|
import requests
|
|
import json
|
|
import re
|
|
from config.enums import RedisInitKeyConfig
|
|
from module_admin.service.metaprocessconfig_service import MetaprocessconfigService
|
|
from config.env import AppConfig
|
|
|
|
|
|
class MetataskService:
|
|
"""
|
|
参数配置管理模块服务层
|
|
"""
|
|
|
|
@classmethod
|
|
async def get_metatask_list_services(
|
|
cls, query_db: AsyncSession, query_object: MetataskQueryModel, is_page: bool = False
|
|
):
|
|
"""
|
|
获取参数配置列表信息service
|
|
:param query_db: orm对象
|
|
:param query_object: 查询参数对象
|
|
:return: 参数配置列表信息对象
|
|
"""
|
|
metatask_list_result = await MetataskDao.get_metatask_list(query_db, query_object, is_page)
|
|
|
|
return metatask_list_result
|
|
|
|
@classmethod
|
|
async def get_data_source_tree(cls,request: Request, current_user: CurrentUserModel):
|
|
|
|
url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password,}
|
|
response = requests.get(url, headers=headers)
|
|
if response.reason == 'OK':
|
|
response_text = response.text
|
|
data = json.loads(response_text)
|
|
total_list = data["data"]["totalList"]
|
|
# 解析 connectionParams 字符串为字典
|
|
for item in total_list:
|
|
item["connectionParams"] = json.loads(item["connectionParams"])
|
|
# 使用 Pydantic 创建 DataSource 对象列表
|
|
data_sources = [DataSource(**item) for item in total_list]
|
|
return data_sources
|
|
else:
|
|
return {'error': f'Request failed with status code {response.status_code}'}
|
|
@classmethod
|
|
async def get_data_source_all(cls,request: Request,current_user: CurrentUserModel):
|
|
# Worker分组
|
|
url1 = f'{AppConfig.ds_server_url}/dolphinscheduler/worker-groups/all'
|
|
# 警告组
|
|
url2= f'{AppConfig.ds_server_url}/dolphinscheduler/alert-groups/list'
|
|
# 工作环境
|
|
url3 = f'{AppConfig.ds_server_url}/dolphinscheduler/environment/query-environment-list'
|
|
dataspurceVo=Datasouceall()
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
|
|
response1 = requests.get(url1, headers=headers)
|
|
response2 = requests.get(url2, headers=headers)
|
|
response3 = requests.get(url3, headers=headers)
|
|
if response1.reason == 'OK':
|
|
response_text = response1.text
|
|
data = json.loads(response_text)
|
|
total_list = data["data"]
|
|
# 使用 Pydantic 创建 DataSource 对象列表
|
|
data_sources = [WorkerGroup(name=item) for item in total_list]
|
|
dataspurceVo.workerGroup=data_sources
|
|
if response2.reason == 'OK':
|
|
response_text = response2.text
|
|
data = json.loads(response_text)
|
|
total_list = data["data"]
|
|
# 使用 Pydantic 创建 DataSource 对象列表
|
|
alertGroups = [AlertGroups(**item) for item in total_list]
|
|
dataspurceVo.alertGroups=alertGroups
|
|
if response3.reason == 'OK':
|
|
response_text = response3.text
|
|
data = json.loads(response_text)
|
|
total_list = data["data"]
|
|
data_sources = [Environment(**item) for item in total_list]
|
|
dataspurceVo.environment=data_sources
|
|
return dataspurceVo
|
|
@classmethod
|
|
async def check_metatask_name_unique_services(cls, query_db: AsyncSession, page_object: MetataskModel):
|
|
"""
|
|
校验参数键名是否唯一service
|
|
|
|
:param query_db: orm对象
|
|
:param page_object: 参数配置对象
|
|
:return: 校验结果
|
|
"""
|
|
metatask_id = -1 if page_object.metatask_id is None else page_object.metatask_id
|
|
|
|
metatask = await MetataskDao.get_metatask_detail_by_info(
|
|
query_db, MetataskModel(metataskName=page_object.metatask_name)
|
|
)
|
|
if metatask and metatask.metatask_id != metatask_id:
|
|
return CommonConstant.NOT_UNIQUE
|
|
return CommonConstant.UNIQUE
|
|
|
|
@classmethod
|
|
async def add_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel):
|
|
"""
|
|
新增参数配置信息service
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 新增参数配置对象
|
|
:return: 新增参数配置校验结果
|
|
"""
|
|
|
|
|
|
if not await cls.check_metatask_name_unique_services(query_db, page_object):
|
|
raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,任务名已存在')
|
|
else:
|
|
try:
|
|
# 查询模板
|
|
processconfig = MetaprocessconfigQueryModel()
|
|
processconfig.ac_target=page_object.ac_target
|
|
processconfig.db_type=page_object.dbCode
|
|
processConfigList =await MetaprocessconfigService.get_metaprocessconfig_list_services(query_db,processconfig,False)
|
|
message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object,current_user)
|
|
if "成功" not in message:
|
|
await query_db.rollback()
|
|
raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,dolphinscheduler创建失败')
|
|
await MetataskDao.add_metatask_dao(query_db, page_object)
|
|
await query_db.commit()
|
|
|
|
return CrudResponseModel(is_success=True, message=message)
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
@classmethod
|
|
async def mysql_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
|
|
"""
|
|
mysql类型
|
|
"""
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
|
|
|
|
# 新增接口
|
|
url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
|
|
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
response = requests.get(url, headers=headers)
|
|
if response.reason == 'OK':
|
|
intdsids=[]
|
|
message=''
|
|
dstypes=[]
|
|
response_text = response.text
|
|
data = json.loads(response_text)
|
|
code_list = data["data"]
|
|
str_list = list(map(str, code_list))
|
|
for config in processConfigList:
|
|
# mysql表字段
|
|
if config.ac_target=='0':
|
|
modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'")
|
|
modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
|
|
modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
|
|
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
|
|
description=page_object.remark, # 替换工作流备注
|
|
locations=modified_json_str3,# 替换locations
|
|
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
|
|
timeout=config.timeout,
|
|
globalParams =config.globalParams ,
|
|
tenantCode =config.tenantCode ,
|
|
taskRelationJson =modified_json_str2,# 替换taskRelationJson
|
|
executionType =config.executionType ,
|
|
releaseState=config.releaseState
|
|
).model_dump(exclude_unset=True, by_alias=True)
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
|
|
response_post0 = requests.post(url2, headers=headers2,data=form_data)
|
|
# text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
|
|
text= response_post0.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['msg'] == 'success':
|
|
intdsids.append(responsJson['data']['code'])
|
|
dstypes.append('0')
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-表字段采集新增成功"
|
|
else:
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-表字段采集新增失败"
|
|
if config.ac_target=='1':
|
|
modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}")
|
|
modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
|
|
modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
|
|
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
|
|
description=page_object.remark, # 替换工作流备注
|
|
locations=modified_json_str3,# 替换locations
|
|
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
|
|
timeout=config.timeout,
|
|
globalParams =config.globalParams ,
|
|
tenantCode =config.tenantCode ,
|
|
taskRelationJson =modified_json_str2,# 替换taskRelationJson
|
|
executionType =config.executionType ,
|
|
releaseState=config.releaseState
|
|
).model_dump(exclude_unset=True, by_alias=True)
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
|
|
response_post0 = requests.post(url2, headers=headers2,data=form_data)
|
|
#text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
|
|
text= response_post0.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['msg'] == 'success':
|
|
dstypes.append('1')
|
|
intdsids.append(responsJson['data']['code'])
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-存储过程采集新增成功"
|
|
else:
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-存储过程采集新增失败"
|
|
if len(intdsids)>0:
|
|
page_object.ds_ids=','.join([str(i) for i in intdsids])
|
|
page_object.ds_types=','.join([str(i) for i in dstypes])
|
|
return message
|
|
|
|
@classmethod
|
|
async def mysql_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
|
|
"""
|
|
mysql类型
|
|
"""
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
dstypes=page_object.ds_types.split(",")
|
|
dsids=page_object.ds_ids.split(",")
|
|
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
|
|
message=''
|
|
# 查询接口
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
|
|
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
for config in result_list:
|
|
# mysql表字段
|
|
if config['dstype']=='0' :
|
|
response = requests.get(f"{url}/{config['dsid']}", headers=headers)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['msg'] == 'success':
|
|
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
|
|
getTaskRelationList=responsJson['data']['processTaskRelationList']
|
|
putTaskRelationList=[]
|
|
for item in getTaskRelationList:
|
|
new_item = {
|
|
"name": item['name'],
|
|
"preTaskCode":item['preTaskCode'] ,
|
|
"preTaskVersion":item['preTaskVersion'] ,
|
|
"postTaskCode":item['postTaskCode'] ,
|
|
"conditionType":item['conditionType'] ,
|
|
"conditionParams":item['conditionParams']
|
|
}
|
|
putTaskRelationList.append(new_item)
|
|
|
|
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
|
|
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
|
|
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
|
|
description=page_object.remark, # 替换工作流备注
|
|
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
|
|
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
|
|
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
|
|
taskRelationJson =modified_json_str2,# 替换taskRelationJson
|
|
).model_dump(exclude_unset=True, by_alias=True)
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
|
|
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data)
|
|
putText= response_put0.text
|
|
responsPutJson=json.loads(putText)
|
|
if responsPutJson['msg'] == 'success':
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-表字段采集修改成功"
|
|
else:
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-表字段采集修改失败"
|
|
if config['dstype']=='1':
|
|
response = requests.get(f"{url}/{config['dsid']}", headers=headers)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['msg'] == 'success':
|
|
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
|
|
getTaskRelationList=responsJson['data']['processTaskRelationList']
|
|
putTaskRelationList=[]
|
|
for item in getTaskRelationList:
|
|
new_item = {
|
|
"name": item['name'],
|
|
"preTaskCode":item['preTaskCode'] ,
|
|
"preTaskVersion":item['preTaskVersion'] ,
|
|
"postTaskCode":item['postTaskCode'] ,
|
|
"conditionType":item['conditionType'] ,
|
|
"conditionParams":item['conditionParams']
|
|
}
|
|
putTaskRelationList.append(new_item)
|
|
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
|
|
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
|
|
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
|
|
description=page_object.remark, # 替换工作流备注
|
|
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
|
|
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
|
|
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
|
|
taskRelationJson =modified_json_str2,# 替换taskRelationJson
|
|
).model_dump(exclude_unset=True, by_alias=True)
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
|
|
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data)
|
|
putText= response_put0.text
|
|
responsPutJson=json.loads(putText)
|
|
if responsPutJson['msg'] == 'success':
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-存储过程采集修改成功"
|
|
else:
|
|
if message:
|
|
message += ", "
|
|
message += page_object.metatask_name + "-存储过程采集修改失败"
|
|
return message
|
|
@classmethod
|
|
async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel):
|
|
"""
|
|
编辑参数配置信息service
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 编辑参数配置对象
|
|
:return: 编辑参数配置校验结果
|
|
"""
|
|
edit_metatask = page_object.model_dump(exclude_unset=True)
|
|
metatask_info = await cls.metatask_detail_services(query_db, page_object.metatask_id)
|
|
if metatask_info.metatask_id:
|
|
if not await cls.check_metatask_name_unique_services(query_db, page_object):
|
|
raise ServiceException(message=f'修改任务{page_object.metatask_name}失败,任务名称已存在')
|
|
else:
|
|
try:
|
|
metatask_old = await cls.metatask_detail_services(query_db, metatask_info.metatask_id)
|
|
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
|
|
if "成功" not in message:
|
|
await query_db.rollback()
|
|
raise ServiceException(message=f'修改元数据任务{page_object.metatask_name}失败,dolphinscheduler修改失败')
|
|
await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
|
|
await query_db.commit()
|
|
return CrudResponseModel(is_success=True, message='更新成功')
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message='更新失败')
|
|
|
|
@classmethod
|
|
async def up_or_down_metatask_services(
|
|
cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str
|
|
):
|
|
metatask_info = await cls.metatask_detail_services(query_db, id)
|
|
metatask_info.update_by = current_user.user.user_name
|
|
metatask_info.update_time = datetime.now()
|
|
type_str: str
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
dstypes=metatask_info.ds_types.split(",")
|
|
dsids=metatask_info.ds_ids.split(",")
|
|
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
|
|
message=''
|
|
# 查询接口
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
for config in result_list:
|
|
# mysql表字段
|
|
if config['dstype']=='0' :
|
|
metaprocessconfig_dict = {
|
|
'name': metatask_info.metatask_name+'-表字段采集',
|
|
'releaseState': type
|
|
}
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
|
|
response = requests.post(f"{url}/{config['dsid']}/release", headers=headers,data=form_data)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['success'] is True:
|
|
message='成功!'
|
|
else:
|
|
raise ServiceException(message='失败'+responsJson['msg'])
|
|
# mysql表字段
|
|
if config['dstype']=='1' :
|
|
metaprocessconfig_dict = {
|
|
'name': metatask_info.metatask_name+'-存储过程采集',
|
|
'releaseState': type
|
|
}
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
|
|
response = requests.post(f"{url}/{config['dsid']}/release", headers=headers,data=form_data)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['success'] is True:
|
|
message='成功!'
|
|
else:
|
|
raise ServiceException(message='失败'+responsJson['msg'])
|
|
if type == 'OFFLINE':
|
|
# 下线
|
|
type_str = '下线'
|
|
metatask_info.status = 'OFFLINE'
|
|
else:
|
|
# 上线
|
|
type_str = '上线'
|
|
metatask_info.status = 'ONLINE'
|
|
|
|
edit_metatask = metatask_info.model_dump(exclude_unset=True)
|
|
try:
|
|
await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
|
|
await query_db.commit()
|
|
return CrudResponseModel(is_success=True, message=message)
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message='更新失败')
|
|
|
|
@classmethod
|
|
async def run_metatask_services(
|
|
cls, request: Request, query_db: AsyncSession, process: ProcessDefinition,current_user: CurrentUserModel
|
|
):
|
|
process.failureStrategy='CONTINUE'
|
|
process.warningType='NONE'
|
|
process.execType='START_PROCESS'
|
|
process.taskDependType='TASK_POST'
|
|
process.complementDependentMode ='OFF_MODE'
|
|
process.runMode='RUN_MODE_SERIAL'
|
|
process.processInstancePriority='MEDIUM'
|
|
process.dryRun=0
|
|
process.scheduleTime="{complementStartDate:'2025-01-12 00:00:00',complementEndDate:'2025-01-12 00:00:00'}"
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/executors/start-process-instance'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
# form_data = {key: str(value) for key, value in process.__dict__.items()}
|
|
form_data = {key: value for key, value in process.__dict__.items()}
|
|
|
|
response = requests.post(url, headers=headers,data=form_data)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['success'] is True:
|
|
return "运行成功!"
|
|
|
|
else:
|
|
raise ServiceException(message='运行失败!')
|
|
@classmethod
|
|
async def ds_metatask_services(
|
|
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel
|
|
):
|
|
|
|
parm =ParmSchedule(
|
|
|
|
)
|
|
parm.failureStrategy='CONTINUE'
|
|
parm.warningType='NONE'
|
|
parm.warningGroupId=process.warningGroupId
|
|
parm.workerGroup='TASK_POST'
|
|
parm.processDefinitionCode =process.processDefinitionCode
|
|
parm.environmentCode=process.environmentCode
|
|
parm.processInstancePriority='MEDIUM'
|
|
parm.schedule = (
|
|
'{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') +
|
|
'", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') +
|
|
'", "crontab":"' + process.crontab +
|
|
'", "timezoneId":"Asia/Shanghai"}')
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
# form_data = {key: str(value) for key, value in process.__dict__.items()}
|
|
form_data = {key: value for key, value in parm.__dict__.items()}
|
|
|
|
response = requests.post(url, headers=headers,data=form_data)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['success'] is True:
|
|
return "运行成功!"
|
|
|
|
else:
|
|
raise ServiceException(message='运行失败!')
|
|
|
|
|
|
@classmethod
|
|
async def get_metatask_logs_services(
|
|
cls, request: Request, current_user: CurrentUserModel, query_db: AsyncSession, id: str
|
|
):
|
|
metatask_info = await cls.metatask_detail_services(query_db, id)
|
|
metatask_info.update_by = current_user.user.user_name
|
|
metatask_info.update_time = datetime.now()
|
|
# 运行中
|
|
metatask_info = '3'
|
|
edit_metatask = metatask_info.model_dump(exclude_unset=True)
|
|
try:
|
|
await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
|
|
await query_db.commit()
|
|
return CrudResponseModel(is_success=True, message=metatask_info.metatask_name + '任务:' '日志获取成功')
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message='更新失败')
|
|
|
|
@classmethod
|
|
async def delete_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel,current_user: CurrentUserModel):
|
|
"""
|
|
删除参数配置信息service
|
|
:param request: Request对象
|
|
:param query_db: orm对象
|
|
:param page_object: 删除参数配置对象
|
|
:return: 删除参数配置校验结果
|
|
"""
|
|
|
|
if page_object.metatask_ids and page_object.ds_ids:
|
|
metatask_id_list = page_object.metatask_ids.split(',')
|
|
try:
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
# 查询接口
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete'
|
|
form_data={'codes':page_object.ds_ids}
|
|
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
|
|
response = requests.post(url, headers=headers,data=form_data)
|
|
text= response.text
|
|
responsJson = json.loads(text)
|
|
if responsJson['success'] is True:
|
|
for metatask_id in metatask_id_list:
|
|
await MetataskDao.delete_metatask_dao(query_db, MetataskModel(metataskId=int(metatask_id)))
|
|
await query_db.commit()
|
|
return CrudResponseModel(is_success=True, message='删除成功')
|
|
else :
|
|
raise ServiceException(message='ds删除失败')
|
|
except Exception as e:
|
|
await query_db.rollback()
|
|
raise e
|
|
else:
|
|
raise ServiceException(message='传入参数配置id为空')
|
|
|
|
@classmethod
|
|
async def metatask_detail_services(cls, query_db: AsyncSession, metatask_id: int):
|
|
"""
|
|
获取参数配置详细信息service
|
|
|
|
:param query_db: orm对象
|
|
:param metatask_id: 参数配置id
|
|
:return: 参数配置id对应的信息
|
|
"""
|
|
metatask = await MetataskDao.get_metatask_detail_by_id(query_db, metatask_id=metatask_id)
|
|
if metatask:
|
|
result = MetataskModel(**CamelCaseUtil.transform_result(metatask))
|
|
else:
|
|
result = MetataskModel(**dict())
|
|
|
|
return result
|
|
@classmethod
|
|
async def get_process_instances_services(
|
|
cls, request: Request, query_object: ProcessInstancePage,current_user: CurrentUserModel
|
|
):
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances?pageNo={query_object.page_num}&pageSize={query_object.page_size}&searchVal={query_object.searchVal}'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
|
|
response = requests.get(url, headers=headers)
|
|
try:
|
|
|
|
if response.reason == 'OK':
|
|
response_text = response.text
|
|
data = json.loads(response_text)
|
|
total_list = data["data"]["totalList"]
|
|
# data_sources = [ProcessInstance(**item) for item in total_list]
|
|
pageData = PageResponseModel(rows=total_list,total=data["data"]["total"])
|
|
return pageData
|
|
else:
|
|
return {'error': f'Request failed with status code {response.status_code}'}
|
|
except Exception as e:
|
|
raise e
|
|
|
|
@classmethod
|
|
async def get_task_nodes_services(
|
|
cls, request: Request,id:int,current_user: CurrentUserModel
|
|
):
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
|
|
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances/{id}/tasks'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
|
|
response = requests.get(url, headers=headers)
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
|
|
if response.reason == 'OK':
|
|
response_text = response.text
|
|
data = json.loads(response_text)
|
|
total_list = data["data"]["taskList"]
|
|
data_sources = [TaskNode(**item) for item in total_list]
|
|
return data_sources
|
|
else:
|
|
return {'error': f'Request failed with status code {response.status_code}'}
|
|
except Exception as e:
|
|
raise e
|
|
|
|
@classmethod
|
|
async def get_log_details_services(
|
|
cls, request: Request,id:int,current_user: CurrentUserModel
|
|
):
|
|
url = f'{AppConfig.ds_server_url}/dolphinscheduler/log/detail?taskInstanceId={id}&limit=1000&skipLineNum=0'
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
|
|
response = requests.get(url, headers=headers)
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
|
|
if response.reason == 'OK':
|
|
response_text = response.text
|
|
data = json.loads(response_text)
|
|
logMessage = data["data"]["message"]
|
|
return logMessage
|
|
else:
|
|
return {'error': f'Request failed with status code {response.status_code}'}
|
|
except Exception as e:
|
|
raise e
|
|
|