diff --git a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py
index fef3fd3..6f7ff72 100644
--- a/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py
+++ b/vue-fastapi-backend/module_admin/controller/metadata_config_controller.py
@@ -492,9 +492,9 @@ async def get_datasec_config_list(
query: DatasecConfigModel = Depends(DatasecConfigPageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
- result = await DatasecConfigService.get_datasec_list_services(query_db, query, False)
+ result = await DatasecConfigService.get_datasec_list_services(query_db, query, True)
logger.info("获取数据安全参数配置列表成功")
- return ResponseUtil.success(data=result)
+ return ResponseUtil.success(model_content=result)
@metadataConfigController.post("/datasecConfig")
@@ -507,7 +507,7 @@ async def add_datasec_config(
):
model.create_by = current_user.user.user_name
model.create_time = datetime.now()
- result = await DatasecConfigService.add_datasec_services(query_db, model)
+ result = await DatasecConfigService.add_datasec_services(query_db, model,current_user,request)
logger.info(result.message)
return ResponseUtil.success(msg=result.message)
@@ -522,7 +522,7 @@ async def edit_datasec_config(
):
model.update_by = current_user.user.user_name
model.update_time = datetime.now()
- result = await DatasecConfigService.edit_datasec_services(query_db, model)
+ result = await DatasecConfigService.edit_datasec_services(query_db, model,current_user,request)
logger.info(result.message)
return ResponseUtil.success(msg=result.message)
diff --git a/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py b/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py
index 324ee3b..3a52ad8 100644
--- a/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py
+++ b/vue-fastapi-backend/module_admin/dao/datasec_config_dao.py
@@ -64,3 +64,16 @@ class DatasecConfigDao:
批量删除任务配置
"""
await db.execute(delete(DatasecConfig).where(DatasecConfig.onum.in_(onum_list)))
+ @classmethod
+ async def check_name_or_param_exist(cls, db: AsyncSession, metatask_name: str, metatask_param: str, exclude_onum: int = None):
+ """
+ 检查是否存在相同的任务名称或参数字段,排除指定 onum(用于编辑)
+ """
+ stmt = select(DatasecConfig).where(
+ (DatasecConfig.metatask_name == metatask_name) | (DatasecConfig.metatask_param == metatask_param)
+ )
+ if exclude_onum:
+ stmt = stmt.where(DatasecConfig.onum != exclude_onum)
+
+ result = await db.execute(stmt)
+ return result.scalars().first() is not None
diff --git a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py
index 6818329..333b743 100644
--- a/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py
+++ b/vue-fastapi-backend/module_admin/entity/vo/metadata_config_vo.py
@@ -335,4 +335,6 @@ class DatasecConfigModel(BaseModel):
@as_query
class DatasecConfigPageQueryModel(DatasecConfigModel):
page_num: int = Field(default=1, description='当前页码')
- page_size: int = Field(default=10, description='每页记录数')
\ No newline at end of file
+ page_size: int = Field(default=10, description='每页记录数')
+ begin_time: Optional[str]= Field(default=None, description='开始时间')
+ end_time: Optional[str]= Field(default=None, description='结束时间')
\ No newline at end of file
diff --git a/vue-fastapi-backend/module_admin/service/datasec_config_service.py b/vue-fastapi-backend/module_admin/service/datasec_config_service.py
index a9d5b3a..5db5010 100644
--- a/vue-fastapi-backend/module_admin/service/datasec_config_service.py
+++ b/vue-fastapi-backend/module_admin/service/datasec_config_service.py
@@ -4,7 +4,15 @@ from module_admin.entity.vo.metadata_config_vo import DatasecConfigModel
from module_admin.entity.vo.common_vo import CrudResponseModel
from exceptions.exception import ServiceException
from utils.common_util import CamelCaseUtil
-
+from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel
+from module_admin.service.metaprocessconfig_service import MetaprocessconfigService
+import requests
+import json
+import re
+from config.enums import RedisInitKeyConfig
+from fastapi import Request,Depends
+from config.env import AppConfig
+from module_admin.entity.vo.user_vo import CurrentUserModel
class DatasecConfigService:
"""
@@ -19,27 +27,115 @@ class DatasecConfigService:
return await DatasecConfigDao.get_list(db, query_object, is_page)
@classmethod
- async def add_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel):
+ async def add_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel, current_user: CurrentUserModel, request: Request):
"""
新增配置
"""
try:
+ # 检查任务名称或参数是否已存在
+ exists = await DatasecConfigDao.check_name_or_param_exist(db, page_object.metatask_name, page_object.metatask_param)
+ if exists:
+ raise ServiceException(message="任务名称或参数字段已存在")
+
+ # 获取流程配置
+ processconfig = MetaprocessconfigQueryModel()
+ processconfig.db_type = "SecConfig"
+ processConfigList = await MetaprocessconfigService.get_metaprocessconfig_list_services(db, processconfig, False)
+
+ # 调用流程定义创建逻辑
+ message = await cls.sec_process_defind_change_add(request, processConfigList, page_object, current_user)
+ if "成功" not in message:
+ await db.rollback()
+ raise ServiceException(message=f'新增数据安全_批次标签任务 {page_object.metatask_name} 失败,dolphinscheduler 创建失败'+message)
+
+ # 添加记录并提交
await DatasecConfigDao.add(db, page_object)
await db.commit()
return CrudResponseModel(is_success=True, message="新增成功")
+
except Exception as e:
await db.rollback()
raise e
@classmethod
- async def edit_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel):
+ async def sec_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:DatasecConfigModel,current_user: CurrentUserModel):
+
+ projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
+ url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
+ headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
+
+ # 新增接口
+ url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
+ headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
+ response = requests.get(url, headers=headers, verify=False)
+ if response.reason == 'OK':
+ intdsids=[]
+ message=''
+ dstypes=[]
+ response_text = response.text
+ data = json.loads(response_text)
+ code_list = data["data"]
+ str_list = list(map(str, code_list))
+ for config in processConfigList:
+ modified_json_str = config.taskDefinitionJson.replace("18093081592672", str_list[0]).replace("sh /home/xx/code/remote_python_sec.sh", "sh /home/xx/code/remote_python_sec.sh "+page_object.metatask_param)
+ modified_json_str2=config.taskRelationJson.replace("18093081592672", str_list[0])
+ modified_json_str3=config.locations.replace("18093081592672", str_list[0])
+ metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
+ description="", # 替换工作流备注
+ locations=modified_json_str3,# 替换locations
+ name =page_object.metatask_name,# 替换工作流名称
+ timeout=config.timeout,
+ globalParams =config.globalParams ,
+ tenantCode =config.tenantCode ,
+ taskRelationJson =modified_json_str2,# 替换taskRelationJson
+ executionType =config.executionType ,
+ releaseState=config.releaseState
+ ).model_dump(exclude_unset=True, by_alias=True)
+ form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
+ response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
+ text= response_post0.text
+ responsJson = json.loads(text)
+ if responsJson['msg'] == 'success':
+ dstypes.append('1')
+ intdsids.append(responsJson['data']['code'])
+ if message:
+ message += ", "
+ message += page_object.metatask_name + "-数据安全_批次标签新增成功"
+ else:
+ if message:
+ message += ", "
+ message += page_object.metatask_name + "-数据安全_批次标签新增失败"
+ if len(intdsids)>0:
+ page_object.ds_ids=','.join([str(i) for i in intdsids])
+ return message
+
+
+ @classmethod
+ async def edit_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel,current_user: CurrentUserModel,request: Request,):
"""
编辑配置
"""
edit_data = page_object.model_dump(exclude_unset=True)
+
+ # 获取原始数据
info = await cls.get_datasec_detail_services(db, page_object.onum)
if info and info.onum:
+ # 检查任务名称或参数是否重复(排除自身)
+ exists = await DatasecConfigDao.check_name_or_param_exist(
+ db,
+ metatask_name=page_object.metatask_name,
+ metatask_param=page_object.metatask_param,
+ exclude_onum=page_object.onum
+ )
+ if exists:
+ raise ServiceException(message="任务名称或参数字段已存在")
+
try:
+ # 调用流程定义创建逻辑
+ message= await cls.sec_process_defind_change_update(request,page_object,info,current_user)
+ if "成功" not in message:
+ await db.rollback()
+ raise ServiceException(message=f'更新数据安全_批次标签任务 {page_object.metatask_name} 失败,dolphinscheduler 创建失败'+message)
await DatasecConfigDao.edit(db, page_object.onum, edit_data)
await db.commit()
return CrudResponseModel(is_success=True, message="更新成功")
@@ -48,6 +144,72 @@ class DatasecConfigService:
raise e
else:
raise ServiceException(message="配置数据不存在")
+ @classmethod
+ async def sec_process_defind_change_update(cls,request: Request,page_object:DatasecConfigModel,metatask_old:DatasecConfigModel,current_user: CurrentUserModel):
+
+ projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
+ dsids=page_object.ds_ids.split(",")
+ result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)]
+ message=''
+ # 查询接口
+ url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
+ headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
+ headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
+ for config in result_list:
+ response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
+ text= response.text
+ responsJson = json.loads(text)
+ if responsJson['msg'] == 'success':
+ task_def_list = responsJson['data']['taskDefinitionList']
+
+ # 遍历修改 rawScript
+ task_def_list = responsJson['data']['taskDefinitionList']
+
+ for task in task_def_list:
+ task_params = task.get("taskParams")
+ if task_params and isinstance(task_params, dict):
+ raw_script = task_params.get("rawScript")
+ if raw_script and raw_script.startswith("sh /home/xx/code/remote_python_sec.sh"):
+ task_params["rawScript"] = f"sh /home/xx/code/remote_python_sec.sh {page_object.metatask_param}"
+
+
+ # 再序列化为 JSON 字符串
+ modified_json_str = json.dumps(task_def_list, ensure_ascii=False, indent=0)
+ getTaskRelationList=responsJson['data']['processTaskRelationList']
+ putTaskRelationList=[]
+ for item in getTaskRelationList:
+ new_item = {
+ "name": item['name'],
+ "preTaskCode":item['preTaskCode'] ,
+ "preTaskVersion":item['preTaskVersion'] ,
+ "postTaskCode":item['postTaskCode'] ,
+ "conditionType":item['conditionType'] ,
+ "conditionParams":item['conditionParams']
+ }
+ putTaskRelationList.append(new_item)
+
+ modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
+ modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
+ metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
+ description="", # 替换工作流备注
+ locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
+ name =page_object.metatask_name,# 替换工作流名称
+ tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
+ taskRelationJson =modified_json_str2,# 替换taskRelationJson
+ ).model_dump(exclude_unset=True, by_alias=True)
+ form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
+ response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
+ putText= response_put0.text
+ responsPutJson=json.loads(putText)
+ if responsPutJson['msg'] == 'success':
+ if message:
+ message += ", "
+ message += page_object.metatask_name + "-数据安全_批次标签成功"
+ else:
+ if message:
+ message += ", "
+ message += page_object.metatask_name + "-数据安全_批次标签修改失败"
+ return message
@classmethod
async def delete_datasec_services(cls, db: AsyncSession, onum_list: str):
diff --git a/vue-fastapi-frontend/src/views/meta/metatask/index.vue b/vue-fastapi-frontend/src/views/meta/metatask/index.vue
index 70305e1..4fd0825 100644
--- a/vue-fastapi-frontend/src/views/meta/metatask/index.vue
+++ b/vue-fastapi-frontend/src/views/meta/metatask/index.vue
@@ -297,7 +297,7 @@
width="100"
>
- {{ parseTime(scope.row.createTime) }}
+ {{ parseTime(scope.row.updateTime) }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 搜索
+ 重置
+
+
+
+
+
+ 新建任务
+
+
+ 修改
+
+
+ 上线
+
+
+ 下线
+
+
+ 调度
+
+
+ 运行
+
+
+ 日志
+
+
+ 删除
+
+
+ 删除调度
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{scope.row.schId}}
+
+
+
+
+ {{ parseTime(scope.row.createTime) }}
+
+
+
+
+ {{ parseTime(scope.row.updateTime) }}
+
+
+
+
+
+
+
+
+
+
+ 修改
+
+
+
+ 删除
+
+
+ handleCommand(command, row)"
+ >
+
+ 更多
+
+
+
+
+
+ 上线
+
+
+
+ 下线
+
+
+
+ 调度
+
+
+
+ 运行
+
+
+
+ 日志
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+