From ac8ecb73929a9064742b22a4b41d29bc91e40c5d Mon Sep 17 00:00:00 2001 From: "si@aidatagov.com" Date: Wed, 25 Jun 2025 02:17:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=8F=90=E4=BA=A4ds=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=AE=89=E5=85=A8=E6=A0=87=E7=AD=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/metadata_config_controller.py | 8 +- .../module_admin/dao/datasec_config_dao.py | 13 + .../entity/vo/metadata_config_vo.py | 4 +- .../service/datasec_config_service.py | 168 +++- .../src/views/meta/metatask/index.vue | 2 +- .../src/views/meta/metatask/secConfig.vue | 749 ++++++++++++++++++ 6 files changed, 935 insertions(+), 9 deletions(-) create mode 100644 vue-fastapi-frontend/src/views/meta/metatask/secConfig.vue diff --git a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py index fef3fd3..6f7ff72 100644 --- a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py +++ b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py @@ -492,9 +492,9 @@ async def get_datasec_config_list( query: DatasecConfigModel = Depends(DatasecConfigPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db), ): - result = await DatasecConfigService.get_datasec_list_services(query_db, query, False) + result = await DatasecConfigService.get_datasec_list_services(query_db, query, True) logger.info("获取数据安全参数配置列表成功") - return ResponseUtil.success(data=result) + return ResponseUtil.success(model_content=result) @metadataConfigController.post("/datasecConfig") @@ -507,7 +507,7 @@ async def add_datasec_config( ): model.create_by = current_user.user.user_name model.create_time = datetime.now() - result = await DatasecConfigService.add_datasec_services(query_db, model) + result = await DatasecConfigService.add_datasec_services(query_db, model,current_user,request) logger.info(result.message) return ResponseUtil.success(msg=result.message) @@ -522,7 +522,7 @@ async def edit_datasec_config( ): model.update_by = current_user.user.user_name model.update_time = datetime.now() - result = await DatasecConfigService.edit_datasec_services(query_db, model) + result = await DatasecConfigService.edit_datasec_services(query_db, model,current_user,request) logger.info(result.message) return ResponseUtil.success(msg=result.message) diff --git a/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py b/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py index 324ee3b..3a52ad8 100644 --- a/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py +++ b/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py @@ -64,3 +64,16 @@ class DatasecConfigDao: 批量删除任务配置 """ await db.execute(delete(DatasecConfig).where(DatasecConfig.onum.in_(onum_list))) + @classmethod + async def check_name_or_param_exist(cls, db: AsyncSession, metatask_name: str, metatask_param: str, exclude_onum: int = None): + """ + 检查是否存在相同的任务名称或参数字段,排除指定 onum(用于编辑) + """ + stmt = select(DatasecConfig).where( + (DatasecConfig.metatask_name == metatask_name) | (DatasecConfig.metatask_param == metatask_param) + ) + if exclude_onum: + stmt = stmt.where(DatasecConfig.onum != exclude_onum) + + result = await db.execute(stmt) + return result.scalars().first() is not None diff --git a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py index 6818329..333b743 100644 --- a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py +++ b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py @@ -335,4 +335,6 @@ class DatasecConfigModel(BaseModel): @as_query class DatasecConfigPageQueryModel(DatasecConfigModel): page_num: int = Field(default=1, description='当前页码') - page_size: int = Field(default=10, description='每页记录数') \ No newline at end of file + page_size: int = Field(default=10, description='每页记录数') + begin_time: Optional[str]= Field(default=None, description='开始时间') + end_time: Optional[str]= Field(default=None, description='结束时间') \ No newline at end of file diff --git a/vue-fastapi-backend/module_admin/service/datasec_config_service.py b/vue-fastapi-backend/module_admin/service/datasec_config_service.py index a9d5b3a..5db5010 100644 --- a/vue-fastapi-backend/module_admin/service/datasec_config_service.py +++ b/vue-fastapi-backend/module_admin/service/datasec_config_service.py @@ -4,7 +4,15 @@ from module_admin.entity.vo.metadata_config_vo import DatasecConfigModel from module_admin.entity.vo.common_vo import CrudResponseModel from exceptions.exception import ServiceException from utils.common_util import CamelCaseUtil - +from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel +from module_admin.service.metaprocessconfig_service import MetaprocessconfigService +import requests +import json +import re +from config.enums import RedisInitKeyConfig +from fastapi import Request,Depends +from config.env import AppConfig +from module_admin.entity.vo.user_vo import CurrentUserModel class DatasecConfigService: """ @@ -19,27 +27,115 @@ class DatasecConfigService: return await DatasecConfigDao.get_list(db, query_object, is_page) @classmethod - async def add_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel): + async def add_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel, current_user: CurrentUserModel, request: Request): """ 新增配置 """ try: + # 检查任务名称或参数是否已存在 + exists = await DatasecConfigDao.check_name_or_param_exist(db, page_object.metatask_name, page_object.metatask_param) + if exists: + raise ServiceException(message="任务名称或参数字段已存在") + + # 获取流程配置 + processconfig = MetaprocessconfigQueryModel() + processconfig.db_type = "SecConfig" + processConfigList = await MetaprocessconfigService.get_metaprocessconfig_list_services(db, processconfig, False) + + # 调用流程定义创建逻辑 + message = await cls.sec_process_defind_change_add(request, processConfigList, page_object, current_user) + if "成功" not in message: + await db.rollback() + raise ServiceException(message=f'新增数据安全_批次标签任务 {page_object.metatask_name} 失败,dolphinscheduler 创建失败'+message) + + # 添加记录并提交 await DatasecConfigDao.add(db, page_object) await db.commit() return CrudResponseModel(is_success=True, message="新增成功") + except Exception as e: await db.rollback() raise e @classmethod - async def edit_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel): + async def sec_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:DatasecConfigModel,current_user: CurrentUserModel): + + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} + + # 新增接口 + url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' + headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + response = requests.get(url, headers=headers, verify=False) + if response.reason == 'OK': + intdsids=[] + message='' + dstypes=[] + response_text = response.text + data = json.loads(response_text) + code_list = data["data"] + str_list = list(map(str, code_list)) + for config in processConfigList: + modified_json_str = config.taskDefinitionJson.replace("18093081592672", str_list[0]).replace("sh /home/xx/code/remote_python_sec.sh", "sh /home/xx/code/remote_python_sec.sh "+page_object.metatask_param) + modified_json_str2=config.taskRelationJson.replace("18093081592672", str_list[0]) + modified_json_str3=config.locations.replace("18093081592672", str_list[0]) + metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson + description="", # 替换工作流备注 + locations=modified_json_str3,# 替换locations + name =page_object.metatask_name,# 替换工作流名称 + timeout=config.timeout, + globalParams =config.globalParams , + tenantCode =config.tenantCode , + taskRelationJson =modified_json_str2,# 替换taskRelationJson + executionType =config.executionType , + releaseState=config.releaseState + ).model_dump(exclude_unset=True, by_alias=True) + form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} + response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False) + text= response_post0.text + responsJson = json.loads(text) + if responsJson['msg'] == 'success': + dstypes.append('1') + intdsids.append(responsJson['data']['code']) + if message: + message += ", " + message += page_object.metatask_name + "-数据安全_批次标签新增成功" + else: + if message: + message += ", " + message += page_object.metatask_name + "-数据安全_批次标签新增失败" + if len(intdsids)>0: + page_object.ds_ids=','.join([str(i) for i in intdsids]) + return message + + + @classmethod + async def edit_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel,current_user: CurrentUserModel,request: Request,): """ 编辑配置 """ edit_data = page_object.model_dump(exclude_unset=True) + + # 获取原始数据 info = await cls.get_datasec_detail_services(db, page_object.onum) if info and info.onum: + # 检查任务名称或参数是否重复(排除自身) + exists = await DatasecConfigDao.check_name_or_param_exist( + db, + metatask_name=page_object.metatask_name, + metatask_param=page_object.metatask_param, + exclude_onum=page_object.onum + ) + if exists: + raise ServiceException(message="任务名称或参数字段已存在") + try: + # 调用流程定义创建逻辑 + message= await cls.sec_process_defind_change_update(request,page_object,info,current_user) + if "成功" not in message: + await db.rollback() + raise ServiceException(message=f'更新数据安全_批次标签任务 {page_object.metatask_name} 失败,dolphinscheduler 创建失败'+message) await DatasecConfigDao.edit(db, page_object.onum, edit_data) await db.commit() return CrudResponseModel(is_success=True, message="更新成功") @@ -48,6 +144,72 @@ class DatasecConfigService: raise e else: raise ServiceException(message="配置数据不存在") + @classmethod + async def sec_process_defind_change_update(cls,request: Request,page_object:DatasecConfigModel,metatask_old:DatasecConfigModel,current_user: CurrentUserModel): + + projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode') + dsids=page_object.ds_ids.split(",") + result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)] + message='' + # 查询接口 + url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, } + headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'} + for config in result_list: + response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False) + text= response.text + responsJson = json.loads(text) + if responsJson['msg'] == 'success': + task_def_list = responsJson['data']['taskDefinitionList'] + + # 遍历修改 rawScript + task_def_list = responsJson['data']['taskDefinitionList'] + + for task in task_def_list: + task_params = task.get("taskParams") + if task_params and isinstance(task_params, dict): + raw_script = task_params.get("rawScript") + if raw_script and raw_script.startswith("sh /home/xx/code/remote_python_sec.sh"): + task_params["rawScript"] = f"sh /home/xx/code/remote_python_sec.sh {page_object.metatask_param}" + + + # 再序列化为 JSON 字符串 + modified_json_str = json.dumps(task_def_list, ensure_ascii=False, indent=0) + getTaskRelationList=responsJson['data']['processTaskRelationList'] + putTaskRelationList=[] + for item in getTaskRelationList: + new_item = { + "name": item['name'], + "preTaskCode":item['preTaskCode'] , + "preTaskVersion":item['preTaskVersion'] , + "postTaskCode":item['postTaskCode'] , + "conditionType":item['conditionType'] , + "conditionParams":item['conditionParams'] + } + putTaskRelationList.append(new_item) + + modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0) + modified_json_str2=re.sub(r'\s+', '', modified_json_str2) + metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson + description="", # 替换工作流备注 + locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations + name =page_object.metatask_name,# 替换工作流名称 + tenantCode =responsJson['data']['processDefinition']['tenantCode'] , + taskRelationJson =modified_json_str2,# 替换taskRelationJson + ).model_dump(exclude_unset=True, by_alias=True) + form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()} + response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False) + putText= response_put0.text + responsPutJson=json.loads(putText) + if responsPutJson['msg'] == 'success': + if message: + message += ", " + message += page_object.metatask_name + "-数据安全_批次标签成功" + else: + if message: + message += ", " + message += page_object.metatask_name + "-数据安全_批次标签修改失败" + return message @classmethod async def delete_datasec_services(cls, db: AsyncSession, onum_list: str): diff --git a/vue-fastapi-frontend/src/views/meta/metatask/index.vue b/vue-fastapi-frontend/src/views/meta/metatask/index.vue index 70305e1..4fd0825 100644 --- a/vue-fastapi-frontend/src/views/meta/metatask/index.vue +++ b/vue-fastapi-frontend/src/views/meta/metatask/index.vue @@ -297,7 +297,7 @@ width="100" > +
+ + + + + + + + + + + + + + + + + 搜索 + 重置 + + + + + + 新建任务 + + + 修改 + + + 上线 + + + 下线 + + + 调度 + + + 运行 + + + 日志 + + + 删除 + + + 删除调度 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + +