@ -3,7 +3,6 @@ from sqlalchemy.ext.asyncio import AsyncSession
from module_admin . entity . vo . metatask_vo import MetataskQueryModel , MetataskModel , DeleteMetataskModel
from module_admin . dao . metatask_dao import MetataskDao
from utils . page_util import PageResponseModel
# from module_admin.entity.vo.dataSource_vo import DataSource,Datasouceall,AlertGroups,Environment,WorkerGroup,ProcessDefinition,ParmScheduleVo,ParmSchedule,ProcessInstancePage
from module_admin . entity . vo . dataSource_vo import *
from module_admin . entity . vo . metaprocessconfig_vo import MetaprocessconfigQueryModel , MetaprocessconfigModel
from config . constant import CommonConstant
@ -1245,10 +1244,27 @@ class MetataskService:
async def ds_metatask_services (
cls , request : Request , query_db : AsyncSession , process : ParmScheduleVo , current_user : CurrentUserModel
) :
parm = ParmSchedule (
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
# 先查询是否建立定时任务
getdsurl = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ ' + projectCode + ' /schedules?pageSize=10&pageNo=1&processDefinitionCode= ' + str ( process . processDefinitionCode )
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , ' Content-Type ' : ' application/x-www-form-urlencoded ' }
getdsresponse = requests . get ( getdsurl , headers = headers , verify = False )
getdstext = getdsresponse . text
responsJson = json . loads ( getdstext )
if responsJson [ ' msg ' ] == ' success ' :
if responsJson [ ' data ' ] [ ' total ' ] > 0 :
# getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4)
getds_json_list = responsJson [ ' data ' ] [ ' totalList ' ]
for item in getds_json_list :
if item [ ' releaseState ' ] == ' ONLINE ' :
# 先下线在删除
offdsurl = f " { AppConfig . ds_server_url } /dolphinscheduler/projects/ { projectCode } /schedules/ { item [ ' id ' ] } /offline "
offresponse = requests . post ( offdsurl , headers = headers , verify = False )
# 删除对应的调度
deldsurl = f " { AppConfig . ds_server_url } /dolphinscheduler/projects/ { projectCode } /schedules/ { item [ ' id ' ] } ?scheduleId= { item [ ' id ' ] } "
delresponse = requests . delete ( deldsurl , headers = headers , verify = False )
)
parm = ParmSchedule ( )
parm . failureStrategy = ' CONTINUE '
parm . warningType = ' NONE '
parm . warningGroupId = process . warningGroupId
@ -1261,18 +1277,17 @@ class MetataskService:
' " , " endTime " : " ' + process . endTime . strftime ( ' % Y- % m- %d % H: % M: % S ' ) +
' " , " crontab " : " ' + process . crontab +
' " , " timezoneId " : " Asia/Shanghai " } ' )
projectCode = await request . app . state . redis . get ( f ' { RedisInitKeyConfig . SYS_CONFIG . key } :sys.ds.projectcode ' )
url = f ' { AppConfig . ds_server_url } /dolphinscheduler/projects/ ' + projectCode + ' /schedules '
headers = { ' dashUserName ' : current_user . user . user_name , ' dashPassword ' : current_user . user . password , ' Content-Type ' : ' application/x-www-form-urlencoded ' }
# form_data = {key: str(value) for key, value in process.__dict__.items()}
form_data = { key : value for key , value in parm . __dict__ . items ( ) }
response = requests . post ( url , headers = headers , data = form_data , verify = False )
text = response . text
responsJson = json . loads ( text )
if responsJson [ ' success ' ] is True :
return " 运行成功! "
if responsJson [ ' msg ' ] == ' success ' :
scheduleId = responsJson [ ' data ' ] [ ' id ' ]
ondsurl = f " { AppConfig . ds_server_url } /dolphinscheduler/projects/ { projectCode } /schedules/ { scheduleId } /online "
ondsurl = requests . post ( ondsurl , headers = headers , verify = False )
return " 调度运行成功! "
else :
raise ServiceException ( message = ' 运行失败! ' )