from sqlalchemy.ext.asyncio import AsyncSession from module_admin.dao.metadata_config_dao import MetadataConfigDao from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.metadata_config_vo import MetadataClasModel,SecuBizConfigAddModel,SecuBizConfigRelaModel,\ SecuBizPermiConfigBatchModel,MetadataClasPageQueryModel,MetadataSecModel,MetadataSecPageQueryModel,SecuBizConfigModel ,\ SecuBizConfigQueryModel,SecuBizPermiConfigModel, SecuBizPermiConfigQueryModel,\ TaskBizConfigAddModel, TaskBizConfigQueryModel,TaskBizConfigModel,TaskBizConfigRelaModel from utils.common_util import CamelCaseUtil from module_admin.entity.do.metadata_config_do import SecuBizConfigRela,TaskBizConfigRela # ORM 类 from exceptions.exception import ServiceException import uuid from module_admin.entity.vo.dataSource_vo import ProcessDefinition,ParmScheduleVo,ProcessInstancePage,ParmSchedule from typing import List from datetime import datetime from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel from module_admin.service.metaprocessconfig_service import MetaprocessconfigService import requests from fastapi import Request,Depends import json import re from config.enums import RedisInitKeyConfig from config.env import AppConfig from module_admin.entity.vo.user_vo import CurrentUserModel from module_admin.entity.vo.metatask_vo import DeleteMetataskModel from datetime import datetime class MetadataConfigService: """ 元数据分类管理 Service """ @classmethod async def get_metadata_clas_list_services( cls, query_db: AsyncSession, query_object: MetadataClasPageQueryModel, is_page: bool = False ): """ 查询元数据分类列表 """ result = await MetadataConfigDao.get_metadata_clas_list(query_db, query_object, is_page) return result @classmethod async def add_metadata_clas_services( cls, query_db: AsyncSession, page_object: MetadataClasModel ): """ 新增元数据分类 """ try: await MetadataConfigDao.add_metadata_clas_dao(query_db, page_object) await query_db.commit() return CrudResponseModel(is_success=True, message="新增成功") except Exception as e: await query_db.rollback() raise e @classmethod async def edit_metadata_clas_services( cls, query_db: AsyncSession, page_object: MetadataClasModel ): """ 编辑元数据分类 """ edit_data = page_object.model_dump(exclude_unset=True) info = await cls.get_metadata_clas_detail_services(query_db, page_object.clas_onum) if info.clas_onum: try: await MetadataConfigDao.edit_metadata_clas_dao(query_db, page_object.clas_onum, edit_data) await query_db.commit() return CrudResponseModel(is_success=True, message="更新成功") except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message="元数据标签不存在") @classmethod async def delete_metadata_clas_services( cls, query_db: AsyncSession, clas_ids: str ): """ 删除元数据分类(支持多个ID用逗号分隔) """ if not clas_ids: raise ServiceException(message="传入的分类ID为空") id_list = [int(id_str) for id_str in clas_ids.split(",") if id_str.isdigit()] if not id_list: raise ServiceException(message="无效的分类ID列表") try: await MetadataConfigDao.delete_metadata_clas_dao(query_db, id_list) await query_db.commit() return CrudResponseModel(is_success=True, message="删除成功") except Exception as e: await query_db.rollback() raise e @classmethod async def get_metadata_clas_detail_services(cls, query_db: AsyncSession, clas_id: int): """ 查询元数据分类详情 """ result = await MetadataConfigDao.get_clas_detail_by_id(query_db, clas_id) if result: return MetadataClasModel(**CamelCaseUtil.transform_result(result)) else: return MetadataClasModel(**dict()) @classmethod async def get_metadata_sec_list_services( cls, query_db: AsyncSession, query_object: MetadataSecPageQueryModel, is_page: bool = False ): """ 查询数据安全配置列表 """ result = await MetadataConfigDao.get_metadata_sec_list(query_db, query_object, is_page) return result @classmethod async def add_metadata_sec_services(cls, query_db: AsyncSession, page_object: MetadataSecModel): """ 新增数据安全配置 """ try: page_object.onum=str(uuid.uuid4()) await MetadataConfigDao.add_metadata_sec_dao(query_db, page_object) await query_db.commit() return CrudResponseModel(is_success=True, message="新增成功") except Exception as e: await query_db.rollback() raise e @classmethod async def edit_metadata_sec_services(cls, query_db: AsyncSession, page_object: MetadataSecModel): """ 编辑数据安全配置 """ edit_data = page_object.model_dump(exclude_unset=True) info = await cls.get_metadata_sec_detail_services(query_db, page_object.onum) if info.onum: try: await MetadataConfigDao.edit_metadata_sec_dao(query_db, page_object.onum, edit_data) await query_db.commit() return CrudResponseModel(is_success=True, message="更新成功") except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message="数据安全配置不存在") @classmethod async def delete_metadata_sec_services(cls, query_db: AsyncSession, onum_list: str): """ 删除数据安全配置(支持多个编号用逗号分隔) """ if not onum_list: raise ServiceException(message="传入的编号为空") id_list = [onum.strip() for onum in onum_list.split(",") if onum.strip()] if not id_list: raise ServiceException(message="无效的编号列表") try: await MetadataConfigDao.delete_metadata_sec_dao(query_db, id_list) await query_db.commit() return CrudResponseModel(is_success=True, message="删除成功") except Exception as e: await query_db.rollback() raise e @classmethod async def get_metadata_sec_detail_services(cls, query_db: AsyncSession, onum: str): """ 查询数据安全配置详情 """ result = await MetadataConfigDao.get_sec_detail_by_id(query_db, onum) if result: return MetadataSecModel(**CamelCaseUtil.transform_result(result)) else: return MetadataSecModel(**dict()) # ----------- t_secu_biz_config 服务方法 ----------- @classmethod async def get_biz_config_list_services( cls, db: AsyncSession, query_object: SecuBizConfigQueryModel, is_page: bool = False ): result = await MetadataConfigDao.get_biz_config_list(db, query_object, is_page) return result @classmethod async def add_biz_config_services(cls, db: AsyncSession, page_object: SecuBizConfigAddModel): try: configMode = SecuBizConfigModel() configMode.onum=page_object.onum configMode.biz_name=page_object.biz_name configMode.risk_lvl=page_object.risk_lvl configMode.isStop=page_object.isStop configMode.create_by=page_object.create_by configMode.update_by=page_object.update_by configMode.update_time=page_object.update_time configMode.create_time=page_object.create_time obj=await MetadataConfigDao.add_biz_config(db, configMode) records: List[SecuBizConfigRela] = [] for tab_onum in page_object.tab_onum_list: record = SecuBizConfigRela() record.biz_onum = obj.onum record.tab_onum = tab_onum record.create_by = obj.create_by record.create_time = obj.create_time records.append(record) await MetadataConfigDao.add_batch_biz_rela_dao(db,records) await db.commit() return CrudResponseModel(is_success=True, message="新增成功") except Exception as e: await db.rollback() raise e @classmethod async def edit_biz_config_services(cls, db: AsyncSession, page_object: SecuBizConfigAddModel): configMode = SecuBizConfigModel() configMode.onum=page_object.onum configMode.biz_name=page_object.biz_name configMode.risk_lvl=page_object.risk_lvl configMode.isStop=page_object.isStop configMode.create_by=page_object.create_by configMode.update_by=page_object.update_by configMode.update_time=page_object.update_time configMode.create_time=page_object.create_time edit_data = configMode.model_dump(exclude_unset=True) info = await cls.get_biz_config_detail_services(db, page_object.onum) if info.onum: try: await MetadataConfigDao.edit_biz_config(db, page_object.onum, edit_data) await MetadataConfigDao.delete_biz_rela_dao(db, page_object.onum) records: List[SecuBizConfigRela] = [] for tab_onum in page_object.tab_onum_list: record = SecuBizConfigRela() record.biz_onum = page_object.onum record.tab_onum = tab_onum record.create_by = page_object.update_by record.create_time = page_object.update_time records.append(record) await MetadataConfigDao.add_batch_biz_rela_dao(db,records) await db.commit() return CrudResponseModel(is_success=True, message="更新成功") except Exception as e: await db.rollback() raise e else: raise ServiceException(message="业务配置不存在") @classmethod async def delete_biz_config_services(cls, db: AsyncSession, onum_list: str): if not onum_list: raise ServiceException(message="传入的编号为空") id_list = [onum.strip() for onum in onum_list.split(",") if onum.strip()] if not id_list: raise ServiceException(message="无效的编号列表") try: await MetadataConfigDao.delete_biz_config(db, id_list) await MetadataConfigDao.delete_biz_rela_batch(db, id_list) await db.commit() return CrudResponseModel(is_success=True, message="删除成功") except Exception as e: await db.rollback() raise e @classmethod async def get_biz_config_detail_services(cls, db: AsyncSession, onum: str): result = await MetadataConfigDao.get_biz_config_detail_by_id(db, onum) if result: return SecuBizConfigModel(**CamelCaseUtil.transform_result(result)) else: return SecuBizConfigModel(**dict()) # ----------- t_secu_biz_permi_config 服务方法 ----------- @classmethod async def get_biz_permi_config_list_services( cls, db: AsyncSession, query_object: SecuBizPermiConfigQueryModel, is_page: bool = False ): result = await MetadataConfigDao.get_biz_permi_config_list(db, query_object, is_page) return result @classmethod async def add_biz_permi_config_services( cls, db: AsyncSession, batch_model: SecuBizPermiConfigBatchModel, create_by: str, create_time: datetime ) -> CrudResponseModel: try: records: List[SecuBizPermiConfigModel] = [] for biz_onum in batch_model.biz_onum_list: record = SecuBizPermiConfigModel() record.biz_onum = biz_onum record.obj_type = batch_model.obj_type record.obj_value = batch_model.obj_value record.obj_name = batch_model.obj_name record.isStop = batch_model.isStop record.create_by = create_by record.create_time = create_time records.append(record) await MetadataConfigDao.add_biz_permi_config_batch(db, records) await db.commit() return CrudResponseModel(is_success=True, message="新增成功") except Exception as e: await db.rollback() raise e @classmethod async def edit_biz_permi_config_services(cls, db: AsyncSession, page_object: SecuBizPermiConfigModel): edit_data = page_object.model_dump(exclude_unset=True) info = await cls.get_biz_permi_config_detail_services(db, page_object.onum) if info.onum: try: await MetadataConfigDao.edit_biz_permi_config(db, page_object.onum, edit_data) await db.commit() return CrudResponseModel(is_success=True, message="更新成功") except Exception as e: await db.rollback() raise e else: raise ServiceException(message="权限配置不存在") @classmethod async def delete_biz_permi_config_services(cls, db: AsyncSession, onum_list: str): if not onum_list: raise ServiceException(message="传入的编号为空") id_list = [onum.strip() for onum in onum_list.split(",") if onum.strip()] if not id_list: raise ServiceException(message="无效的编号列表") try: await MetadataConfigDao.delete_biz_permi_config(db, id_list) await db.commit() return CrudResponseModel(is_success=True, message="删除成功") except Exception as e: await db.rollback() raise e @classmethod async def get_biz_permi_config_detail_services(cls, db: AsyncSession, onum: str): result = await MetadataConfigDao.get_biz_permi_config_detail_by_id(db, onum) if result: return SecuBizPermiConfigModel(**CamelCaseUtil.transform_result(result)) else: return SecuBizPermiConfigModel(**dict()) @classmethod async def get_biz_config_rela_list_services(cls, result_db: AsyncSession, biz_onum: int): ai_session_list = await MetadataConfigDao.get_biz_rela_by_biz_id(result_db, biz_onum) # 查询最新的20条 return CamelCaseUtil.transform_result(ai_session_list) # ----------元数据标签调度任务 ---------- @classmethod async def get_task_biz_config_list_services( cls, db: AsyncSession, query_object: TaskBizConfigQueryModel, is_page: bool = False ): return await MetadataConfigDao.get_task_biz_config_list(db, query_object, is_page) @classmethod async def add_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel, current_user: CurrentUserModel, request: Request): try: config = TaskBizConfigModel() config.onum = page_object.onum config.biz_name = page_object.biz_name config.risk_lvl = page_object.risk_lvl # config.status = page_object.status # 映射 data_sec_lvl # config.ds_ids = page_object.ds_ids # 映射 applyType config.isStop = page_object.isStop config.create_by = page_object.create_by config.create_time = page_object.create_time config.update_by = page_object.update_by config.update_time = page_object.update_time obj = await MetadataConfigDao.add_task_biz_config(db, config) page_object.onum=obj.onum # 添加关联信息 records: List[TaskBizConfigRela] = [] for tab_onum in page_object.tab_onum_list: record = TaskBizConfigRela() record.biz_onum = obj.onum record.tab_onum = tab_onum record.create_by = obj.create_by record.create_time = obj.create_time records.append(record) await MetadataConfigDao.add_batch_task_rela_dao(db, records) # 获取流程配置 processconfig = MetaprocessconfigQueryModel() processconfig.db_type = "BizConfig" processConfigList = await MetaprocessconfigService.get_metaprocessconfig_list_services(db, processconfig, False) message = await cls.biz_process_defind_change_add(request, processConfigList, page_object, current_user) if "成功" not in message: await db.rollback() raise ServiceException(message=f'新增批次标签任务 {page_object.biz_name} 失败,dolphinscheduler 创建失败'+message) config.onum=page_object.onum config.ds_ids=page_object.ds_ids edit_data = config.model_dump(exclude_unset=True) await MetadataConfigDao.edit_task_biz_config(db, page_object.onum, edit_data) await db.commit() return CrudResponseModel(is_success=True, message="新增成功") except Exception as e: await db.rollback() raise e @classmethod async def biz_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:TaskBizConfigAddModel,current_user: CurrentUserModel): projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} # 新增接口 url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} response = requests.get(url, headers=headers, verify=False) if response.reason == 'OK': intdsids=[] message='' dstypes=[] response_text = response.text data = json.loads(response_text) code_list = data["data"] str_list = list(map(str, code_list)) for config in processConfigList: modified_json_str = config.taskDefinitionJson.replace("18093081592672", str_list[0]).replace("sh /home/xx/code/remote_python.sh", "sh /home/xx/code/remote_python.sh "+str(page_object.onum)) modified_json_str2=config.taskRelationJson.replace("18093081592672", str_list[0]) modified_json_str3=config.locations.replace("18093081592672", str_list[0]) metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson description="", # 替换工作流备注 locations=modified_json_str3,# 替换locations name =page_object.biz_name,# 替换工作流名称 timeout=config.timeout, globalParams =config.globalParams , tenantCode =config.tenantCode , taskRelationJson =modified_json_str2,# 替换taskRelationJson executionType =config.executionType , releaseState=config.releaseState ).model_dump(exclude_unset=True, by_alias=True) form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False) text= response_post0.text responsJson = json.loads(text) if responsJson['msg'] == 'success': dstypes.append('1') intdsids.append(responsJson['data']['code']) if message: message += ", " message += page_object.biz_name + "-批次标签新增成功" else: if message: message += ", " message += page_object.biz_name + "-批次标签新增失败" if len(intdsids)>0: page_object.ds_ids=','.join([str(i) for i in intdsids]) return message @classmethod async def edit_task_biz_config_services(cls, db: AsyncSession, page_object: TaskBizConfigAddModel,current_user: CurrentUserModel,request: Request): config = TaskBizConfigModel() config.onum = page_object.onum config.biz_name = page_object.biz_name config.risk_lvl = page_object.risk_lvl # config.status = page_object.status # 映射 data_sec_lvl # config.ds_ids = page_object.ds_ids # 映射 applyType config.isStop = page_object.isStop config.create_by = page_object.create_by config.create_time = page_object.create_time config.update_by = page_object.update_by config.update_time = page_object.update_time edit_data = config.model_dump(exclude_unset=True) info = await cls.get_task_biz_config_detail_services(db, page_object.onum) if info.onum: try: await MetadataConfigDao.edit_task_biz_config(db, page_object.onum, edit_data) await MetadataConfigDao.delete_task_rela_dao(db, page_object.onum) records: List[TaskBizConfigRela] = [] for tab_onum in page_object.tab_onum_list: record = TaskBizConfigRela() record.biz_onum = page_object.onum record.tab_onum = tab_onum record.create_by = page_object.update_by record.create_time = page_object.update_time records.append(record) await MetadataConfigDao.add_batch_task_rela_dao(db, records) message= await cls.sec_process_defind_change_update(request,page_object,info,current_user) if "成功" not in message: await db.rollback() raise ServiceException(message=f'更新批次标签任务 {page_object.biz_name} 失败,dolphinscheduler 创建失败'+message) await db.commit() return CrudResponseModel(is_success=True, message="更新成功") except Exception as e: await db.rollback() raise e else: raise ServiceException(message="任务业务配置不存在") @classmethod async def sec_process_defind_change_update(cls,request: Request,page_object:TaskBizConfigAddModel,metatask_old:TaskBizConfigAddModel,current_user: CurrentUserModel): projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') dsids=page_object.ds_ids.split(",") result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)] message='' # 查询接口 url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, } headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} for config in result_list: response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False) text= response.text responsJson = json.loads(text) if responsJson['msg'] == 'success': task_def_list = responsJson['data']['taskDefinitionList'] # 遍历修改 rawScript task_def_list = responsJson['data']['taskDefinitionList'] # 再序列化为 JSON 字符串 modified_json_str = json.dumps(task_def_list, ensure_ascii=False, indent=0) getTaskRelationList=responsJson['data']['processTaskRelationList'] putTaskRelationList=[] for item in getTaskRelationList: new_item = { "name": item['name'], "preTaskCode":item['preTaskCode'] , "preTaskVersion":item['preTaskVersion'] , "postTaskCode":item['postTaskCode'] , "conditionType":item['conditionType'] , "conditionParams":item['conditionParams'] } putTaskRelationList.append(new_item) modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0) modified_json_str2=re.sub(r'\s+', '', modified_json_str2) metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson description="", # 替换工作流备注 locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations name =page_object.biz_name,# 替换工作流名称 tenantCode =responsJson['data']['processDefinition']['tenantCode'] , taskRelationJson =modified_json_str2,# 替换taskRelationJson ).model_dump(exclude_unset=True, by_alias=True) form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False) putText= response_put0.text responsPutJson=json.loads(putText) if responsPutJson['msg'] == 'success': if message: message += ", " message += page_object.biz_name + "-批次标签成功" else: if message: message += ", " message += page_object.biz_name + "-批次标签修改失败" return message @classmethod async def delete_task_biz_config_services( cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel, current_user: CurrentUserModel ): if page_object.metatask_ids and page_object.ds_ids: metatask_id_list = page_object.metatask_ids.split(',') try: projectCode = await request.app.state.redis.get( f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode' ) # 构造请求参数 url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-definition/batch-delete' form_data = {'codes': page_object.ds_ids} headers = { 'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded' } # 发送 POST 请求 response = requests.post(url, headers=headers, data=form_data, verify=False) responsJson = json.loads(response.text) if responsJson['success'] is True: # 删除本地元任务配置 await MetadataConfigDao.delete_task_biz_config(query_db, metatask_id_list) await MetadataConfigDao.delete_task_rela_batch(query_db, metatask_id_list) await query_db.commit() return CrudResponseModel(is_success=True, message='删除成功') else: raise ServiceException(message='ds删除失败') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入参数配置id为空') @classmethod async def up_or_down_metatask_services( cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str ): metatask_info = await cls.get_task_biz_config_detail_services(query_db, id) metatask_info.update_by = current_user.user.user_name metatask_info.update_time = datetime.now() type_str: str projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') dsids=metatask_info.ds_ids.split(",") result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)] message='' # 查询接口 url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} for config in result_list: metaprocessconfig_dict = { 'name': metatask_info.biz_name, 'releaseState': type } form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} response = requests.post(f"{url}/{config['dsid']}/release", headers=headers, data=form_data, verify=False) text= response.text responsJson = json.loads(text) if responsJson['success'] is True: message='成功!' else: raise ServiceException(message='失败'+responsJson['msg']) if type == 'OFFLINE': # 下线 type_str = '下线' metatask_info.status = 'OFFLINE' else: # 上线 type_str = '上线' metatask_info.status = 'ONLINE' edit_metatask = metatask_info.model_dump(exclude_unset=True) try: await MetadataConfigDao.edit_task_biz_config(query_db, metatask_info.onum, edit_metatask) await query_db.commit() return CrudResponseModel(is_success=True, message=message) except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='更新失败') @classmethod async def ds_metatask_services( cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel ): projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') # 先查询是否建立定时任务 getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} getdsresponse = requests.get(getdsurl, headers=headers, verify=False) getdstext= getdsresponse.text responsJson = json.loads(getdstext) if responsJson['msg'] == 'success': if responsJson['data']['total']>0: # getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) getds_json_list = responsJson['data']['totalList'] for item in getds_json_list: if item['releaseState']=='ONLINE': # 先下线在删除 offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline" offresponse = requests.post(offdsurl, headers=headers, verify=False) # 删除对应的调度 deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}" delresponse = requests.delete(deldsurl, headers=headers, verify=False) parm =ParmSchedule() parm.failureStrategy='CONTINUE' parm.warningType='NONE' parm.warningGroupId=process.warningGroupId parm.workerGroup=process.workerGroup parm.processDefinitionCode =process.processDefinitionCode parm.environmentCode=process.environmentCode parm.processInstancePriority='MEDIUM' parm.schedule = ( '{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') + '", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') + '", "crontab":"' + process.crontab + '", "timezoneId":"Asia/Shanghai"}') url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules' # form_data = {key: str(value) for key, value in process.__dict__.items()} form_data = {key: value for key, value in parm.__dict__.items()} response = requests.post(url, headers=headers, data=form_data, verify=False) text= response.text responsJson = json.loads(text) if responsJson['msg'] == 'success': scheduleId= responsJson['data']['id'] ondsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{scheduleId}/online" ondsurl = requests.post(ondsurl, headers=headers, verify=False) metatask_info = await cls.get_task_biz_config_detail_services(query_db, process.metaTaskId) metatask_info.schId=scheduleId metatask_info2 = metatask_info.model_dump(exclude_unset=True) await MetadataConfigDao.edit_task_biz_config(query_db,metatask_info.onum, metatask_info2) await query_db.commit() return "调度运行成功!" else: raise ServiceException(message='运行失败!') @classmethod async def ds_metatask_delete( cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel ): projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') # 先查询是否建立定时任务 getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} getdsresponse = requests.get(getdsurl, headers=headers, verify=False) getdstext= getdsresponse.text responsJson = json.loads(getdstext) if responsJson['msg'] == 'success': if responsJson['data']['total']>0: # getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) getds_json_list = responsJson['data']['totalList'] for item in getds_json_list: if item['releaseState']=='ONLINE': # 先下线在删除 offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline" offresponse = requests.post(offdsurl, headers=headers, verify=False) # 删除对应的调度 deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}" delresponse = requests.delete(deldsurl, headers=headers, verify=False) deldstext= delresponse.text delresponsJson = json.loads(deldstext) if delresponsJson['msg'] == 'success': metatask_info = await cls.get_task_biz_config_detail_services(query_db, process.metaTaskId) metatask_info2 = metatask_info.model_dump(exclude_unset=True) metatask_info2.schId="" await MetadataConfigDao.edit_task_biz_config(query_db,metatask_info.onum, metatask_info2) await query_db.commit() return "调度删除成功!" return "调度删除成功!" @classmethod async def get_task_biz_config_detail_services(cls, db: AsyncSession, onum: str): result = await MetadataConfigDao.get_task_biz_config_detail_by_id(db, onum) if result: return TaskBizConfigModel(**CamelCaseUtil.transform_result(result)) else: return TaskBizConfigModel(**dict()) @classmethod async def get_task_biz_config_rela_list_services(cls, result_db: AsyncSession, biz_onum: int): ai_session_list = await MetadataConfigDao.get_task_rela_by_biz_id(result_db, biz_onum) # 查询最新的20条 return CamelCaseUtil.transform_result(ai_session_list)