Browse Source

功能同步

master
si@aidatagov.com 3 months ago
parent
commit
bd5d95239c
  1. 18
      vue-fastapi-backend/.env.dev
  2. 206
      vue-fastapi-backend/module_admin/controller/metatask_controller.py
  3. 109
      vue-fastapi-backend/module_admin/dao/metaprocessconfig_dao.py
  4. 107
      vue-fastapi-backend/module_admin/dao/metatask_dao.py
  5. 23
      vue-fastapi-backend/module_admin/entity/do/metaprocessconfig_do.py
  6. 31
      vue-fastapi-backend/module_admin/entity/do/metatask_do.py
  7. 142
      vue-fastapi-backend/module_admin/entity/vo/dataSource_vo.py
  8. 97
      vue-fastapi-backend/module_admin/entity/vo/metaprocessconfig_vo.py
  9. 96
      vue-fastapi-backend/module_admin/entity/vo/metatask_vo.py
  10. 248
      vue-fastapi-backend/module_admin/service/metaprocessconfig_service.py
  11. 626
      vue-fastapi-backend/module_admin/service/metatask_service.py
  12. 2
      vue-fastapi-backend/server.py
  13. 115
      vue-fastapi-frontend/src/api/meta/metatask.js
  14. 168
      vue-fastapi-frontend/src/views/meta/metatask/dsDialog.vue
  15. 1011
      vue-fastapi-frontend/src/views/meta/metatask/index.vue
  16. 140
      vue-fastapi-frontend/src/views/meta/metatask/logsDialog.vue
  17. 103
      vue-fastapi-frontend/src/views/meta/metatask/runDialog.vue

18
vue-fastapi-backend/.env.dev

@ -33,15 +33,21 @@ JWT_REDIS_EXPIRE_MINUTES = 30
# 数据库类型,可选的有'mysql'、'postgresql',默认为'mysql'
DB_TYPE = 'mysql'
# 数据库主机
DB_HOST = '192.168.0.3'
# DB_HOST = '192.168.0.3'
DB_HOST = '127.0.0.1'
# 数据库端口
DB_PORT = 3306
# 数据库用户名
DB_USERNAME = 'admin'
# DB_USERNAME = 'admin'
DB_USERNAME = 'root'
# 数据库密码
DB_PASSWORD = '123456'
# DB_PASSWORD = '123456'
DB_PASSWORD = 'root'
# 数据库名称
DB_DATABASE = 'vue_faseapi'
# DB_DATABASE = 'vue_faseapi'
DB_DATABASE = 'ruoyi-fastapi'
# 是否开启sqlalchemy日志
DB_ECHO = true
# 允许溢出连接池大小的最大连接数
@ -55,7 +61,9 @@ DB_POOL_TIMEOUT = 30
# -------- Redis配置 --------
# Redis主机
REDIS_HOST = '192.168.0.3'
# REDIS_HOST = '192.168.0.3'
REDIS_HOST = '127.0.0.1'
# Redis端口
REDIS_PORT = 6379
# Redis用户名

206
vue-fastapi-backend/module_admin/controller/metatask_controller.py

