From a1937448d6d3e76d4daa9f5e428349a805afaa73 Mon Sep 17 00:00:00 2001 From: insistence <3055204202@qq.com> Date: Thu, 7 Nov 2024 10:01:49 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=94=B9=E7=94=A8AsyncIOScheduler=E5=92=8CAsyncIOExec?= =?UTF-8?q?utor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-fastapi-backend/config/get_scheduler.py | 46 +++++++------------ 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/ruoyi-fastapi-backend/config/get_scheduler.py b/ruoyi-fastapi-backend/config/get_scheduler.py index a92a169..2c9457b 100644 --- a/ruoyi-fastapi-backend/config/get_scheduler.py +++ b/ruoyi-fastapi-backend/config/get_scheduler.py @@ -1,12 +1,11 @@ import json from apscheduler.events import EVENT_ALL from apscheduler.executors.asyncio import AsyncIOExecutor -from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor +from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.redis import RedisJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from asyncio import iscoroutinefunction from datetime import datetime, timedelta @@ -112,12 +111,9 @@ job_stores = { ) ), } -async_executors = {'default': AsyncIOExecutor()} -executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)} +executors = {'default': AsyncIOExecutor(), 'processpool': ProcessPoolExecutor(5)} job_defaults = {'coalesce': False, 'max_instance': 1} -async_scheduler = AsyncIOScheduler() -scheduler = BackgroundScheduler() -async_scheduler.configure(jobstores=job_stores, executors=async_executors, job_defaults=job_defaults) +scheduler = AsyncIOScheduler() scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults) @@ -135,14 +131,12 @@ class SchedulerUtil: """ logger.info('开始启动定时任务...') scheduler.start() - async_scheduler.start() async with AsyncSessionLocal() as session: job_list = await JobDao.get_job_list_for_scheduler(session) for item in job_list: cls.remove_scheduler_job(job_id=str(item.job_id)) cls.add_scheduler_job(item) scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL) - async_scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL) logger.info('系统初始定时任务加载成功') @classmethod @@ -153,7 +147,6 @@ class SchedulerUtil: :return: """ scheduler.shutdown() - async_scheduler.shutdown() logger.info('关闭定时任务成功') @classmethod @@ -164,7 +157,7 @@ class SchedulerUtil: :param job_id: 任务id :return: 任务对象 """ - query_job = scheduler.get_job(job_id=str(job_id)) or async_scheduler.get_job(job_id=str(job_id)) + query_job = scheduler.get_job(job_id=str(job_id)) return query_job @@ -177,8 +170,11 @@ class SchedulerUtil: :return: """ job_func = eval(job_info.invoke_target) - job_param = dict( - func=job_func, + job_executor = job_info.job_executor + if iscoroutinefunction(job_func): + job_executor = 'default' + scheduler.add_job( + func=eval(job_info.invoke_target), trigger=MyCronTrigger.from_crontab(job_info.cron_expression), args=job_info.job_args.split(',') if job_info.job_args else None, kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None, @@ -188,11 +184,8 @@ class SchedulerUtil: coalesce=True if job_info.misfire_policy == '2' else False, max_instances=3 if job_info.concurrent == '0' else 1, jobstore=job_info.job_group, + executor=job_executor, ) - if iscoroutinefunction(job_func): - async_scheduler.add_job(**job_param) - else: - scheduler.add_job(executor=job_info.job_executor, **job_param) @classmethod def execute_scheduler_job_once(cls, job_info: JobModel): @@ -203,8 +196,11 @@ class SchedulerUtil: :return: """ job_func = eval(job_info.invoke_target) - job_param = dict( - func=job_func, + job_executor = job_info.job_executor + if iscoroutinefunction(job_func): + job_executor = 'default' + scheduler.add_job( + func=eval(job_info.invoke_target), trigger='date', run_date=datetime.now() + timedelta(seconds=1), args=job_info.job_args.split(',') if job_info.job_args else None, @@ -215,11 +211,8 @@ class SchedulerUtil: coalesce=True if job_info.misfire_policy == '2' else False, max_instances=3 if job_info.concurrent == '0' else 1, jobstore=job_info.job_group, + executor=job_executor, ) - if iscoroutinefunction(job_func): - async_scheduler.add_job(**job_param) - else: - scheduler.add_job(executor=job_info.job_executor, **job_param) @classmethod def remove_scheduler_job(cls, job_id: Union[str, int]): @@ -231,12 +224,7 @@ class SchedulerUtil: """ query_job = cls.get_scheduler_job(job_id=job_id) if query_job: - query_job_info = query_job.__getstate__() - job_func = eval(query_job_info.get('func').replace(':', '.')) - if iscoroutinefunction(job_func): - async_scheduler.remove_job(job_id=str(job_id)) - else: - scheduler.remove_job(job_id=str(job_id)) + scheduler.remove_job(job_id=str(job_id)) @classmethod def scheduler_event_listener(cls, event):