You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
13 KiB

from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.dao.datasec_config_dao import DatasecConfigDao
from module_admin.entity.vo.metadata_config_vo import DatasecConfigModel
from module_admin.entity.vo.common_vo import CrudResponseModel
from exceptions.exception import ServiceException
from utils.common_util import CamelCaseUtil
from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel
from module_admin.service.metaprocessconfig_service import MetaprocessconfigService
import requests
import json
import re
from config.enums import RedisInitKeyConfig
from fastapi import Request,Depends
from config.env import AppConfig
from module_admin.entity.vo.user_vo import CurrentUserModel
class DatasecConfigService:
"""
数据安全参数配置表 Service
"""
@classmethod
async def get_datasec_list_services(cls, db: AsyncSession, query_object, is_page: bool = False):
"""
获取配置列表支持分页
"""
return await DatasecConfigDao.get_list(db, query_object, is_page)
@classmethod
async def add_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel, current_user: CurrentUserModel, request: Request):
"""
新增配置
"""
try:
# 检查任务名称或参数是否已存在
exists = await DatasecConfigDao.check_name_or_param_exist(db, page_object.metatask_name, page_object.metatask_param)
if exists:
raise ServiceException(message="任务名称或参数字段已存在")
# 获取流程配置
processconfig = MetaprocessconfigQueryModel()
processconfig.db_type = "SecConfig"
processConfigList = await MetaprocessconfigService.get_metaprocessconfig_list_services(db, processconfig, False)
# 调用流程定义创建逻辑
message = await cls.sec_process_defind_change_add(request, processConfigList, page_object, current_user)
if "成功" not in message:
await db.rollback()
raise ServiceException(message=f'新增数据安全_批次标签任务 {page_object.metatask_name} 失败,dolphinscheduler 创建失败'+message)
# 添加记录并提交
await DatasecConfigDao.add(db, page_object)
await db.commit()
return CrudResponseModel(is_success=True, message="新增成功")
except Exception as e:
await db.rollback()
raise e
@classmethod
async def sec_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:DatasecConfigModel,current_user: CurrentUserModel):
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
# 新增接口
url2=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.get(url, headers=headers, verify=False)
if response.reason == 'OK':
intdsids=[]
message=''
dstypes=[]
response_text = response.text
data = json.loads(response_text)
code_list = data["data"]
str_list = list(map(str, code_list))
for config in processConfigList:
modified_json_str = config.taskDefinitionJson.replace("18093081592672", str_list[0]).replace("sh /home/xx/code/remote_python_sec.sh", "sh /home/xx/code/remote_python_sec.sh "+page_object.metatask_param)
modified_json_str2=config.taskRelationJson.replace("18093081592672", str_list[0])
modified_json_str3=config.locations.replace("18093081592672", str_list[0])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description="", # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2,data=form_data,verify=False)
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
dstypes.append('1')
intdsids.append(responsJson['data']['code'])
if message:
message += ", "
message += page_object.metatask_name + "-数据安全_批次标签新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-数据安全_批次标签新增失败"
if len(intdsids)>0:
page_object.ds_ids=','.join([str(i) for i in intdsids])
return message
@classmethod
async def edit_datasec_services(cls, db: AsyncSession, page_object: DatasecConfigModel,current_user: CurrentUserModel,request: Request,):
"""
编辑配置
"""
edit_data = page_object.model_dump(exclude_unset=True)
# 获取原始数据
info = await cls.get_datasec_detail_services(db, page_object.onum)
if info and info.onum:
# 检查任务名称或参数是否重复(排除自身)
exists = await DatasecConfigDao.check_name_or_param_exist(
db,
metatask_name=page_object.metatask_name,
metatask_param=page_object.metatask_param,
exclude_onum=page_object.onum
)
if exists:
raise ServiceException(message="任务名称或参数字段已存在")
try:
# 调用流程定义创建逻辑
message= await cls.sec_process_defind_change_update(request,page_object,info,current_user)
if "成功" not in message:
await db.rollback()
raise ServiceException(message=f'更新数据安全_批次标签任务 {page_object.metatask_name} 失败,dolphinscheduler 创建失败'+message)
await DatasecConfigDao.edit(db, page_object.onum, edit_data)
await db.commit()
return CrudResponseModel(is_success=True, message="更新成功")
except Exception as e:
await db.rollback()
raise e
else:
raise ServiceException(message="配置数据不存在")
@classmethod
async def sec_process_defind_change_update(cls,request: Request,page_object:DatasecConfigModel,metatask_old:DatasecConfigModel,current_user: CurrentUserModel):
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
dsids=page_object.ds_ids.split(",")
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dsids, dsids)]
message=''
# 查询接口
url=f'{AppConfig.ds_server_url}/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, }
headers2 = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password, 'Content-Type': 'application/x-www-form-urlencoded'}
for config in result_list:
response = requests.get(f"{url}/{config['dsid']}", headers=headers, verify=False)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
task_def_list = responsJson['data']['taskDefinitionList']
# 遍历修改 rawScript
task_def_list = responsJson['data']['taskDefinitionList']
for task in task_def_list:
task_params = task.get("taskParams")
if task_params and isinstance(task_params, dict):
raw_script = task_params.get("rawScript")
if raw_script and raw_script.startswith("sh /home/xx/code/remote_python_sec.sh"):
task_params["rawScript"] = f"sh /home/xx/code/remote_python_sec.sh {page_object.metatask_param}"
# 再序列化为 JSON 字符串
modified_json_str = json.dumps(task_def_list, ensure_ascii=False, indent=0)
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description="", # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data,verify=False)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-数据安全_批次标签成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-数据安全_批次标签修改失败"
return message
@classmethod
async def delete_datasec_services(cls, db: AsyncSession, onum_list: str):
"""
批量删除配置
"""
id_list = [int(x.strip()) for x in onum_list.split(",") if x.strip().isdigit()]
if not id_list:
raise ServiceException(message="无效的编号列表")
try:
await DatasecConfigDao.delete(db, id_list)
await db.commit()
return CrudResponseModel(is_success=True, message="删除成功")
except Exception as e:
await db.rollback()
raise e
@classmethod
async def get_datasec_detail_services(cls, db: AsyncSession, onum: int):
"""
获取配置详情
"""
result = await DatasecConfigDao.get_detail_by_id(db, onum)
if result:
return DatasecConfigModel(**CamelCaseUtil.transform_result(result))
return DatasecConfigModel(**dict())