Browse Source

feat: 定时任务新增支持调用异步函数

master
insistence 3 months ago
parent
commit
1ddb1bd5b5
  1. 42
      ruoyi-fastapi-backend/config/get_scheduler.py
  2. 6
      ruoyi-fastapi-backend/module_admin/service/job_service.py
  3. 14
      ruoyi-fastapi-backend/module_task/scheduler_test.py
  4. 19
      ruoyi-fastapi-frontend/src/views/monitor/job/index.vue

42
ruoyi-fastapi-backend/config/get_scheduler.py

@ -1,11 +1,14 @@
import json
from apscheduler.events import EVENT_ALL
from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from asyncio import iscoroutinefunction
from datetime import datetime, timedelta
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import sessionmaker
@ -109,9 +112,12 @@ job_stores = {
)
),
}
async_executors = {'default': AsyncIOExecutor()}
executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
job_defaults = {'coalesce': False, 'max_instance': 1}
async_scheduler = AsyncIOScheduler()
scheduler = BackgroundScheduler()
async_scheduler.configure(jobstores=job_stores, executors=async_executors, job_defaults=job_defaults)
scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
@ -129,14 +135,14 @@ class SchedulerUtil:
"""
logger.info('开始启动定时任务...')
scheduler.start()
async_scheduler.start()
async with AsyncSessionLocal() as session:
job_list = await JobDao.get_job_list_for_scheduler(session)
for item in job_list:
query_job = cls.get_scheduler_job(job_id=str(item.job_id))
if query_job:
cls.remove_scheduler_job(job_id=str(item.job_id))
cls.add_scheduler_job(item)
scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
async_scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
logger.info('系统初始定时任务加载成功')
@classmethod
@ -147,6 +153,7 @@ class SchedulerUtil:
:return:
"""
scheduler.shutdown()
async_scheduler.shutdown()
logger.info('关闭定时任务成功')
@classmethod
@ -157,7 +164,7 @@ class SchedulerUtil:
:param job_id: 任务id
:return: 任务对象
"""
query_job = scheduler.get_job(job_id=str(job_id))
query_job = scheduler.get_job(job_id=str(job_id)) or async_scheduler.get_job(job_id=str(job_id))
return query_job
@ -169,8 +176,9 @@ class SchedulerUtil:
:param job_info: 任务对象信息
:return:
"""
scheduler.add_job(
func=eval(job_info.invoke_target),
job_func = eval(job_info.invoke_target)
job_param = dict(
func=job_func,
trigger=MyCronTrigger.from_crontab(job_info.cron_expression),
args=job_info.job_args.split(',') if job_info.job_args else None,
kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None,
@ -180,8 +188,11 @@ class SchedulerUtil:
coalesce=True if job_info.misfire_policy == '2' else False,
max_instances=3 if job_info.concurrent == '0' else 1,
jobstore=job_info.job_group,
executor=job_info.job_executor,
)
if iscoroutinefunction(job_func):
async_scheduler.add_job(**job_param)
else:
scheduler.add_job(executor=job_info.job_executor, **job_param)
@classmethod
def execute_scheduler_job_once(cls, job_info: JobModel):
@ -191,8 +202,9 @@ class SchedulerUtil:
:param job_info: 任务对象信息
:return:
"""
scheduler.add_job(
func=eval(job_info.invoke_target),
job_func = eval(job_info.invoke_target)
job_param = dict(
func=job_func,
trigger='date',
run_date=datetime.now() + timedelta(seconds=1),
args=job_info.job_args.split(',') if job_info.job_args else None,
@ -203,8 +215,11 @@ class SchedulerUtil:
coalesce=True if job_info.misfire_policy == '2' else False,
max_instances=3 if job_info.concurrent == '0' else 1,
jobstore=job_info.job_group,
executor=job_info.job_executor,
)
if iscoroutinefunction(job_func):
async_scheduler.add_job(**job_param)
else:
scheduler.add_job(executor=job_info.job_executor, **job_param)
@classmethod
def remove_scheduler_job(cls, job_id: Union[str, int]):
@ -214,6 +229,13 @@ class SchedulerUtil:
:param job_id: 任务id
:return:
"""
query_job = cls.get_scheduler_job(job_id=job_id)
if query_job:
query_job_info = query_job.__getstate__()
job_func = eval(query_job_info.get('func').replace(':', '.'))
if iscoroutinefunction(job_func):
async_scheduler.remove_job(job_id=str(job_id))
else:
scheduler.remove_job(job_id=str(job_id))
@classmethod

6
ruoyi-fastapi-backend/module_admin/service/job_service.py

@ -129,8 +129,6 @@ class JobService:
raise ServiceException(message=f'修改定时任务{page_object.job_name}失败,定时任务已存在')
try:
await JobDao.edit_job_dao(query_db, edit_job)
query_job = SchedulerUtil.get_scheduler_job(job_id=edit_job.get('job_id'))
if query_job:
SchedulerUtil.remove_scheduler_job(job_id=edit_job.get('job_id'))
if edit_job.get('status') == '0':
job_info = await cls.job_detail_services(query_db, edit_job.get('job_id'))
@ -152,8 +150,6 @@ class JobService:
:param page_object: 定时任务对象
:return: 执行一次定时任务结果
"""
query_job = SchedulerUtil.get_scheduler_job(job_id=page_object.job_id)
if query_job:
SchedulerUtil.remove_scheduler_job(job_id=page_object.job_id)
job_info = await cls.job_detail_services(query_db, page_object.job_id)
if job_info:
@ -176,8 +172,6 @@ class JobService:
try:
for job_id in job_id_list:
await JobDao.delete_job_dao(query_db, JobModel(jobId=job_id))
query_job = SchedulerUtil.get_scheduler_job(job_id=job_id)
if query_job:
SchedulerUtil.remove_scheduler_job(job_id=job_id)
await query_db.commit()
return CrudResponseModel(is_success=True, message='删除成功')

14
ruoyi-fastapi-backend/module_task/scheduler_test.py

@ -2,6 +2,18 @@ from datetime import datetime
def job(*args, **kwargs):
"""
定时任务执行同步函数示例
"""
print(args)
print(kwargs)
print(f'{datetime.now()}执行了')
print(f'{datetime.now()}同步函数执行了')
async def async_job(*args, **kwargs):
"""
定时任务执行异步函数示例
"""
print(args)
print(kwargs)
print(f'{datetime.now()}异步函数执行了')

19
ruoyi-fastapi-frontend/src/views/monitor/job/index.vue

@ -159,7 +159,20 @@
</el-form-item>
</el-col>
<el-col :span="12">
<el-form-item label="任务执行器" prop="jobGroup">
<el-form-item prop="jobGroup">
<template #label>
<span>
任务执行器
<el-tooltip placement="top">
<template #content>
<div>
调用方法为异步函数时此选项无效
</div>
</template>
<el-icon><question-filled /></el-icon>
</el-tooltip>
</span>
</template>
<el-select v-model="form.jobExecutor" placeholder="请选择任务执行器">
<el-option
v-for="dict in sys_job_executor"
@ -178,9 +191,7 @@
<el-tooltip placement="top">
<template #content>
<div>
Bean调用示例ryTask.ryParams('ry')
<br />Class类调用示例com.ruoyi.quartz.task.RyTask.ryParams('ry')
<br />参数说明支持字符串布尔类型长整型浮点型整型
调用示例module_task.scheduler_test.job
</div>
</template>
<el-icon><question-filled /></el-icon>

Loading…
Cancel
Save