from fastapi import Request , Depends
from sqlalchemy . ext . asyncio import AsyncSession
from module_admin . entity . vo . metatask_vo import MetataskQueryModel , MetataskModel , DeleteMetataskModel
from module_admin . dao . metatask_dao import MetataskDao
from utils . page_util import PageResponseModel
# from module_admin.entity.vo.dataSource_vo import DataSource,Datasouceall,AlertGroups,Environment,WorkerGroup,ProcessDefinition,ParmScheduleVo,ParmSchedule,ProcessInstancePage
from module_admin . entity . vo . dataSource_vo import *
from module_admin . entity . vo . metaprocessconfig_vo import MetaprocessconfigQueryModel , MetaprocessconfigModel
from config . constant import CommonConstant
from exceptions . exception import ServiceException
from module_admin . entity . vo . common_vo import CrudResponseModel
from utils . common_util import CamelCaseUtil
from module_admin . entity . vo . user_vo import CurrentUserModel
from datetime import datetime
import requests
import json
import re
from config . enums import RedisInitKeyConfig
from module_admin . service . metaprocessconfig_service import MetaprocessconfigService
from config . env import AppConfig
class MetataskService :
"""
参数配置管理模块服务层
"""
@classmethod
async def get_metatask_list_services (
cls , query_db : AsyncSession , query_object : MetataskQueryModel , is_page : bool = False
) :
"""
获取参数配置列表信息service
: param query_db : orm对象
: param query_object : 查询参数对象
: return : 参数配置列表信息对象
"""
metatask_list_result = await MetataskDao . get_metatask_list ( query_db , query_object , is_page )
return metatask_list_result
@classmethod
async def get_data_source_tree ( cls , request : Request , current_user : CurrentUserModel ) :
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/datasources?pageNo=1&pageSize=100 '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , }
response = requests . get ( url , headers = headers )
if response . reason == ' OK ' :
response_text = response . text
data = json . loads ( response_text )
total_list = data [ " data " ] [ " totalList " ]
# 解析 connectionParams 字符串为字典
for item in total_list :
item [ " connectionParams " ] = json . loads ( item [ " connectionParams " ] )
# 使用 Pydantic 创建 DataSource 对象列表
data_sources = [ DataSource ( * * item ) for item in total_list ]
return data_sources
else :
return { ' error ' : f ' Request failed with status code { response . status_code } ' }
@classmethod
async def get_data_source_all ( cls , request : Request , current_user : CurrentUserModel ) :
# Worker分组
url1 = f ' { AppConfig . ds_server_url } /dolphinscheduler/worker-groups/all '
# 警告组
url2 = f ' { AppConfig . ds_server_url } /dolphinscheduler/alert-groups/list '
# 工作环境
url3 = f ' { AppConfig . ds_server_url } /dolphinscheduler/environment/query-environment-list '
dataspurceVo = Datasouceall ( )
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password }
response1 = requests . get ( url1 , headers = headers )
response2 = requests . get ( url2 , headers = headers )
response3 = requests . get ( url3 , headers = headers )
if response1 . reason == ' OK ' :
response_text = response1 . text
data = json . loads ( response_text )
total_list = data [ " data " ]
# 使用 Pydantic 创建 DataSource 对象列表
data_sources = [ WorkerGroup ( name = item ) for item in total_list ]
dataspurceVo . workerGroup = data_sources
if response2 . reason == ' OK ' :
response_text = response2 . text
data = json . loads ( response_text )
total_list = data [ " data " ]
# 使用 Pydantic 创建 DataSource 对象列表
alertGroups = [ AlertGroups ( * * item ) for item in total_list ]
dataspurceVo . alertGroups = alertGroups
if response3 . reason == ' OK ' :
response_text = response3 . text
data = json . loads ( response_text )
total_list = data [ " data " ]
data_sources = [ Environment ( * * item ) for item in total_list ]
dataspurceVo . environment = data_sources
return dataspurceVo
@classmethod
async def check_metatask_name_unique_services ( cls , query_db : AsyncSession , page_object : MetataskModel ) :
"""
校验参数键名是否唯一service
: param query_db : orm对象
: param page_object : 参数配置对象
: return : 校验结果
"""
metatask_id = - 1 if page_object . metatask_id is None else page_object . metatask_id
metatask = await MetataskDao . get_metatask_detail_by_info (
query_db , MetataskModel ( metataskName = page_object . metatask_name )
)
if metatask and metatask . metatask_id != metatask_id :
return CommonConstant . NOT_UNIQUE
return CommonConstant . UNIQUE
@classmethod
async def add_metatask_services ( cls , request : Request , query_db : AsyncSession , page_object : MetataskModel , current_user : CurrentUserModel ) :
"""
新增参数配置信息service
: param request : Request对象
: param query_db : orm对象
: param page_object : 新增参数配置对象
: return : 新增参数配置校验结果
"""
if not await cls . check_metatask_name_unique_services ( query_db , page_object ) :
raise ServiceException ( message = f ' 新增元数据任务 { page_object . metatask_name } 失败,任务名已存在 ' )
else :
try :
# 查询模板
processconfig = MetaprocessconfigQueryModel ( )
processconfig . ac_target = page_object . ac_target
processconfig . db_type = page_object . dbCode
processConfigList = await MetaprocessconfigService . get_metaprocessconfig_list_services ( query_db , processconfig , False )
message = await cls . mysql_process_defind_change_add ( request , processConfigList , page_object , current_user )
if " 成功 " not in message :
await query_db . rollback ( )
raise ServiceException ( message = f ' 新增元数据任务 { page_object . metatask_name } 失败,dolphinscheduler创建失败 ' )
await MetataskDao . add_metatask_dao ( query_db , page_object )
await query_db . commit ( )
return CrudResponseModel ( is_success = True , message = message )
except Exception as e :
await query_db . rollback ( )
raise e
@classmethod
async def mysql_process_defind_change_add ( cls , request : Request , processConfigList : list [ MetaprocessconfigModel ] , page_object : MetataskModel , current_user : CurrentUserModel ) :
"""
mysql类型
"""
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ ' + projectCode + ' /task-definition/gen-task-codes?genNum=5 '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password }
# 新增接口
url2 = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ ' + projectCode + ' /process-definition '
headers2 = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , ' Content-Type ' : ' application/x-www-form-urlencoded ' }
response = requests . get ( url , headers = headers )
if response . reason == ' OK ' :
intdsids = [ ]
message = ' '
dstypes = [ ]
response_text = response . text
data = json . loads ( response_text )
code_list = data [ " data " ]
str_list = list ( map ( str , code_list ) )
for config in processConfigList :
# mysql表字段
if config . ac_target == ' 0 ' :
modified_json_str = config . taskDefinitionJson . replace ( " 16199683466336 " , str_list [ 0 ] ) . replace ( " 16199683466337 " , str_list [ 1 ] ) . replace ( " 16199683466338 " , str_list [ 2 ] ) . replace ( " ' dash_test_w ' " , f " ' { page_object . dbSName } ' " ) . replace ( " ' mysql_conn ' " , f " ' { page_object . dbRName } ' " )
modified_json_str2 = config . taskRelationJson . replace ( " 16199683466336 " , str_list [ 0 ] ) . replace ( " 16199683466337 " , str_list [ 1 ] ) . replace ( " 16199683466338 " , str_list [ 2 ] )
modified_json_str3 = config . locations . replace ( " 16199683466336 " , str_list [ 0 ] ) . replace ( " 16199683466337 " , str_list [ 1 ] ) . replace ( " 16199683466338 " , str_list [ 2 ] )
metaprocessconfig_dict = MetaprocessconfigModel ( taskDefinitionJson = modified_json_str , # 替换taskDefinitionJson
description = page_object . remark , # 替换工作流备注
locations = modified_json_str3 , # 替换locations
name = page_object . metatask_name + " -表字段采集 " , # 替换工作流名称
timeout = config . timeout ,
globalParams = config . globalParams ,
tenantCode = config . tenantCode ,
taskRelationJson = modified_json_str2 , # 替换taskRelationJson
executionType = config . executionType ,
releaseState = config . releaseState
) . model_dump ( exclude_unset = True , by_alias = True )
form_data = { key : str ( value ) for key , value in metaprocessconfig_dict . items ( ) }
response_post0 = requests . post ( url2 , headers = headers2 , data = form_data )
# text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text = response_post0 . text
responsJson = json . loads ( text )
if responsJson [ ' msg ' ] == ' success ' :
intdsids . append ( responsJson [ ' data ' ] [ ' code ' ] )
dstypes . append ( ' 0 ' )
if message :
message + = " , "
message + = page_object . metatask_name + " -表字段采集新增成功 "
else :
if message :
message + = " , "
message + = page_object . metatask_name + " -表字段采集新增失败 "
if config . ac_target == ' 1 ' :
modified_json_str = config . taskDefinitionJson . replace ( " 16286410625888 " , str_list [ 0 ] ) . replace ( " 16286410625889 " , str_list [ 1 ] ) . replace ( " 16286410625890 " , str_list [ 2 ] ) . replace ( " 16286410625891 " , str_list [ 3 ] ) . replace ( " ' dash_test_w ' " , f " ' { page_object . dbSName } ' " ) . replace ( " ' mysql_conn ' " , f " ' { page_object . dbRName } ' " ) . replace ( " mysql_conn dash_test_w " , f " { page_object . dbRName } { page_object . dbSName } " )
modified_json_str2 = config . taskRelationJson . replace ( " 16286410625888 " , str_list [ 0 ] ) . replace ( " 16286410625889 " , str_list [ 1 ] ) . replace ( " 16286410625890 " , str_list [ 2 ] ) . replace ( " 16286410625891 " , str_list [ 3 ] )
modified_json_str3 = config . locations . replace ( " 16286410625888 " , str_list [ 0 ] ) . replace ( " 16286410625889 " , str_list [ 1 ] ) . replace ( " 16286410625890 " , str_list [ 2 ] ) . replace ( " 16286410625891 " , str_list [ 3 ] )
metaprocessconfig_dict = MetaprocessconfigModel ( taskDefinitionJson = modified_json_str , # 替换taskDefinitionJson
description = page_object . remark , # 替换工作流备注
locations = modified_json_str3 , # 替换locations
name = page_object . metatask_name + " -存储过程采集 " , # 替换工作流名称
timeout = config . timeout ,
globalParams = config . globalParams ,
tenantCode = config . tenantCode ,
taskRelationJson = modified_json_str2 , # 替换taskRelationJson
executionType = config . executionType ,
releaseState = config . releaseState
) . model_dump ( exclude_unset = True , by_alias = True )
form_data = { key : str ( value ) for key , value in metaprocessconfig_dict . items ( ) }
response_post0 = requests . post ( url2 , headers = headers2 , data = form_data )
#text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text = response_post0 . text
responsJson = json . loads ( text )
if responsJson [ ' msg ' ] == ' success ' :
dstypes . append ( ' 1 ' )
intdsids . append ( responsJson [ ' data ' ] [ ' code ' ] )
if message :
message + = " , "
message + = page_object . metatask_name + " -存储过程采集新增成功 "
else :
if message :
message + = " , "
message + = page_object . metatask_name + " -存储过程采集新增失败 "
if len ( intdsids ) > 0 :
page_object . ds_ids = ' , ' . join ( [ str ( i ) for i in intdsids ] )
page_object . ds_types = ' , ' . join ( [ str ( i ) for i in dstypes ] )
return message
@classmethod
async def mysql_process_defind_change_update ( cls , request : Request , page_object : MetataskModel , metatask_old : MetataskModel , current_user : CurrentUserModel ) :
"""
mysql类型
"""
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
dstypes = page_object . ds_types . split ( " , " )
dsids = page_object . ds_ids . split ( " , " )
result_list = [ { ' dstype ' : t , ' dsid ' : i } for t , i in zip ( dstypes , dsids ) ]
message = ' '
# 查询接口
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ ' + projectCode + ' /process-definition '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , }
headers2 = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , ' Content-Type ' : ' application/x-www-form-urlencoded ' }
for config in result_list :
# mysql表字段
if config [ ' dstype ' ] == ' 0 ' :
response = requests . get ( f " { url } / { config [ ' dsid ' ] } " , headers = headers )
text = response . text
responsJson = json . loads ( text )
if responsJson [ ' msg ' ] == ' success ' :
modified_json_str = json . dumps ( responsJson [ ' data ' ] [ ' taskDefinitionList ' ] , ensure_ascii = False , indent = 0 ) . replace ( f " ' { metatask_old . dbSName } ' " , f " ' { page_object . dbSName } ' " )
getTaskRelationList = responsJson [ ' data ' ] [ ' processTaskRelationList ' ]
putTaskRelationList = [ ]
for item in getTaskRelationList :
new_item = {
" name " : item [ ' name ' ] ,
" preTaskCode " : item [ ' preTaskCode ' ] ,
" preTaskVersion " : item [ ' preTaskVersion ' ] ,
" postTaskCode " : item [ ' postTaskCode ' ] ,
" conditionType " : item [ ' conditionType ' ] ,
" conditionParams " : item [ ' conditionParams ' ]
}
putTaskRelationList . append ( new_item )
modified_json_str2 = json . dumps ( putTaskRelationList , ensure_ascii = False , indent = 0 )
modified_json_str2 = re . sub ( r ' \ s+ ' , ' ' , modified_json_str2 )
metaprocessconfig_dict = MetaprocessconfigModel ( taskDefinitionJson = modified_json_str , # 替换taskDefinitionJson
description = page_object . remark , # 替换工作流备注
locations = responsJson [ ' data ' ] [ ' processDefinition ' ] [ ' locations ' ] , # 替换locations
name = page_object . metatask_name + " -表字段采集 " , # 替换工作流名称
tenantCode = responsJson [ ' data ' ] [ ' processDefinition ' ] [ ' tenantCode ' ] ,
taskRelationJson = modified_json_str2 , # 替换taskRelationJson
) . model_dump ( exclude_unset = True , by_alias = True )
form_data = { key : str ( value ) for key , value in metaprocessconfig_dict . items ( ) }
response_put0 = requests . put ( f " { url } / { config [ ' dsid ' ] } " , headers = headers2 , data = form_data )
putText = response_put0 . text
responsPutJson = json . loads ( putText )
if responsPutJson [ ' msg ' ] == ' success ' :
if message :
message + = " , "
message + = page_object . metatask_name + " -表字段采集修改成功 "
else :
if message :
message + = " , "
message + = page_object . metatask_name + " -表字段采集修改失败 "
if config [ ' dstype ' ] == ' 1 ' :
response = requests . get ( f " { url } / { config [ ' dsid ' ] } " , headers = headers )
text = response . text
responsJson = json . loads ( text )
if responsJson [ ' msg ' ] == ' success ' :
modified_json_str = json . dumps ( responsJson [ ' data ' ] [ ' taskDefinitionList ' ] , ensure_ascii = False , indent = 4 ) . replace ( f " ' { metatask_old . dbSName } ' " , f " ' { page_object . dbSName } ' " )
getTaskRelationList = responsJson [ ' data ' ] [ ' processTaskRelationList ' ]
putTaskRelationList = [ ]
for item in getTaskRelationList :
new_item = {
" name " : item [ ' name ' ] ,
" preTaskCode " : item [ ' preTaskCode ' ] ,
" preTaskVersion " : item [ ' preTaskVersion ' ] ,
" postTaskCode " : item [ ' postTaskCode ' ] ,
" conditionType " : item [ ' conditionType ' ] ,
" conditionParams " : item [ ' conditionParams ' ]
}
putTaskRelationList . append ( new_item )
modified_json_str2 = json . dumps ( putTaskRelationList , ensure_ascii = False , indent = 4 )
modified_json_str2 = re . sub ( r ' \ s+ ' , ' ' , modified_json_str2 )
metaprocessconfig_dict = MetaprocessconfigModel ( taskDefinitionJson = modified_json_str , # 替换taskDefinitionJson
description = page_object . remark , # 替换工作流备注
locations = responsJson [ ' data ' ] [ ' processDefinition ' ] [ ' locations ' ] , # 替换locations
name = page_object . metatask_name + " -存储过程采集 " , # 替换工作流名称
tenantCode = responsJson [ ' data ' ] [ ' processDefinition ' ] [ ' tenantCode ' ] ,
taskRelationJson = modified_json_str2 , # 替换taskRelationJson
) . model_dump ( exclude_unset = True , by_alias = True )
form_data = { key : str ( value ) for key , value in metaprocessconfig_dict . items ( ) }
response_put0 = requests . put ( f " { url } / { config [ ' dsid ' ] } " , headers = headers2 , data = form_data )
putText = response_put0 . text
responsPutJson = json . loads ( putText )
if responsPutJson [ ' msg ' ] == ' success ' :
if message :
message + = " , "
message + = page_object . metatask_name + " -存储过程采集修改成功 "
else :
if message :
message + = " , "
message + = page_object . metatask_name + " -存储过程采集修改失败 "
return message
@classmethod
async def edit_metatask_services ( cls , request : Request , query_db : AsyncSession , page_object : MetataskModel , current_user : CurrentUserModel ) :
"""
编辑参数配置信息service
: param request : Request对象
: param query_db : orm对象
: param page_object : 编辑参数配置对象
: return : 编辑参数配置校验结果
"""
edit_metatask = page_object . model_dump ( exclude_unset = True )
metatask_info = await cls . metatask_detail_services ( query_db , page_object . metatask_id )
if metatask_info . metatask_id :
if not await cls . check_metatask_name_unique_services ( query_db , page_object ) :
raise ServiceException ( message = f ' 修改任务 { page_object . metatask_name } 失败,任务名称已存在 ' )
else :
try :
metatask_old = await cls . metatask_detail_services ( query_db , metatask_info . metatask_id )
message = await cls . mysql_process_defind_change_update ( request , page_object , metatask_old , current_user )
if " 成功 " not in message :
await query_db . rollback ( )
raise ServiceException ( message = f ' 修改元数据任务 { page_object . metatask_name } 失败,dolphinscheduler修改失败 ' )
await MetataskDao . edit_metatask_dao ( query_db , edit_metatask )
await query_db . commit ( )
return CrudResponseModel ( is_success = True , message = ' 更新成功 ' )
except Exception as e :
await query_db . rollback ( )
raise e
else :
raise ServiceException ( message = ' 更新失败 ' )
@classmethod
async def up_or_down_metatask_services (
cls , request : Request , query_db : AsyncSession , current_user : CurrentUserModel , id : str , type : str
) :
metatask_info = await cls . metatask_detail_services ( query_db , id )
metatask_info . update_by = current_user . user . user_name
metatask_info . update_time = datetime . now ( )
type_str : str
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
dstypes = metatask_info . ds_types . split ( " , " )
dsids = metatask_info . ds_ids . split ( " , " )
result_list = [ { ' dstype ' : t , ' dsid ' : i } for t , i in zip ( dstypes , dsids ) ]
message = ' '
# 查询接口
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ ' + projectCode + ' /process-definition '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , ' Content-Type ' : ' application/x-www-form-urlencoded ' }
for config in result_list :
# mysql表字段
if config [ ' dstype ' ] == ' 0 ' :
metaprocessconfig_dict = {
' name ' : metatask_info . metatask_name + ' -表字段采集 ' ,
' releaseState ' : type
}
form_data = { key : str ( value ) for key , value in metaprocessconfig_dict . items ( ) }
response = requests . post ( f " { url } / { config [ ' dsid ' ] } /release " , headers = headers , data = form_data )
text = response . text
responsJson = json . loads ( text )
if responsJson [ ' success ' ] is True :
message = ' 成功! '
else :
raise ServiceException ( message = ' 失败 ' + responsJson [ ' msg ' ] )
# mysql表字段
if config [ ' dstype ' ] == ' 1 ' :
metaprocessconfig_dict = {
' name ' : metatask_info . metatask_name + ' -存储过程采集 ' ,
' releaseState ' : type
}
form_data = { key : str ( value ) for key , value in metaprocessconfig_dict . items ( ) }
response = requests . post ( f " { url } / { config [ ' dsid ' ] } /release " , headers = headers , data = form_data )
text = response . text
responsJson = json . loads ( text )
if responsJson [ ' success ' ] is True :
message = ' 成功! '
else :
raise ServiceException ( message = ' 失败 ' + responsJson [ ' msg ' ] )
if type == ' OFFLINE ' :
# 下线
type_str = ' 下线 '
metatask_info . status = ' OFFLINE '
else :
# 上线
type_str = ' 上线 '
metatask_info . status = ' ONLINE '
edit_metatask = metatask_info . model_dump ( exclude_unset = True )
try :
await MetataskDao . edit_metatask_dao ( query_db , edit_metatask )
await query_db . commit ( )
return CrudResponseModel ( is_success = True , message = message )
except Exception as e :
await query_db . rollback ( )
raise e
else :
raise ServiceException ( message = ' 更新失败 ' )
@classmethod
async def run_metatask_services (
cls , request : Request , query_db : AsyncSession , process : ProcessDefinition , current_user : CurrentUserModel
) :
process . failureStrategy = ' CONTINUE '
process . warningType = ' NONE '
process . execType = ' START_PROCESS '
process . taskDependType = ' TASK_POST '
process . complementDependentMode = ' OFF_MODE '
process . runMode = ' RUN_MODE_SERIAL '
process . processInstancePriority = ' MEDIUM '
process . dryRun = 0
process . scheduleTime = " { complementStartDate: ' 2025-01-12 00:00:00 ' ,complementEndDate: ' 2025-01-12 00:00:00 ' } "
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ ' + projectCode + ' /executors/start-process-instance '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , ' Content-Type ' : ' application/x-www-form-urlencoded ' }
# form_data = {key: str(value) for key, value in process.__dict__.items()}
form_data = { key : value for key , value in process . __dict__ . items ( ) }
response = requests . post ( url , headers = headers , data = form_data )
text = response . text
responsJson = json . loads ( text )
if responsJson [ ' success ' ] is True :
return " 运行成功! "
else :
raise ServiceException ( message = ' 运行失败! ' )
@classmethod
async def ds_metatask_services (
cls , request : Request , query_db : AsyncSession , process : ParmScheduleVo , current_user : CurrentUserModel
) :
parm = ParmSchedule (
)
parm . failureStrategy = ' CONTINUE '
parm . warningType = ' NONE '
parm . warningGroupId = process . warningGroupId
parm . workerGroup = ' TASK_POST '
parm . processDefinitionCode = process . processDefinitionCode
parm . environmentCode = process . environmentCode
parm . processInstancePriority = ' MEDIUM '
parm . schedule = (
' { " startTime " : " ' + process . beginTime . strftime ( ' % Y- % m- %d % H: % M: % S ' ) +
' " , " endTime " : " ' + process . endTime . strftime ( ' % Y- % m- %d % H: % M: % S ' ) +
' " , " crontab " : " ' + process . crontab +
' " , " timezoneId " : " Asia/Shanghai " } ' )
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ ' + projectCode + ' /schedules '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , ' Content-Type ' : ' application/x-www-form-urlencoded ' }
# form_data = {key: str(value) for key, value in process.__dict__.items()}
form_data = { key : value for key , value in parm . __dict__ . items ( ) }
response = requests . post ( url , headers = headers , data = form_data )
text = response . text
responsJson = json . loads ( text )
if responsJson [ ' success ' ] is True :
return " 运行成功! "
else :
raise ServiceException ( message = ' 运行失败! ' )
@classmethod
async def get_metatask_logs_services (
cls , request : Request , current_user : CurrentUserModel , query_db : AsyncSession , id : str
) :
metatask_info = await cls . metatask_detail_services ( query_db , id )
metatask_info . update_by = current_user . user . user_name
metatask_info . update_time = datetime . now ( )
# 运行中
metatask_info = ' 3 '
edit_metatask = metatask_info . model_dump ( exclude_unset = True )
try :
await MetataskDao . edit_metatask_dao ( query_db , edit_metatask )
await query_db . commit ( )
return CrudResponseModel ( is_success = True , message = metatask_info . metatask_name + ' 任务: ' ' 日志获取成功 ' )
except Exception as e :
await query_db . rollback ( )
raise e
else :
raise ServiceException ( message = ' 更新失败 ' )
@classmethod
async def delete_metatask_services ( cls , request : Request , query_db : AsyncSession , page_object : DeleteMetataskModel , current_user : CurrentUserModel ) :
"""
删除参数配置信息service
: param request : Request对象
: param query_db : orm对象
: param page_object : 删除参数配置对象
: return : 删除参数配置校验结果
"""
if page_object . metatask_ids and page_object . ds_ids :
metatask_id_list = page_object . metatask_ids . split ( ' , ' )
try :
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
# 查询接口
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ ' + projectCode + ' /process-definition/batch-delete '
form_data = { ' codes ' : page_object . ds_ids }
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , ' Content-Type ' : ' application/x-www-form-urlencoded ' }
response = requests . post ( url , headers = headers , data = form_data )
text = response . text
responsJson = json . loads ( text )
if responsJson [ ' success ' ] is True :
for metatask_id in metatask_id_list :
await MetataskDao . delete_metatask_dao ( query_db , MetataskModel ( metataskId = int ( metatask_id ) ) )
await query_db . commit ( )
return CrudResponseModel ( is_success = True , message = ' 删除成功 ' )
else :
raise ServiceException ( message = ' ds删除失败 ' )
except Exception as e :
await query_db . rollback ( )
raise e
else :
raise ServiceException ( message = ' 传入参数配置id为空 ' )
@classmethod
async def metatask_detail_services ( cls , query_db : AsyncSession , metatask_id : int ) :
"""
获取参数配置详细信息service
: param query_db : orm对象
: param metatask_id : 参数配置id
: return : 参数配置id对应的信息
"""
metatask = await MetataskDao . get_metatask_detail_by_id ( query_db , metatask_id = metatask_id )
if metatask :
result = MetataskModel ( * * CamelCaseUtil . transform_result ( metatask ) )
else :
result = MetataskModel ( * * dict ( ) )
return result
@classmethod
async def get_process_instances_services (
cls , request : Request , query_object : ProcessInstancePage , current_user : CurrentUserModel
) :
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ { projectCode } /process-instances?pageNo= { query_object . page_num } &pageSize= { query_object . page_size } &searchVal= { query_object . searchVal } '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password }
response = requests . get ( url , headers = headers )
try :
if response . reason == ' OK ' :
response_text = response . text
data = json . loads ( response_text )
total_list = data [ " data " ] [ " totalList " ]
# data_sources = [ProcessInstance(**item) for item in total_list]
pageData = PageResponseModel ( rows = total_list , total = data [ " data " ] [ " total " ] )
return pageData
else :
return { ' error ' : f ' Request failed with status code { response . status_code } ' }
except Exception as e :
raise e
@classmethod
async def get_task_nodes_services (
cls , request : Request , id : int , current_user : CurrentUserModel
) :
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ { projectCode } /process-instances/ { id } /tasks '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password }
response = requests . get ( url , headers = headers )
try :
response = requests . get ( url , headers = headers )
if response . reason == ' OK ' :
response_text = response . text
data = json . loads ( response_text )
total_list = data [ " data " ] [ " taskList " ]
data_sources = [ TaskNode ( * * item ) for item in total_list ]
return data_sources
else :
return { ' error ' : f ' Request failed with status code { response . status_code } ' }
except Exception as e :
raise e
@classmethod
async def get_log_details_services (
cls , request : Request , id : int , current_user : CurrentUserModel
) :
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/log/detail?taskInstanceId= { id } &limit=1000&skipLineNum=0 '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password }
response = requests . get ( url , headers = headers )
try :
response = requests . get ( url , headers = headers )
if response . reason == ' OK ' :
response_text = response . text
data = json . loads ( response_text )
logMessage = data [ " data " ] [ " message " ]
return logMessage
else :
return { ' error ' : f ' Request failed with status code { response . status_code } ' }
except Exception as e :
raise e