You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
249 lines
12 KiB
249 lines
12 KiB
2 weeks ago
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel
|
||
|
from module_admin.dao.metaprocessconfig_dao import MetaprocessconfigDao
|
||
|
|
||
|
|
||
|
class MetaprocessconfigService:
|
||
|
"""
|
||
|
参数配置管理模块服务层
|
||
|
"""
|
||
|
|
||
|
@classmethod
|
||
|
async def get_metaprocessconfig_list_services(
|
||
|
cls, query_db: AsyncSession, query_object: MetaprocessconfigQueryModel, is_page: bool = False
|
||
|
):
|
||
|
"""
|
||
|
获取参数配置列表信息service
|
||
|
:param query_db: orm对象
|
||
|
:param query_object: 查询参数对象
|
||
|
:return: 参数配置列表信息对象
|
||
|
"""
|
||
|
metaprocessconfig_list_result = await MetaprocessconfigDao.get_metaprocessconfig_list_all(query_db, query_object, is_page)
|
||
|
|
||
|
return metaprocessconfig_list_result
|
||
|
|
||
|
# @classmethod
|
||
|
# async def get_data_source_tree(cls,request: Request):
|
||
|
# url = 'http://47.121.207.11:12345/dolphinscheduler/datasources?pageNo=1&pageSize=100'
|
||
|
# token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
|
||
|
# headers = {'token': token}
|
||
|
# response = requests.get(url, headers=headers)
|
||
|
# if response.reason == 'OK':
|
||
|
# response_text = response.text
|
||
|
# data = json.loads(response_text)
|
||
|
# total_list = data["data"]["totalList"]
|
||
|
# # 解析 connectionParams 字符串为字典
|
||
|
# for item in total_list:
|
||
|
# item["connectionParams"] = json.loads(item["connectionParams"])
|
||
|
# # 使用 Pydantic 创建 DataSource 对象列表
|
||
|
# data_sources = [DataSource(**item) for item in total_list]
|
||
|
# return data_sources
|
||
|
# else:
|
||
|
# return {'error': f'Request failed with status code {response.status_code}'}
|
||
|
|
||
|
# @classmethod
|
||
|
# async def check_metaprocessconfig_name_unique_services(cls, query_db: AsyncSession, page_object: MetaprocessconfigModel):
|
||
|
# """
|
||
|
# 校验参数键名是否唯一service
|
||
|
|
||
|
# :param query_db: orm对象
|
||
|
# :param page_object: 参数配置对象
|
||
|
# :return: 校验结果
|
||
|
# """
|
||
|
# metaprocessconfig_id = -1 if page_object.metaprocessconfig_id is None else page_object.metaprocessconfig_id
|
||
|
|
||
|
# metaprocessconfig = await MetaprocessconfigDao.get_metaprocessconfig_detail_by_info(
|
||
|
# query_db, MetaprocessconfigModel(metaprocessconfigName=page_object.metaprocessconfig_name)
|
||
|
# )
|
||
|
# if metaprocessconfig and metaprocessconfig.metaprocessconfig_id != metaprocessconfig_id:
|
||
|
# return CommonConstant.NOT_UNIQUE
|
||
|
# return CommonConstant.UNIQUE
|
||
|
|
||
|
# @classmethod
|
||
|
# async def add_metaprocessconfig_services(cls, request: Request, query_db: AsyncSession, page_object: MetaprocessconfigModel):
|
||
|
# """
|
||
|
# 新增参数配置信息service
|
||
|
# :param request: Request对象
|
||
|
# :param query_db: orm对象
|
||
|
# :param page_object: 新增参数配置对象
|
||
|
# :return: 新增参数配置校验结果
|
||
|
# """
|
||
|
# if not await cls.check_metaprocessconfig_name_unique_services(query_db, page_object):
|
||
|
# raise ServiceException(message=f'新增元数据任务{page_object.metaprocessconfig_name}失败,任务名已存在')
|
||
|
# else:
|
||
|
# try:
|
||
|
# await MetaprocessconfigDao.add_metaprocessconfig_dao(query_db, page_object)
|
||
|
# await query_db.commit()
|
||
|
|
||
|
# return CrudResponseModel(is_success=True, message='新增成功')
|
||
|
# except Exception as e:
|
||
|
# await query_db.rollback()
|
||
|
# raise e
|
||
|
|
||
|
# @classmethod
|
||
|
# async def edit_metaprocessconfig_services(cls, request: Request, query_db: AsyncSession, page_object: MetaprocessconfigModel):
|
||
|
# """
|
||
|
# 编辑参数配置信息service
|
||
|
# :param request: Request对象
|
||
|
# :param query_db: orm对象
|
||
|
# :param page_object: 编辑参数配置对象
|
||
|
# :return: 编辑参数配置校验结果
|
||
|
# """
|
||
|
# edit_metaprocessconfig = page_object.model_dump(exclude_unset=True)
|
||
|
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, page_object.metaprocessconfig_id)
|
||
|
# if metaprocessconfig_info.metaprocessconfig_id:
|
||
|
# if not await cls.check_metaprocessconfig_name_unique_services(query_db, page_object):
|
||
|
# raise ServiceException(message=f'修改任务{page_object.metaprocessconfig_name}失败,任务名称已存在')
|
||
|
# else:
|
||
|
# try:
|
||
|
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
|
||
|
# await query_db.commit()
|
||
|
# return CrudResponseModel(is_success=True, message='更新成功')
|
||
|
# except Exception as e:
|
||
|
# await query_db.rollback()
|
||
|
# raise e
|
||
|
# else:
|
||
|
# raise ServiceException(message='更新失败')
|
||
|
|
||
|
# @classmethod
|
||
|
# async def up_or_down_metaprocessconfig_services(
|
||
|
# cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str
|
||
|
# ):
|
||
|
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id)
|
||
|
# metaprocessconfig_info.update_by = current_user.user.user_name
|
||
|
# metaprocessconfig_info.update_time = datetime.now()
|
||
|
# type_str: str
|
||
|
# if type == 'down':
|
||
|
# # 下线
|
||
|
# type_str = '下线'
|
||
|
# metaprocessconfig_info.status = '1'
|
||
|
# else:
|
||
|
# # 上线
|
||
|
# type_str = '上线'
|
||
|
# metaprocessconfig_info.status = '2'
|
||
|
|
||
|
# edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True)
|
||
|
# try:
|
||
|
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
|
||
|
# await query_db.commit()
|
||
|
# return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务' + type_str + '成功')
|
||
|
# except Exception as e:
|
||
|
# await query_db.rollback()
|
||
|
# raise e
|
||
|
# else:
|
||
|
# raise ServiceException(message='更新失败')
|
||
|
|
||
|
# @classmethod
|
||
|
# async def run_metaprocessconfig_services(
|
||
|
# cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str
|
||
|
# ):
|
||
|
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id)
|
||
|
# metaprocessconfig_info.update_by = current_user.user.user_name
|
||
|
# metaprocessconfig_info.update_time = datetime.now()
|
||
|
# # 运行中
|
||
|
# metaprocessconfig_info.status = '3'
|
||
|
# edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True)
|
||
|
# try:
|
||
|
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
|
||
|
# await query_db.commit()
|
||
|
# return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务:' '运行成功')
|
||
|
# except Exception as e:
|
||
|
# await query_db.rollback()
|
||
|
# raise e
|
||
|
# else:
|
||
|
# raise ServiceException(message='更新失败')
|
||
|
|
||
|
# @classmethod
|
||
|
# async def ds_metaprocessconfig_services(
|
||
|
# cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str
|
||
|
# ):
|
||
|
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id)
|
||
|
# metaprocessconfig_info.update_by = current_user.user.user_name
|
||
|
# metaprocessconfig_info.update_time = datetime.now()
|
||
|
# # 运行中
|
||
|
# metaprocessconfig_info.status = '4'
|
||
|
# edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True)
|
||
|
# try:
|
||
|
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
|
||
|
# await query_db.commit()
|
||
|
# return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务:' '调度启动成功')
|
||
|
# except Exception as e:
|
||
|
# await query_db.rollback()
|
||
|
# raise e
|
||
|
# else:
|
||
|
# raise ServiceException(message='更新失败')
|
||
|
|
||
|
# @classmethod
|
||
|
# async def get_metaprocessconfig_logs_services(
|
||
|
# cls, request: Request, current_user: CurrentUserModel, query_db: AsyncSession, id: str
|
||
|
# ):
|
||
|
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id)
|
||
|
# metaprocessconfig_info.update_by = current_user.user.user_name
|
||
|
# metaprocessconfig_info.update_time = datetime.now()
|
||
|
# # 运行中
|
||
|
# metaprocessconfig_info = '3'
|
||
|
# edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True)
|
||
|
# try:
|
||
|
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
|
||
|
# await query_db.commit()
|
||
|
# return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务:' '日志获取成功')
|
||
|
# except Exception as e:
|
||
|
# await query_db.rollback()
|
||
|
# raise e
|
||
|
# else:
|
||
|
# raise ServiceException(message='更新失败')
|
||
|
|
||
|
# @classmethod
|
||
|
# async def delete_metaprocessconfig_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaprocessconfigModel):
|
||
|
# """
|
||
|
# 删除参数配置信息service
|
||
|
# :param request: Request对象
|
||
|
# :param query_db: orm对象
|
||
|
# :param page_object: 删除参数配置对象
|
||
|
# :return: 删除参数配置校验结果
|
||
|
# """
|
||
|
# # if page_object.metaprocessconfig_ids.split(','):
|
||
|
# # metaprocessconfig_id_list = page_object.metaprocessconfig_ids.split(',')
|
||
|
# # try:
|
||
|
# # for metaprocessconfig_id in metaprocessconfig_id_list:
|
||
|
# # metaprocessconfig_id_dict = dict(metaprocessconfig_id=metaprocessconfig_id)
|
||
|
# # MetaprocessconfigDao.delete_metaprocessconfig_dao(query_db, MetaprocessconfigModel(**metaprocessconfig_id_dict))
|
||
|
# # query_db.commit()
|
||
|
# # # await cls.init_cache_sys_metaprocessconfig_services(query_db, request.app.state.redis)s
|
||
|
# # result = dict(is_success=True, message='删除成功')
|
||
|
# # except Exception as e:
|
||
|
# # query_db.rollback()
|
||
|
# # result = dict(is_success=False, message=str(e))
|
||
|
# # else:
|
||
|
# # result = dict(is_success=False, message='传入字典数据id为空')
|
||
|
# # return CrudResponseModel(**result)
|
||
|
# if page_object.metaprocessconfig_ids:
|
||
|
# metaprocessconfig_id_list = page_object.metaprocessconfig_ids.split(',')
|
||
|
# try:
|
||
|
# for metaprocessconfig_id in metaprocessconfig_id_list:
|
||
|
# await MetaprocessconfigDao.delete_metaprocessconfig_dao(query_db, MetaprocessconfigModel(metaprocessconfigId=int(metaprocessconfig_id)))
|
||
|
# await query_db.commit()
|
||
|
# return CrudResponseModel(is_success=True, message='删除成功')
|
||
|
# except Exception as e:
|
||
|
# await query_db.rollback()
|
||
|
# raise e
|
||
|
# else:
|
||
|
# raise ServiceException(message='传入参数配置id为空')
|
||
|
|
||
|
# @classmethod
|
||
|
# async def metaprocessconfig_detail_services(cls, query_db: AsyncSession, metaprocessconfig_id: int):
|
||
|
# """
|
||
|
# 获取参数配置详细信息service
|
||
|
|
||
|
# :param query_db: orm对象
|
||
|
# :param metaprocessconfig_id: 参数配置id
|
||
|
# :return: 参数配置id对应的信息
|
||
|
# """
|
||
|
# metaprocessconfig = await MetaprocessconfigDao.get_metaprocessconfig_detail_by_id(query_db, metaprocessconfig_id=metaprocessconfig_id)
|
||
|
# if metaprocessconfig:
|
||
|
# result = MetaprocessconfigModel(**CamelCaseUtil.transform_result(metaprocessconfig))
|
||
|
# else:
|
||
|
# result = MetaprocessconfigModel(**dict())
|
||
|
|
||
|
# return result
|