@ -0,0 +1,206 @@
from fastapi import APIRouter, Depends, Request
from config.get_db import get_db
from module_admin.service.login_service import LoginService
from module_admin.service.metatask_service import MetataskService
from module_admin.entity.vo.metatask_vo import DeleteMetataskModel,MetataskPageObject,MetataskModel,OperaMetataskModel
from module_admin.entity.vo.dataSource_vo import ProcessDefinition,ParmScheduleVo,ProcessInstancePage
from utils.page_util import PageResponseModel
from utils.response_util import ResponseUtil
from utils.log_util import logger
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth
from sqlalchemy.ext.asyncio import AsyncSession
from config.enums import BusinessType
from datetime import datetime
from module_admin.entity.vo.user_vo import CurrentUserModel
from pydantic_validation_decorator import ValidateFields
from module_admin.annotation.log_annotation import Log
from module_admin.service.config_service import ConfigService
metataskController = APIRouter(prefix='/meta/metatask', dependencies=[Depends(LoginService.get_current_user)])
@metataskController.get(
'/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:list'))]
)
async def get_system_metatask_list(
request: Request,
metatask_page_query: MetataskPageObject = Depends(MetataskPageObject.as_query),
query_db: AsyncSession = Depends(get_db),
):
# 获取分页数据
config_page_query_result = await MetataskService.get_metatask_list_services(query_db, metatask_page_query, is_page=True)
logger.info('获取成功')
return ResponseUtil.success(model_content=config_page_query_result)
# 获取工作流实例
@metataskController.get(
'/process_instances', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:log'))]
)
async def get_process_instances_list(
request: Request,
metatask_page_query: ProcessInstancePage = Depends(ProcessInstancePage),
):
# 获取分页数据
config_page_query_result = await MetataskService.get_process_instances_services(request, metatask_page_query)
logger.info('获取成功')
return ResponseUtil.success(model_content=config_page_query_result)
# 获取工作流实例节点
@metataskController.get(
'/task_nodes/{id}', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:log'))]
)
async def get_task_nodes_list(
request: Request,
id:int,
):
config_page_query_result = await MetataskService.get_task_nodes_services(request, id)
logger.info('获取成功')
return ResponseUtil.success(rows=config_page_query_result)
# 获取日志详情
@metataskController.get(
'/log_details/{id}', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:log'))]
)
async def get_log_details(
request: Request,
id:int,
):
# 获取分页数据
config_page_query_result = await MetataskService.get_log_details_services(request,id)
logger.info('获取成功')
return ResponseUtil.success(data=config_page_query_result)
@metataskController.get(
'/tree', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:list'))]
)
async def get_data_source_tree( request: Request,):
# 获取分页数据
data_tree_result = await MetataskService.get_data_source_tree( request)
logger.info('获取成功')
return ResponseUtil.success(rows=data_tree_result)
@metataskController.get(
'/sourceall', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:list'))]
)
async def get_data_source_all( request: Request,):
# 获取分页数据
data_source_result = await MetataskService.get_data_source_all( request)
logger.info('获取成功')
return ResponseUtil.success(data=data_source_result)
@metataskController.post('', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:add'))])
@ValidateFields(validate_model='add_metatask')
@Log(title='元数据任务', business_type=BusinessType.INSERT)
async def add_meta_metatask(
request: Request,
add_metatask: MetataskModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
add_metatask.create_by = current_user.user.user_name
add_metatask.create_time = datetime.now()
add_metatask.update_by = current_user.user.user_name
add_metatask.update_time = datetime.now()
add_metatask_result = await MetataskService.add_metatask_services(request, query_db, add_metatask)
logger.info(add_metatask_result.message)
return ResponseUtil.success(msg=add_metatask_result.message)
@metataskController.put('', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:edit'))])
@ValidateFields(validate_model='edit_metatask')
@Log(title='元数据任务', business_type=BusinessType.UPDATE)
async def edit_meta_metatask(
request: Request,
edit_metatask: MetataskModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
edit_metatask.update_by = current_user.user.user_name
edit_metatask.update_time = datetime.now()
edit_config_result = await MetataskService.edit_metatask_services(request, query_db, edit_metatask)
logger.info(edit_config_result.message)
return ResponseUtil.success(msg=edit_config_result.message)
# 元数据任务上下线
@metataskController.put('/upOrdown', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:up',"meta:metatask:down"))])
@Log(title='元数据任务', business_type=BusinessType.UPDATE)
async def up_or_down_meta_metatask(
request: Request,
DownOrUpdate: OperaMetataskModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
edit_config_result = await MetataskService.up_or_down_metatask_services(request, query_db,current_user, DownOrUpdate.id,DownOrUpdate.type)
logger.info(edit_config_result.message)
return ResponseUtil.success(msg=edit_config_result.message)
# 元数据任务运行
@metataskController.put('/Run', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:run'))])
@Log(title='元数据任务', business_type=BusinessType.UPDATE)
async def run_meta_metatask(
request: Request,
process: ProcessDefinition,
query_db: AsyncSession = Depends(get_db),
):
edit_config_result = await MetataskService.run_metatask_services(request, query_db, process)
return ResponseUtil.success(msg=edit_config_result)
# 元数据任务调度
@metataskController.put('/DS', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:ds'))])
@Log(title='元数据任务', business_type=BusinessType.UPDATE)
async def DS_meta_metatask(
request: Request,
process: ParmScheduleVo,
query_db: AsyncSession = Depends(get_db),
):
edit_config_result = await MetataskService.ds_metatask_services(request, query_db, process)
return ResponseUtil.success(msg=edit_config_result)
# 元数据任务日志
@metataskController.get('/logs', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:up',"meta:metatask:down"))])
@Log(title='元数据任务', business_type=BusinessType.UPDATE)
async def get_metatask_logs(
request: Request,
id: str,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
edit_config_result = await MetataskService.get_metatask_logs_services(request, query_db,current_user, id)
logger.info(edit_config_result.message)
return ResponseUtil.success(msg=edit_config_result.message)
@metataskController.delete('/{metatask_ids}/{ds_ids}', dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:delete'))])
@Log(title='元数据任务', business_type=BusinessType.DELETE)
async def delete_system_config(request: Request, metatask_ids: str,ds_ids:str, query_db: AsyncSession = Depends(get_db)):
delete_config = DeleteMetataskModel(metatask_ids=metatask_ids,ds_ids=ds_ids)
delete_config_result = await MetataskService.delete_metatask_services(request, query_db, delete_config)
logger.info(delete_config_result.message)
return ResponseUtil.success(msg=delete_config_result.message)
@metataskController.get(
'/{metatask_id}', response_model=MetataskModel, dependencies=[Depends(CheckUserInterfaceAuth('meta:metatask:query'))]
)
async def query_detail_system_config(request: Request, metatask_id: int, query_db: AsyncSession = Depends(get_db)):
metatask_detail_result = await MetataskService.metatask_detail_services(query_db, metatask_id)
logger.info(f'获取config_id为{metatask_id}的信息成功')
return ResponseUtil.success(data=metatask_detail_result)

109
vue-fastapi-backend/module_admin/dao/metaprocessconfig_dao.py

@ -0,0 +1,109 @@
from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.metaprocessconfig_do import Metaprocessconfig
from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigModel, MetaprocessconfigPageObject
from datetime import datetime, time
from utils.page_util import PageUtil
class MetaprocessconfigDao:
"""
参数配置管理模块数据库操作层
"""
# @classmethod
# async def get_metaprocessconfig_detail_by_id(cls, db: AsyncSession, metaprocessconfig_id: int):
# """
# 根据参数配置id获取参数配置详细信息
# :param db: orm对象
# :param metaprocessconfig_id: 参数配置id
# :return: 参数配置信息对象
# """
# metaprocessconfig_info = (await db.execute(select(Metaprocessconfig).where(Metaprocessconfig.metaprocessconfig_id == metaprocessconfig_id))).scalars().first()
# return metaprocessconfig_info
# @classmethod
# async def get_metaprocessconfig_detail_by_info(cls, db: AsyncSession, metaprocessconfig: MetaprocessconfigModel):
# metaprocessconfig_info = (
# (
# await db.execute(
# select(Metaprocessconfig).where(
# Metaprocessconfig.metaprocessconfig_name == metaprocessconfig.metaprocessconfig_name if metaprocessconfig.metaprocessconfig_name else True,
# )
# )
# )
# .scalars()
# .first()
# )
# return metaprocessconfig_info
@classmethod
async def get_metaprocessconfig_list(cls, db: AsyncSession, query_object: MetaprocessconfigPageObject , is_page: bool = False):
"""
根据查询参数获取参数配置列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 参数配置列表信息对象
"""
query = (
select(Metaprocessconfig)
.where(
Metaprocessconfig.db_type == query_object.db_type if query_object.db_type else True,
Metaprocessconfig.ac_target == query_object.ac_target if query_object.ac_target and len(query_object.ac_target)==1 else True,
)
.order_by(Metaprocessconfig.pdc_id.desc())
.distinct()
)
metaprocessconfig_list = await PageUtil.paginate(db, query, 1,10, is_page)
return metaprocessconfig_list
@classmethod
async def get_metaprocessconfig_list_all(cls, db: AsyncSession, query_object: MetaprocessconfigPageObject , is_page: bool = False):
query = (
select(Metaprocessconfig)
.where(
Metaprocessconfig.db_type == query_object.db_type if query_object.db_type else True,
Metaprocessconfig.ac_target == query_object.ac_target if query_object.ac_target and len(query_object.ac_target) == 1 else True,
).order_by(Metaprocessconfig.ac_target.asc()).distinct())
result = await db.execute(query)
metaprocessconfig_list = result.scalars().all()
return list(metaprocessconfig_list)
# @classmethod
# async def add_metaprocessconfig_dao(cls, db: AsyncSession, metaprocessconfig: MetaprocessconfigModel):
# """
# 新增参数配置数据库操作
# :param db: orm对象
# :param metaprocessconfig: 参数配置对象
# :return:
# """
# db_metaprocessconfig = Metaprocessconfig(**metaprocessconfig.model_dump())
# db.add(db_metaprocessconfig)
# await db.flush()
# return db_metaprocessconfig
# @classmethod
# async def edit_metaprocessconfig_dao(cls, db: AsyncSession, metaprocessconfig: dict):
# await db.execute(update(Metaprocessconfig), [metaprocessconfig])
# @classmethod
# async def delete_metaprocessconfig_dao(cls, db: AsyncSession, metaprocessconfig: MetaprocessconfigModel):
# """
# 删除参数配置数据库操作
# :param db: orm对象
# :param config: 参数配置对象
# :return:
# """
# await db.execute(delete(Metaprocessconfig).where(Metaprocessconfig.metaprocessconfig_id.in_([metaprocessconfig.metaprocessconfig_id])))

107
vue-fastapi-backend/module_admin/dao/metatask_dao.py

@ -0,0 +1,107 @@
from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.metatask_do import Metatask
from module_admin.entity.vo.metatask_vo import MetataskModel, MetataskPageObject
from datetime import datetime, time
from utils.page_util import PageUtil
class MetataskDao:
"""
参数配置管理模块数据库操作层
"""
@classmethod
async def get_metatask_detail_by_id(cls, db: AsyncSession, metatask_id: int):
"""
根据参数配置id获取参数配置详细信息
:param db: orm对象
:param metatask_id: 参数配置id
:return: 参数配置信息对象
"""
metatask_info = (await db.execute(select(Metatask).where(Metatask.metatask_id == metatask_id))).scalars().first()
return metatask_info
@classmethod
async def get_metatask_detail_by_info(cls, db: AsyncSession, metatask: MetataskModel):
metatask_info = (
(
await db.execute(
select(Metatask).where(
Metatask.metatask_name == metatask.metatask_name if metatask.metatask_name else True,
)
)
)
.scalars()
.first()
)
return metatask_info
@classmethod
async def get_metatask_list(cls, db: AsyncSession, query_object: MetataskPageObject , is_page: bool = False):
"""
根据查询参数获取参数配置列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 参数配置列表信息对象
"""
query = (
select(Metatask)
.where(
Metatask.metatask_name.like(f'%{query_object.metatask_name}%') if query_object.metatask_name else True,
Metatask.metatask_id == query_object.metatask_id if query_object.metatask_id else True,
Metatask.metatask_type == query_object.metatask_type if query_object.metatask_type else True,
Metatask.dbRCode == query_object.dbRCode if query_object.dbRCode else False,
Metatask.create_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(00, 00, 00)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
)
if query_object.begin_time and query_object.end_time
else True,
)
.order_by(Metatask.create_time.desc())
.distinct()
)
metatask_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return metatask_list
@classmethod
async def add_metatask_dao(cls, db: AsyncSession, metatask: MetataskModel):
"""
新增参数配置数据库操作
:param db: orm对象
:param metatask: 参数配置对象
:return:
"""
db_metatask = Metatask(**metatask.model_dump())
db.add(db_metatask)
await db.flush()
return db_metatask
@classmethod
async def edit_metatask_dao(cls, db: AsyncSession, metatask: dict):
await db.execute(update(Metatask), [metatask])
@classmethod
async def delete_metatask_dao(cls, db: AsyncSession, metatask: MetataskModel):
"""
删除参数配置数据库操作
:param db: orm对象
:param config: 参数配置对象
:return:
"""
await db.execute(delete(Metatask).where(Metatask.metatask_id.in_([metatask.metatask_id])))

23
vue-fastapi-backend/module_admin/entity/do/metaprocessconfig_do.py

@ -0,0 +1,23 @@
from sqlalchemy import Column, Integer, String, Text
from config.database import Base
class Metaprocessconfig(Base):
"""
流程配置表
"""
__tablename__ = 'meta_process_config'
pdc_id = Column(Integer, primary_key=True, autoincrement=True, comment='流程定义主键')
db_type = Column(String(10), nullable=False, default='', comment='数据库类型')
ac_target = Column(String(1), nullable=False, default='0', comment='采集对象(0 字表 1过程)')
taskDefinitionJson = Column(Text, nullable=False, comment='节点数据')
taskRelationJson = Column(String(2000), nullable=True, comment='节点位置')
locations = Column(String(500), nullable=False, default='', comment='更新者')
name = Column(String(50), nullable=True, comment='流程定义名')
tenantCode = Column(String(50), nullable=True, comment='租户代码')
executionType = Column(String(10), nullable=False, default='N', comment='执行类型')
description = Column(String(200), nullable=True, comment='描述')
globalParams = Column(String(50), nullable=True, comment='全局参数')
timeout = Column(String(50), nullable=True, comment='超时设置')
releaseState = Column(String(50), nullable=True, comment='发布状态')

31
vue-fastapi-backend/module_admin/entity/do/metatask_do.py

@ -0,0 +1,31 @@
from sqlalchemy import Column, Integer, String, DateTime
from config.database import Base
from datetime import datetime
class Metatask(Base):
"""
元数据任务表
"""
__tablename__ = 'meta_metatask'
metatask_id = Column(Integer, primary_key=True, autoincrement=True, comment='任务主键')
metatask_name = Column(String(50), nullable=True, default='', comment='任务名称')
metatask_type = Column(String(5), nullable=True, default='N', comment='任务类型(0 采集 1加工)')
create_by = Column(String(64), nullable=True, default='', comment='创建者')
create_time = Column(DateTime, nullable=True, default=datetime.now(), comment='创建时间')
update_by = Column(String(64), nullable=True, default='', comment='更新者')
update_time = Column(DateTime, nullable=True, default=datetime.now(), comment='更新时间')
remark = Column(String(500), nullable=True, default=None, comment='备注')
status = Column(String(10), nullable=True, default='OFFLINE', comment='状态')
ds_time = Column(DateTime, nullable=True, default=None, comment='调度时间')
dbRName = Column(String(50), nullable=True, default=None, comment='数据源名称')
dbRCode = Column(Integer, nullable=True, default=None, comment='数据源ID')
dbName = Column(String(50), nullable=True, default=None, comment='数据库名称')
dbCode = Column(String(50), nullable=True, default=None, comment='数据库ID')
dbSName = Column(String(50), nullable=True, default=None, comment='模式名称')
dbSCode = Column(String(50), nullable=True, default=None, comment='模式ID')
acquisitionType = Column(String(10), nullable=True, default=None, comment='采集方式')
ac_target = Column(String(5), nullable=True, default=None, comment='采集目标')
ds_ids = Column(String(50), nullable=True, default=None, comment='任务Id')
ds_types = Column(String(5), nullable=True, default=None, comment='ds任务类型')

142
vue-fastapi-backend/module_admin/entity/vo/dataSource_vo.py

@ -0,0 +1,142 @@
from typing import Union, Optional, List
from pydantic import BaseModel, ConfigDict, Field
from datetime import datetime
from pydantic.alias_generators import to_camel
from module_admin.annotation.pydantic_annotation import as_query
class ConnectionParams(BaseModel):
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True)
user: str = Field(description="Database user")
password: str = Field(description="Database password")
address: str = Field(description="Database address")
database: str = Field(description="Database name")
jdbc_url: str = Field(alias="jdbcUrl", description="JDBC URL")
driver_class_name: str = Field(alias="driverClassName", description="JDBC driver class name")
validation_query: str = Field(description="Validation query")
class DataSource(BaseModel):
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True)
id: int = Field(description="Primary key ID")
user_id: int = Field(alias="userId", description="User ID")
user_name: str = Field(alias="userName", description="Username")
name: str = Field(description="Datasource name")
note: Optional[str] = Field(default=None, description="Datasource note")
type: str = Field(description="Datasource type")
connection_params: ConnectionParams = Field(alias="connectionParams", description="Connection parameters")
create_time: datetime = Field(alias="createTime", description="Creation time")
update_time: datetime = Field(alias="updateTime", description="Update time")
class WorkerGroup(BaseModel):
name: str= Field(description="Datasource name")
class AlertGroups(BaseModel):
id: int = Field(description="Primary key ID")
description: str = Field(description="Datasource name")
alertInstanceIds: str = Field(description="Datasource name")
groupName: str = Field(description="Datasource name")
class Environment(BaseModel):
id: int = Field(description="Primary key ID")
code: int = Field(description="Primary key ID")
operator: int = Field(description="Primary key ID")
description: str = Field(description="Datasource name")
config: str = Field(description="Datasource name")
name: str = Field(description="Datasource name")
workerGroups: list[str]= Field(description="Datasource name")
class Datasouceall(BaseModel):
workerGroup:List[WorkerGroup]=[]
alertGroups:List[AlertGroups]=[]
environment:List[Environment]=[]
class ProcessDefinition(BaseModel):
processDefinitionCode: Optional[int] = Field(None, description="Process definition code")
failureStrategy: Optional[str] = Field(None, description="Failure strategy")
warningType: Optional[str] = Field(None, description="Warning type")
warningGroupId: Optional[int] = Field(None, description="Warning group ID")
execType: Optional[str] = Field(None, description="Execution type")
# startNodeList: Optional[List[str]] = Field(default_factory=list, description="Start node list")
startNodeList: Optional[str] = Field(default_factory=list, description="Start node list")
taskDependType: Optional[str] = Field(None, description="Task depend type")
complementDependentMode: Optional[str] = Field(None, description="Complement dependent mode")
runMode: Optional[str] = Field(None, description="Run mode")
processInstancePriority: Optional[str] = Field(None, description="Process instance priority")
workerGroup: Optional[str] = Field(None, description="Worker group")
startParams: Optional[str] = Field(None, description="Worker group")
environmentCode: Optional[int] = Field(None, description="Environment code")
expectedParallelismNumber: Optional[int] = Field(None, description="Expected parallelism number")
dryRun: Optional[int] = Field(None, description="Dry run flag")
scheduleTime: Optional[str] = Field(None, description="Schedule time")
# class Schedule(BaseModel):
# startTime: Optional[datetime] = Field(default=None, description='更新时间')
# endTime: Optional[datetime] = Field(default=None, description='更新时间')
# crontab: Optional[str] = Field(default=None, description='更新时间')
# timezoneId: Optional[str] = Field(default=None, description='更新时间')
class ParmSchedule(BaseModel):
schedule: Optional[str]= Field(default=None, description='更新时间')
failureStrategy: Optional[str] = Field(None, description="Failure strategy")
warningType: Optional[str] = Field(None, description="Warning type")
warningGroupId: Optional[int] = Field(None, description="Warning group ID")
workerGroup: Optional[str] = Field(None, description="Worker group")
environmentCode: Optional[int] = Field(None, description="Environment code")
processInstancePriority: Optional[str] = Field(None, description="Process instance priority")
processDefinitionCode: Optional[int] = Field(None, description="Process definition code")
class ParmScheduleVo(BaseModel):
warningGroupId: Optional[int] = Field(None, description="Warning group ID")
workerGroup: Optional[str] = Field(None, description="Worker group")
environmentCode: Optional[int] = Field(None, description="Environment code")
processDefinitionCode: Optional[int] = Field(None, description="Process definition code")
crontab: Optional[str] = Field(default=None, description='更新时间')
beginTime: Optional[datetime] = Field(default=None, description='更新时间')
endTime: Optional[datetime] = Field(default=None, description='更新时间')
class ProcessInstancePage(BaseModel):
searchVal: Optional[str] = Field(default=None, description="Name of the task node")
page_num: int = Field(default=1, description='当前页码')
page_size: int = Field(default=10, description='每页记录数')
class TaskNode(BaseModel):
id: int = Field(..., description="Unique identifier for the task node")
name: Optional[str] = Field(None, description="Name of the task node")
taskCode: Optional[int] = Field(None, description="Task code identifier")
taskType: Optional[str] = Field(None, description="Type of the task")
processInstanceId: Optional[int] = Field(None, description="Associated process instance ID")
processInstanceName: Optional[str] = Field(None, description="Name of the associated process instance")
startTime: Optional[datetime] = Field(None, description="Start time of the task")
endTime: Optional[datetime] = Field(None, description="End time of the task")
submitTime: Optional[datetime] = Field(None, description="Time when the task was submitted")
firstSubmitTime: Optional[datetime] = Field(None, description="Time of the first task submission")
executorId: Optional[int] = Field(None, description="Executor ID")
host: Optional[str] = Field(None, description="Host where the task is executed")
logPath: Optional[str] = Field(None, description="Log file path for the task execution")
executePath: Optional[str] = Field(None, description="Execution path of the task")
state: Optional[str] = Field(None, description="Current state of the task")
duration: Optional[str] = Field(None, description="Duration of the task execution")
dryRun: Optional[int] = Field(0, description="Indicates if the task is a dry run")
alertFlag: Optional[str] = Field(None, description="Alert flag for task issues")
blockingTask: Optional[bool] = Field(False, description="Indicates if the task is blocking other tasks")
conditionsTask: Optional[bool] = Field(False, description="Indicates if the task has conditions")
dependTask: Optional[bool] = Field(False, description="Indicates if the task is dependent on another task")
cpuQuota: Optional[int] = Field(-1, description="CPU quota allocated for the task")
memoryMax: Optional[int] = Field(-1, description="Maximum memory allocation for the task")
flag: Optional[str] = Field(None, description="Task execution flag")
maxRetryTimes: Optional[int] = Field(0, description="Maximum retry attempts for the task")
retryInterval: Optional[int] = Field(1, description="Interval between retries")
retryTimes: Optional[int] = Field(0, description="Number of retries attempted")
subProcess: Optional[bool] = Field(False, description="Indicates if the task is a subprocess")
switchTask: Optional[bool] = Field(False, description="Indicates if the task is a switch task")
taskComplete: Optional[bool] = Field(True, description="Indicates if the task has completed execution")
taskExecuteType: Optional[str] = Field(None, description="Execution type of the task")
taskDefinitionVersion: Optional[int] = Field(None, description="Version of the task definition")
taskGroupId: Optional[int] = Field(0, description="Group ID of the task")
taskGroupPriority: Optional[int] = Field(0, description="Priority within the task group")
taskInstancePriority: Optional[str] = Field(None, description="Priority of the task instance")
taskParams: Optional[str] = Field(None, description="Parameters for the task execution")
environmentCode: Optional[int] = Field(None, description="Environment code for the task")
workerGroup: Optional[str] = Field(None, description="Worker group handling the task")
varPool: Optional[str] = Field(None, description="Variable pool for the task execution")
delayTime: Optional[int] = Field(0, description="Delay time for task execution")
dependency: Optional[str] = Field(None, description="Task dependencies")
dependentResult: Optional[str] = Field(None, description="Result of the task dependencies")
switchDependency: Optional[str] = Field(None, description="Switch dependency")

97
vue-fastapi-backend/module_admin/entity/vo/metaprocessconfig_vo.py

@ -0,0 +1,97 @@
from typing import Union, Optional, List
from pydantic import BaseModel, ConfigDict, Field
from module_admin.annotation.pydantic_annotation import as_query
from pydantic.alias_generators import to_camel
class MetaprocessconfigModel(BaseModel):
"""
参数配置表对应pydantic模型
"""
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True)
pdc_id: Optional[int] = Field(default=None, description='流程定义主键')
db_type: Optional[str] = Field(default=None, description='数据库类型')
ac_target: Optional[str] = Field(default=None, max_length=5, description='采集对象(0 字表 1过程)')
taskDefinitionJson: Optional[str] = Field(default=None, description='节点数据')
taskRelationJson: Optional[str] = Field(default=None, max_length=2000, description='节点位置')
locations: Optional[str] = Field(default=None, description='更新者')
name: Optional[str] = Field(default=None, description='流程定义名')
tenantCode: Optional[str] = Field(default=None, description='租户代码')
executionType: Optional[str] = Field(default=None, description='执行类型')
description: Optional[str] = Field(default=None, description='描述')
globalParams: Optional[str] = Field(default=None, description='全局参数')
timeout: Optional[str] = Field(default=None, description='超时设置')
releaseState: Optional[str] = Field(default=None, description='发布状态')
class Metaprocessconfig:
orm_mode = False
class MetaprocessconfigQueryModel(MetaprocessconfigModel):
"""
元数据任务不分页查询模型
"""
begin_time: Optional[str]= Field(default=None, description='开始时间')
end_time: Optional[str]= Field(default=None, description='结束时间')
@as_query
class MetaprocessconfigPageObject(MetaprocessconfigQueryModel):
"""
元数据任务分页查询模型
"""
page_num: int = Field(default=1, description='当前页码')
page_size: int = Field(default=10, description='每页记录数')
class MetaprocessconfigPageObjectResponse(BaseModel):
"""
元数据任务列表分页查询返回模型
"""
rows: List[Union[MetaprocessconfigModel, None]] = []
page_num: int
page_size: int
total: int
has_next: bool
class DeleteMetaprocessconfigModel(BaseModel):
"""
删除参数配置模型
"""
metaprocessconfig_ids: str
class OperaMetaprocessconfigModel(BaseModel):
"""
上下线模型
"""
id: str
type:str
class CrudMetaprocessconfigResponse(BaseModel):
"""
操作参数配置响应模型
"""
is_success: bool
message: str
def to_dict(self):
return {
"metatask_id": self.metatask_id,
"metatask_name": self.metatask_name,
"metatask_type": self.metatask_type,
"create_by": self.create_by,
"create_time": self.create_time.isoformat() if self.create_time else None,
"update_by": self.update_by,
"update_time": self.update_time.isoformat() if self.update_time else None,
"remark": self.remark,
"status": self.status,
"ds_time": self.ds_time.isoformat() if self.ds_time else None,
"dbRName": self.dbRName,
"dbRCode": self.dbRCode,
"dbName": self.dbName,
"dbCode": self.dbCode,
"dbSName": self.dbSName,
"dbSCode": self.dbSCode,
"acquisitionType": self.acquisitionType,
"ac_target": self.ac_target
}

96
vue-fastapi-backend/module_admin/entity/vo/metatask_vo.py

@ -0,0 +1,96 @@
from typing import Union, Optional, List
from pydantic import BaseModel, ConfigDict, Field
from module_admin.annotation.pydantic_annotation import as_query
from datetime import datetime
from pydantic_validation_decorator import NotBlank, Size
from pydantic.alias_generators import to_camel
class MetataskModel(BaseModel):
"""
参数配置表对应pydantic模型
"""
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True)
metatask_id: Optional[int] = Field(default=None, description='任务主键')
metatask_name: Optional[str] = Field(default=None, description='任务名称')
metatask_type: Optional[str] = Field(default=None, description='任务类型(0 采集 1加工')
ds_time: Optional[datetime] = Field(default=None, description='调度时间')
create_by: Optional[str] = Field(default=None, description='创建人')
create_time: Optional[datetime] = Field(default=None, description='创建时间')
update_by: Optional[str] = Field(default=None, description='更新人')
update_time: Optional[datetime] = Field(default=None, description='更新时间')
remark: Optional[str] = Field(default=None, description='备注')
status: Optional[str] = Field(default=None, description='状态')
dbRName: Optional[str] = Field(default=None, description='数据源')
dbRCode: Optional[int] = Field(default=None, description='数据源code')
dbName: Optional[str] = Field(default=None, description='数据库')
dbCode: Optional[str] = Field(default=None, description='数据库code')
dbSName: Optional[str] = Field(default=None, description='模式')
dbSCode: Optional[str] = Field(default=None, description='模式code')
acquisitionType: Optional[str] = Field(default=None, description='采集方式')
ac_target: Optional[str] = Field(default=None, description='采集目标')
ds_ids: Optional[str] = Field(default=None, description='ds采集ids')
ds_types: Optional[str] = Field(default=None, description='ds采集类型')
class Metatask:
orm_mode = True
@NotBlank(field_name='metatask_name', message='任务名称不能为空')
@Size(field_name='metatask_name', min_length=0, max_length=100, message='任务名称长度不能超过50个字符')
def get_metatask_name(self):
return self.metatask_name
def validate_fields(self):
self.get_metatask_name()
class MetataskQueryModel(MetataskModel):
"""
元数据任务不分页查询模型
"""
begin_time: Optional[str]= Field(default=None, description='开始时间')
end_time: Optional[str]= Field(default=None, description='结束时间')
@as_query
class MetataskPageObject(MetataskQueryModel):
"""
元数据任务分页查询模型
"""
page_num: int = Field(default=1, description='当前页码')
page_size: int = Field(default=10, description='每页记录数')
class MetataskPageObjectResponse(BaseModel):
"""
元数据任务列表分页查询返回模型
"""
rows: List[Union[MetataskModel, None]] = []
page_num: int
page_size: int
total: int
has_next: bool
class DeleteMetataskModel(BaseModel):
"""
删除参数配置模型
"""
metatask_ids: str
ds_ids: str
class OperaMetataskModel(BaseModel):
"""
上下线模型
"""
id: str
type:str
class CrudMetataskResponse(BaseModel):
"""
操作参数配置响应模型
"""
is_success: bool
message: str

248
vue-fastapi-backend/module_admin/service/metaprocessconfig_service.py

@ -0,0 +1,248 @@
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel
from module_admin.dao.metaprocessconfig_dao import MetaprocessconfigDao
class MetaprocessconfigService:
"""
参数配置管理模块服务层
"""
@classmethod
async def get_metaprocessconfig_list_services(
cls, query_db: AsyncSession, query_object: MetaprocessconfigQueryModel, is_page: bool = False
):
"""
获取参数配置列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:return: 参数配置列表信息对象
"""
metaprocessconfig_list_result = await MetaprocessconfigDao.get_metaprocessconfig_list_all(query_db, query_object, is_page)
return metaprocessconfig_list_result
# @classmethod
# async def get_data_source_tree(cls,request: Request):
# url = 'http://47.121.207.11:12345/dolphinscheduler/datasources?pageNo=1&pageSize=100'
# token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
# headers = {'token': token}
# response = requests.get(url, headers=headers)
# if response.reason == 'OK':
# response_text = response.text
# data = json.loads(response_text)
# total_list = data["data"]["totalList"]
# # 解析 connectionParams 字符串为字典
# for item in total_list:
# item["connectionParams"] = json.loads(item["connectionParams"])
# # 使用 Pydantic 创建 DataSource 对象列表
# data_sources = [DataSource(**item) for item in total_list]
# return data_sources
# else:
# return {'error': f'Request failed with status code {response.status_code}'}
# @classmethod
# async def check_metaprocessconfig_name_unique_services(cls, query_db: AsyncSession, page_object: MetaprocessconfigModel):
# """
# 校验参数键名是否唯一service
# :param query_db: orm对象
# :param page_object: 参数配置对象
# :return: 校验结果
# """
# metaprocessconfig_id = -1 if page_object.metaprocessconfig_id is None else page_object.metaprocessconfig_id
# metaprocessconfig = await MetaprocessconfigDao.get_metaprocessconfig_detail_by_info(
# query_db, MetaprocessconfigModel(metaprocessconfigName=page_object.metaprocessconfig_name)
# )
# if metaprocessconfig and metaprocessconfig.metaprocessconfig_id != metaprocessconfig_id:
# return CommonConstant.NOT_UNIQUE
# return CommonConstant.UNIQUE
# @classmethod
# async def add_metaprocessconfig_services(cls, request: Request, query_db: AsyncSession, page_object: MetaprocessconfigModel):
# """
# 新增参数配置信息service
# :param request: Request对象
# :param query_db: orm对象
# :param page_object: 新增参数配置对象
# :return: 新增参数配置校验结果
# """
# if not await cls.check_metaprocessconfig_name_unique_services(query_db, page_object):
# raise ServiceException(message=f'新增元数据任务{page_object.metaprocessconfig_name}失败,任务名已存在')
# else:
# try:
# await MetaprocessconfigDao.add_metaprocessconfig_dao(query_db, page_object)
# await query_db.commit()
# return CrudResponseModel(is_success=True, message='新增成功')
# except Exception as e:
# await query_db.rollback()
# raise e
# @classmethod
# async def edit_metaprocessconfig_services(cls, request: Request, query_db: AsyncSession, page_object: MetaprocessconfigModel):
# """
# 编辑参数配置信息service
# :param request: Request对象
# :param query_db: orm对象
# :param page_object: 编辑参数配置对象
# :return: 编辑参数配置校验结果
# """
# edit_metaprocessconfig = page_object.model_dump(exclude_unset=True)
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, page_object.metaprocessconfig_id)
# if metaprocessconfig_info.metaprocessconfig_id:
# if not await cls.check_metaprocessconfig_name_unique_services(query_db, page_object):
# raise ServiceException(message=f'修改任务{page_object.metaprocessconfig_name}失败,任务名称已存在')
# else:
# try:
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
# await query_db.commit()
# return CrudResponseModel(is_success=True, message='更新成功')
# except Exception as e:
# await query_db.rollback()
# raise e
# else:
# raise ServiceException(message='更新失败')
# @classmethod
# async def up_or_down_metaprocessconfig_services(
# cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str
# ):
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id)
# metaprocessconfig_info.update_by = current_user.user.user_name
# metaprocessconfig_info.update_time = datetime.now()
# type_str: str
# if type == 'down':
# # 下线
# type_str = '下线'
# metaprocessconfig_info.status = '1'
# else:
# # 上线
# type_str = '上线'
# metaprocessconfig_info.status = '2'
# edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True)
# try:
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
# await query_db.commit()
# return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务' + type_str + '成功')
# except Exception as e:
# await query_db.rollback()
# raise e
# else:
# raise ServiceException(message='更新失败')
# @classmethod
# async def run_metaprocessconfig_services(
# cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str
# ):
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id)
# metaprocessconfig_info.update_by = current_user.user.user_name
# metaprocessconfig_info.update_time = datetime.now()
# # 运行中
# metaprocessconfig_info.status = '3'
# edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True)
# try:
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
# await query_db.commit()
# return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务:' '运行成功')
# except Exception as e:
# await query_db.rollback()
# raise e
# else:
# raise ServiceException(message='更新失败')
# @classmethod
# async def ds_metaprocessconfig_services(
# cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str
# ):
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id)
# metaprocessconfig_info.update_by = current_user.user.user_name
# metaprocessconfig_info.update_time = datetime.now()
# # 运行中
# metaprocessconfig_info.status = '4'
# edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True)
# try:
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
# await query_db.commit()
# return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务:' '调度启动成功')
# except Exception as e:
# await query_db.rollback()
# raise e
# else:
# raise ServiceException(message='更新失败')
# @classmethod
# async def get_metaprocessconfig_logs_services(
# cls, request: Request, current_user: CurrentUserModel, query_db: AsyncSession, id: str
# ):
# metaprocessconfig_info = await cls.metaprocessconfig_detail_services(query_db, id)
# metaprocessconfig_info.update_by = current_user.user.user_name
# metaprocessconfig_info.update_time = datetime.now()
# # 运行中
# metaprocessconfig_info = '3'
# edit_metaprocessconfig = metaprocessconfig_info.model_dump(exclude_unset=True)
# try:
# await MetaprocessconfigDao.edit_metaprocessconfig_dao(query_db, edit_metaprocessconfig)
# await query_db.commit()
# return CrudResponseModel(is_success=True, message=metaprocessconfig_info.metaprocessconfig_name + '任务:' '日志获取成功')
# except Exception as e:
# await query_db.rollback()
# raise e
# else:
# raise ServiceException(message='更新失败')
# @classmethod
# async def delete_metaprocessconfig_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaprocessconfigModel):
# """
# 删除参数配置信息service
# :param request: Request对象
# :param query_db: orm对象
# :param page_object: 删除参数配置对象
# :return: 删除参数配置校验结果
# """
# # if page_object.metaprocessconfig_ids.split(','):
# # metaprocessconfig_id_list = page_object.metaprocessconfig_ids.split(',')
# # try:
# # for metaprocessconfig_id in metaprocessconfig_id_list:
# # metaprocessconfig_id_dict = dict(metaprocessconfig_id=metaprocessconfig_id)
# # MetaprocessconfigDao.delete_metaprocessconfig_dao(query_db, MetaprocessconfigModel(**metaprocessconfig_id_dict))
# # query_db.commit()
# # # await cls.init_cache_sys_metaprocessconfig_services(query_db, request.app.state.redis)s
# # result = dict(is_success=True, message='删除成功')
# # except Exception as e:
# # query_db.rollback()
# # result = dict(is_success=False, message=str(e))
# # else:
# # result = dict(is_success=False, message='传入字典数据id为空')
# # return CrudResponseModel(**result)
# if page_object.metaprocessconfig_ids:
# metaprocessconfig_id_list = page_object.metaprocessconfig_ids.split(',')
# try:
# for metaprocessconfig_id in metaprocessconfig_id_list:
# await MetaprocessconfigDao.delete_metaprocessconfig_dao(query_db, MetaprocessconfigModel(metaprocessconfigId=int(metaprocessconfig_id)))
# await query_db.commit()
# return CrudResponseModel(is_success=True, message='删除成功')
# except Exception as e:
# await query_db.rollback()
# raise e
# else:
# raise ServiceException(message='传入参数配置id为空')
# @classmethod
# async def metaprocessconfig_detail_services(cls, query_db: AsyncSession, metaprocessconfig_id: int):
# """
# 获取参数配置详细信息service
# :param query_db: orm对象
# :param metaprocessconfig_id: 参数配置id
# :return: 参数配置id对应的信息
# """
# metaprocessconfig = await MetaprocessconfigDao.get_metaprocessconfig_detail_by_id(query_db, metaprocessconfig_id=metaprocessconfig_id)
# if metaprocessconfig:
# result = MetaprocessconfigModel(**CamelCaseUtil.transform_result(metaprocessconfig))
# else:
# result = MetaprocessconfigModel(**dict())
# return result

626
vue-fastapi-backend/module_admin/service/metatask_service.py

@ -0,0 +1,626 @@
from fastapi import Request,Depends
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.vo.metatask_vo import MetataskQueryModel, MetataskModel, DeleteMetataskModel
from module_admin.dao.metatask_dao import MetataskDao
from utils.page_util import PageResponseModel
# from module_admin.entity.vo.dataSource_vo import DataSource,Datasouceall,AlertGroups,Environment,WorkerGroup,ProcessDefinition,ParmScheduleVo,ParmSchedule,ProcessInstancePage
from module_admin.entity.vo.dataSource_vo import *
from module_admin.entity.vo.metaprocessconfig_vo import MetaprocessconfigQueryModel,MetaprocessconfigModel
from config.constant import CommonConstant
from exceptions.exception import ServiceException
from module_admin.entity.vo.common_vo import CrudResponseModel
from utils.common_util import CamelCaseUtil
from module_admin.entity.vo.user_vo import CurrentUserModel
from datetime import datetime
import requests
import json
import re
from config.enums import RedisInitKeyConfig
from module_admin.service.metaprocessconfig_service import MetaprocessconfigService
class MetataskService:
"""
参数配置管理模块服务层
"""
@classmethod
async def get_metatask_list_services(
cls, query_db: AsyncSession, query_object: MetataskQueryModel, is_page: bool = False
):
"""
获取参数配置列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:return: 参数配置列表信息对象
"""
metatask_list_result = await MetataskDao.get_metatask_list(query_db, query_object, is_page)
return metatask_list_result
@classmethod
async def get_data_source_tree(cls,request: Request):
url = 'http://47.121.207.11:12345/dolphinscheduler/datasources?pageNo=1&pageSize=100'
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
headers = {'token': token}
response = requests.get(url, headers=headers)
if response.reason == 'OK':
response_text = response.text
data = json.loads(response_text)
total_list = data["data"]["totalList"]
# 解析 connectionParams 字符串为字典
for item in total_list:
item["connectionParams"] = json.loads(item["connectionParams"])
# 使用 Pydantic 创建 DataSource 对象列表
data_sources = [DataSource(**item) for item in total_list]
return data_sources
else:
return {'error': f'Request failed with status code {response.status_code}'}
@classmethod
async def get_data_source_all(cls,request: Request):
# Worker分组
url1 = 'http://47.121.207.11:12345/dolphinscheduler/worker-groups/all'
# 警告组
url2= 'http://47.121.207.11:12345/dolphinscheduler/alert-groups/list'
# 工作环境
url3 = 'http://47.121.207.11:12345/dolphinscheduler/environment/query-environment-list'
dataspurceVo=Datasouceall()
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
headers = {'token': token}
response1 = requests.get(url1, headers=headers)
response2 = requests.get(url2, headers=headers)
response3 = requests.get(url3, headers=headers)
if response1.reason == 'OK':
response_text = response1.text
data = json.loads(response_text)
total_list = data["data"]
# 使用 Pydantic 创建 DataSource 对象列表
data_sources = [WorkerGroup(name=item) for item in total_list]
dataspurceVo.workerGroup=data_sources
if response2.reason == 'OK':
response_text = response2.text
data = json.loads(response_text)
total_list = data["data"]
# 使用 Pydantic 创建 DataSource 对象列表
alertGroups = [AlertGroups(**item) for item in total_list]
dataspurceVo.alertGroups=alertGroups
if response3.reason == 'OK':
response_text = response3.text
data = json.loads(response_text)
total_list = data["data"]
data_sources = [Environment(**item) for item in total_list]
dataspurceVo.environment=data_sources
return dataspurceVo
@classmethod
async def check_metatask_name_unique_services(cls, query_db: AsyncSession, page_object: MetataskModel):
"""
校验参数键名是否唯一service
:param query_db: orm对象
:param page_object: 参数配置对象
:return: 校验结果
"""
metatask_id = -1 if page_object.metatask_id is None else page_object.metatask_id
metatask = await MetataskDao.get_metatask_detail_by_info(
query_db, MetataskModel(metataskName=page_object.metatask_name)
)
if metatask and metatask.metatask_id != metatask_id:
return CommonConstant.NOT_UNIQUE
return CommonConstant.UNIQUE
@classmethod
async def add_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel):
"""
新增参数配置信息service
:param request: Request对象
:param query_db: orm对象
:param page_object: 新增参数配置对象
:return: 新增参数配置校验结果
"""
if not await cls.check_metatask_name_unique_services(query_db, page_object):
raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,任务名已存在')
else:
try:
# 查询模板
processconfig = MetaprocessconfigQueryModel()
processconfig.ac_target=page_object.ac_target
processconfig.db_type=page_object.dbCode
processConfigList =await MetaprocessconfigService.get_metaprocessconfig_list_services(query_db,processconfig,False)
message= await cls.mysql_process_defind_change_add(request,processConfigList,page_object)
if "成功" not in message:
await query_db.rollback()
raise ServiceException(message=f'新增元数据任务{page_object.metatask_name}失败,dolphinscheduler创建失败')
await MetataskDao.add_metatask_dao(query_db, page_object)
await query_db.commit()
return CrudResponseModel(is_success=True, message=message)
except Exception as e:
await query_db.rollback()
raise e
@classmethod
async def mysql_process_defind_change_add(cls,request: Request, processConfigList: list[MetaprocessconfigModel],page_object:MetataskModel):
"""
mysql类型
"""
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = 'http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/task-definition/gen-task-codes?genNum=5'
headers = {'token': token}
# 新增接口
url2='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers2 = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.get(url, headers=headers)
if response.reason == 'OK':
intdsids=[]
message=''
dstypes=[]
response_text = response.text
data = json.loads(response_text)
code_list = data["data"]
str_list = list(map(str, code_list))
for config in processConfigList:
# mysql表字段
if config.ac_target=='0':
modified_json_str = config.taskDefinitionJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'")
modified_json_str2=config.taskRelationJson.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
modified_json_str3=config.locations.replace("16199683466336", str_list[0]).replace("16199683466337", str_list[1]).replace("16199683466338", str_list[2])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2,data=form_data)
# text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
intdsids.append(responsJson['data']['code'])
dstypes.append('0')
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集新增失败"
if config.ac_target=='1':
modified_json_str = config.taskDefinitionJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3]).replace("'dash_test_w'", f"'{page_object.dbSName}'").replace("'mysql_conn'", f"'{page_object.dbRName}'").replace("mysql_conn dash_test_w", f"{page_object.dbRName} {page_object.dbSName}")
modified_json_str2=config.taskRelationJson.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
modified_json_str3=config.locations.replace("16286410625888", str_list[0]).replace("16286410625889", str_list[1]).replace("16286410625890", str_list[2]).replace("16286410625891", str_list[3])
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=modified_json_str3,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
timeout=config.timeout,
globalParams =config.globalParams ,
tenantCode =config.tenantCode ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
executionType =config.executionType ,
releaseState=config.releaseState
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_post0 = requests.post(url2, headers=headers2,data=form_data)
#text= '{"code":0,"msg":"success","data":{"id":null,"code":16244510692320,"name":"测试2250-表字段采集","version":0,"releaseState":null,"projectCode":15094503753824,"description":"111","globalParams":"[]","globalParamList":null,"globalParamMap":{},"createTime":"2025-01-08 12:50:39","updateTime":"2025-01-08 12:50:39","flag":"YES","userId":1,"userName":null,"projectName":null,"locations":"[{\\"taskCode\\":16244510678624,\\"x\\":334,\\"y\\":265},{\\"taskCode\\":16244510678625,\\"x\\":334,\\"y\\":390},{\\"taskCode\\":16244510678626,\\"x\\":687,\\"y\\":335}]","scheduleReleaseState":null,"timeout":0,"tenantId":1,"tenantCode":null,"modifyBy":null,"warningGroupId":0,"executionType":"PARALLEL"},"failed":false,"success":true}'
text= response_post0.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
dstypes.append('1')
intdsids.append(responsJson['data']['code'])
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集新增失败"
if len(intdsids)>0:
page_object.ds_ids=','.join([str(i) for i in intdsids])
page_object.ds_types=','.join([str(i) for i in dstypes])
return message
@classmethod
async def mysql_process_defind_change_update(cls,request: Request,page_object:MetataskModel,metatask_old:MetataskModel):
"""
mysql类型
"""
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
dstypes=page_object.ds_types.split(",")
dsids=page_object.ds_ids.split(",")
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
message=''
# 查询接口
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'token': token, }
headers2 = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
for config in result_list:
# mysql表字段
if config['dstype']=='0' :
response = requests.get(f"{url}/{config['dsid']}", headers=headers)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=0).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=0)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-表字段采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-表字段采集修改失败"
if config['dstype']=='1':
response = requests.get(f"{url}/{config['dsid']}", headers=headers)
text= response.text
responsJson = json.loads(text)
if responsJson['msg'] == 'success':
modified_json_str = json.dumps(responsJson['data']['taskDefinitionList'], ensure_ascii=False, indent=4).replace( f"'{metatask_old.dbSName}'", f"'{page_object.dbSName}'")
getTaskRelationList=responsJson['data']['processTaskRelationList']
putTaskRelationList=[]
for item in getTaskRelationList:
new_item = {
"name": item['name'],
"preTaskCode":item['preTaskCode'] ,
"preTaskVersion":item['preTaskVersion'] ,
"postTaskCode":item['postTaskCode'] ,
"conditionType":item['conditionType'] ,
"conditionParams":item['conditionParams']
}
putTaskRelationList.append(new_item)
modified_json_str2= json.dumps(putTaskRelationList, ensure_ascii=False, indent=4)
modified_json_str2=re.sub(r'\s+', '', modified_json_str2)
metaprocessconfig_dict = MetaprocessconfigModel(taskDefinitionJson=modified_json_str,# 替换taskDefinitionJson
description=page_object.remark, # 替换工作流备注
locations=responsJson['data']['processDefinition']['locations'] ,# 替换locations
name =page_object.metatask_name+"-存储过程采集" ,# 替换工作流名称
tenantCode =responsJson['data']['processDefinition']['tenantCode'] ,
taskRelationJson =modified_json_str2,# 替换taskRelationJson
).model_dump(exclude_unset=True, by_alias=True)
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response_put0 = requests.put(f"{url}/{config['dsid']}", headers=headers2,data=form_data)
putText= response_put0.text
responsPutJson=json.loads(putText)
if responsPutJson['msg'] == 'success':
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改成功"
else:
if message:
message += ", "
message += page_object.metatask_name + "-存储过程采集修改失败"
return message
@classmethod
async def edit_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: MetataskModel):
"""
编辑参数配置信息service
:param request: Request对象
:param query_db: orm对象
:param page_object: 编辑参数配置对象
:return: 编辑参数配置校验结果
"""
edit_metatask = page_object.model_dump(exclude_unset=True)
metatask_info = await cls.metatask_detail_services(query_db, page_object.metatask_id)
if metatask_info.metatask_id:
if not await cls.check_metatask_name_unique_services(query_db, page_object):
raise ServiceException(message=f'修改任务{page_object.metatask_name}失败,任务名称已存在')
else:
try:
metatask_old = await cls.metatask_detail_services(query_db, metatask_info.metatask_id)
message= await cls.mysql_process_defind_change_update(request,page_object,metatask_old)
if "成功" not in message:
await query_db.rollback()
raise ServiceException(message=f'修改元数据任务{page_object.metatask_name}失败,dolphinscheduler修改失败')
await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
await query_db.commit()
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='更新失败')
@classmethod
async def up_or_down_metatask_services(
cls, request: Request, query_db: AsyncSession, current_user: CurrentUserModel, id: str, type: str
):
metatask_info = await cls.metatask_detail_services(query_db, id)
metatask_info.update_by = current_user.user.user_name
metatask_info.update_time = datetime.now()
type_str: str
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
dstypes=metatask_info.ds_types.split(",")
dsids=metatask_info.ds_ids.split(",")
result_list = [{'dstype': t, 'dsid': i} for t, i in zip(dstypes, dsids)]
message=''
# 查询接口
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition'
headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
for config in result_list:
# mysql表字段
if config['dstype']=='0' :
metaprocessconfig_dict = {
'name': metatask_info.metatask_name+'-表字段采集',
'releaseState': type
}
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response = requests.post(f"{url}/{config['dsid']}/release", headers=headers,data=form_data)
text= response.text
responsJson = json.loads(text)
if responsJson['success'] is True:
message='成功!'
else:
raise ServiceException(message='失败'+responsJson['msg'])
# mysql表字段
if config['dstype']=='1' :
metaprocessconfig_dict = {
'name': metatask_info.metatask_name+'-存储过程采集',
'releaseState': type
}
form_data = {key: str(value) for key, value in metaprocessconfig_dict.items()}
response = requests.post(f"{url}/{config['dsid']}/release", headers=headers,data=form_data)
text= response.text
responsJson = json.loads(text)
if responsJson['success'] is True:
message='成功!'
else:
raise ServiceException(message='失败'+responsJson['msg'])
if type == 'OFFLINE':
# 下线
type_str = '下线'
metatask_info.status = 'OFFLINE'
else:
# 上线
type_str = '上线'
metatask_info.status = 'ONLINE'
edit_metatask = metatask_info.model_dump(exclude_unset=True)
try:
await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
await query_db.commit()
return CrudResponseModel(is_success=True, message=message)
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='更新失败')
@classmethod
async def run_metatask_services(
cls, request: Request, query_db: AsyncSession, process: ProcessDefinition
):
process.failureStrategy='CONTINUE'
process.warningType='NONE'
process.execType='START_PROCESS'
process.taskDependType='TASK_POST'
process.complementDependentMode ='OFF_MODE'
process.runMode='RUN_MODE_SERIAL'
process.processInstancePriority='MEDIUM'
process.dryRun=0
process.scheduleTime="{complementStartDate:'2025-01-12 00:00:00',complementEndDate:'2025-01-12 00:00:00'}"
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/executors/start-process-instance'
headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
# form_data = {key: str(value) for key, value in process.__dict__.items()}
form_data = {key: value for key, value in process.__dict__.items()}
response = requests.post(url, headers=headers,data=form_data)
text= response.text
responsJson = json.loads(text)
if responsJson['success'] is True:
return "运行成功!"
else:
raise ServiceException(message='运行失败!')
@classmethod
async def ds_metatask_services(
cls, request: Request, query_db: AsyncSession, process: ParmScheduleVo
):
parm =ParmSchedule(
)
parm.failureStrategy='CONTINUE'
parm.warningType='NONE'
parm.warningGroupId=process.warningGroupId
parm.workerGroup='TASK_POST'
parm.processDefinitionCode =process.processDefinitionCode
parm.environmentCode=process.environmentCode
parm.processInstancePriority='MEDIUM'
parm.schedule = (
'{"startTime":"' + process.beginTime.strftime('%Y-%m-%d %H:%M:%S') +
'", "endTime":"' + process.endTime.strftime('%Y-%m-%d %H:%M:%S') +
'", "crontab":"' + process.crontab +
'", "timezoneId":"Asia/Shanghai"}')
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/schedules'
headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
# form_data = {key: str(value) for key, value in process.__dict__.items()}
form_data = {key: value for key, value in parm.__dict__.items()}
response = requests.post(url, headers=headers,data=form_data)
text= response.text
responsJson = json.loads(text)
if responsJson['success'] is True:
return "运行成功!"
else:
raise ServiceException(message='运行失败!')
@classmethod
async def get_metatask_logs_services(
cls, request: Request, current_user: CurrentUserModel, query_db: AsyncSession, id: str
):
metatask_info = await cls.metatask_detail_services(query_db, id)
metatask_info.update_by = current_user.user.user_name
metatask_info.update_time = datetime.now()
# 运行中
metatask_info = '3'
edit_metatask = metatask_info.model_dump(exclude_unset=True)
try:
await MetataskDao.edit_metatask_dao(query_db, edit_metatask)
await query_db.commit()
return CrudResponseModel(is_success=True, message=metatask_info.metatask_name + '任务:' '日志获取成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='更新失败')
@classmethod
async def delete_metatask_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetataskModel):
"""
删除参数配置信息service
:param request: Request对象
:param query_db: orm对象
:param page_object: 删除参数配置对象
:return: 删除参数配置校验结果
"""
if page_object.metatask_ids and page_object.ds_ids:
metatask_id_list = page_object.metatask_ids.split(',')
try:
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
# 查询接口
url='http://47.121.207.11:12345/dolphinscheduler/projects/'+projectCode+'/process-definition/batch-delete'
form_data={'codes':page_object.ds_ids}
headers = {'token': token, 'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.post(url, headers=headers,data=form_data)
text= response.text
responsJson = json.loads(text)
if responsJson['success'] is True:
for metatask_id in metatask_id_list:
await MetataskDao.delete_metatask_dao(query_db, MetataskModel(metataskId=int(metatask_id)))
await query_db.commit()
return CrudResponseModel(is_success=True, message='删除成功')
else :
raise ServiceException(message='ds删除失败')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='传入参数配置id为空')
@classmethod
async def metatask_detail_services(cls, query_db: AsyncSession, metatask_id: int):
"""
获取参数配置详细信息service
:param query_db: orm对象
:param metatask_id: 参数配置id
:return: 参数配置id对应的信息
"""
metatask = await MetataskDao.get_metatask_detail_by_id(query_db, metatask_id=metatask_id)
if metatask:
result = MetataskModel(**CamelCaseUtil.transform_result(metatask))
else:
result = MetataskModel(**dict())
return result
@classmethod
async def get_process_instances_services(
cls, request: Request, query_object: ProcessInstancePage
):
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = f'http://47.121.207.11:12345/dolphinscheduler/projects/{projectCode}/process-instances?pageNo={query_object.page_num}&pageSize={query_object.page_size}&searchVal={query_object.searchVal}'
headers = {'token': token}
response = requests.get(url, headers=headers)
try:
if response.reason == 'OK':
response_text = response.text
data = json.loads(response_text)
total_list = data["data"]["totalList"]
# data_sources = [ProcessInstance(**item) for item in total_list]
pageData = PageResponseModel(rows=total_list,total=data["data"]["total"])
return pageData
else:
return {'error': f'Request failed with status code {response.status_code}'}
except Exception as e:
raise e
@classmethod
async def get_task_nodes_services(
cls, request: Request,id:int
):
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
projectCode = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.projectcode')
url = f'http://47.121.207.11:12345/dolphinscheduler/projects/{projectCode}/process-instances/{id}/tasks'
headers = {'token': token}
response = requests.get(url, headers=headers)
try:
response = requests.get(url, headers=headers)
if response.reason == 'OK':
response_text = response.text
data = json.loads(response_text)
total_list = data["data"]["taskList"]
data_sources = [TaskNode(**item) for item in total_list]
return data_sources
else:
return {'error': f'Request failed with status code {response.status_code}'}
except Exception as e:
raise e
@classmethod
async def get_log_details_services(
cls, request: Request,id:int
):
token = await request.app.state.redis.get(f'{RedisInitKeyConfig.SYS_CONFIG.key}:sys.ds.token')
url = f'http://47.121.207.11:12345/dolphinscheduler/log/detail?taskInstanceId={id}&limit=1000&skipLineNum=0'
headers = {'token': token}
response = requests.get(url, headers=headers)
try:
response = requests.get(url, headers=headers)
if response.reason == 'OK':
response_text = response.text
data = json.loads(response_text)
logMessage = data["data"]["message"]
return logMessage
else:
return {'error': f'Request failed with status code {response.status_code}'}
except Exception as e:
raise e

