Browse Source

refactor: 定时任务改用AsyncIOScheduler和AsyncIOExecutor

master
insistence 3 months ago
parent
commit
a1937448d6
  1. 44
      ruoyi-fastapi-backend/config/get_scheduler.py

44
ruoyi-fastapi-backend/config/get_scheduler.py

@ -1,12 +1,11 @@
import json import json
from apscheduler.events import EVENT_ALL from apscheduler.events import EVENT_ALL
from apscheduler.executors.asyncio import AsyncIOExecutor from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.redis import RedisJobStore from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
from asyncio import iscoroutinefunction from asyncio import iscoroutinefunction
from datetime import datetime, timedelta from datetime import datetime, timedelta
@ -112,12 +111,9 @@ job_stores = {
) )
), ),
} }
async_executors = {'default': AsyncIOExecutor()} executors = {'default': AsyncIOExecutor(), 'processpool': ProcessPoolExecutor(5)}
executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
job_defaults = {'coalesce': False, 'max_instance': 1} job_defaults = {'coalesce': False, 'max_instance': 1}
async_scheduler = AsyncIOScheduler() scheduler = AsyncIOScheduler()
scheduler = BackgroundScheduler()
async_scheduler.configure(jobstores=job_stores, executors=async_executors, job_defaults=job_defaults)
scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults) scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
@ -135,14 +131,12 @@ class SchedulerUtil:
""" """
logger.info('开始启动定时任务...') logger.info('开始启动定时任务...')
scheduler.start() scheduler.start()
async_scheduler.start()
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
job_list = await JobDao.get_job_list_for_scheduler(session) job_list = await JobDao.get_job_list_for_scheduler(session)
for item in job_list: for item in job_list:
cls.remove_scheduler_job(job_id=str(item.job_id)) cls.remove_scheduler_job(job_id=str(item.job_id))
cls.add_scheduler_job(item) cls.add_scheduler_job(item)
scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL) scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
async_scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
logger.info('系统初始定时任务加载成功') logger.info('系统初始定时任务加载成功')
@classmethod @classmethod
@ -153,7 +147,6 @@ class SchedulerUtil:
:return: :return:
""" """
scheduler.shutdown() scheduler.shutdown()
async_scheduler.shutdown()
logger.info('关闭定时任务成功') logger.info('关闭定时任务成功')
@classmethod @classmethod
@ -164,7 +157,7 @@ class SchedulerUtil:
:param job_id: 任务id :param job_id: 任务id
:return: 任务对象 :return: 任务对象
""" """
query_job = scheduler.get_job(job_id=str(job_id)) or async_scheduler.get_job(job_id=str(job_id)) query_job = scheduler.get_job(job_id=str(job_id))
return query_job return query_job
@ -177,8 +170,11 @@ class SchedulerUtil:
:return: :return:
""" """
job_func = eval(job_info.invoke_target) job_func = eval(job_info.invoke_target)
job_param = dict( job_executor = job_info.job_executor
func=job_func, if iscoroutinefunction(job_func):
job_executor = 'default'
scheduler.add_job(
func=eval(job_info.invoke_target),
trigger=MyCronTrigger.from_crontab(job_info.cron_expression), trigger=MyCronTrigger.from_crontab(job_info.cron_expression),
args=job_info.job_args.split(',') if job_info.job_args else None, args=job_info.job_args.split(',') if job_info.job_args else None,
kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None, kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None,
@ -188,11 +184,8 @@ class SchedulerUtil:
coalesce=True if job_info.misfire_policy == '2' else False, coalesce=True if job_info.misfire_policy == '2' else False,
max_instances=3 if job_info.concurrent == '0' else 1, max_instances=3 if job_info.concurrent == '0' else 1,
jobstore=job_info.job_group, jobstore=job_info.job_group,
executor=job_executor,
) )
if iscoroutinefunction(job_func):
async_scheduler.add_job(**job_param)
else:
scheduler.add_job(executor=job_info.job_executor, **job_param)
@classmethod @classmethod
def execute_scheduler_job_once(cls, job_info: JobModel): def execute_scheduler_job_once(cls, job_info: JobModel):
@ -203,8 +196,11 @@ class SchedulerUtil:
:return: :return:
""" """
job_func = eval(job_info.invoke_target) job_func = eval(job_info.invoke_target)
job_param = dict( job_executor = job_info.job_executor
func=job_func, if iscoroutinefunction(job_func):
job_executor = 'default'
scheduler.add_job(
func=eval(job_info.invoke_target),
trigger='date', trigger='date',
run_date=datetime.now() + timedelta(seconds=1), run_date=datetime.now() + timedelta(seconds=1),
args=job_info.job_args.split(',') if job_info.job_args else None, args=job_info.job_args.split(',') if job_info.job_args else None,
@ -215,11 +211,8 @@ class SchedulerUtil:
coalesce=True if job_info.misfire_policy == '2' else False, coalesce=True if job_info.misfire_policy == '2' else False,
max_instances=3 if job_info.concurrent == '0' else 1, max_instances=3 if job_info.concurrent == '0' else 1,
jobstore=job_info.job_group, jobstore=job_info.job_group,
executor=job_executor,
) )
if iscoroutinefunction(job_func):
async_scheduler.add_job(**job_param)
else:
scheduler.add_job(executor=job_info.job_executor, **job_param)
@classmethod @classmethod
def remove_scheduler_job(cls, job_id: Union[str, int]): def remove_scheduler_job(cls, job_id: Union[str, int]):
@ -231,11 +224,6 @@ class SchedulerUtil:
""" """
query_job = cls.get_scheduler_job(job_id=job_id) query_job = cls.get_scheduler_job(job_id=job_id)
if query_job: if query_job:
query_job_info = query_job.__getstate__()
job_func = eval(query_job_info.get('func').replace(':', '.'))
if iscoroutinefunction(job_func):
async_scheduler.remove_job(job_id=str(job_id))
else:
scheduler.remove_job(job_id=str(job_id)) scheduler.remove_job(job_id=str(job_id))
@classmethod @classmethod

Loading…
Cancel
Save