|
@ -9,11 +9,13 @@ from module_admin.service.metaprocessconfig_service import MetaprocessconfigServ |
|
|
import requests |
|
|
import requests |
|
|
import json |
|
|
import json |
|
|
import re |
|
|
import re |
|
|
|
|
|
from module_admin.entity.vo.dataSource_vo import ProcessDefinition,ParmScheduleVo,ProcessInstancePage,ParmSchedule |
|
|
from config.enums import RedisInitKeyConfig |
|
|
from config.enums import RedisInitKeyConfig |
|
|
from fastapi import Request,Depends |
|
|
from fastapi import Request,Depends |
|
|
from config.env import AppConfig |
|
|
from config.env import AppConfig |
|
|
from module_admin.entity.vo.user_vo import CurrentUserModel |
|
|
from module_admin.entity.vo.user_vo import CurrentUserModel |
|
|
|
|
|
from module_admin.entity.vo.metatask_vo import DeleteMetataskModel |
|
|
|
|
|
from datetime import datetime |
|
|
class DatasecConfigService: |
|
|
class DatasecConfigService: |
|
|
""" |
|
|
""" |
|
|
数据安全参数配置表 Service 层 |
|
|
数据安全参数配置表 Service 层 |
|
@ -211,23 +213,56 @@ class DatasecConfigService: |
|
|
message += page_object.metatask_name + "-数据安全_批次标签修改失败" |
|
|
message += page_object.metatask_name + "-数据安全_批次标签修改失败" |
|
|
return message |
|
|
return message |
|
|
|
|
|
|
|
|
|
|
|
# @classmethod |
|
|
|
|
|
# async def delete_datasec_services(cls, db: AsyncSession, onum_list: str): |
|
|
|
|
|
# """ |
|
|
|
|
|
# 批量删除配置 |
|
|
|
|
|
# """ |
|
|
|
|
|
# id_list = [int(x.strip()) for x in onum_list.split(",") if x.strip().isdigit()] |
|
|
|
|
|
# if not id_list: |
|
|
|
|
|
# raise ServiceException(message="无效的编号列表") |
|
|
|
|
|
|
|
|
|
|
|
# try: |
|
|
|
|
|
# await DatasecConfigDao.delete(db, id_list) |
|
|
|
|
|
# await db.commit() |
|
|
|
|
|
# return CrudResponseModel(is_success=True, message="删除成功") |
|
|
|
|
|
# except Exception as e: |
|
|
|
|
|
# await db.rollback() |
|
|
|
|
|
# raise e |
|
|
@classmethod |
|
|
@classmethod |
|
|
async def delete_datasec_services(cls, db: AsyncSession, onum_list: str): |
|
|
async def delete_datasec_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel,current_user: CurrentUserModel): |
|
|
""" |
|
|
""" |
|
|
批量删除配置 |
|
|
删除参数配置信息service |
|
|
|
|
|
:param request: Request对象 |
|
|
|
|
|
:param query_db: orm对象 |
|
|
|
|
|
:param page_object: 删除参数配置对象 |
|
|
|
|
|
:return: 删除参数配置校验结果 |
|
|
""" |
|
|
""" |
|
|
id_list = [int(x.strip()) for x in onum_list.split(",") if x.strip().isdigit()] |
|
|
|
|
|
if not id_list: |
|
|
|
|
|
raise ServiceException(message="无效的编号列表") |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
if page_object.metatask_ids and page_object.ds_ids: |
|
|
await DatasecConfigDao.delete(db, id_list) |
|
|
metatask_id_list = page_object.metatask_ids.split(',') |
|
|
await db.commit() |
|
|
try: |
|
|
return CrudResponseModel(is_success=True, message="删除成功") |
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') |
|
|
except Exception as e: |
|
|
# 查询接口 |
|
|
await db.rollback() |
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete' |
|
|
raise e |
|
|
form_data={'codes':page_object.ds_ids} |
|
|
|
|
|
|
|
|
|
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} |
|
|
|
|
|
response = requests.post(url, headers=headers, data=form_data, verify=False) |
|
|
|
|
|
text= response.text |
|
|
|
|
|
responsJson = json.loads(text) |
|
|
|
|
|
if responsJson['success'] is True: |
|
|
|
|
|
for metatask_id in metatask_id_list: |
|
|
|
|
|
await DatasecConfigDao.delete(query_db, DatasecConfigModel(onum=int(metatask_id))) |
|
|
|
|
|
await query_db.commit() |
|
|
|
|
|
return CrudResponseModel(is_success=True, message='删除成功') |
|
|
|
|
|
else : |
|
|
|
|
|
raise ServiceException(message='ds删除失败') |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
await query_db.rollback() |
|
|
|
|
|
raise e |
|
|
|
|
|
else: |
|
|
|
|
|
raise ServiceException(message='传入参数配置id为空') |
|
|
@classmethod |
|
|
@classmethod |
|
|
async def get_datasec_detail_services(cls, db: AsyncSession, onum: int): |
|
|
async def get_datasec_detail_services(cls, db: AsyncSession, onum: int): |
|
|
""" |
|
|
""" |
|
@ -237,3 +272,178 @@ class DatasecConfigService: |
|
|
if result: |
|
|
if result: |
|
|
return DatasecConfigModel(**CamelCaseUtil.transform_result(result)) |
|
|
return DatasecConfigModel(**CamelCaseUtil.transform_result(result)) |
|
|
return DatasecConfigModel(**dict()) |
|
|
return DatasecConfigModel(**dict()) |
|
|
|
|
|
@classmethod |
|
|
|
|
|
async def up_or_down_metatask_services( |
|
|
|
|
|
cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str |
|
|
|
|
|
): |
|
|
|
|
|
metatask_info = await cls.get_datasec_detail_services(query_db, id) |
|
|
|
|
|
metatask_info.update_by = current_user.user.user_name |
|
|
|
|
|
metatask_info.update_time = datetime.now() |
|
|
|
|
|
type_str: str |
|
|
|
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') |
|
|
|
|
|
dsids=metatask_info.ds_ids.split(",") |
|
|
|
|
|
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)] |
|
|
|
|
|
message='' |
|
|
|
|
|
# 查询接口 |
|
|
|
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' |
|
|
|
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} |
|
|
|
|
|
for config in result_list: |
|
|
|
|
|
metaprocessconfig_dict = { |
|
|
|
|
|
'name': metatask_info.metatask_name, |
|
|
|
|
|
'releaseState': type |
|
|
|
|
|
} |
|
|
|
|
|
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} |
|
|
|
|
|
response = requests.post(f"{url}/{config['dsid']}/release", headers=headers, data=form_data, verify=False) |
|
|
|
|
|
text= response.text |
|
|
|
|
|
responsJson = json.loads(text) |
|
|
|
|
|
if responsJson['success'] is True: |
|
|
|
|
|
message='成功!' |
|
|
|
|
|
else: |
|
|
|
|
|
raise ServiceException(message='失败'+responsJson['msg']) |
|
|
|
|
|
if type == 'OFFLINE': |
|
|
|
|
|
# 下线 |
|
|
|
|
|
type_str = '下线' |
|
|
|
|
|
metatask_info.status = 'OFFLINE' |
|
|
|
|
|
else: |
|
|
|
|
|
# 上线 |
|
|
|
|
|
type_str = '上线' |
|
|
|
|
|
metatask_info.status = 'ONLINE' |
|
|
|
|
|
|
|
|
|
|
|
edit_metatask = metatask_info.model_dump(exclude_unset=True) |
|
|
|
|
|
try: |
|
|
|
|
|
await DatasecConfigDao.edit(query_db,metatask_info.onum, edit_metatask) |
|
|
|
|
|
await query_db.commit() |
|
|
|
|
|
return CrudResponseModel(is_success=True, message=message) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
await query_db.rollback() |
|
|
|
|
|
raise e |
|
|
|
|
|
else: |
|
|
|
|
|
raise ServiceException(message='更新失败') |
|
|
|
|
|
@classmethod |
|
|
|
|
|
async def datasec_detail_services(cls, query_db: AsyncSession, metatask_id: int): |
|
|
|
|
|
""" |
|
|
|
|
|
获取参数配置详细信息service |
|
|
|
|
|
|
|
|
|
|
|
:param query_db: orm对象 |
|
|
|
|
|
:param metatask_id: 参数配置id |
|
|
|
|
|
:return: 参数配置id对应的信息 |
|
|
|
|
|
""" |
|
|
|
|
|
metatask = await DatasecConfigDao.get_detail_by_id(query_db, metatask_id=metatask_id) |
|
|
|
|
|
if metatask: |
|
|
|
|
|
result = DatasecConfigModel(**CamelCaseUtil.transform_result(metatask)) |
|
|
|
|
|
else: |
|
|
|
|
|
result = DatasecConfigModel(**dict()) |
|
|
|
|
|
return result |
|
|
|
|
|
@classmethod |
|
|
|
|
|
async def ds_metatask_services( |
|
|
|
|
|
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel |
|
|
|
|
|
): |
|
|
|
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') |
|
|
|
|
|
# 先查询是否建立定时任务 |
|
|
|
|
|
getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) |
|
|
|
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} |
|
|
|
|
|
getdsresponse = requests.get(getdsurl, headers=headers, verify=False) |
|
|
|
|
|
getdstext= getdsresponse.text |
|
|
|
|
|
responsJson = json.loads(getdstext) |
|
|
|
|
|
if responsJson['msg'] == 'success': |
|
|
|
|
|
if responsJson['data']['total']>0: |
|
|
|
|
|
# getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) |
|
|
|
|
|
getds_json_list = responsJson['data']['totalList'] |
|
|
|
|
|
for item in getds_json_list: |
|
|
|
|
|
if item['releaseState']=='ONLINE': |
|
|
|
|
|
# 先下线在删除 |
|
|
|
|
|
offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline" |
|
|
|
|
|
offresponse = requests.post(offdsurl, headers=headers, verify=False) |
|
|
|
|
|
# 删除对应的调度 |
|
|
|
|
|
deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}" |
|
|
|
|
|
delresponse = requests.delete(deldsurl, headers=headers, verify=False) |
|
|
|
|
|
|
|
|
|
|
|
parm =ParmSchedule() |
|
|
|
|
|
parm.failureStrategy='CONTINUE' |
|
|
|
|
|
parm.warningType='NONE' |
|
|
|
|
|
parm.warningGroupId=process.warningGroupId |
|
|
|
|
|
parm.workerGroup=process.workerGroup |
|
|
|
|
|
parm.processDefinitionCode =process.processDefinitionCode |
|
|
|
|
|
parm.environmentCode=process.environmentCode |
|
|
|
|
|
parm.processInstancePriority='MEDIUM' |
|
|
|
|
|
parm.schedule = ( |
|
|
|
|
|
'{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') + |
|
|
|
|
|
'", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') + |
|
|
|
|
|
'", "crontab":"' + process.crontab + |
|
|
|
|
|
'", "timezoneId":"Asia/Shanghai"}') |
|
|
|
|
|
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules' |
|
|
|
|
|
# form_data = {key: str(value) for key, value in process.__dict__.items()} |
|
|
|
|
|
form_data = {key: value for key, value in parm.__dict__.items()} |
|
|
|
|
|
response = requests.post(url, headers=headers, data=form_data, verify=False) |
|
|
|
|
|
text= response.text |
|
|
|
|
|
responsJson = json.loads(text) |
|
|
|
|
|
if responsJson['msg'] == 'success': |
|
|
|
|
|
scheduleId= responsJson['data']['id'] |
|
|
|
|
|
ondsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{scheduleId}/online" |
|
|
|
|
|
ondsurl = requests.post(ondsurl, headers=headers, verify=False) |
|
|
|
|
|
metatask_info = await cls.get_datasec_detail_services(query_db, process.metaTaskId) |
|
|
|
|
|
metatask_info.schId=scheduleId |
|
|
|
|
|
metatask_info2 = metatask_info.model_dump(exclude_unset=True) |
|
|
|
|
|
await DatasecConfigDao.edit(query_db,metatask_info.onum, metatask_info2) |
|
|
|
|
|
await query_db.commit() |
|
|
|
|
|
return "调度运行成功!" |
|
|
|
|
|
else: |
|
|
|
|
|
raise ServiceException(message='运行失败!') |
|
|
|
|
|
@classmethod |
|
|
|
|
|
async def ds_metatask_detail( |
|
|
|
|
|
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel |
|
|
|
|
|
): |
|
|
|
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') |
|
|
|
|
|
# 先查询是否建立定时任务 |
|
|
|
|
|
getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) |
|
|
|
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} |
|
|
|
|
|
getdsresponse = requests.get(getdsurl, headers=headers, verify=False) |
|
|
|
|
|
getdstext= getdsresponse.text |
|
|
|
|
|
processVo=ParmScheduleVo() |
|
|
|
|
|
responsJson = json.loads(getdstext) |
|
|
|
|
|
if responsJson['msg'] == 'success': |
|
|
|
|
|
if responsJson['data']['total']>0: |
|
|
|
|
|
# getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) |
|
|
|
|
|
getds_json_list = responsJson['data']['totalList'] |
|
|
|
|
|
for item in getds_json_list: |
|
|
|
|
|
processVo.crontab=item['crontab'] |
|
|
|
|
|
processVo.beginTime=item['startTime'] |
|
|
|
|
|
processVo.endTime=item['endTime'] |
|
|
|
|
|
processVo.workerGroup=item['workerGroup'] |
|
|
|
|
|
processVo.warningGroupId=item['warningGroupId'] |
|
|
|
|
|
processVo.environmentCode=item['environmentCode'] |
|
|
|
|
|
return processVo |
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
|
async def ds_metatask_delete( |
|
|
|
|
|
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel |
|
|
|
|
|
): |
|
|
|
|
|
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') |
|
|
|
|
|
# 先查询是否建立定时任务 |
|
|
|
|
|
getdsurl=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules?pageSize=10&pageNo=1&processDefinitionCode='+str(process.processDefinitionCode) |
|
|
|
|
|
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} |
|
|
|
|
|
getdsresponse = requests.get(getdsurl, headers=headers, verify=False) |
|
|
|
|
|
getdstext= getdsresponse.text |
|
|
|
|
|
responsJson = json.loads(getdstext) |
|
|
|
|
|
if responsJson['msg'] == 'success': |
|
|
|
|
|
if responsJson['data']['total']>0: |
|
|
|
|
|
# getds_json_list = json.dumps(responsJson['data']['totalList'], ensure_ascii=False, indent=4) |
|
|
|
|
|
getds_json_list = responsJson['data']['totalList'] |
|
|
|
|
|
for item in getds_json_list: |
|
|
|
|
|
if item['releaseState']=='ONLINE': |
|
|
|
|
|
# 先下线在删除 |
|
|
|
|
|
offdsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}/offline" |
|
|
|
|
|
offresponse = requests.post(offdsurl, headers=headers, verify=False) |
|
|
|
|
|
# 删除对应的调度 |
|
|
|
|
|
deldsurl=f"{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/schedules/{item['id']}?scheduleId={item['id']}" |
|
|
|
|
|
delresponse = requests.delete(deldsurl, headers=headers, verify=False) |
|
|
|
|
|
deldstext= delresponse.text |
|
|
|
|
|
delresponsJson = json.loads(deldstext) |
|
|
|
|
|
if delresponsJson['msg'] == 'success': |
|
|
|
|
|
metatask_info = await cls.get_datasec_detail_services(query_db, process.metaTaskId) |
|
|
|
|
|
metatask_info.schId="" |
|
|
|
|
|
metatask_info2 = metatask_info.model_dump(exclude_unset=True) |
|
|
|
|
|
await DatasecConfigDao.edit(query_db,metatask_info.onum, metatask_info2) |
|
|
|
|
|
await query_db.commit() |
|
|
|
|
|
return "调度删除成功!" |
|
|
|
|
|
return "调度删除成功!" |