2
vue-fastapi-backend/server.py

@ -23,6 +23,7 @@ from module_admin.controller.role_controller import roleController
from module_admin.controller.server_controller import serverController
from module_admin.controller.user_controller import userController
from module_admin.controller.aichat_controller import aichatController
from module_admin.controller.metatask_controller import metataskController
from sub_applications.handle import handle_sub_applications
from utils.common_util import worship
from utils.log_util import logger
@ -78,6 +79,7 @@ controller_list = [
{'router': serverController, 'tags': ['系统监控-菜单管理']},
{'router': cacheController, 'tags': ['系统监控-缓存监控']},
{'router': commonController, 'tags': ['通用模块']},
{'router': metataskController, 'tags': ['元数据管理-元数据任务模块']},
{'router': aichatController, 'tags': ['智能问答模块']},
]

115
vue-fastapi-frontend/src/api/meta/metatask.js

@ -0,0 +1,115 @@
import request from '@/utils/request'
import { parseStrEmpty } from "@/utils/ruoyi";
// 查询参数列表
export function listmetatask(query) {
return request({
url: '/meta/metatask/list',
method: 'get',
params: query
})
}
// 查询日志
export function listInstances(query) {
return request({
url: '/meta/metatask/process_instances',
method: 'get',
params: query
})
}
// 查询节点
export function taskNodes(id) {
return request({
url: '/meta/metatask/task_nodes/'+id,
method: 'get',
})
}
//日志详情
// 查询参数列表
export function logDetails(id) {
return request({
url: '/meta/metatask/log_details/'+id,
method: 'get',
})
}
export function datasourcetree() {
return request({
url: '/meta/metatask/tree',
method: 'get',
})
}
export function datasourceall() {
return request({
url: '/meta/metatask/sourceall',
method: 'get',
})
}
// 查询参数详细
export function getmetatask(metataskId) {
return request({
url: '/meta/metatask/' + parseStrEmpty(metataskId),
method: 'get'
})
}
// 新增元数据任务
export function addmetatask(data) {
return request({
url: '/meta/metatask',
method: 'post',
data: data
})
}
// 修改元数据任务
export function updatemetatask(data) {
return request({
url: '/meta/metatask',
method: 'put',
data: data
})
}
// 上下线
export function downOrUpmetatask(id,type) {
const data = {
id,
type
}
return request({
url: '/meta/metatask/upOrdown',
method: 'put',
data: data
})
}
// 运行
export function runmetatask(data) {
return request({
url: '/meta/metatask/Run',
method: 'put',
data: data
})
}
// 调度
export function dsmetatask(data) {
return request({
url: '/meta/metatask/DS',
method: 'put',
data: data
})
}
// 删除元数据任务
export function delmetatask(metataskId,dsIds) {
const data = {
metataskId:metataskId,
dsIds:dsIds
}
return request({
url: '/meta/metatask/' + metataskId+"/"+dsIds,
method: 'delete',
})
}

