from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.redis import RedisJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.triggers.cron import CronTrigger from apscheduler.events import EVENT_ALL import json from datetime import datetime, timedelta from config.database import engine, SQLALCHEMY_DATABASE_URL, SessionLocal from config.env import RedisConfig from module_admin.service.job_log_service import JobLogService, JobLogModel from module_admin.dao.job_dao import Session, JobDao from utils.log_util import logger import module_task # 重写Cron定时 class MyCronTrigger(CronTrigger): @classmethod def from_crontab(cls, expr, timezone=None): values = expr.split() if len(values) != 6 and len(values) != 7: raise ValueError('Wrong number of fields; got {}, expected 6 or 7'.format(len(values))) second = values[0] minute = values[1] hour = values[2] if '?' in values[3]: day = None elif 'L' in values[5]: day = f"last {values[5].replace('L', '')}" elif 'W' in values[3]: day = cls.__find_recent_workday(int(values[3].split('W')[0])) else: day = values[3].replace('L', 'last') month = values[4] if '?' in values[5] or 'L' in values[5]: week = None elif '#' in values[5]: week = int(values[5].split('#')[1]) else: week = values[5] if '#' in values[5]: day_of_week = int(values[5].split('#')[0]) - 1 else: day_of_week = None year = values[6] if len(values) == 7 else None return cls(second=second, minute=minute, hour=hour, day=day, month=month, week=week, day_of_week=day_of_week, year=year, timezone=timezone) @classmethod def __find_recent_workday(cls, day): now = datetime.now() date = datetime(now.year, now.month, day) if date.weekday() < 5: return date.day else: diff = 1 while True: previous_day = date - timedelta(days=diff) if previous_day.weekday() < 5: return previous_day.day else: diff += 1 job_stores = { 'default': MemoryJobStore(), 'sqlalchemy': SQLAlchemyJobStore(url=SQLALCHEMY_DATABASE_URL, engine=engine), 'redis': RedisJobStore( **dict( host=RedisConfig.redis_host, port=RedisConfig.redis_port, username=RedisConfig.redis_username, password=RedisConfig.redis_password, db=RedisConfig.redis_database ) ) } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instance': 1 } scheduler = BackgroundScheduler() scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults) class SchedulerUtil: """ 定时任务相关方法 """ @classmethod async def init_system_scheduler(cls, result_db: Session = SessionLocal()): """ 应用启动时初始化定时任务 :return: """ logger.info("开始启动定时任务...") scheduler.start() job_list = JobDao.get_job_list_for_scheduler(result_db) for item in job_list: query_job = cls.get_scheduler_job(job_id=str(item.job_id)) if query_job: cls.remove_scheduler_job(job_id=str(item.job_id)) cls.add_scheduler_job(item) result_db.close() scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL) logger.info("系统初始定时任务加载成功") @classmethod async def close_system_scheduler(cls): """ 应用关闭时关闭定时任务 :return: """ scheduler.shutdown() logger.info("关闭定时任务成功") @classmethod def get_scheduler_job(cls, job_id): """ 根据任务id获取任务对象 :param job_id: 任务id :return: 任务对象 """ query_job = scheduler.get_job(job_id=str(job_id)) return query_job @classmethod def add_scheduler_job(cls, job_info): """ 根据输入的任务对象信息添加任务 :param job_info: 任务对象信息 :return: """ scheduler.add_job( func=eval(job_info.invoke_target), trigger=MyCronTrigger.from_crontab(job_info.cron_expression), args=job_info.job_args.split(',') if job_info.job_args else None, kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None, id=str(job_info.job_id), name=job_info.job_name, misfire_grace_time=1000000000000 if job_info.misfire_policy == '3' else None, coalesce=True if job_info.misfire_policy == '2' else False, max_instances=3 if job_info.concurrent == '0' else 1, jobstore=job_info.job_group, executor=job_info.job_executor ) @classmethod def execute_scheduler_job_once(cls, job_info): """ 根据输入的任务对象执行一次任务 :param job_info: 任务对象信息 :return: """ scheduler.add_job( func=eval(job_info.invoke_target), trigger='date', run_date=datetime.now() + timedelta(seconds=1), args=job_info.job_args.split(',') if job_info.job_args else None, kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None, id=str(job_info.job_id), name=job_info.job_name, misfire_grace_time=1000000000000 if job_info.misfire_policy == '3' else None, coalesce=True if job_info.misfire_policy == '2' else False, max_instances=3 if job_info.concurrent == '0' else 1, jobstore=job_info.job_group, executor=job_info.job_executor ) @classmethod def remove_scheduler_job(cls, job_id): """ 根据任务id移除任务 :param job_id: 任务id :return: """ scheduler.remove_job(job_id=str(job_id)) @classmethod def scheduler_event_listener(cls, event): # 获取事件类型和任务ID event_type = event.__class__.__name__ # 获取任务执行异常信息 status = '0' exception_info = '' if event_type == 'JobExecutionEvent' and event.exception: exception_info = str(event.exception) status = '1' job_id = event.job_id query_job = cls.get_scheduler_job(job_id=job_id) if query_job: query_job_info = query_job.__getstate__() # 获取任务名称 job_name = query_job_info.get('name') # 获取任务组名 job_group = query_job._jobstore_alias # 获取任务执行器 job_executor = query_job_info.get('executor') # 获取调用目标字符串 invoke_target = query_job_info.get('func') # 获取调用函数位置参数 job_args = ','.join(query_job_info.get('args')) # 获取调用函数关键字参数 job_kwargs = json.dumps(query_job_info.get('kwargs')) # 获取任务触发器 job_trigger = str(query_job_info.get('trigger')) # 构造日志消息 job_message = f"事件类型: {event_type}, 任务ID: {job_id}, 任务名称: {job_name}, 执行于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" job_log = dict( job_name=job_name, job_group=job_group, job_executor=job_executor, invoke_target=invoke_target, job_args=job_args, job_kwargs=job_kwargs, job_trigger=job_trigger, job_message=job_message, status=status, exception_info=exception_info ) session = SessionLocal() JobLogService.add_job_log_services(session, JobLogModel(**job_log)) session.close()