Browse Source

ds接口可配置

master
si@aidatagov.com 2 weeks ago
parent
commit
6cce2560ee
  1. 5
      vue-fastapi-backend/.env.dev
  2. 4
      vue-fastapi-backend/config/env.py
  3. 36
      vue-fastapi-backend/module_admin/controller/metatask_controller.py
  4. 96
      vue-fastapi-backend/module_admin/service/metatask_service.py
  5. 3
      vue-fastapi-frontend/src/views/meta/metatask/index.vue

5
vue-fastapi-backend/.env.dev

@ -80,4 +80,7 @@ MINIO_ADDRESS = '192.168.0.3:9000'
# minio用户
MINIO_ADMIN = 'admin'
# minio密码
MINIO_PASSWORD = 'admin123'
MINIO_PASSWORD = 'admin123'
# 访问dolphinscheduler的接口地址
DS_SERVER_URL= http://47.121.207.11:12345

4
vue-fastapi-backend/config/env.py

@ -21,8 +21,8 @@ class AppSettings(BaseSettings):
app_reload: bool = True
app_ip_location_query: bool = True
app_same_time_login: bool = True
ds_server_url: str = 'http://47.121.207.11:12345'
ds_task_id: str = '15081964614112'
class JwtSettings(BaseSettings):
"""
Jwt配置

36
vue-fastapi-backend/module_admin/controller/metatask_controller.py

@ -38,10 +38,10 @@ async def get_system_metatask_list(
)
async def get_process_instances_list(
request: Request,
metatask_page_query: ProcessInstancePage = Depends(ProcessInstancePage),
metatask_page_query: ProcessInstancePage = Depends(ProcessInstancePage),current_user: CurrentUserModel = Depends(LoginService.get_current_user)
):
# 获取分页数据
config_page_query_result = await MetataskService.get_process_instances_services(request, metatask_page_query)
config_page_query_result = await MetataskService.get_process_instances_services(request, metatask_page_query,current_user)
logger.info('获取成功')
return ResponseUtil.success(model_content=config_page_query_result)
@ -51,10 +51,10 @@ async def get_process_instances_list(
)
async def get_task_nodes_list(
request: Request,
id:int,
id:int,current_user: CurrentUserModel = Depends(LoginService.get_current_user)
):
config_page_query_result = await MetataskService.get_task_nodes_services(request, id)
config_page_query_result = await MetataskService.get_task_nodes_services(request, id,current_user)
logger.info('获取成功')
return ResponseUtil.success(rows=config_page_query_result)
@ -64,10 +64,10 @@ async def get_task_nodes_list(
)
async def get_log_details(
request: Request,
id:int,
id:int,current_user: CurrentUserModel = Depends(LoginService.get_current_user)
):
# 获取分页数据
config_page_query_result = await MetataskService.get_log_details_services(request,id)
config_page_query_result = await MetataskService.get_log_details_services(request,id,current_user)
logger.info('获取成功')
return ResponseUtil.success(data=config_page_query_result)
@ -75,20 +75,20 @@ async def get_log_details(
@metataskController.get(
'/tree', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:list'))]
)
async def get_data_source_tree( request: Request,):
async def get_data_source_tree( request: Request,current_user: CurrentUserModel = Depends(LoginService.get_current_user)):
# 获取分页数据
data_tree_result = await MetataskService.get_data_source_tree( request)
data_tree_result = await MetataskService.get_data_source_tree( request,current_user)
logger.info('获取成功')
return ResponseUtil.success(rows=data_tree_result)
@metataskController.get(
'/sourceall', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:list'))]
)
async def get_data_source_all( request: Request,):
async def get_data_source_all( request: Request,current_user: CurrentUserModel = Depends(LoginService.get_current_user)):
# 获取分页数据
data_source_result = await MetataskService.get_data_source_all( request)
data_source_result = await MetataskService.get_data_source_all( request,current_user)
logger.info('获取成功')
return ResponseUtil.success(data=data_source_result)
@ -105,7 +105,7 @@ async def add_meta_metatask(
add_metatask.create_time = datetime.now()
add_metatask.update_by = current_user.user.user_name
add_metatask.update_time = datetime.now()
add_metatask_result = await MetataskService.add_metatask_services(request, query_db, add_metatask)
add_metatask_result = await MetataskService.add_metatask_services(request, query_db, add_metatask,current_user)
logger.info(add_metatask_result.message)
return ResponseUtil.success(msg=add_metatask_result.message)
@ -123,7 +123,7 @@ async def edit_meta_metatask(
):
edit_metatask.update_by = current_user.user.user_name
edit_metatask.update_time = datetime.now()
edit_config_result = await MetataskService.edit_metatask_services(request, query_db, edit_metatask)
edit_config_result = await MetataskService.edit_metatask_services(request, query_db, edit_metatask,current_user)
logger.info(edit_config_result.message)
return ResponseUtil.success(msg=edit_config_result.message)
@ -148,10 +148,10 @@ async def run_meta_metatask(
request: Request,
process: ProcessDefinition,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user)
):
edit_config_result = await MetataskService.run_metatask_services(request, query_db, process)
edit_config_result = await MetataskService.run_metatask_services(request, query_db, process,current_user)
return ResponseUtil.success(msg=edit_config_result)
# 元数据任务调度
@ -161,10 +161,10 @@ async def DS_meta_metatask(
request: Request,
process: ParmScheduleVo,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user)
):
edit_config_result = await MetataskService.ds_metatask_services(request, query_db, process)
edit_config_result = await MetataskService.ds_metatask_services(request, query_db, process,current_user)
return ResponseUtil.success(msg=edit_config_result)
@ -185,9 +185,9 @@ async def get_metatask_logs(
@metataskController.delete('/{metatask_ids}/{ds_ids}', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:delete'))])
@Log(title='元数据任务', business_type=BusinessType.DELETE)
async def delete_system_config(request: Request, metatask_ids: str,ds_ids:str, query_db: AsyncSession = Depends(get_db)):
async def delete_system_config(request: Request, metatask_ids: str,ds_ids:str, query_db: AsyncSession = Depends(get_db),current_user: CurrentUserModel = Depends(LoginService.get_current_user)):
delete_config = DeleteMetataskModel(metatask_ids=metatask_ids,ds_ids=ds_ids)
delete_config_result = await MetataskService.delete_metatask_services(request, query_db, delete_config)
delete_config_result = await MetataskService.delete_metatask_services(request, query_db, delete_config,current_user)
logger.info(delete_config_result.message)
return ResponseUtil.success(msg=delete_config_result.message)

96
vue-fastapi-backend/module_admin/service/metatask_service.py

@ -17,6 +17,7 @@ import json
import re
from config.enums import RedisInitKeyConfig
from module_admin.service.metaprocessconfig_service import MetaprocessconfigService
from config.env import AppConfig
class MetataskService:
@ -39,10 +40,10 @@ class MetataskService:
return metatask_list_result
@classmethod
async def get_data_source_tree(cls,request: Request):
url = 'http://47.121.207.11:12345/dolphinscheduler/datasources?pageNo=1&pageSize=100'
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
headers = {'token': token}
async def get_data_source_tree(cls,request: Request, current_user: CurrentUserModel):
url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password,}
response = requests.get(url, headers=headers)
if response.reason == 'OK':
response_text = response.text
@ -57,16 +58,15 @@ class MetataskService:
else:
return {'error': f'Request failed with status code {response.status_code}'}
@classmethod
async def get_data_source_all(cls,request: Request):
async def get_data_source_all(cls,request: Request,current_user: CurrentUserModel):
# Worker分组
url1 = 'http://47.121.207.11:12345/dolphinscheduler/worker-groups/all'
url1 = f'{AppConfig.ds_server_url}/dolphinscheduler/worker-groups/all'
# 警告组
url2= 'http://47.121.207.11:12345/dolphinscheduler/alert-groups/list'
url2= f'{AppConfig.ds_server_url}/dolphinscheduler/alert-groups/list'
# 工作环境
url3 = 'http://47.121.207.11:12345/dolphinscheduler/environment/query-environment-list'
url3 = f'{AppConfig.ds_server_url}/dolphinscheduler/environment/query-environment-list'
dataspurceVo=Datasouceall()
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
headers = {'token': token}
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
response1 = requests.get(url1, headers=headers)
response2 = requests.get(url2, headers=headers)
response3 = requests.get(url3, headers=headers)
@ -110,7 +110,7 @@ class MetataskService:
return CommonConstant.UNIQUE
@classmethod
async def add_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel):
async def add_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel):
"""
新增参数配置信息service
:param request: Request对象
@ -129,7 +129,7 @@ class MetataskService:
processconfig.ac_target=page_object.ac_target
processconfig.db_type=page_object.dbCode
processConfigList =await MetaprocessconfigService.get_metaprocessconfig_list_services(query_db,processconfig,False)
message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object)
message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object,current_user)
if "成功" not in message:
await query_db.rollback()
raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,dolphinscheduler创建失败')
@ -141,17 +141,17 @@ class MetataskService:
await query_db.rollback()
raise e
@classmethod
async def mysql_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel):
async def mysql_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel):
"""
mysql类型
"""
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = 'http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
headers = {'token': token}
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
# 新增接口
url2='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers2 = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.get(url, headers=headers)
if response.reason == 'OK':
intdsids=[]
@ -229,20 +229,19 @@ class MetataskService:
return message
@classmethod
async def mysql_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel):
async def mysql_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel):
"""
mysql类型
"""
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
dstypes=page_object.ds_types.split(",")
dsids=page_object.ds_ids.split(",")
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
message=''
# 查询接口
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'token': token, }
headers2 = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
for config in result_list:
# mysql表字段
if config['dstype']=='0' :
@ -326,7 +325,7 @@ class MetataskService:
message += page_object.metatask_name + "-存储过程采集修改失败"
return message
@classmethod
async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel):
async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel):
"""
编辑参数配置信息service
:param request: Request对象
@ -342,7 +341,7 @@ class MetataskService:
else:
try:
metatask_old = await cls.metatask_detail_services(query_db, metatask_info.metatask_id)
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old)
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user)
if "成功" not in message:
await query_db.rollback()
raise ServiceException(message=f'修改元数据任务{page_object.metatask_name}失败,dolphinscheduler修改失败')
@ -363,15 +362,14 @@ class MetataskService:
metatask_info.update_by = current_user.user.user_name
metatask_info.update_time = datetime.now()
type_str: str
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
dstypes=metatask_info.ds_types.split(",")
dsids=metatask_info.ds_ids.split(",")
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
message=''
# 查询接口
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
for config in result_list:
# mysql表字段
if config['dstype']=='0' :
@ -423,7 +421,7 @@ class MetataskService:
@classmethod
async def run_metatask_services(
cls, request: Request, query_db: AsyncSession, process: ProcessDefinition
cls, request: Request, query_db: AsyncSession, process: ProcessDefinition,current_user: CurrentUserModel
):
process.failureStrategy='CONTINUE'
process.warningType='NONE'
@ -434,10 +432,9 @@ class MetataskService:
process.processInstancePriority='MEDIUM'
process.dryRun=0
process.scheduleTime="{complementStartDate:'2025-01-12 00:00:00',complementEndDate:'2025-01-12 00:00:00'}"
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/executors/start-process-instance'
headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/executors/start-process-instance'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
# form_data = {key: str(value) for key, value in process.__dict__.items()}
form_data = {key: value for key, value in process.__dict__.items()}
@ -451,7 +448,7 @@ class MetataskService:
raise ServiceException(message='运行失败!')
@classmethod
async def ds_metatask_services(
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel
):
parm =ParmSchedule(
@ -469,10 +466,9 @@ class MetataskService:
'", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') +
'", "crontab":"' + process.crontab +
'", "timezoneId":"Asia/Shanghai"}')
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/schedules'
headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
# form_data = {key: str(value) for key, value in process.__dict__.items()}
form_data = {key: value for key, value in parm.__dict__.items()}
@ -507,7 +503,7 @@ class MetataskService:
raise ServiceException(message='更新失败')
@classmethod
async def delete_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel):
async def delete_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel,current_user: CurrentUserModel):
"""
删除参数配置信息service
:param request: Request对象
@ -519,13 +515,12 @@ class MetataskService:
if page_object.metatask_ids and page_object.ds_ids:
metatask_id_list = page_object.metatask_ids.split(',')
try:
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
# 查询接口
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete'
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete'
form_data={'codes':page_object.ds_ids}
headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.post(url, headers=headers,data=form_data)
text= response.text
responsJson = json.loads(text)
@ -560,12 +555,11 @@ class MetataskService:
return result
@classmethod
async def get_process_instances_services(
cls, request: Request, query_object: ProcessInstancePage
cls, request: Request, query_object: ProcessInstancePage,current_user: CurrentUserModel
):
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = f'http://47.121.207.11:12345/dolphinscheduler/projects/{projectCode}/process-instances?pageNo={query_object.page_num}&pageSize={query_object.page_size}&searchVal={query_object.searchVal}'
headers = {'token': token}
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances?pageNo={query_object.page_num}&pageSize={query_object.page_size}&searchVal={query_object.searchVal}'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
response = requests.get(url, headers=headers)
try:
@ -583,12 +577,11 @@ class MetataskService:
@classmethod
async def get_task_nodes_services(
cls, request: Request,id:int
cls, request: Request,id:int,current_user: CurrentUserModel
):
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = f'http://47.121.207.11:12345/dolphinscheduler/projects/{projectCode}/process-instances/{id}/tasks'
headers = {'token': token}
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances/{id}/tasks'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
response = requests.get(url, headers=headers)
try:
response = requests.get(url, headers=headers)
@ -606,11 +599,10 @@ cls, request: Request,id:int
@classmethod
async def get_log_details_services(
cls, request: Request,id:int
cls, request: Request,id:int,current_user: CurrentUserModel
):
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
url = f'http://47.121.207.11:12345/dolphinscheduler/log/detail?taskInstanceId={id}&limit=1000&skipLineNum=0'
headers = {'token': token}
url = f'{AppConfig.ds_server_url}/dolphinscheduler/log/detail?taskInstanceId={id}&limit=1000&skipLineNum=0'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
response = requests.get(url, headers=headers)
try:
response = requests.get(url, headers=headers)

3
vue-fastapi-frontend/src/views/meta/metatask/index.vue

@ -884,7 +884,8 @@ const submitForm = async () => {
const submitForm = { ...form.value };
submitForm.dbCode = clickNode.value.type;
submitForm.dbRName = clickNode.value.name;
if (form.metataskId !== undefined) {
if (submitForm.metataskId !== undefined) {
await updatemetatask(submitForm);
proxy.$modal.msgSuccess("修改成功");
open.value = false;

Loading…
Cancel
Save