diff --git a/vue-fastapi-backend/.env.dev b/vue-fastapi-backend/.env.dev index a3e8bcc..54276fc 100644 --- a/vue-fastapi-backend/.env.dev +++ b/vue-fastapi-backend/.env.dev @@ -80,4 +80,7 @@ MINIO_ADDRESS = '192.168.0.3:9000' # minio用户 MINIO_ADMIN = 'admin' # minio密码 -MINIO_PASSWORD = 'admin123' \ No newline at end of file +MINIO_PASSWORD = 'admin123' + +# 访问dolphinscheduler的接口地址 +DS_SERVER_URL= http://47.121.207.11:12345 \ No newline at end of file diff --git a/vue-fastapi-backend/config/env.py b/vue-fastapi-backend/config/env.py index a5fd047..6af5783 100644 --- a/vue-fastapi-backend/config/env.py +++ b/vue-fastapi-backend/config/env.py @@ -21,8 +21,8 @@ class AppSettings(BaseSettings): app_reload: bool = True app_ip_location_query: bool = True app_same_time_login: bool = True - - + ds_server_url: str = 'http://47.121.207.11:12345' + ds_task_id: str = '15081964614112' class JwtSettings(BaseSettings): """ Jwt配置 diff --git a/vue-fastapi-backend/module_admin/controller/metatask_controller.py b/vue-fastapi-backend/module_admin/controller/metatask_controller.py index 301548e..d0079d4 100644 --- a/vue-fastapi-backend/module_admin/controller/metatask_controller.py +++ b/vue-fastapi-backend/module_admin/controller/metatask_controller.py @@ -38,10 +38,10 @@ async def get_system_metatask_list( ) async def get_process_instances_list( request: Request, - metatask_page_query: ProcessInstancePage = Depends(ProcessInstancePage), + metatask_page_query: ProcessInstancePage = Depends(ProcessInstancePage),current_user: CurrentUserModel = Depends(LoginService.get_current_user) ): # 获取分页数据 - config_page_query_result = await MetataskService.get_process_instances_services(request, metatask_page_query) + config_page_query_result = await MetataskService.get_process_instances_services(request, metatask_page_query,current_user) logger.info('获取成功') return ResponseUtil.success(model_content=config_page_query_result) @@ -51,10 +51,10 @@ async def get_process_instances_list( ) async def get_task_nodes_list( request: Request, - id:int, + id:int,current_user: CurrentUserModel = Depends(LoginService.get_current_user) ): - config_page_query_result = await MetataskService.get_task_nodes_services(request, id) + config_page_query_result = await MetataskService.get_task_nodes_services(request, id,current_user) logger.info('获取成功') return ResponseUtil.success(rows=config_page_query_result) @@ -64,10 +64,10 @@ async def get_task_nodes_list( ) async def get_log_details( request: Request, - id:int, + id:int,current_user: CurrentUserModel = Depends(LoginService.get_current_user) ): # 获取分页数据 - config_page_query_result = await MetataskService.get_log_details_services(request,id) + config_page_query_result = await MetataskService.get_log_details_services(request,id,current_user) logger.info('获取成功') return ResponseUtil.success(data=config_page_query_result) @@ -75,20 +75,20 @@ async def get_log_details( @metataskController.get( '/tree', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:list'))] ) -async def get_data_source_tree( request: Request,): +async def get_data_source_tree( request: Request,current_user: CurrentUserModel = Depends(LoginService.get_current_user)): # 获取分页数据 - data_tree_result = await MetataskService.get_data_source_tree( request) + data_tree_result = await MetataskService.get_data_source_tree( request,current_user) logger.info('获取成功') return ResponseUtil.success(rows=data_tree_result) @metataskController.get( '/sourceall', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:list'))] ) -async def get_data_source_all( request: Request,): +async def get_data_source_all( request: Request,current_user: CurrentUserModel = Depends(LoginService.get_current_user)): # 获取分页数据 - data_source_result = await MetataskService.get_data_source_all( request) + data_source_result = await MetataskService.get_data_source_all( request,current_user) logger.info('获取成功') return ResponseUtil.success(data=data_source_result) @@ -105,7 +105,7 @@ async def add_meta_metatask( add_metatask.create_time = datetime.now() add_metatask.update_by = current_user.user.user_name add_metatask.update_time = datetime.now() - add_metatask_result = await MetataskService.add_metatask_services(request, query_db, add_metatask) + add_metatask_result = await MetataskService.add_metatask_services(request, query_db, add_metatask,current_user) logger.info(add_metatask_result.message) return ResponseUtil.success(msg=add_metatask_result.message) @@ -123,7 +123,7 @@ async def edit_meta_metatask( ): edit_metatask.update_by = current_user.user.user_name edit_metatask.update_time = datetime.now() - edit_config_result = await MetataskService.edit_metatask_services(request, query_db, edit_metatask) + edit_config_result = await MetataskService.edit_metatask_services(request, query_db, edit_metatask,current_user) logger.info(edit_config_result.message) return ResponseUtil.success(msg=edit_config_result.message) @@ -148,10 +148,10 @@ async def run_meta_metatask( request: Request, process: ProcessDefinition, query_db: AsyncSession = Depends(get_db), - + current_user: CurrentUserModel = Depends(LoginService.get_current_user) ): - edit_config_result = await MetataskService.run_metatask_services(request, query_db, process) + edit_config_result = await MetataskService.run_metatask_services(request, query_db, process,current_user) return ResponseUtil.success(msg=edit_config_result) # 元数据任务调度 @@ -161,10 +161,10 @@ async def DS_meta_metatask( request: Request, process: ParmScheduleVo, query_db: AsyncSession = Depends(get_db), - + current_user: CurrentUserModel = Depends(LoginService.get_current_user) ): - edit_config_result = await MetataskService.ds_metatask_services(request, query_db, process) + edit_config_result = await MetataskService.ds_metatask_services(request, query_db, process,current_user) return ResponseUtil.success(msg=edit_config_result) @@ -185,9 +185,9 @@ async def get_metatask_logs( @metataskController.delete('/{metatask_ids}/{ds_ids}', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:delete'))]) @Log(title='元数据任务', business_type=BusinessType.DELETE) -async def delete_system_config(request: Request, metatask_ids: str,ds_ids:str, query_db: AsyncSession = Depends(get_db)): +async def delete_system_config(request: Request, metatask_ids: str,ds_ids:str, query_db: AsyncSession = Depends(get_db),current_user: CurrentUserModel = Depends(LoginService.get_current_user)): delete_config = DeleteMetataskModel(metatask_ids=metatask_ids,ds_ids=ds_ids) - delete_config_result = await MetataskService.delete_metatask_services(request, query_db, delete_config) + delete_config_result = await MetataskService.delete_metatask_services(request, query_db, delete_config,current_user) logger.info(delete_config_result.message) return ResponseUtil.success(msg=delete_config_result.message) diff --git a/vue-fastapi-backend/module_admin/service/metatask_service.py b/vue-fastapi-backend/module_admin/service/metatask_service.py index c47a712..369965a 100644 --- a/vue-fastapi-backend/module_admin/service/metatask_service.py +++ b/vue-fastapi-backend/module_admin/service/metatask_service.py @@ -17,6 +17,7 @@ import json import re from config.enums import RedisInitKeyConfig from module_admin.service.metaprocessconfig_service import MetaprocessconfigService +from config.env import AppConfig class MetataskService: @@ -39,10 +40,10 @@ class MetataskService: return metatask_list_result @classmethod - async def get_data_source_tree(cls,request: Request): - url = 'http://47.121.207.11:12345/dolphinscheduler/datasources?pageNo=1&pageSize=100' - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') - headers = {'token': token} + async def get_data_source_tree(cls,request: Request, current_user: CurrentUserModel): + + url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password,} response = requests.get(url, headers=headers) if response.reason == 'OK': response_text = response.text @@ -57,16 +58,15 @@ class MetataskService: else: return {'error': f'Request failed with status code {response.status_code}'} @classmethod - async def get_data_source_all(cls,request: Request): + async def get_data_source_all(cls,request: Request,current_user: CurrentUserModel): # Worker分组 - url1 = 'http://47.121.207.11:12345/dolphinscheduler/worker-groups/all' + url1 = f'{AppConfig.ds_server_url}/dolphinscheduler/worker-groups/all' # 警告组 - url2= 'http://47.121.207.11:12345/dolphinscheduler/alert-groups/list' + url2= f'{AppConfig.ds_server_url}/dolphinscheduler/alert-groups/list' # 工作环境 - url3 = 'http://47.121.207.11:12345/dolphinscheduler/environment/query-environment-list' + url3 = f'{AppConfig.ds_server_url}/dolphinscheduler/environment/query-environment-list' dataspurceVo=Datasouceall() - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') - headers = {'token': token} + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response1 = requests.get(url1, headers=headers) response2 = requests.get(url2, headers=headers) response3 = requests.get(url3, headers=headers) @@ -110,7 +110,7 @@ class MetataskService: return CommonConstant.UNIQUE @classmethod - async def add_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel): + async def add_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel): """ 新增参数配置信息service :param request: Request对象 @@ -129,7 +129,7 @@ class MetataskService: processconfig.ac_target=page_object.ac_target processconfig.db_type=page_object.dbCode processConfigList =await MetaprocessconfigService.get_metaprocessconfig_list_services(query_db,processconfig,False) - message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object) + message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object,current_user) if "成功" not in message: await query_db.rollback() raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,dolphinscheduler创建失败') @@ -141,17 +141,17 @@ class MetataskService: await query_db.rollback() raise e @classmethod - async def mysql_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel): + async def mysql_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel,current_user: CurrentUserModel): """ mysql类型 """ - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') - url = 'http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5' - headers = {'token': token} + url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} + # 新增接口 - url2='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition' - headers2 = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'} + url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' + headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} response = requests.get(url, headers=headers) if response.reason == 'OK': intdsids=[] @@ -229,20 +229,19 @@ class MetataskService: return message @classmethod - async def mysql_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel): + async def mysql_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel,current_user: CurrentUserModel): """ mysql类型 """ - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') dstypes=page_object.ds_types.split(",") dsids=page_object.ds_ids.split(",") result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)] message='' # 查询接口 - url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition' - headers = {'token': token, } - headers2 = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'} + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, } + headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} for config in result_list: # mysql表字段 if config['dstype']=='0' : @@ -326,7 +325,7 @@ class MetataskService: message += page_object.metatask_name + "-存储过程采集修改失败" return message @classmethod - async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel): + async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel,current_user: CurrentUserModel): """ 编辑参数配置信息service :param request: Request对象 @@ -342,7 +341,7 @@ class MetataskService: else: try: metatask_old = await cls.metatask_detail_services(query_db, metatask_info.metatask_id) - message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old) + message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old,current_user) if "成功" not in message: await query_db.rollback() raise ServiceException(message=f'修改元数据任务{page_object.metatask_name}失败,dolphinscheduler修改失败') @@ -363,15 +362,14 @@ class MetataskService: metatask_info.update_by = current_user.user.user_name metatask_info.update_time = datetime.now() type_str: str - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') dstypes=metatask_info.ds_types.split(",") dsids=metatask_info.ds_ids.split(",") result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)] message='' # 查询接口 - url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition' - headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'} + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} for config in result_list: # mysql表字段 if config['dstype']=='0' : @@ -423,7 +421,7 @@ class MetataskService: @classmethod async def run_metatask_services( - cls, request: Request, query_db: AsyncSession, process: ProcessDefinition + cls, request: Request, query_db: AsyncSession, process: ProcessDefinition,current_user: CurrentUserModel ): process.failureStrategy='CONTINUE' process.warningType='NONE' @@ -434,10 +432,9 @@ class MetataskService: process.processInstancePriority='MEDIUM' process.dryRun=0 process.scheduleTime="{complementStartDate:'2025-01-12 00:00:00',complementEndDate:'2025-01-12 00:00:00'}" - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') - url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/executors/start-process-instance' - headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'} + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/executors/start-process-instance' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} # form_data = {key: str(value) for key, value in process.__dict__.items()} form_data = {key: value for key, value in process.__dict__.items()} @@ -451,7 +448,7 @@ class MetataskService: raise ServiceException(message='运行失败!') @classmethod async def ds_metatask_services( - cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo + cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo,current_user: CurrentUserModel ): parm =ParmSchedule( @@ -469,10 +466,9 @@ class MetataskService: '", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') + '", "crontab":"' + process.crontab + '", "timezoneId":"Asia/Shanghai"}') - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') - url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/schedules' - headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'} + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/schedules' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} # form_data = {key: str(value) for key, value in process.__dict__.items()} form_data = {key: value for key, value in parm.__dict__.items()} @@ -507,7 +503,7 @@ class MetataskService: raise ServiceException(message='更新失败') @classmethod - async def delete_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel): + async def delete_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel,current_user: CurrentUserModel): """ 删除参数配置信息service :param request: Request对象 @@ -519,13 +515,12 @@ class MetataskService: if page_object.metatask_ids and page_object.ds_ids: metatask_id_list = page_object.metatask_ids.split(',') try: - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') # 查询接口 - url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete' + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete' form_data={'codes':page_object.ds_ids} - headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'} + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} response = requests.post(url, headers=headers,data=form_data) text= response.text responsJson = json.loads(text) @@ -560,12 +555,11 @@ class MetataskService: return result @classmethod async def get_process_instances_services( - cls, request: Request, query_object: ProcessInstancePage + cls, request: Request, query_object: ProcessInstancePage,current_user: CurrentUserModel ): - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') - url = f'http://47.121.207.11:12345/dolphinscheduler/projects/{projectCode}/process-instances?pageNo={query_object.page_num}&pageSize={query_object.page_size}&searchVal={query_object.searchVal}' - headers = {'token': token} + url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances?pageNo={query_object.page_num}&pageSize={query_object.page_size}&searchVal={query_object.searchVal}' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response = requests.get(url, headers=headers) try: @@ -583,12 +577,11 @@ class MetataskService: @classmethod async def get_task_nodes_services( -cls, request: Request,id:int +cls, request: Request,id:int,current_user: CurrentUserModel ): - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') - url = f'http://47.121.207.11:12345/dolphinscheduler/projects/{projectCode}/process-instances/{id}/tasks' - headers = {'token': token} + url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/{projectCode}/process-instances/{id}/tasks' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response = requests.get(url, headers=headers) try: response = requests.get(url, headers=headers) @@ -606,11 +599,10 @@ cls, request: Request,id:int @classmethod async def get_log_details_services( - cls, request: Request,id:int + cls, request: Request,id:int,current_user: CurrentUserModel ): - token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token') - url = f'http://47.121.207.11:12345/dolphinscheduler/log/detail?taskInstanceId={id}&limit=1000&skipLineNum=0' - headers = {'token': token} + url = f'{AppConfig.ds_server_url}/dolphinscheduler/log/detail?taskInstanceId={id}&limit=1000&skipLineNum=0' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response = requests.get(url, headers=headers) try: response = requests.get(url, headers=headers) diff --git a/vue-fastapi-frontend/src/views/meta/metatask/index.vue b/vue-fastapi-frontend/src/views/meta/metatask/index.vue index ac4071e..4d206b5 100644 --- a/vue-fastapi-frontend/src/views/meta/metatask/index.vue +++ b/vue-fastapi-frontend/src/views/meta/metatask/index.vue @@ -884,7 +884,8 @@ const submitForm = async () => { const submitForm = { ...form.value }; submitForm.dbCode = clickNode.value.type; submitForm.dbRName = clickNode.value.name; - if (form.metataskId !== undefined) { + if (submitForm.metataskId !== undefined) { + await updatemetatask(submitForm); proxy.$modal.msgSuccess("修改成功"); open.value = false;