168
vue-fastapi-frontend/src/views/meta/metatask/dsDialog.vue

@ -0,0 +1,168 @@
<template>
<el-dialog title="调度" :visible.sync="visible" width="700px" top="5vh" append-to-body>
<el-form ref="form" :model="form" label-width="120px" style="margin: 10px;">
<!-- 日期范围 -->
<el-row :gutter="20">
<el-col :span="24">
<el-form-item label="日期范围" prop="dateRange">
<el-date-picker
v-model="dateRange"
type="datetimerange"
range-separator="至"
start-placeholder="开始日期"
end-placeholder="结束日期"
style="width: 100%;"
></el-date-picker>
</el-form-item>
</el-col>
</el-row>
<!-- Cron表达式 -->
<el-row :gutter="20">
<el-col :span="24">
<el-form-item label="Cron表达式" prop="crontab">
<el-input v-model="form.crontab" placeholder="请输入cron执行表达式">
<template #append>
<el-button type="primary" @click="handleShowCron">
生成表达式
<i class="el-icon-time el-icon--right"></i>
</el-button>
</template>
</el-input>
</el-form-item>
</el-col>
</el-row>
<!-- Worker分组 -->
<el-row :gutter="20">
<el-col :span="24">
<el-form-item label="Worker分组" prop="workerGroup">
<el-select
v-model="form.workerGroup"
placeholder="请选择Worker分组"
style="width: 100%;"
>
<el-option
v-for="dict in workerGroupList"
:key="dict.name"
:label="dict.name"
:value="dict.name"
></el-option>
</el-select>
</el-form-item>
</el-col>
</el-row>
<!-- 环境名称 -->
<el-row :gutter="20">
<el-col :span="24">
<el-form-item label="环境名称" prop="environmentCode">
<el-select
v-model="form.environmentCode"
placeholder="请选择环境名称"
style="width: 100%;"
>
<el-option
v-for="dict in environmentList"
:key="dict.id"
:label="dict.name"
:value="dict.code"
></el-option>
</el-select>
</el-form-item>
</el-col>
</el-row>
<!-- 告警组 -->
<el-row :gutter="20">
<el-col :span="24">
<el-form-item label="告警组" prop="warningGroupId">
<el-select
v-model="form.warningGroupId"
placeholder="请选择告警组"
style="width: 100%;"
>
<el-option
v-for="dict in warningGroupList"
:key="dict.id"
:label="dict.groupName"
:value="dict.id"
></el-option>
</el-select>
</el-form-item>
</el-col>
</el-row>
</el-form>
<!-- Footer 按钮 -->
<div slot="footer" class="dialog-footer" style="text-align: right;">
<el-button @click="visible = false"> </el-button>
<el-button type="primary" @click="handleds"> </el-button>
</div>
<!-- Cron表达式生成器 -->
<el-dialog title="Cron表达式生成器" :visible.sync="openCron" append-to-body destroy-on-close class="scrollbar">
<crontab @hide="openCron=false" @fill="crontabFill" :expression="expression"></crontab>
</el-dialog>
</el-dialog>
</template>
<script>
import { dsmetatask } from "@/api/meta/metatask";
import Crontab from '@/components/Crontab';
export default {
components: { Crontab },
props: [
"processDefinitionCode",
"warningGroupList",
"environmentList",
"workerGroupList"
],
data() {
return {
visible: false, //
expression: "", //
openCron: false, // Cron
dateRange: [], //
form: {
processDefinitionCode: undefined,
warningGroupId: undefined,
environmentCode: undefined,
workerGroup: undefined,
crontab: "",
},
};
},
methods: {
show(ids) {
this.form.processDefinitionCode = Number(this.processDefinitionCode[0]);
this.visible = true;
},
handleShowCron() {
this.expression = this.form.crontab;
this.openCron = true;
},
crontabFill(value) {
this.form.crontab = value;
},
handleds() {
dsmetatask(this.addDateRange(this.form, this.dateRange)).then((response) => {
if (response.success) {
this.visible = false;
this.$modal.msgSuccess("运行成功");
}
});
},
},
};
</script>
<style scoped>
.dialog-footer {
padding: 10px 20px;
}
.el-row {
margin-bottom: 10px;
}
</style>

