from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel from module_admin.dao.metaprocessconfig_dao import MetaprocessconfigDao class MetaprocessconfigService: """ 参数配置管理模块服务层 """ @classmethod async def get_metaprocessconfig_list_services( cls, query_db: AsyncSession, query_object: MetaprocessconfigQueryModel, is_page: bool = False ): """ 获取参数配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :return: 参数配置列表信息对象 """ metaprocessconfig_list_result = await MetaprocessconfigDao.get_metaprocessconfig_list_all(query_db, query_object, is_page) return metaprocessconfig_list_result # @classmethod # async def get_data_source_tree(cls,request: Request): # url = 'http://47.121.207.11:12345/dolphinscheduler/datasources?pageNo=1&pageSize=100' # token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') # headers = {'token': token} # response = requests.get(url, headers=headers) # if response.reason == 'OK': # response_text = response.text # data = json.loads(response_text) # total_list = data["data"]["totalList"] # # 解析 connectionParams 字符串为字典 # for item in total_list: # item["connectionParams"] = json.loads(item["connectionParams"]) # # 使用 Pydantic 创建 DataSource 对象列表 # data_sources = [DataSource(**item) for item in total_list] # return data_sources # else: # return {'error': f'Request failed with status code {response.status_code}'} # @classmethod # async def check_metaprocessconfig_name_unique_services(cls, query_db: AsyncSession, page_object: MetaprocessconfigModel): # """ # 校验参数键名是否唯一service # :param query_db: orm对象 # :param page_object: 参数配置对象 # :return: 校验结果 # """ # metaprocessconfig_id = -1 if page_object.metaprocessconfig_id is None else page_object.metaprocessconfig_id # metaprocessconfig = await MetaprocessconfigDao.get_metaprocessconfig_detail_by_info( # query_db, MetaprocessconfigModel(metaprocessconfigName=page_object.metaprocessconfig_name) # ) # if metaprocessconfig and metaprocessconfig.metaprocessconfig_id != metaprocessconfig_id: # return CommonConstant.NOT_UNIQUE # return CommonConstant.UNIQUE # @classmethod # async def add_metaprocessconfig_services(cls, request: Request, query_db: AsyncSession, page_object: MetaprocessconfigModel): # """ # 新增参数配置信息service # :param request: Request对象 # :param query_db: orm对象 # :param page_object: 新增参数配置对象 # :return: 新增参数配置校验结果 # """ # if not await cls.check_metaprocessconfig_name_unique_services(query_db, page_object): # raise ServiceException(message=f'新增元数据任务{page_object.metaprocessconfig_name}失败,任务名已存在') # else: # try: # await MetaprocessconfigDao.add_metaprocessconfig_dao(query_db, page_object) # await query_db.commit() # return CrudResponseModel(is_success=True, message='新增成功') # except Exception as e: # await query_db.rollback() # raise e # @classmethod # async def edit_metaprocessconfig_services(cls, request: Request, query_db: AsyncSession, page_object: MetaprocessconfigModel): # """ # 编辑参数配置信息service # :param request: Request对象 # :param query_db: orm对象 # :param page_object: 编辑参数配置对象 # :return: 编辑参数配置校验结果 # """ # edit_metaprocessconfig = page_object.model_dump(exclude_unset=True) # metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, page_object.metaprocessconfig_id) # if metaprocessconfig_info.metaprocessconfig_id: # if not await cls.check_metaprocessconfig_name_unique_services(query_db, page_object): # raise ServiceException(message=f'修改任务{page_object.metaprocessconfig_name}失败,任务名称已存在') # else: # try: # await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig) # await query_db.commit() # return CrudResponseModel(is_success=True, message='更新成功') # except Exception as e: # await query_db.rollback() # raise e # else: # raise ServiceException(message='更新失败') # @classmethod # async def up_or_down_metaprocessconfig_services( # cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str # ): # metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id) # metaprocessconfig_info.update_by = current_user.user.user_name # metaprocessconfig_info.update_time = datetime.now() # type_str: str # if type == 'down': # # 下线 # type_str = '下线' # metaprocessconfig_info.status = '1' # else: # # 上线 # type_str = '上线' # metaprocessconfig_info.status = '2' # edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True) # try: # await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig) # await query_db.commit() # return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务' + type_str + '成功') # except Exception as e: # await query_db.rollback() # raise e # else: # raise ServiceException(message='更新失败') # @classmethod # async def run_metaprocessconfig_services( # cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str # ): # metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id) # metaprocessconfig_info.update_by = current_user.user.user_name # metaprocessconfig_info.update_time = datetime.now() # # 运行中 # metaprocessconfig_info.status = '3' # edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True) # try: # await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig) # await query_db.commit() # return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务:' '运行成功') # except Exception as e: # await query_db.rollback() # raise e # else: # raise ServiceException(message='更新失败') # @classmethod # async def ds_metaprocessconfig_services( # cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str # ): # metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id) # metaprocessconfig_info.update_by = current_user.user.user_name # metaprocessconfig_info.update_time = datetime.now() # # 运行中 # metaprocessconfig_info.status = '4' # edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True) # try: # await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig) # await query_db.commit() # return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务:' '调度启动成功') # except Exception as e: # await query_db.rollback() # raise e # else: # raise ServiceException(message='更新失败') # @classmethod # async def get_metaprocessconfig_logs_services( # cls, request: Request, current_user: CurrentUserModel, query_db: AsyncSession, id: str # ): # metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id) # metaprocessconfig_info.update_by = current_user.user.user_name # metaprocessconfig_info.update_time = datetime.now() # # 运行中 # metaprocessconfig_info = '3' # edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True) # try: # await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig) # await query_db.commit() # return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务:' '日志获取成功') # except Exception as e: # await query_db.rollback() # raise e # else: # raise ServiceException(message='更新失败') # @classmethod # async def delete_metaprocessconfig_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaprocessconfigModel): # """ # 删除参数配置信息service # :param request: Request对象 # :param query_db: orm对象 # :param page_object: 删除参数配置对象 # :return: 删除参数配置校验结果 # """ # # if page_object.metaprocessconfig_ids.split(','): # # metaprocessconfig_id_list = page_object.metaprocessconfig_ids.split(',') # # try: # # for metaprocessconfig_id in metaprocessconfig_id_list: # # metaprocessconfig_id_dict = dict(metaprocessconfig_id=metaprocessconfig_id) # # MetaprocessconfigDao.delete_metaprocessconfig_dao(query_db, MetaprocessconfigModel(**metaprocessconfig_id_dict)) # # query_db.commit() # # # await cls.init_cache_sys_metaprocessconfig_services(query_db, request.app.state.redis)s # # result = dict(is_success=True, message='删除成功') # # except Exception as e: # # query_db.rollback() # # result = dict(is_success=False, message=str(e)) # # else: # # result = dict(is_success=False, message='传入字典数据id为空') # # return CrudResponseModel(**result) # if page_object.metaprocessconfig_ids: # metaprocessconfig_id_list = page_object.metaprocessconfig_ids.split(',') # try: # for metaprocessconfig_id in metaprocessconfig_id_list: # await MetaprocessconfigDao.delete_metaprocessconfig_dao(query_db, MetaprocessconfigModel(metaprocessconfigId=int(metaprocessconfig_id))) # await query_db.commit() # return CrudResponseModel(is_success=True, message='删除成功') # except Exception as e: # await query_db.rollback() # raise e # else: # raise ServiceException(message='传入参数配置id为空') # @classmethod # async def metaprocessconfig_detail_services(cls, query_db: AsyncSession, metaprocessconfig_id: int): # """ # 获取参数配置详细信息service # :param query_db: orm对象 # :param metaprocessconfig_id: 参数配置id # :return: 参数配置id对应的信息 # """ # metaprocessconfig = await MetaprocessconfigDao.get_metaprocessconfig_detail_by_id(query_db, metaprocessconfig_id=metaprocessconfig_id) # if metaprocessconfig: # result = MetaprocessconfigModel(**CamelCaseUtil.transform_result(metaprocessconfig)) # else: # result = MetaprocessconfigModel(**dict()) # return result