from fastapi import Request,Depends from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.vo.metatask_vo import MetataskQueryModel, MetataskModel, DeleteMetataskModel from module_admin.dao.metatask_dao import MetataskDao from utils.page_util import PageResponseModel # from module_admin.entity.vo.dataSource_vo import DataSource,Datasouceall,AlertGroups,Environment,WorkerGroup,ProcessDefinition,ParmScheduleVo,ParmSchedule,ProcessInstancePage from module_admin.entity.vo.dataSource_vo import * from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel from config.constant import CommonConstant from exceptions.exception import ServiceException from module_admin.entity.vo.common_vo import CrudResponseModel from utils.common_util import CamelCaseUtil from module_admin.entity.vo.user_vo import CurrentUserModel from datetime import datetime import requests import json import re from config.enums import RedisInitKeyConfig from module_admin.service.metaprocessconfig_service import MetaprocessconfigService from config.env import AppConfig class MetataskService: """ 参数配置管理模块服务层 """ @classmethod async def get_metatask_list_services( cls, query_db: AsyncSession, query_object: MetataskQueryModel, is_page: bool = False ): """ 获取参数配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :return: 参数配置列表信息对象 """ metatask_list_result = await MetataskDao.get_metatask_list(query_db, query_object, is_page) return metatask_list_result @classmethod async def get_data_source_tree(cls,request: Request, current_user: CurrentUserModel): url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password,} response = requests.get(url, headers=headers) if response.reason == 'OK': response_text = response.text data = json.loads(response_text) total_list = data["data"]["totalList"] # 解析 connectionParams 字符串为字典 for item in total_list: item["connectionParams"] = json.loads(item["connectionParams"]) # 使用 Pydantic 创建 DataSource 对象列表 data_sources = [DataSource(**item) for item in total_list] return data_sources else: return {'error': f'Request failed with status code {response.status_code}'} @classmethod async def get_data_source_all(cls,request: Request,current_user: CurrentUserModel): # Worker分组 url1 = f'{AppConfig.ds_server_url}/dolphinscheduler/worker-groups/all' # 警告组 url2= f'{AppConfig.ds_server_url}/dolphinscheduler/alert-groups/list' # 工作环境 url3 = f'{AppConfig.ds_server_url}/dolphinscheduler/environment/query-environment-list' dataspurceVo=Datasouceall() headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response1 = requests.get(url1, headers=headers) response2 = requests.get(url2, headers=headers) response3 = requests.get(url3, headers=headers) if response1.reason == 'OK': response_text = response1.text data = json.loads(response_text) total_list = data["data"] # 使用 Pydantic 创建 DataSource 对象列表 data_sources = [WorkerGroup(name=item) for item in total_list] dataspurceVo.workerGroup=data_sources if response2.reason == 'OK': response_text = response2.text data = json.loads(response_text) total_list = data["data"] # 使用 Pydantic 创建 DataSource 对象列表 alertGroups = [AlertGroups(**item) for item in total_list] dataspurceVo.alertGroups=alertGroups if response3.reason == 'OK': response_text = response3.text data = json.loads(response_text) total_list = data["data"] data_sources = [Environment(**item) for item in total_list] dataspurceVo.environment=data_sources return dataspurceVo @classmethod async def check_metatask_name_unique_services(cls, query_db: AsyncSession, page_object: MetataskModel): """ 校验参数键名是否唯一service :param query_db: orm对象 :param page_object: 参数配置对象 :return: 校验结果 """ metatask_id = -1 if page_object.metatask_id is None else page_object.metatask_id metatask = await MetataskDao.get_metatask_detail_by_info( query_db, MetataskModel(metataskName=page_object.metatask_name) ) if metatask and metatask.metatask_id != metatask_id: return CommonConstant.NOT_UNIQUE return CommonConstant.UNIQUE @classmethod async def add_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel): """ 新增参数配置信息service :param request: Request对象 :param query_db: orm对象 :param page_object: 新增参数配置对象 :return: 新增参数配置校验结果 """ if not await cls.check_metatask_name_unique_services(query_db, page_object): raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,任务名已存在') else: try: # 查询模板 processconfig = MetaprocessconfigQueryModel() processconfig.ac_target=page_object.ac_target processconfig.db_type=page_object.dbCode processConfigList =await MetaprocessconfigService.get_metaprocessconfig_list_services(query_db,processconfig,False) message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object,current_user) if "成功" not in message: await query_db.rollback() raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,dolphinscheduler创建失败') await MetataskDao.add_metatask_dao(query_db, page_object) await query_db.commit() return CrudResponseModel(is_success=True, message=message) except Exception as e: await query_db.rollback() raise e @classmethod async def mysql_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel): """ mysql类型 """ projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} # 新增接口 url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} response = requests.get(url, headers=headers) if response.reason == 'OK': intdsids=[] message='' dstypes=[] response_text = response.text data = json.loads(response_text) code_list = data["data"] str_list = list(map(str, code_list)) for config in processConfigList: # mysql表字段 if config.ac_target=='0': modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'") modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]) modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]) metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson description=page_object.remark, # 替换工作流备注 locations=modified_json_str3,# 替换locations name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称 timeout=config.timeout, globalParams =config.globalParams , tenantCode =config.tenantCode , taskRelationJson =modified_json_str2,# 替换taskRelationJson executionType =config.executionType , releaseState=config.releaseState ).model_dump(exclude_unset=True, by_alias=True) form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} response_post0 = requests.post(url2, headers=headers2,data=form_data) # text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}' text= response_post0.text responsJson = json.loads(text) if responsJson['msg'] == 'success': intdsids.append(responsJson['data']['code']) dstypes.append('0') if message: message += ", " message += page_object.metatask_name + "-表字段采集新增成功" else: if message: message += ", " message += page_object.metatask_name + "-表字段采集新增失败" if config.ac_target=='1': modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}") modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]) modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]) metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson description=page_object.remark, # 替换工作流备注 locations=modified_json_str3,# 替换locations name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称 timeout=config.timeout, globalParams =config.globalParams , tenantCode =config.tenantCode , taskRelationJson =modified_json_str2,# 替换taskRelationJson executionType =config.executionType , releaseState=config.releaseState ).model_dump(exclude_unset=True, by_alias=True) form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} response_post0 = requests.post(url2, headers=headers2,data=form_data) #text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}' text= response_post0.text responsJson = json.loads(text) if responsJson['msg'] == 'success': dstypes.append('1') intdsids.append(responsJson['data']['code']) if message: message += ", " message += page_object.metatask_name + "-存储过程采集新增成功" else: if message: message += ", " message += page_object.metatask_name + "-存储过程采集新增失败" if len(intdsids)>0: page_object.ds_ids=','.join([str(i) for i in intdsids]) page_object.ds_types=','.join([str(i) for i in dstypes]) return message @classmethod async def mysql_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel): """ mysql类型 """ projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') dstypes=page_object.ds_types.split(",") dsids=page_object.ds_ids.split(",") result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)] message='' # 查询接口 url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, } headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} for config in result_list: # mysql表字段 if config['dstype']=='0' : response = requests.get(f"{url}/{config['dsid']}", headers=headers) text= response.text responsJson = json.loads(text) if responsJson['msg'] == 'success': modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'") getTaskRelationList=responsJson['data']['processTaskRelationList'] putTaskRelationList=[] for item in getTaskRelationList: new_item = { "name": item['name'], "preTaskCode":item['preTaskCode'] , "preTaskVersion":item['preTaskVersion'] , "postTaskCode":item['postTaskCode'] , "conditionType":item['conditionType'] , "conditionParams":item['conditionParams'] } putTaskRelationList.append(new_item) modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0) modified_json_str2=re.sub(r'\s+', '', modified_json_str2) metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson description=page_object.remark, # 替换工作流备注 locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称 tenantCode =responsJson['data']['processDefinition']['tenantCode'] , taskRelationJson =modified_json_str2,# 替换taskRelationJson ).model_dump(exclude_unset=True, by_alias=True) form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data) putText= response_put0.text responsPutJson=json.loads(putText) if responsPutJson['msg'] == 'success': if message: message += ", " message += page_object.metatask_name + "-表字段采集修改成功" else: if message: message += ", " message += page_object.metatask_name + "-表字段采集修改失败" if config['dstype']=='1': response = requests.get(f"{url}/{config['dsid']}", headers=headers) text= response.text responsJson = json.loads(text) if responsJson['msg'] == 'success': modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'") getTaskRelationList=responsJson['data']['processTaskRelationList'] putTaskRelationList=[] for item in getTaskRelationList: new_item = { "name": item['name'], "preTaskCode":item['preTaskCode'] , "preTaskVersion":item['preTaskVersion'] , "postTaskCode":item['postTaskCode'] , "conditionType":item['conditionType'] , "conditionParams":item['conditionParams'] } putTaskRelationList.append(new_item) modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=4) modified_json_str2=re.sub(r'\s+', '', modified_json_str2) metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson description=page_object.remark, # 替换工作流备注 locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称 tenantCode =responsJson['data']['processDefinition']['tenantCode'] , taskRelationJson =modified_json_str2,# 替换taskRelationJson ).model_dump(exclude_unset=True, by_alias=True) form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data) putText= response_put0.text responsPutJson=json.loads(putText) if responsPutJson['msg'] == 'success': if message: message += ", " message += page_object.metatask_name + "-存储过程采集修改成功" else: if message: message += ", " message += page_object.metatask_name + "-存储过程采集修改失败" return message @classmethod async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel): """ 编辑参数配置信息service :param request: Request对象 :param query_db: orm对象 :param page_object: 编辑参数配置对象 :return: 编辑参数配置校验结果 """ edit_metatask = page_object.model_dump(exclude_unset=True) metatask_info = await cls.metatask_detail_services(query_db, page_object.metatask_id) if metatask_info.metatask_id: if not await cls.check_metatask_name_unique_services(query_db, page_object): raise ServiceException(message=f'修改任务{page_object.metatask_name}失败,任务名称已存在') else: try: metatask_old = await cls.metatask_detail_services(query_db, metatask_info.metatask_id) message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user) if "成功" not in message: await query_db.rollback() raise ServiceException(message=f'修改元数据任务{page_object.metatask_name}失败,dolphinscheduler修改失败') await MetataskDao.edit_metatask_dao(query_db, edit_metatask) await query_db.commit() return CrudResponseModel(is_success=True, message='更新成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='更新失败') @classmethod async def up_or_down_metatask_services( cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str ): metatask_info = await cls.metatask_detail_services(query_db, id) metatask_info.update_by = current_user.user.user_name metatask_info.update_time = datetime.now() type_str: str projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') dstypes=metatask_info.ds_types.split(",") dsids=metatask_info.ds_ids.split(",") result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)] message='' # 查询接口 url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} for config in result_list: # mysql表字段 if config['dstype']=='0' : metaprocessconfig_dict = { 'name': metatask_info.metatask_name+'-表字段采集', 'releaseState': type } form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} response = requests.post(f"{url}/{config['dsid']}/release", headers=headers,data=form_data) text= response.text responsJson = json.loads(text) if responsJson['success'] is True: message='成功!' else: raise ServiceException(message='失败'+responsJson['msg']) # mysql表字段 if config['dstype']=='1' : metaprocessconfig_dict = { 'name': metatask_info.metatask_name+'-存储过程采集', 'releaseState': type } form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} response = requests.post(f"{url}/{config['dsid']}/release", headers=headers,data=form_data) text= response.text responsJson = json.loads(text) if responsJson['success'] is True: message='成功!' else: raise ServiceException(message='失败'+responsJson['msg']) if type == 'OFFLINE': # 下线 type_str = '下线' metatask_info.status = 'OFFLINE' else: # 上线 type_str = '上线' metatask_info.status = 'ONLINE' edit_metatask = metatask_info.model_dump(exclude_unset=True) try: await MetataskDao.edit_metatask_dao(query_db, edit_metatask) await query_db.commit() return CrudResponseModel(is_success=True, message=message) except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='更新失败') @classmethod async def run_metatask_services( cls, request: Request, query_db: AsyncSession, process: ProcessDefinition,current_user: CurrentUserModel ): process.failureStrategy='CONTINUE' process.warningType='NONE' process.execType='START_PROCESS' process.taskDependType='TASK_POST' process.complementDependentMode ='OFF_MODE' process.runMode='RUN_MODE_SERIAL' process.processInstancePriority='MEDIUM' process.dryRun=0 process.scheduleTime="{complementStartDate:'2025-01-12 00:00:00',complementEndDate:'2025-01-12 00:00:00'}" projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/executors/start-process-instance' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} # form_data = {key: str(value) for key, value in process.__dict__.items()} form_data = {key: value for key, value in process.__dict__.items()} response = requests.post(url, headers=headers,data=form_data) text= response.text responsJson = json.loads(text) if responsJson['success'] is True: return "运行成功!" else: raise ServiceException(message='运行失败!') @classmethod async def ds_metatask_services( cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel ): parm =ParmSchedule( ) parm.failureStrategy='CONTINUE' parm.warningType='NONE' parm.warningGroupId=process.warningGroupId parm.workerGroup='TASK_POST' parm.processDefinitionCode =process.processDefinitionCode parm.environmentCode=process.environmentCode parm.processInstancePriority='MEDIUM' parm.schedule = ( '{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') + '", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') + '", "crontab":"' + process.crontab + '", "timezoneId":"Asia/Shanghai"}') projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} # form_data = {key: str(value) for key, value in process.__dict__.items()} form_data = {key: value for key, value in parm.__dict__.items()} response = requests.post(url, headers=headers,data=form_data) text= response.text responsJson = json.loads(text) if responsJson['success'] is True: return "运行成功!" else: raise ServiceException(message='运行失败!') @classmethod async def get_metatask_logs_services( cls, request: Request, current_user: CurrentUserModel, query_db: AsyncSession, id: str ): metatask_info = await cls.metatask_detail_services(query_db, id) metatask_info.update_by = current_user.user.user_name metatask_info.update_time = datetime.now() # 运行中 metatask_info = '3' edit_metatask = metatask_info.model_dump(exclude_unset=True) try: await MetataskDao.edit_metatask_dao(query_db, edit_metatask) await query_db.commit() return CrudResponseModel(is_success=True, message=metatask_info.metatask_name + '任务:' '日志获取成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='更新失败') @classmethod async def delete_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel,current_user: CurrentUserModel): """ 删除参数配置信息service :param request: Request对象 :param query_db: orm对象 :param page_object: 删除参数配置对象 :return: 删除参数配置校验结果 """ if page_object.metatask_ids and page_object.ds_ids: metatask_id_list = page_object.metatask_ids.split(',') try: projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') # 查询接口 url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete' form_data={'codes':page_object.ds_ids} headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} response = requests.post(url, headers=headers,data=form_data) text= response.text responsJson = json.loads(text) if responsJson['success'] is True: for metatask_id in metatask_id_list: await MetataskDao.delete_metatask_dao(query_db, MetataskModel(metataskId=int(metatask_id))) await query_db.commit() return CrudResponseModel(is_success=True, message='删除成功') else : raise ServiceException(message='ds删除失败') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入参数配置id为空') @classmethod async def metatask_detail_services(cls, query_db: AsyncSession, metatask_id: int): """ 获取参数配置详细信息service :param query_db: orm对象 :param metatask_id: 参数配置id :return: 参数配置id对应的信息 """ metatask = await MetataskDao.get_metatask_detail_by_id(query_db, metatask_id=metatask_id) if metatask: result = MetataskModel(**CamelCaseUtil.transform_result(metatask)) else: result = MetataskModel(**dict()) return result @classmethod async def get_process_instances_services( cls, request: Request, query_object: ProcessInstancePage,current_user: CurrentUserModel ): projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances?pageNo={query_object.page_num}&pageSize={query_object.page_size}&searchVal={query_object.searchVal}' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response = requests.get(url, headers=headers) try: if response.reason == 'OK': response_text = response.text data = json.loads(response_text) total_list = data["data"]["totalList"] # data_sources = [ProcessInstance(**item) for item in total_list] pageData = PageResponseModel(rows=total_list,total=data["data"]["total"]) return pageData else: return {'error': f'Request failed with status code {response.status_code}'} except Exception as e: raise e @classmethod async def get_task_nodes_services( cls, request: Request,id:int,current_user: CurrentUserModel ): projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances/{id}/tasks' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response = requests.get(url, headers=headers) try: response = requests.get(url, headers=headers) if response.reason == 'OK': response_text = response.text data = json.loads(response_text) total_list = data["data"]["taskList"] data_sources = [TaskNode(**item) for item in total_list] return data_sources else: return {'error': f'Request failed with status code {response.status_code}'} except Exception as e: raise e @classmethod async def get_log_details_services( cls, request: Request,id:int,current_user: CurrentUserModel ): url = f'{AppConfig.ds_server_url}/dolphinscheduler/log/detail?taskInstanceId={id}&limit=1000&skipLineNum=0' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response = requests.get(url, headers=headers) try: response = requests.get(url, headers=headers) if response.reason == 'OK': response_text = response.text data = json.loads(response_text) logMessage = data["data"]["message"] return logMessage else: return {'error': f'Request failed with status code {response.status_code}'} except Exception as e: raise e