1011
vue-fastapi-frontend/src/views/meta/metatask/index.vue

File diff suppressed because it is too large

140
vue-fastapi-frontend/src/views/meta/metatask/logsDialog.vue

@ -0,0 +1,140 @@
<template>
<el-dialog :visible.sync="visible" width="80%" top="5vh" append-to-body>
<div style="display: flex; flex-direction: column; gap: 10px;">
<!-- 第一部分实例表格带分页 -->
<el-card shadow="hover">
<div slot="header" class="clearfix">
<span>实例列表</span>
</div>
<el-table
:data="instances"
border
style="width: 100%"
@row-click="handleFirstRowClick"
:height="350"
>
<el-table-column prop="id" label="实例ID" width="100" />
<el-table-column prop="name" label="实例名称" />
<el-table-column prop="state" label="状态" width="100" >
<template slot-scope="scope">
<dict-tag
:options="dict.type.meta_instance_status"
:value="scope.row.state"
/>
</template>
</el-table-column> <el-table-column prop="startTime" label="开始时间" width="200" />
<el-table-column prop="endTime" label="结束时间" width="200" />
<el-table-column prop="duration" label="运行时长" width="80" />
</el-table>
<el-pagination
background
layout="prev, pager, next"
:total="total"
:page-size="pageSize"
:current-page.sync="currentPage"
@current-change="fetchInstances"
style="margin-top: 10px; text-align: right;"
/>
</el-card>
<!-- 第二部分任务节点表格 -->
<el-card shadow="hover">
<div slot="header" class="clearfix">
<span>任务节点</span>
</div>
<el-table
:data="taskNodes"
border
style="width: 100%"
@row-click="handleSecondRowClick"
:height="300"
>
<el-table-column prop="name" label="节点名称"/>
<el-table-column prop="taskType" label="任务类型" />
<el-table-column prop="state" label="状态" width="100" >
<template slot-scope="scope">
<dict-tag
:options="dict.type.meta_instance_status"
:value="scope.row.state"
/>
</template>
</el-table-column>
</el-table>
</el-card>
<!-- 第三部分日志内容 -->
<el-card shadow="hover">
<div slot="header" class="clearfix">
<span>日志详情</span>
</div>
<el-input
type="textarea"
v-model="logDetailsContent"
readonly
:rows="15"
style="width: 100%; height: 320px; overflow: auto; resize: none;"
placeholder="请点击任务节点查看日志详情"
/>
</el-card>
</div>
</el-dialog>
</template>
<script>
import { listInstances, taskNodes, logDetails } from "@/api/meta/metatask";
export default {
props: ["defindName"],
dicts:["meta_instance_status"],
data() {
return {
visible: false, //
instances: [], //
total: 0, //
pageSize: 10, //
currentPage: 1, //
taskNodes: [], //
logDetailsContent: "", //
};
},
methods: {
//
show(defindName) {
this.visible = true;
this.fetchInstances(defindName);
},
//
fetchInstances(defindName) {
listInstances({ page_num: this.currentPage, page_size: this.pageSize, searchVal: defindName }).then((response) => {
console.log(response, "response");
this.instances = response.rows;
this.total = response.total;
});
},
//
handleFirstRowClick(row) {
taskNodes(row.id).then((response) => {
this.taskNodes = response.rows;
});
},
//
handleSecondRowClick(row) {
logDetails(row.id).then((response) => {
this.logDetailsContent = response.data;
});
},
},
};
</script>
<style scoped>
/* 添加全局间距和样式优化 */
.el-card {
padding: 5px;
}
.el-card .clearfix {
font-weight: bold;
font-size: 17px;
}
</style>

