You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							1490 lines
						
					
					
						
							103 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							1490 lines
						
					
					
						
							103 KiB
						
					
					
				
								from fastapi import Request,Depends
							 | 
						|
								from sqlalchemy.ext.asyncio import AsyncSession
							 | 
						|
								from module_admin.entity.vo.metatask_vo import MetataskQueryModel, MetataskModel, DeleteMetataskModel
							 | 
						|
								from module_admin.dao.metatask_dao import MetataskDao
							 | 
						|
								from utils.page_util import PageResponseModel
							 | 
						|
								from module_admin.entity.vo.dataSource_vo import *
							 | 
						|
								from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel
							 | 
						|
								from config.constant import CommonConstant
							 | 
						|
								from exceptions.exception import ServiceException
							 | 
						|
								from module_admin.entity.vo.common_vo import CrudResponseModel
							 | 
						|
								from utils.common_util import CamelCaseUtil
							 | 
						|
								from module_admin.entity.vo.user_vo import CurrentUserModel
							 | 
						|
								from datetime import datetime
							 | 
						|
								import requests
							 | 
						|
								import json
							 | 
						|
								import re
							 | 
						|
								from config.enums import RedisInitKeyConfig
							 | 
						|
								from module_admin.service.metaprocessconfig_service import MetaprocessconfigService
							 | 
						|
								from config.env import AppConfig
							 | 
						|
								
							 | 
						|
								
							 | 
						|
								class MetataskService:
							 | 
						|
								    """
							 | 
						|
								    参数配置管理模块服务层
							 | 
						|
								    """
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def get_metatask_list_services(
							 | 
						|
								        cls, query_db: AsyncSession, query_object: MetataskQueryModel, is_page: bool = False
							 | 
						|
								    ):
							 | 
						|
								        """
							 | 
						|
								        获取参数配置列表信息service
							 | 
						|
								        :param query_db: orm对象
							 | 
						|
								        :param query_object: 查询参数对象
							 | 
						|
								        :return: 参数配置列表信息对象
							 | 
						|
								        """
							 | 
						|
								        metatask_list_result = await MetataskDao.get_metatask_list(query_db, query_object, is_page)
							 | 
						|
								
							 | 
						|
								        return metatask_list_result
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def get_data_source_tree(cls,request: Request, current_user: CurrentUserModel):
							 | 
						|
								        
							 | 
						|
								       url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100'
							 | 
						|
								       headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password,}
							 | 
						|
								       response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								       if response.reason == 'OK':
							 | 
						|
								           response_text = response.text
							 | 
						|
								           data = json.loads(response_text)
							 | 
						|
								           total_list = data["data"]["totalList"]
							 | 
						|
								           # 解析 connectionParams 字符串为字典
							 | 
						|
								           for item in total_list:
							 | 
						|
								               item["connectionParams"] = json.loads(item["connectionParams"])
							 | 
						|
								             # 使用 Pydantic 创建 DataSource 对象列表
							 | 
						|
								           data_sources = [DataSource(**item) for item in total_list]
							 | 
						|
								           return data_sources
							 | 
						|
								       else:
							 | 
						|
								           return {'error': f'Request failed with status code {response.status_code}'}
							 | 
						|
								    @classmethod
							 | 
						|
								    async def get_data_source_all(cls,request: Request,current_user: CurrentUserModel):
							 | 
						|
								    # Worker分组
							 | 
						|
								       url1 = f'{AppConfig.ds_server_url}/dolphinscheduler/worker-groups/all'
							 | 
						|
								    # 警告组 
							 | 
						|
								       url2= f'{AppConfig.ds_server_url}/dolphinscheduler/alert-groups/list'
							 | 
						|
								       # 工作环境 
							 | 
						|
								       url3 = f'{AppConfig.ds_server_url}/dolphinscheduler/environment/query-environment-list'
							 | 
						|
								       dataspurceVo=Datasouceall()
							 | 
						|
								       headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
							 | 
						|
								       response1 = requests.get(url1, headers=headers, verify=False)
							 | 
						|
								       response2 = requests.get(url2, headers=headers, verify=False)
							 | 
						|
								       response3 = requests.get(url3, headers=headers, verify=False)
							 | 
						|
								       if response1.reason == 'OK':
							 | 
						|
								           response_text = response1.text
							 | 
						|
								           data = json.loads(response_text)
							 | 
						|
								           total_list = data["data"]
							 | 
						|
								             # 使用 Pydantic 创建 DataSource 对象列表
							 | 
						|
								           data_sources = [WorkerGroup(name=item) for item in total_list]
							 | 
						|
								           dataspurceVo.workerGroup=data_sources
							 | 
						|
								       if response2.reason == 'OK':
							 | 
						|
								           response_text = response2.text
							 | 
						|
								           data = json.loads(response_text)
							 | 
						|
								           total_list = data["data"]
							 | 
						|
								             # 使用 Pydantic 创建 DataSource 对象列表
							 | 
						|
								           alertGroups = [AlertGroups(**item) for item in total_list]
							 | 
						|
								           dataspurceVo.alertGroups=alertGroups
							 | 
						|
								       if response3.reason == 'OK':
							 | 
						|
								           response_text = response3.text
							 | 
						|
								           data = json.loads(response_text)
							 | 
						|
								           total_list = data["data"] 
							 | 
						|
								           data_sources = [Environment(**item) for item in total_list]
							 | 
						|
								           dataspurceVo.environment=data_sources
							 | 
						|
								       return dataspurceVo
							 | 
						|
								    @classmethod
							 | 
						|
								    async def check_metatask_name_unique_services(cls, query_db: AsyncSession, page_object: MetataskModel):
							 | 
						|
								        """
							 | 
						|
								        校验参数键名是否唯一service
							 | 
						|
								
							 | 
						|
								        :param query_db: orm对象
							 | 
						|
								        :param page_object: 参数配置对象
							 | 
						|
								        :return: 校验结果
							 | 
						|
								        """
							 | 
						|
								        metatask_id = -1 if page_object.metatask_id is None else page_object.metatask_id
							 | 
						|
								
							 | 
						|
								        metatask = await MetataskDao.get_metatask_detail_by_info(
							 | 
						|
								            query_db, MetataskModel(metataskName=page_object.metatask_name)
							 | 
						|
								        )
							 | 
						|
								        if metatask and metatask.metatask_id != metatask_id:
							 | 
						|
								            return CommonConstant.NOT_UNIQUE
							 | 
						|
								        return CommonConstant.UNIQUE
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def add_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        新增参数配置信息service
							 | 
						|
								        :param request: Request对象
							 | 
						|
								        :param query_db: orm对象
							 | 
						|
								        :param page_object: 新增参数配置对象
							 | 
						|
								        :return: 新增参数配置校验结果
							 | 
						|
								        """
							 | 
						|
								
							 | 
						|
								        
							 | 
						|
								        if not await cls.check_metatask_name_unique_services(query_db, page_object):
							 | 
						|
								            raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,任务名已存在')
							 | 
						|
								        else:
							 | 
						|
								            try:
							 | 
						|
								                # 查询模板
							 | 
						|
								                processconfig = MetaprocessconfigQueryModel()
							 | 
						|
								                processconfig.ac_target=page_object.ac_target
							 | 
						|
								                processconfig.db_type=page_object.dbCode
							 | 
						|
								                processConfigList =await MetaprocessconfigService.get_metaprocessconfig_list_services(query_db,processconfig,False)
							 | 
						|
								                if processconfig.db_type=="MYSQL":
							 | 
						|
								                    message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object,current_user)
							 | 
						|
								                elif processconfig.db_type=="POSTGRESQL":  
							 | 
						|
								                    message= await cls.pg_process_defind_change_add(request,processConfigList,page_object,current_user)
							 | 
						|
								                elif processconfig.db_type=="DB2":  
							 | 
						|
								                    message= await cls.db2_process_defind_change_add(request,processConfigList,page_object,current_user) 
							 | 
						|
								                elif processconfig.db_type=="ORACLE":  
							 | 
						|
								                    message= await cls.oracle_process_defind_change_add(request,processConfigList,page_object,current_user)                     
							 | 
						|
								                elif processconfig.db_type=="HIVE":  
							 | 
						|
								                    message= await cls.hive_process_defind_change_add(request,processConfigList,page_object,current_user) 
							 | 
						|
								                else:
							 | 
						|
								                    raise ServiceException(message=f"不支持的数据库类型: {processconfig.db_type}")                                     
							 | 
						|
								                if "成功" not in message:
							 | 
						|
								                    await query_db.rollback()
							 | 
						|
								                    raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,dolphinscheduler创建失败')
							 | 
						|
								                await MetataskDao.add_metatask_dao(query_db, page_object)
							 | 
						|
								                await query_db.commit()
							 | 
						|
								
							 | 
						|
								                return CrudResponseModel(is_success=True, message=message)
							 | 
						|
								            except Exception as e:
							 | 
						|
								                await query_db.rollback()
							 | 
						|
								                raise e
							 | 
						|
								    @classmethod
							 | 
						|
								    async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        编辑参数配置信息service
							 | 
						|
								        :param request: Request对象
							 | 
						|
								        :param query_db: orm对象
							 | 
						|
								        :param page_object: 编辑参数配置对象
							 | 
						|
								        :return: 编辑参数配置校验结果
							 | 
						|
								        """
							 | 
						|
								        edit_metatask = page_object.model_dump(exclude_unset=True)
							 | 
						|
								        metatask_info = await cls.metatask_detail_services(query_db, page_object.metatask_id)
							 | 
						|
								        if metatask_info.metatask_id:
							 | 
						|
								            if not await cls.check_metatask_name_unique_services(query_db, page_object):
							 | 
						|
								                raise ServiceException(message=f'修改任务{page_object.metatask_name}失败,任务名称已存在')
							 | 
						|
								            else:
							 | 
						|
								                try:
							 | 
						|
								                    metatask_old = await cls.metatask_detail_services(query_db, metatask_info.metatask_id)
							 | 
						|
								                    if metatask_old.dbCode=="MYSQL": 
							 | 
						|
								                        message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
							 | 
						|
								                    elif metatask_old.dbCode=="POSTGRESQL": 
							 | 
						|
								                        message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
							 | 
						|
								                    elif metatask_old.dbCode=="DB2": 
							 | 
						|
								                        message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
							 | 
						|
								                    elif metatask_old.dbCode=="ORACLE": 
							 | 
						|
								                        message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
							 | 
						|
								                    elif metatask_old.dbCode=="HIVE": 
							 | 
						|
								                        message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
							 | 
						|
								                    else:
							 | 
						|
								                        raise ServiceException(message=f"不支持的数据库类型: {metatask_old.dbCode}")
							 | 
						|
								                    if "成功" not in message:
							 | 
						|
								                        await query_db.rollback()
							 | 
						|
								                        raise ServiceException(message=f'修改元数据任务{page_object.metatask_name}失败,dolphinscheduler修改失败')
							 | 
						|
								                    await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
							 | 
						|
								                    await query_db.commit()
							 | 
						|
								                    return CrudResponseModel(is_success=True, message='更新成功')
							 | 
						|
								                except Exception as e:
							 | 
						|
								                    await query_db.rollback()
							 | 
						|
								                    raise e
							 | 
						|
								        else:
							 | 
						|
								            raise ServiceException(message='更新失败')
							 | 
						|
								    @classmethod
							 | 
						|
								    async def mysql_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        mysql类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
							 | 
						|
								
							 | 
						|
								        # 新增接口
							 | 
						|
								        url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								        if response.reason == 'OK':
							 | 
						|
								           intdsids=[]
							 | 
						|
								           message=''
							 | 
						|
								           dstypes=[]
							 | 
						|
								           response_text = response.text
							 | 
						|
								           data = json.loads(response_text)
							 | 
						|
								           code_list = data["data"]      
							 | 
						|
								           str_list = list(map(str, code_list))            
							 | 
						|
								           for config in processConfigList:
							 | 
						|
								            # mysql表字段   
							 | 
						|
								            if config.ac_target=='0':
							 | 
						|
								              modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'mysql_conn'",  f"'{page_object.dbRName}'")
							 | 
						|
								              if page_object.dbSName:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("'dash_test_w'", f"'{page_object.dbSName}'")
							 | 
						|
								              else:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("t.table_schema = 'dash_test_w'",  "1=1")
							 | 
						|
								              modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
							 | 
						|
								              modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
							 | 
						|
								              metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								              form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								              response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
							 | 
						|
								            #   text=  '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
							 | 
						|
								              text= response_post0.text 
							 | 
						|
								              responsJson = json.loads(text)
							 | 
						|
								              if responsJson['msg'] == 'success':
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                dstypes.append('0')
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增成功"
							 | 
						|
								              else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增失败"
							 | 
						|
								            if config.ac_target=='1':
							 | 
						|
								               modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'mysql_conn'",  f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}")
							 | 
						|
								               if page_object.dbSName:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("'dash_test_w'", f"'{page_object.dbSName}'")
							 | 
						|
								               else:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("t.table_schema = 'dash_test_w'",  "1=1")
							 | 
						|
								               modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
							 | 
						|
								               modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
							 | 
						|
								               metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								               form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								               response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
							 | 
						|
								               #text=  '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
							 | 
						|
								               text= response_post0.text
							 | 
						|
								               responsJson = json.loads(text) 
							 | 
						|
								               if responsJson['msg'] == 'success':
							 | 
						|
								                dstypes.append('1')
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增成功"
							 | 
						|
								               else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增失败"
							 | 
						|
								           if len(intdsids)>0:
							 | 
						|
								              page_object.ds_ids=','.join([str(i) for i in intdsids])
							 | 
						|
								              page_object.ds_types=','.join([str(i) for i in dstypes])
							 | 
						|
								           return message  
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def mysql_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        mysql类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        dstypes=page_object.ds_types.split(",")
							 | 
						|
								        dsids=page_object.ds_ids.split(",")
							 | 
						|
								        result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
							 | 
						|
								        message=''
							 | 
						|
								        # 查询接口
							 | 
						|
								        url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        for config in result_list:
							 | 
						|
								            # mysql表字段
							 | 
						|
								            if config['dstype']=='0' :
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    # modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0)
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                    "name": item['name'],
							 | 
						|
								                    "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                    "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                    "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                    "conditionType":item['conditionType'] ,
							 | 
						|
								                    "conditionParams":item['conditionParams'] 
							 | 
						|
								                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                        
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改失败" 
							 | 
						|
								            if config['dstype']=='1':
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                                "name": item['name'],
							 | 
						|
								                                "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                                "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                                "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                                "conditionType":item['conditionType'] ,
							 | 
						|
								                                "conditionParams":item['conditionParams'] 
							 | 
						|
								                                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改失败"   
							 | 
						|
								        return message  
							 | 
						|
								    @classmethod
							 | 
						|
								    async def oracle_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        mysql类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        dstypes=page_object.ds_types.split(",")
							 | 
						|
								        dsids=page_object.ds_ids.split(",")
							 | 
						|
								        result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
							 | 
						|
								        message=''
							 | 
						|
								        # 查询接口
							 | 
						|
								        url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        for config in result_list:
							 | 
						|
								            # mysql表字段
							 | 
						|
								            if config['dstype']=='0' :
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                    "name": item['name'],
							 | 
						|
								                    "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                    "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                    "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                    "conditionType":item['conditionType'] ,
							 | 
						|
								                    "conditionParams":item['conditionParams'] 
							 | 
						|
								                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                        
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改失败" 
							 | 
						|
								            if config['dstype']=='1':
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                                "name": item['name'],
							 | 
						|
								                                "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                                "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                                "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                                "conditionType":item['conditionType'] ,
							 | 
						|
								                                "conditionParams":item['conditionParams'] 
							 | 
						|
								                                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改失败"   
							 | 
						|
								        return message  
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def oracle_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        mysql类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
							 | 
						|
								
							 | 
						|
								        # 新增接口
							 | 
						|
								        url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								        if response.reason == 'OK':
							 | 
						|
								           intdsids=[]
							 | 
						|
								           message=''
							 | 
						|
								           dstypes=[]
							 | 
						|
								           response_text = response.text
							 | 
						|
								           data = json.loads(response_text)
							 | 
						|
								           code_list = data["data"]      
							 | 
						|
								           str_list = list(map(str, code_list))            
							 | 
						|
								           for config in processConfigList:
							 | 
						|
								            # oracl表字段   
							 | 
						|
								            if config.ac_target=='0':
							 | 
						|
								              modified_json_str = config.taskDefinitionJson.replace("16699723296864", str_list[0]).replace("16699723296865", str_list[1]).replace("16699723296866", str_list[2]).replace("'ORCL_USER'",  f"'{page_object.dbSName}'").replace("'orcl_conn'",  f"'{page_object.dbRName}'")
							 | 
						|
								              if page_object.dbSName:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("'ORCL_USER'", f"'{page_object.dbSName}'")
							 | 
						|
								              else:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("t.owner = 'ORCL_USER'",  "1=1")
							 | 
						|
								              modified_json_str2=config.taskRelationJson.replace("16699723296864", str_list[0]).replace("16699723296865", str_list[1]).replace("16699723296866", str_list[2])
							 | 
						|
								              modified_json_str3=config.locations.replace("16699723296864", str_list[0]).replace("16699723296865", str_list[1]).replace("16699723296866", str_list[2])
							 | 
						|
								              metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								              form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								              response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
							 | 
						|
								            #   text=  '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
							 | 
						|
								              text= response_post0.text 
							 | 
						|
								              responsJson = json.loads(text)
							 | 
						|
								              if responsJson['msg'] == 'success':
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                dstypes.append('0')
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增成功"
							 | 
						|
								              else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增失败"
							 | 
						|
								            if config.ac_target=='1':
							 | 
						|
								               modified_json_str = config.taskDefinitionJson.replace("16699841738592", str_list[0]).replace("16699841738593", str_list[1]).replace("16699841738594", str_list[2]).replace("16699841738595", str_list[3]).replace("'orcl_conn'",  f"'{page_object.dbRName}'")
							 | 
						|
								               if page_object.dbSName:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("'ORCL_USER'", f"'{page_object.dbSName}'")
							 | 
						|
								               else:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("t.owner = 'ORCL_USER'",  "1=1")
							 | 
						|
								               modified_json_str2=config.taskRelationJson.replace("16699841738592", str_list[0]).replace("16699841738593", str_list[1]).replace("16699841738594", str_list[2]).replace("16699841738595", str_list[3])
							 | 
						|
								               modified_json_str3=config.locations.replace("16699841738592", str_list[0]).replace("16699841738593", str_list[1]).replace("16699841738594", str_list[2]).replace("16699841738595", str_list[3])
							 | 
						|
								               metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								               form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								               response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
							 | 
						|
								               #text=  '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
							 | 
						|
								               text= response_post0.text
							 | 
						|
								               responsJson = json.loads(text) 
							 | 
						|
								               if responsJson['msg'] == 'success':
							 | 
						|
								                dstypes.append('1')
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增成功"
							 | 
						|
								               else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增失败"
							 | 
						|
								           if len(intdsids)>0:
							 | 
						|
								              page_object.ds_ids=','.join([str(i) for i in intdsids])
							 | 
						|
								              page_object.ds_types=','.join([str(i) for i in dstypes])
							 | 
						|
								           return message  
							 | 
						|
								
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def db2_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        mysql类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
							 | 
						|
								
							 | 
						|
								        # 新增接口
							 | 
						|
								        url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								        if response.reason == 'OK':
							 | 
						|
								           intdsids=[]
							 | 
						|
								           message=''
							 | 
						|
								           dstypes=[]
							 | 
						|
								           response_text = response.text
							 | 
						|
								           data = json.loads(response_text)
							 | 
						|
								           code_list = data["data"]      
							 | 
						|
								           str_list = list(map(str, code_list))            
							 | 
						|
								           for config in processConfigList:
							 | 
						|
								            # db2表字段   
							 | 
						|
								            if config.ac_target=='0':
							 | 
						|
								              modified_json_str = config.taskDefinitionJson.replace("16699625480160", str_list[0]).replace("16699625480161", str_list[1]).replace("16699625480162", str_list[2]).replace("'db2_conn'",  f"'{page_object.dbRName}'")
							 | 
						|
								              if page_object.dbSName:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("'MYSCHEMA'", f"'{page_object.dbSName}'")
							 | 
						|
								              else:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("TABSCHEMA = 'MYSCHEMA'",  "1=1")
							 | 
						|
								              modified_json_str2=config.taskRelationJson.replace("16699625480160", str_list[0]).replace("16699625480161", str_list[1]).replace("16699625480162", str_list[2])
							 | 
						|
								              modified_json_str3=config.locations.replace("16699625480160", str_list[0]).replace("16699625480161", str_list[1]).replace("16699625480162", str_list[2])
							 | 
						|
								              metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								              form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								              response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
							 | 
						|
								              text= response_post0.text 
							 | 
						|
								              responsJson = json.loads(text)
							 | 
						|
								              if responsJson['msg'] == 'success':
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                dstypes.append('0')
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增成功"
							 | 
						|
								              else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增失败"
							 | 
						|
								            if config.ac_target=='1':
							 | 
						|
								               modified_json_str = config.taskDefinitionJson.replace("16699623866592", str_list[0]).replace("16699623866593", str_list[1]).replace("16699623866594", str_list[2]).replace("16699623866595", str_list[3]).replace("'db2_conn'",  f"'{page_object.dbRName}'")
							 | 
						|
								               if page_object.dbSName:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("'MYSCHEMA'", f"'{page_object.dbSName}'")
							 | 
						|
								               else:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("TABSCHEMA = 'MYSCHEMA'",  "1=1")
							 | 
						|
								               modified_json_str2=config.taskRelationJson.replace("16699623866592", str_list[0]).replace("16699623866593", str_list[1]).replace("16699623866594", str_list[2]).replace("16699623866595", str_list[3])
							 | 
						|
								               modified_json_str3=config.locations.replace("16699623866592", str_list[0]).replace("16699623866593", str_list[1]).replace("16699623866594", str_list[2]).replace("16699623866595", str_list[3])
							 | 
						|
								               metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								               form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								               response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
							 | 
						|
								               #text=  '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
							 | 
						|
								               text= response_post0.text
							 | 
						|
								               responsJson = json.loads(text) 
							 | 
						|
								               if responsJson['msg'] == 'success':
							 | 
						|
								                dstypes.append('1')
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增成功"
							 | 
						|
								               else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增失败"
							 | 
						|
								           if len(intdsids)>0:
							 | 
						|
								              page_object.ds_ids=','.join([str(i) for i in intdsids])
							 | 
						|
								              page_object.ds_types=','.join([str(i) for i in dstypes])
							 | 
						|
								           return message  
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def db2_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        mysql类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        dstypes=page_object.ds_types.split(",")
							 | 
						|
								        dsids=page_object.ds_ids.split(",")
							 | 
						|
								        result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
							 | 
						|
								        message=''
							 | 
						|
								        # 查询接口
							 | 
						|
								        url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        for config in result_list:
							 | 
						|
								            # mysql表字段
							 | 
						|
								            if config['dstype']=='0' :
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                    "name": item['name'],
							 | 
						|
								                    "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                    "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                    "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                    "conditionType":item['conditionType'] ,
							 | 
						|
								                    "conditionParams":item['conditionParams'] 
							 | 
						|
								                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                        
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改失败" 
							 | 
						|
								            if config['dstype']=='1':
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                                "name": item['name'],
							 | 
						|
								                                "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                                "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                                "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                                "conditionType":item['conditionType'] ,
							 | 
						|
								                                "conditionParams":item['conditionParams'] 
							 | 
						|
								                                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改失败"   
							 | 
						|
								        return message  
							 | 
						|
								
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def pg_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        POSTGRESQL类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
							 | 
						|
								
							 | 
						|
								        # 新增接口
							 | 
						|
								        url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								        if response.reason == 'OK':
							 | 
						|
								           intdsids=[]
							 | 
						|
								           message=''
							 | 
						|
								           dstypes=[]
							 | 
						|
								           response_text = response.text
							 | 
						|
								           data = json.loads(response_text)
							 | 
						|
								           code_list = data["data"]      
							 | 
						|
								           str_list = list(map(str, code_list))            
							 | 
						|
								           for config in processConfigList:
							 | 
						|
								            # POSTGRESQL表字段   
							 | 
						|
								            if config.ac_target=='0':
							 | 
						|
								              modified_json_str = config.taskDefinitionJson.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2]).replace("'pg_conn'",  f"'{page_object.dbRName}'")
							 | 
						|
								              if page_object.dbSName:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("'public'", f"'{page_object.dbSName}'")
							 | 
						|
								              else:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("n.nspname = 'public'",  "1=1")             
							 | 
						|
								              modified_json_str2=config.taskRelationJson.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2])
							 | 
						|
								              modified_json_str3=config.locations.replace("16688213802592", str_list[0]).replace("16688213802593", str_list[1]).replace("16688213802594", str_list[2])
							 | 
						|
								              metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								              form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								              response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
							 | 
						|
								              text= response_post0.text 
							 | 
						|
								              responsJson = json.loads(text)
							 | 
						|
								              if responsJson['msg'] == 'success':
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                dstypes.append('0')
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增成功"
							 | 
						|
								              else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增失败"
							 | 
						|
								            elif config.ac_target=='1':
							 | 
						|
								               modified_json_str = config.taskDefinitionJson.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3]).replace("'pg_conn'",  f"'{page_object.dbRName}'")
							 | 
						|
								               if page_object.dbSName:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("'public'", f"'{page_object.dbSName}'")
							 | 
						|
								               else:
							 | 
						|
								                    modified_json_str=modified_json_str.replace("n.nspname = 'public'",  "1=1")    
							 | 
						|
								               modified_json_str2=config.taskRelationJson.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3])
							 | 
						|
								               modified_json_str3=config.locations.replace("16688477124320", str_list[0]).replace("16688477124321", str_list[1]).replace("16688477124322", str_list[2]).replace("16688477124323", str_list[3])
							 | 
						|
								               metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								               form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								               response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
							 | 
						|
								               text= response_post0.text
							 | 
						|
								               responsJson = json.loads(text) 
							 | 
						|
								               if responsJson['msg'] == 'success':
							 | 
						|
								                dstypes.append('1')
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增成功"
							 | 
						|
								               else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增失败"
							 | 
						|
								           if len(intdsids)>0:
							 | 
						|
								              page_object.ds_ids=','.join([str(i) for i in intdsids])
							 | 
						|
								              page_object.ds_types=','.join([str(i) for i in dstypes])
							 | 
						|
								           return message  
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def pg_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        mysql类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        dstypes=page_object.ds_types.split(",")
							 | 
						|
								        dsids=page_object.ds_ids.split(",")
							 | 
						|
								        result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
							 | 
						|
								        message=''
							 | 
						|
								        # 查询接口
							 | 
						|
								        url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        for config in result_list:
							 | 
						|
								            # mysql表字段
							 | 
						|
								            if config['dstype']=='0' :
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                    "name": item['name'],
							 | 
						|
								                    "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                    "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                    "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                    "conditionType":item['conditionType'] ,
							 | 
						|
								                    "conditionParams":item['conditionParams'] 
							 | 
						|
								                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                        
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改失败" 
							 | 
						|
								            if config['dstype']=='1':
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                                "name": item['name'],
							 | 
						|
								                                "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                                "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                                "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                                "conditionType":item['conditionType'] ,
							 | 
						|
								                                "conditionParams":item['conditionParams'] 
							 | 
						|
								                                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改失败"   
							 | 
						|
								        return message  
							 | 
						|
								
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def hive_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        mysql类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
							 | 
						|
								
							 | 
						|
								        # 新增接口
							 | 
						|
								        url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								        if response.reason == 'OK':
							 | 
						|
								           intdsids=[]
							 | 
						|
								           message=''
							 | 
						|
								           dstypes=[]
							 | 
						|
								           response_text = response.text
							 | 
						|
								           data = json.loads(response_text)
							 | 
						|
								           code_list = data["data"]      
							 | 
						|
								           str_list = list(map(str, code_list))            
							 | 
						|
								           for config in processConfigList:
							 | 
						|
								            # mysql表字段   
							 | 
						|
								            if config.ac_target=='0':
							 | 
						|
								              modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'dash_test_w'",  f"'{page_object.dbSName}'").replace("'mysql_conn'",  f"'{page_object.dbRName}'")
							 | 
						|
								              modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
							 | 
						|
								              modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
							 | 
						|
								              metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								              form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								              response_post0 = requests.post(url2, headers=headers2, data=form_data, verify=False)
							 | 
						|
								            #   text=  '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
							 | 
						|
								              text= response_post0.text 
							 | 
						|
								              responsJson = json.loads(text)
							 | 
						|
								              if responsJson['msg'] == 'success':
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                dstypes.append('0')
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增成功"
							 | 
						|
								              else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-表字段采集新增失败"
							 | 
						|
								            if config.ac_target=='1':
							 | 
						|
								               modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'dash_test_w'",  f"'{page_object.dbSName}'").replace("'mysql_conn'",  f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}")
							 | 
						|
								               modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
							 | 
						|
								               modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
							 | 
						|
								               metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=modified_json_str3,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            timeout=config.timeout,
							 | 
						|
								                                                            globalParams =config.globalParams ,
							 | 
						|
								                                                            tenantCode =config.tenantCode ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            executionType =config.executionType ,
							 | 
						|
								                                                            releaseState=config.releaseState
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								               form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								               response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
							 | 
						|
								               #text=  '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
							 | 
						|
								               text= response_post0.text
							 | 
						|
								               responsJson = json.loads(text) 
							 | 
						|
								               if responsJson['msg'] == 'success':
							 | 
						|
								                dstypes.append('1')
							 | 
						|
								                intdsids.append(responsJson['data']['code'])
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增成功"
							 | 
						|
								               else: 
							 | 
						|
								                if message:
							 | 
						|
								                    message += ", "
							 | 
						|
								                message += page_object.metatask_name + "-存储过程采集新增失败"
							 | 
						|
								           if len(intdsids)>0:
							 | 
						|
								              page_object.ds_ids=','.join([str(i) for i in intdsids])
							 | 
						|
								              page_object.ds_types=','.join([str(i) for i in dstypes])
							 | 
						|
								           return message  
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def hive_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        mysql类型
							 | 
						|
								        """
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        dstypes=page_object.ds_types.split(",")
							 | 
						|
								        dsids=page_object.ds_ids.split(",")
							 | 
						|
								        result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
							 | 
						|
								        message=''
							 | 
						|
								        # 查询接口
							 | 
						|
								        url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
							 | 
						|
								        headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        for config in result_list:
							 | 
						|
								            # mysql表字段
							 | 
						|
								            if config['dstype']=='0' :
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                    "name": item['name'],
							 | 
						|
								                    "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                    "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                    "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                    "conditionType":item['conditionType'] ,
							 | 
						|
								                    "conditionParams":item['conditionParams'] 
							 | 
						|
								                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                        
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-表字段采集修改失败" 
							 | 
						|
								            if config['dstype']=='1':
							 | 
						|
								                response = requests.get(f"{url}/{config['dsid']}", headers=headers)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text) 
							 | 
						|
								                if responsJson['msg'] == 'success':
							 | 
						|
								                    modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'",  f"'{page_object.dbSName}'")
							 | 
						|
								                    getTaskRelationList=responsJson['data']['processTaskRelationList']
							 | 
						|
								                    putTaskRelationList=[]
							 | 
						|
								                    for item in getTaskRelationList:
							 | 
						|
								                        new_item = {
							 | 
						|
								                                "name": item['name'],
							 | 
						|
								                                "preTaskCode":item['preTaskCode'] ,
							 | 
						|
								                                "preTaskVersion":item['preTaskVersion'] ,
							 | 
						|
								                                "postTaskCode":item['postTaskCode'] ,
							 | 
						|
								                                "conditionType":item['conditionType'] ,
							 | 
						|
								                                "conditionParams":item['conditionParams'] 
							 | 
						|
								                                    }
							 | 
						|
								                        putTaskRelationList.append(new_item)
							 | 
						|
								                    modified_json_str2=  json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
							 | 
						|
								                    modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
							 | 
						|
								                    metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
							 | 
						|
								                                                            description=page_object.remark, # 替换工作流备注
							 | 
						|
								                                                            locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
							 | 
						|
								                                                            name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
							 | 
						|
								                                                            tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
							 | 
						|
								                                                            taskRelationJson  =modified_json_str2,# 替换taskRelationJson
							 | 
						|
								                                                            ).model_dump(exclude_unset=True, by_alias=True)
							 | 
						|
								                    form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                    response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
							 | 
						|
								                    putText= response_put0.text
							 | 
						|
								                    responsPutJson=json.loads(putText) 
							 | 
						|
								                    if responsPutJson['msg'] == 'success':
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改成功"
							 | 
						|
								                    else:
							 | 
						|
								                        if message:
							 | 
						|
								                            message += ", "
							 | 
						|
								                        message += page_object.metatask_name + "-存储过程采集修改失败"   
							 | 
						|
								        return message  
							 | 
						|
								
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def up_or_down_metatask_services(
							 | 
						|
								        cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str
							 | 
						|
								    ):
							 | 
						|
								        metatask_info = await cls.metatask_detail_services(query_db, id)
							 | 
						|
								        metatask_info.update_by = current_user.user.user_name
							 | 
						|
								        metatask_info.update_time = datetime.now()
							 | 
						|
								        type_str: str
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        dstypes=metatask_info.ds_types.split(",")
							 | 
						|
								        dsids=metatask_info.ds_ids.split(",")
							 | 
						|
								        result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
							 | 
						|
								        message=''
							 | 
						|
								        # 查询接口
							 | 
						|
								        url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        for config in result_list:
							 | 
						|
								            # mysql表字段
							 | 
						|
								            if config['dstype']=='0' :
							 | 
						|
								                metaprocessconfig_dict = {
							 | 
						|
								                'name': metatask_info.metatask_name+'-表字段采集',
							 | 
						|
								                'releaseState': type
							 | 
						|
								                }
							 | 
						|
								                form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                response = requests.post(f"{url}/{config['dsid']}/release", headers=headers, data=form_data, verify=False)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text)
							 | 
						|
								                if responsJson['success'] is True:
							 | 
						|
								                    message='成功!'
							 | 
						|
								                else:
							 | 
						|
								                    raise ServiceException(message='失败'+responsJson['msg'])
							 | 
						|
								                 # mysql表字段
							 | 
						|
								            if config['dstype']=='1' :
							 | 
						|
								                metaprocessconfig_dict = {
							 | 
						|
								                'name': metatask_info.metatask_name+'-存储过程采集',
							 | 
						|
								                'releaseState': type
							 | 
						|
								                }
							 | 
						|
								                form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
							 | 
						|
								                response = requests.post(f"{url}/{config['dsid']}/release", headers=headers,data=form_data,verify=False)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text)
							 | 
						|
								                if responsJson['success'] is True:
							 | 
						|
								                    message='成功!'
							 | 
						|
								                else:
							 | 
						|
								                    raise ServiceException(message='失败'+responsJson['msg'])
							 | 
						|
								        if type == 'OFFLINE':
							 | 
						|
								            # 下线
							 | 
						|
								            type_str = '下线'
							 | 
						|
								            metatask_info.status = 'OFFLINE'
							 | 
						|
								        else:
							 | 
						|
								            # 上线
							 | 
						|
								            type_str = '上线'
							 | 
						|
								            metatask_info.status = 'ONLINE'
							 | 
						|
								
							 | 
						|
								        edit_metatask = metatask_info.model_dump(exclude_unset=True)
							 | 
						|
								        try:
							 | 
						|
								            await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
							 | 
						|
								            await query_db.commit()
							 | 
						|
								            return CrudResponseModel(is_success=True, message=message)
							 | 
						|
								        except Exception as e:
							 | 
						|
								            await query_db.rollback()
							 | 
						|
								            raise e
							 | 
						|
								        else:
							 | 
						|
								            raise ServiceException(message='更新失败')
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def run_metatask_services(
							 | 
						|
								        cls, request: Request, query_db: AsyncSession,  process: ProcessDefinition,current_user: CurrentUserModel
							 | 
						|
								    ):
							 | 
						|
								        process.failureStrategy='CONTINUE'
							 | 
						|
								        process.warningType='NONE'
							 | 
						|
								        process.execType='START_PROCESS'
							 | 
						|
								        process.taskDependType='TASK_POST'
							 | 
						|
								        process.complementDependentMode ='OFF_MODE'
							 | 
						|
								        process.runMode='RUN_MODE_SERIAL'
							 | 
						|
								        process.processInstancePriority='MEDIUM'
							 | 
						|
								        process.dryRun=0
							 | 
						|
								        process.scheduleTime="{complementStartDate:'2025-01-12 00:00:00',complementEndDate:'2025-01-12 00:00:00'}"
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/executors/start-process-instance'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        # form_data = {key: str(value) for key, value in process.__dict__.items()}
							 | 
						|
								        form_data = {key: value for key, value in process.__dict__.items()}
							 | 
						|
								
							 | 
						|
								        response = requests.post(url, headers=headers, data=form_data, verify=False)
							 | 
						|
								        text= response.text
							 | 
						|
								        responsJson = json.loads(text)
							 | 
						|
								        if responsJson['success'] is True:
							 | 
						|
								           return "运行成功!"
							 | 
						|
								       
							 | 
						|
								        else:
							 | 
						|
								            raise ServiceException(message='运行失败!')
							 | 
						|
								    @classmethod
							 | 
						|
								    async def ds_metatask_services(
							 | 
						|
								        cls, request: Request, query_db: AsyncSession,  process: ParmScheduleVo,current_user: CurrentUserModel
							 | 
						|
								    ):
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        # 先查询是否建立定时任务
							 | 
						|
								        getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode)
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        getdsresponse = requests.get(getdsurl, headers=headers, verify=False)
							 | 
						|
								        getdstext= getdsresponse.text
							 | 
						|
								        responsJson = json.loads(getdstext) 
							 | 
						|
								        if responsJson['msg'] == 'success':
							 | 
						|
								            if responsJson['data']['total']>0:
							 | 
						|
								            #    getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4)
							 | 
						|
								               getds_json_list = responsJson['data']['totalList']
							 | 
						|
								               for item in getds_json_list:
							 | 
						|
								                  if item['releaseState']=='ONLINE':
							 | 
						|
								                    # 先下线在删除
							 | 
						|
								                        offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline"
							 | 
						|
								                        offresponse = requests.post(offdsurl, headers=headers, verify=False)
							 | 
						|
								                #   删除对应的调度
							 | 
						|
								                  deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}"
							 | 
						|
								                  delresponse = requests.delete(deldsurl, headers=headers, verify=False)
							 | 
						|
								            
							 | 
						|
								        parm =ParmSchedule()
							 | 
						|
								        parm.failureStrategy='CONTINUE'
							 | 
						|
								        parm.warningType='NONE'
							 | 
						|
								        parm.warningGroupId=process.warningGroupId
							 | 
						|
								        parm.workerGroup=process.workerGroup
							 | 
						|
								        parm.processDefinitionCode =process.processDefinitionCode
							 | 
						|
								        parm.environmentCode=process.environmentCode
							 | 
						|
								        parm.processInstancePriority='MEDIUM'
							 | 
						|
								        parm.schedule = (
							 | 
						|
								    '{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') +
							 | 
						|
								    '", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') +
							 | 
						|
								    '", "crontab":"' + process.crontab +
							 | 
						|
								    '", "timezoneId":"Asia/Shanghai"}') 
							 | 
						|
								        url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules'
							 | 
						|
								        # form_data = {key: str(value) for key, value in process.__dict__.items()}
							 | 
						|
								        form_data = {key: value for key, value in parm.__dict__.items()}
							 | 
						|
								        response = requests.post(url, headers=headers, data=form_data, verify=False)
							 | 
						|
								        text= response.text
							 | 
						|
								        responsJson = json.loads(text)
							 | 
						|
								        if responsJson['msg'] == 'success':
							 | 
						|
								            scheduleId= responsJson['data']['id']
							 | 
						|
								            ondsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{scheduleId}/online"
							 | 
						|
								            ondsurl = requests.post(ondsurl, headers=headers, verify=False)
							 | 
						|
								            metatask_info = await cls.metatask_detail_services(query_db, process.metaTaskId)
							 | 
						|
								            metatask_info.schId=scheduleId
							 | 
						|
								            metatask_info = metatask_info.model_dump(exclude_unset=True)
							 | 
						|
								            await MetataskDao.edit_metatask_dao(query_db, metatask_info)
							 | 
						|
								            await query_db.commit()
							 | 
						|
								            return "调度运行成功!"
							 | 
						|
								        else:
							 | 
						|
								            raise ServiceException(message='运行失败!')
							 | 
						|
								    @classmethod
							 | 
						|
								    async def ds_metatask_detail(
							 | 
						|
								        cls, request: Request, query_db: AsyncSession,  process: ParmScheduleVo,current_user: CurrentUserModel
							 | 
						|
								    ):
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        # 先查询是否建立定时任务
							 | 
						|
								        getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode)
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        getdsresponse = requests.get(getdsurl, headers=headers, verify=False)
							 | 
						|
								        getdstext= getdsresponse.text
							 | 
						|
								        processVo=ParmScheduleVo()
							 | 
						|
								        responsJson = json.loads(getdstext) 
							 | 
						|
								        if responsJson['msg'] == 'success':
							 | 
						|
								            if responsJson['data']['total']>0:
							 | 
						|
								            #    getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4)
							 | 
						|
								               getds_json_list = responsJson['data']['totalList']
							 | 
						|
								               for item in getds_json_list:
							 | 
						|
								                      processVo.crontab=item['crontab']
							 | 
						|
								                      processVo.beginTime=item['startTime']
							 | 
						|
								                      processVo.endTime=item['endTime']
							 | 
						|
								                      processVo.workerGroup=item['workerGroup']
							 | 
						|
								                      processVo.warningGroupId=item['warningGroupId']
							 | 
						|
								                      processVo.environmentCode=item['environmentCode']
							 | 
						|
								        return processVo
							 | 
						|
								 
							 | 
						|
								    @classmethod
							 | 
						|
								    async def ds_metatask_delete(
							 | 
						|
								        cls, request: Request, query_db: AsyncSession,  process: ParmScheduleVo,current_user: CurrentUserModel
							 | 
						|
								    ):
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        # 先查询是否建立定时任务
							 | 
						|
								        getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode)
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
							 | 
						|
								        getdsresponse = requests.get(getdsurl, headers=headers, verify=False)
							 | 
						|
								        getdstext= getdsresponse.text
							 | 
						|
								        responsJson = json.loads(getdstext) 
							 | 
						|
								        if responsJson['msg'] == 'success':
							 | 
						|
								            if responsJson['data']['total']>0:
							 | 
						|
								            #    getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4)
							 | 
						|
								               getds_json_list = responsJson['data']['totalList']
							 | 
						|
								               for item in getds_json_list:
							 | 
						|
								                  if item['releaseState']=='ONLINE':
							 | 
						|
								                    # 先下线在删除
							 | 
						|
								                        offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline"
							 | 
						|
								                        offresponse = requests.post(offdsurl, headers=headers, verify=False)
							 | 
						|
								                #   删除对应的调度
							 | 
						|
								                  deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}"
							 | 
						|
								                  delresponse = requests.delete(deldsurl, headers=headers, verify=False)
							 | 
						|
								                  deldstext= delresponse.text
							 | 
						|
								                  delresponsJson = json.loads(deldstext) 
							 | 
						|
								                  if delresponsJson['msg'] == 'success':
							 | 
						|
								                        metatask_info = await cls.metatask_detail_services(query_db, process.metaTaskId)
							 | 
						|
								                        metatask_info.schId=""
							 | 
						|
								                        metatask_info = metatask_info.model_dump(exclude_unset=True)
							 | 
						|
								                        await MetataskDao.edit_metatask_dao(query_db, metatask_info)
							 | 
						|
								                        await query_db.commit()   
							 | 
						|
								                        return "调度删除成功!"    
							 | 
						|
								        return "调度删除成功!" 
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def get_metatask_logs_services(
							 | 
						|
								        cls, request: Request, current_user: CurrentUserModel, query_db: AsyncSession, id: str
							 | 
						|
								    ):
							 | 
						|
								        metatask_info = await cls.metatask_detail_services(query_db, id)
							 | 
						|
								        metatask_info.update_by = current_user.user.user_name
							 | 
						|
								        metatask_info.update_time = datetime.now()
							 | 
						|
								        # 运行中
							 | 
						|
								        metatask_info = '3'
							 | 
						|
								        edit_metatask = metatask_info.model_dump(exclude_unset=True)
							 | 
						|
								        try:
							 | 
						|
								            await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
							 | 
						|
								            await query_db.commit()
							 | 
						|
								            return CrudResponseModel(is_success=True, message=metatask_info.metatask_name + '任务:' '日志获取成功')
							 | 
						|
								        except Exception as e:
							 | 
						|
								            await query_db.rollback()
							 | 
						|
								            raise e
							 | 
						|
								        else:
							 | 
						|
								            raise ServiceException(message='更新失败')
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def delete_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel,current_user: CurrentUserModel):
							 | 
						|
								        """
							 | 
						|
								        删除参数配置信息service
							 | 
						|
								        :param request: Request对象
							 | 
						|
								        :param query_db: orm对象
							 | 
						|
								        :param page_object: 删除参数配置对象
							 | 
						|
								        :return: 删除参数配置校验结果
							 | 
						|
								        """
							 | 
						|
								     
							 | 
						|
								        if page_object.metatask_ids and page_object.ds_ids:
							 | 
						|
								            metatask_id_list = page_object.metatask_ids.split(',')
							 | 
						|
								            try:
							 | 
						|
								                projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								                # 查询接口
							 | 
						|
								                url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete'
							 | 
						|
								                form_data={'codes':page_object.ds_ids}
							 | 
						|
								
							 | 
						|
								                headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}                   
							 | 
						|
								                response = requests.post(url, headers=headers, data=form_data, verify=False)
							 | 
						|
								                text= response.text
							 | 
						|
								                responsJson = json.loads(text)
							 | 
						|
								                if responsJson['success'] is True:  
							 | 
						|
								                    for metatask_id in metatask_id_list:
							 | 
						|
								                        await MetataskDao.delete_metatask_dao(query_db, MetataskModel(metataskId=int(metatask_id)))
							 | 
						|
								                    await query_db.commit()
							 | 
						|
								                    return CrudResponseModel(is_success=True, message='删除成功')
							 | 
						|
								                else :
							 | 
						|
								                        raise  ServiceException(message='ds删除失败')
							 | 
						|
								            except Exception as e:
							 | 
						|
								                await query_db.rollback()
							 | 
						|
								                raise e
							 | 
						|
								        else:
							 | 
						|
								            raise ServiceException(message='传入参数配置id为空')
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def metatask_detail_services(cls, query_db: AsyncSession, metatask_id: int):
							 | 
						|
								        """
							 | 
						|
								        获取参数配置详细信息service
							 | 
						|
								
							 | 
						|
								        :param query_db: orm对象
							 | 
						|
								        :param metatask_id: 参数配置id
							 | 
						|
								        :return: 参数配置id对应的信息
							 | 
						|
								        """
							 | 
						|
								        metatask = await MetataskDao.get_metatask_detail_by_id(query_db, metatask_id=metatask_id)
							 | 
						|
								        if metatask:
							 | 
						|
								            result = MetataskModel(**CamelCaseUtil.transform_result(metatask))
							 | 
						|
								        else:
							 | 
						|
								            result = MetataskModel(**dict())
							 | 
						|
								
							 | 
						|
								        return result
							 | 
						|
								    @classmethod
							 | 
						|
								    async def get_process_instances_services(
							 | 
						|
								        cls, request: Request, query_object: ProcessInstancePage,current_user: CurrentUserModel
							 | 
						|
								    ):
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances?pageNo={query_object.page_num}&pageSize={query_object.page_size}&searchVal={query_object.searchVal}'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
							 | 
						|
								        response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								        try:
							 | 
						|
								            
							 | 
						|
								            if response.reason == 'OK':
							 | 
						|
								                response_text = response.text
							 | 
						|
								                data = json.loads(response_text)
							 | 
						|
								                total_list = data["data"]["totalList"]       
							 | 
						|
								                # data_sources = [ProcessInstance(**item) for item in total_list]
							 | 
						|
								                pageData = PageResponseModel(rows=total_list,total=data["data"]["total"])
							 | 
						|
								                return pageData
							 | 
						|
								            else:
							 | 
						|
								                return {'error': f'Request failed with status code {response.status_code}'}
							 | 
						|
								        except Exception as e:
							 | 
						|
								            raise e
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def get_task_nodes_services(
							 | 
						|
								cls, request: Request,id:int,current_user: CurrentUserModel
							 | 
						|
								    ):
							 | 
						|
								        projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
							 | 
						|
								        url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances/{id}/tasks'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
							 | 
						|
								        response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								        try:
							 | 
						|
								            response = requests.get(url, headers=headers,verify=False)
							 | 
						|
								            
							 | 
						|
								            if response.reason == 'OK':
							 | 
						|
								                response_text = response.text
							 | 
						|
								                data = json.loads(response_text)
							 | 
						|
								                total_list = data["data"]["taskList"]       
							 | 
						|
								                data_sources = [TaskNode(**item) for item in total_list]
							 | 
						|
								                return data_sources
							 | 
						|
								            else:
							 | 
						|
								                return {'error': f'Request failed with status code {response.status_code}'}
							 | 
						|
								        except Exception as e:
							 | 
						|
								            raise e
							 | 
						|
								
							 | 
						|
								    @classmethod
							 | 
						|
								    async def get_log_details_services(
							 | 
						|
								    cls, request: Request,id:int,current_user: CurrentUserModel
							 | 
						|
								    ):
							 | 
						|
								        url = f'{AppConfig.ds_server_url}/dolphinscheduler/log/detail?taskInstanceId={id}&limit=1000&skipLineNum=0'
							 | 
						|
								        headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
							 | 
						|
								        # response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								        try:
							 | 
						|
								            response = requests.get(url, headers=headers, verify=False)
							 | 
						|
								            
							 | 
						|
								            if response.reason == 'OK':
							 | 
						|
								                response_text = response.text
							 | 
						|
								                data = json.loads(response_text)
							 | 
						|
								                logMessage = data["data"]["message"]       
							 | 
						|
								                return logMessage
							 | 
						|
								            else:
							 | 
						|
								                return {'error': f'Request failed with status code {response.status_code}'}
							 | 
						|
								        except Exception as e:
							 | 
						|
								            raise e 
							 | 
						|
								
							 |