103
vue-fastapi-frontend/src/views/meta/metatask/runDialog.vue

@ -0,0 +1,103 @@
<template>
<!-- 运行 -->
<el-dialog title="运行" :visible.sync="visible" width="500px" top="5vh" append-to-body>
<el-form ref="form" :model="form" label-width="120px" class="dialog-form">
<!-- Worker分组 -->
<el-row :gutter="20">
<el-col :span="24">
<el-form-item label="Worker分组" prop="workerGroup">
<el-select v-model="form.workerGroup" placeholder="请选择Worker分组" style="width: 100%;">
<el-option
v-for="dict in workerGroupList"
:key="dict.name"
:label="dict.name"
:value="dict.name"
></el-option>
</el-select>
</el-form-item>
</el-col>
</el-row>
<!-- 环境名称 -->
<el-row :gutter="20">
<el-col :span="24">
<el-form-item label="环境名称" prop="environmentCode">
<el-select v-model="form.environmentCode" placeholder="请选择环境名称" style="width: 100%;">
<el-option
v-for="dict in environmentList"
:key="dict.id"
:label="dict.name"
:value="dict.code"
></el-option>
</el-select>
</el-form-item>
</el-col>
</el-row>
<!-- 告警组 -->
<el-row :gutter="20">
<el-col :span="24">
<el-form-item label="告警组" prop="warningGroupId">
<el-select v-model="form.warningGroupId" placeholder="请选择告警组" style="width: 100%;">
<el-option
v-for="dict in warningGroupList"
:key="dict.id"
:label="dict.groupName"
:value="dict.id"
></el-option>
</el-select>
</el-form-item>
</el-col>
</el-row>
</el-form>
<!-- Footer 按钮 -->
<div slot="footer" class="dialog-footer">
<el-button @click="visible = false"> </el-button>
<el-button type="primary" @click="handleRun"> </el-button>
</div>
</el-dialog>
</template>
<script>
import { runmetatask } from "@/api/meta/metatask";
export default {
props: ["processDefinitionCode", "warningGroupList", "environmentList", "workerGroupList"],
data() {
return {
visible: false,
form: {
processDefinitionCode: undefined,
warningGroupId: undefined,
environmentCode: undefined,
workerGroup: undefined,
},
};
},
methods: {
show(ids) {
this.form.processDefinitionCode = Number(this.processDefinitionCode[0]);
this.visible = true;
},
handleRun() {
runmetatask(this.form).then((response) => {
if (response.success) {
this.visible = false;
this.$modal.msgSuccess("运行成功");
}
});
},
},
};
</script>
<style scoped>
.dialog-form {
margin: 10px 0;
}
.dialog-footer {
text-align: right;
padding: 10px 20px;
}
</style>
Loading…
Cancel
Save