[定时器源码]从Sleep到Quartz 写在前面 笔者在实习项目中运用到了定时器,用于定时同步数据。但对于定时器的工作原理一知半解,比如定时器是如何做到到点执行的?又是如何知道每个定时任务的下一执行日期的?CronString在定时器中是如何被解释的,在代码中是以怎样的形式存在?
带着这些问题,笔者调研了几个主流定时器的源码,记录下Timer、Spring Scheduling和Quartz框架在一些实现细节上的差异。
ThreadPool+Sleep 线程加Sleep其实就能实现一个很简单的定时器
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(3 , 10 , 10 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10 )); for (int i = 0 ; i < 3 ; i++) { executor.submit(new Thread(()->{ while (true ) { Thread.sleep(1000 ); doSomething(); } })); } }
这种定时器就做到了每秒执行任务,且三个定时任务并发执行。但显然还是很粗陋,先不说单线程只能对应一个任务,任务无法交叉执行,而且一旦定时任务数量过多,核心线程数旧居不下,会占用太多资源。
Timer Java 1.3后原生支持了定时器Timer,仅以单线程即可安排多个定时任务,且不用担心任务数量过多,待执行的任务会被安排在等待队列中。
基本使用 1 2 3 4 5 6 new Timer("test" ).schedule(new TimerTask() { @Override public void run () { doSomething(); } },5000L ,1000L );
这种定时器做到了定时器启动5s后,每秒执行一次任务。此外,Timer派生出了三种任务类型:一次性任务、固定速率任务和固定延时任务。
三种任务都属于TimerTask类型,由TimerTask.period区分开来:
一次性任务: period = 0, 定时器执行完该任务后便不再执行
固定速率任务(FixedRateTask): period > 0, 定时器执行完该任务后, 将上一任务的启动时间后移固定时长, 作为下一任务的预期执行时间
固定延时任务(FixedDelayTask): period < 0, 定时器执行完该任务后, 将上一任务的完成时间后移固定时长, 作为下一任务的预期执行时间
按照官方说法,固定速率任务适用于对相邻任务的时间间隔比较敏感的定时任务,比如每小时的准时响铃,或者在十秒内每秒作响一次的计时器。这是因为固定速率任务会保证每个执行都完成,即使某次执行耗时过长(比如GC或者网络IO)以至于结束时间以及超过了后续几次执行的预期执行时间。
以每秒滴答作响的计时器为例,预期在0、1、2、3、4、5秒分表作响一次。在0秒的执行中出现卡顿导致在2.5s的时候才结束。固定速率任务则会把1、2秒本该执行的执行一遍,但固定延时任务会忽略掉“过时”的任务,转而安排的下一任务的预期执行时间为3.5s。
启动流程 new Timer()
会立刻启动一个定时器线程TimerThread
,该线程不断自循环地访问任务队列queue
的队首元素queue[1]
,如果任务队列为空则进入等待状态queue.wait()
,该调用没有设置timeout,所以需要等待其他线程主动唤醒。
Timer.schedule()
会将TimerTask
添加到任务队列当中,如果是首次添加任务,还会通过queue.notify()
主动唤醒TimerThread
。至此就成功调度了一份任务给定时器。
随后定时器线程取到队首任务,等到预期执行时间再执行该任务,但在执行之前还会根据该任务是否是一次性任务来决定是否要重新入队。
任务队列 注意到取的队首任务是queue[1]而非queue[0],这与queue的”储存结构“有关。任务队列本质还是Arraylist,但采用了堆思想以实现优先队列的功能。
在任务队列中,0号元素不储存任务,1号元素为队首任务,其预期执行时间最早。队列元素以小顶堆的思想组织起来的,即queue[n].executionTime <= queue[2n].executionTime && queue[n].executionTime <= queue[2n+1].executionTime
。
添加任务 添加任务时先在队尾塞入任务(必要时会先扩容),然后根据该任务的预期执行时间,将其安排到合适的队列位置。这一流程的核心逻辑处于TaskQueue#fixUp
中
1 2 3 4 5 6 7 8 9 private void fixUp (int k) { while (k > 1 ) { int j = k >> 1 ; if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) break ; TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } }
新加入的第k号任务,不断比较自己的父节点的预期执行时间,如果早于父节点则与父节点交换位置,直到父节点的预期执行时间更早或新任务成为了队首任务。
复用任务 对于非一次性任务,定时器在执行任务前会重新安排一次该任务。具体步骤是先更新任务的预期执行时间(固定速率任务和固定时延任务的更新策略有所不同),然后将任务下放到合适位置,核心逻辑处于TaskQueue#fixDown
中
1 2 3 4 5 6 7 8 9 10 11 12 private void fixDown (int k) { int j; while ((j = k << 1 ) <= size && j > 0 ) { if (j < size && queue[j].nextExecutionTime > queue[j+1 ].nextExecutionTime) j++; if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime) break ; TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } }
对于k号任务,找到子节点中预期执行时间最早的那个,与其比较。如果子节点的预期执行时间更早,则与子节点交换位置,直到该任务的预期执行时间比所有子节点都早或者没有子节点。
剔除任务 对于一次性任务,定时器在执行任务前会用队尾任务和该任务对换位置,然后将该任务踢出任务队列。针对提上来的队尾任务,执行一次fixDown
。
清理任务 任务自身可以通过TimerTask#cancel
将自己设置为任务取消状态。定时器线程在遇到CANCELLED的任务时,会将其剔除。此外,用户可以主动调用Timer#purge
来快速清理CANCELLED的任务,但这种清理方式会导致整个任务队列的排序混乱,因而还会对整个队列重排一次Timer#heapify
Spring Schedule Spring 3.0后开始支持定时任务,且新增对CronTask任务的支持
基本使用 一般在启动类上添加@EnableScheduling
注解即可开始对定时任务的支持,带有@Scheduled或@Schedules
注解的方法会被识别并被注册为定时任务。
@Scheduled
有几个重要属性:
属性
解释
实例
cron
类cron表达式,扩展了UN*X系统中,对包含秒、分、时、月中日、月和周中日的触发器的定义
0 * * * * MON-FRI
zone
与cron
同用,在cron表达式解析时指明时区,默认使用服务器得本地时区
GMT+8:00
fixedDelay
固定延时,用于配置固定延时任务
1000L
fixedRate
固定速率,用于配置固定速率任务
1000L
initialDelay
fixedRateTask和fixedDelayTask的首次启动时延
1000L
timeUnit
为fixedDelay、fixedRate和initialDelay指明时间单位,默认为毫秒
TimeUnit.MILLISECONDS
以下代码就注册了一个从当前小时05分开始,每10分钟执行一次的定时任务
1 2 3 4 @Scheduled(cron = "0 5/10 * * * *") public void cronTask () { doSomething(); }
启动流程 任务注册 随后在#afterSingletonsInstantiated或#onApplicationEvent
这两个“互斥”方法中执行注册后的工作#finishRegistration
,总之#finishRegistration
不会被重复执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void finishRegistration () { if (this .scheduler != null ) { this .registrar.setScheduler(this .scheduler); } if (this .beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this .beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this .registrar); } } if (this .registrar.hasTasks() && this .registrar.getScheduler() == null ) { try { this .registrar.setTaskScheduler(resolveSchedulerBean(this .beanFactory, TaskScheduler.class, false )); } ...... 总之尽力从工厂中获取调度器, 如果什么都没有提前配置,这一步也依然获取不到调度器 } this .registrar.afterPropertiesSet(); }
可以看到,Spring Schedule为用户提供了钩子SchedulingConfigurer
,用户可以在这里提前配置任务或者线程池等等。
随后,进入到#afterPropertiesSet
,其中核心步骤为#scheduleTasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 protected void scheduleTasks () { if (this .taskScheduler == null ) { this .localExecutor = Executors.newSingleThreadScheduledExecutor(); this .taskScheduler = new ConcurrentTaskScheduler(this .localExecutor); } if (this .triggerTasks != null ) { for (TriggerTask task : this .triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this .cronTasks != null ) { for (CronTask task : this .cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this .fixedRateTasks != null ) { for (IntervalTask task : this .fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this .fixedDelayTasks != null ) { for (IntervalTask task : this .fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
可以发现, #finishRegistration
主要完成一些前置工作。后续对@Scheduled的解析还在ScheduledAnnotationBeanPostProcessor#postProcessAfterInitialization
中。
该方法的前期会经过几层筛选,合格的方法会被整理到annotatedMethods(Map<Method,Set<Scheduled>>类型
)中。随后在#processScheduled
中被解析成各种定时任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 protected void processScheduled (Scheduled scheduled, Method method, Object bean) { try { Set<ScheduledTask> tasks = new LinkedHashSet<>(4 ); Runnable runnable = createRunnable(bean, method); ...... long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit() ...... String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); ...... tasks.add(this .registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit()); if (fixedDelay >= 0 ) { ...... tasks.add(this .registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); ...... long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit()); if (fixedRate >= 0 ) { ..... tasks.add(this .registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); ...... synchronized (this .scheduledTasks) { Set<ScheduledTask> regTasks = this .scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4 )); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
以上是任务注册流程,注册结果同一储存在ScheduledAnnotationBeanPostProcessor.scheduledTasks
中,这是一个容量为16的Map<Object, Set<ScheduledTask>>
,其中ScheduledTask
就是各种定时任务。
调度任务 Spring Schedule中将定时任务扔入执行器(本质是个线程池)的步骤称为任务调度,该过程必须由调度器执行。具体方法为ScheduledTaskRegistrar#scheduleCronTask
、ScheduledTaskRegistrar#scheduleFixedRateTask
或ScheduledTaskRegistrar#scheduleFixedDelayTask
。
细心的可以发现,在任务注册步骤,只要调度器存在,每次注册一个定时任务的同时也会将该任务调度到执行器中,并不是待scheduledTasks
收集完所有定时任务后再统一调度。
调度任务过程,笔者将重点讲CronTask调度和DelayTask调度的区别。
DelayTask调度 DelayTask有几个重要属性:
属性
类型
含义
interval
long
固定时延时长,默认以毫秒为单位
initialDelay
long
首次任务的启动延时,默认以毫秒为单位
runnable
Runnable
实际类型为ScheduledMethodRunnable
,由原始方法经过一层包装得到的
ScheduledTaskRegistrar#scheduleFixedDelayTask
是DelayTask调度的入口,主要介绍其中的核心流程TaskScheduler#scheduleWithFixedDelay
1 2 3 4 5 6 7 8 9 10 11 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) { long initialDelay = startTime.getTime() - this .clock.millis(); try { return this .scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true ), initialDelay, delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this .scheduledExecutor + "] did not accept task: " + task, ex); } }
接下来看下任务是怎么进入线程池的,以及怎么做到“周期性执行”的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { ...... ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null , triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
深入到ScheduledFutureTask#run
,可以知道为什么交给线程池之前还要再包装一次:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void run () { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false ); else if (!periodic) ScheduledFutureTask.super .run(); else if (ScheduledFutureTask.super .runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
综上,DelayTask调度其实与Timer的调度有异曲同工之妙。只不过把手动启动任务换为把任务提交给线程池,把优先队列换为线程池的延时队列DelayedWorkQueue
。
CronTask调度 CronTask有几个重要属性:
属性
类型
含义
expression
String
cron表达式,如 0 5/10 * * * *
trigger
Trigger
实际类型为CronTrigger
,方法nextExecutionTime
可以返回下一轮任务的预期执行时间
runnable
Runnable
实际类型为ScheduledMethodRunnable
,由原始方法经过一层包装得到的
ScheduledTaskRegistrar#scheduleCronTask
是CronTask调度的入口,主要介绍其中的核心流程TaskScheduler#schedule
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { try { if (this .enterpriseConcurrentScheduler) { return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true ), trigger); } else { ErrorHandler errorHandler = (this .errorHandler != null ? this .errorHandler : TaskUtils.getDefaultErrorHandler(true )); return new ReschedulingRunnable(task, trigger, this .clock, this .scheduledExecutor, errorHandler).schedule(); } } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this .scheduledExecutor + "] did not accept task: " + task, ex); } }
比起DelayTask调度, 调度器没有直接将任务扔入执行器中。这是为实现“周期性任务”做出的让步:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public ScheduledFuture<?> schedule() { synchronized (this .triggerContextMonitor) { this .scheduledExecutionTime = this .trigger.nextExecutionTime(this .triggerContext); if (this .scheduledExecutionTime == null ) { return null ; } long initialDelay = this .scheduledExecutionTime.getTime() - this .triggerContext.getClock().millis(); this .currentFuture = this .executor.schedule(this , initialDelay, TimeUnit.MILLISECONDS); return this ; } }@Override public void run () { Date actualExecutionTime = new Date(this .triggerContext.getClock().millis()); super .run(); Date completionTime = new Date(this .triggerContext.getClock().millis()); synchronized (this .triggerContextMonitor) { Assert.state(this .scheduledExecutionTime != null , "No scheduled execution" ); this .triggerContext.update(this .scheduledExecutionTime, actualExecutionTime, completionTime); if (!obtainCurrentFuture().isCancelled()) { schedule(); } } }
在这里就可以发现,CronTask因为无法像DelayTask一样设置period。因此在执行器看来,CronTask就是一个一次性任务,但这个一次性任务会不断向执行器塞入任务。所以在调度器看来,CronTask是一个周期性任务。
Cron解析 上文提到了CronTask特有的CronTrigger,其nextExecutionTime方法能计算出下一任务的预期执行时间。以“0 5/10 * * * *”为例,如果触发时间为17:33:00,CronTrigger是如何推算出下一时间为17:35:00的呢?
CronExpression CronTrigger一个重要属性就是CronExpression,它是CronString,即”0 5/10 * * * *”这一抽象字符串在程序中的具体表现形式。
可以看到,CronExpression储存了7份BitsCronField
,除了BitsCronField-NanoOfSecond
外,剩余六份都能和”0 5/10 * * * *”的六个位置对应的上。仔细观察,以BitsCronField-HourOfDay
这一份为例,它包含了0-23小时的所有值,这与 \*
的含义一致,表示任意小时。接下来我们重点关注BitsCronField-MinuteOfHour
,为什么只挑出了这么几分钟。
上图中的BitsCronField.type
储存了0-59分钟的完整数据,这部分为固定值,不随CronString改变而改变。BitsCronField.bits
为核心属性,其类型为long,将其转为二进制后,如果第i位(从0开始计算)为1,就表示选取了第i分钟 。以图中的值36064015784378400为例,其转换为二进制后正好第5、15、25、35、45和55位为1,也就说明选取了第5、15、25、35、45和55分钟。这与CronString的5/10
含义一致。
至于BitsCronField.bits
是如何计算出来的,可以参考源码CronExpression#parse
如何计算nextExecutionTime 计算CronTrigger#nextExecutionTime
的核心逻辑在CronExpression#next
,最后的计算逻辑在BitsCronfield#nextOrSame
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public <T extends Temporal & Comparable<? super T>> T nextOrSame (T temporal) { int current = type().get(temporal); int next = nextSetBit(current); if (next == -1 ) { temporal = type().rollForward(temporal); next = nextSetBit(0 ); } if (next == current) { return temporal; } else { int count = 0 ; current = type().get(temporal); while (current != next && count++ < CronExpression.MAX_ATTEMPTS) { temporal = type().elapseUntil(temporal, next); current = type().get(temporal); next = nextSetBit(current); if (next == -1 ) { temporal = type().rollForward(temporal); next = nextSetBit(0 ); } } if (count >= CronExpression.MAX_ATTEMPTS) { return null ; } return type().reset(temporal); } }private int nextSetBit (int fromIndex) { long result = this .bits & (-1L << fromIndex); if (result != 0 ) { return Long.numberOfTrailingZeros(result); } else { return -1 ; } }
以上代码只是以BitsCronfield-MinuteOfHour
为例,CronExpression持有的7份BitsCronfield都要执行一遍,也就是秒、分、时等7个部分都要更换一边,如此才能真正找到下一预期执行时间。
钩子函数 Spring Schedule还为用户提供了钩子函数,用户只要实现了SchedulingConfigurer
接口,就能自行配置任务或者线程池等等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class MyConfigurer implements SchedulingConfigurer { @Override public void configureTasks (ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.addCronTask(runnable, "0/5 * * * * *" ); taskRegistrar.setScheduler(threadPoolTaskScheduler()); } public ThreadPoolTaskScheduler threadPoolTaskScheduler () { ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler(); executor.setPoolSize(20 ); executor.setThreadNamePrefix("taskExecutor-" ); executor.setWaitForTasksToCompleteOnShutdown(true ); executor.setAwaitTerminationSeconds(60 ); executor.initialize(); return executor; } }
这些钩子真正生效的地方在#finishRegistration
,Spring Schedule会取出所有SchedulingConfigurer的实现类,并一一执行#configureTasks
方法。在钩子方法中添加的任务会被暂存到ScheduledAnnotationBeanPostProcessor.registrar
中,在ScheduledTaskRegistrar#scheduleTasks
内被一一调度到执行器中。
Quartz Spring Schedule其实已经基本能够满足常规的定时任务工作。但其可自定义程度并不高。Quartz框架除了有着更为灵活的自定义功能,在调度器实现,cron解析等方面也有着些许不同。
基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); JobDetail job = JobBuilder.newJob(MyJob.class) .withIdentity("testJob" , "testJobGroup" ) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("testTrigger" , "testTriggerGroup" ) .withSchedule(CronScheduleBuilder.cronSchedule("*/30 * * * * ?" )) .build(); scheduler.scheduleJob(job, trigger); scheduler.start();
quartz的使用较为直白,相比于Spring Schedule,调度器、任务和触发器需要手动实例化并手动触发调度。对于Trigger,常用类型为CronTrigger和SimpleTrigger,分别可用于CronTask和FixedDelayTask、FixedRateTask
1 2 3 4 5 6 7 TriggerBuilder.newTrigger() .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(30 ) .repeatForever()) .startNow() .build();
以上就定义了一个自启动开始,每隔30秒执行一次的触发器。
启动流程 调度器初始化 quartz的启动入口在于SchedulerFactory#getScheduler
,即当我们获取到调度器实例的时候,背后的执行器等相关也就一同准备好了。后续代码以StdSchedulerFactory#getScheduler
为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Scheduler getScheduler () throws SchedulerException { if (cfg == null ) { initialize(); } SchedulerRepository schedRep = SchedulerRepository.getInstance(); Scheduler sched = schedRep.lookup(getSchedulerName()); if (sched != null ) { if (sched.isShutdown()) { schedRep.remove(getSchedulerName()); } else { return sched; } } sched = instantiate(); return sched; }
在读取配置文件的步骤中,读取的配置文件名为org.quartz.properties或quartz.properties
,quartz自带的一份配置文件就设置了一部分的默认值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 org.quartz.scheduler.instanceName : DefaultQuartzScheduler org.quartz.scheduler.rmi.export : false org.quartz.scheduler.rmi.proxy : false org.quartz.scheduler.wrapJobExecutionInUserTransaction : false org.quartz.threadPool.class : org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount : 10 org.quartz.threadPool.threadPriority : 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread : true org.quartz.jobStore.misfireThreshold : 60000 org.quartz.jobStore.class : org.quartz.simpl.RAMJobStore
接下来深入到StdSchedulerFactory#instantiate
,重点关注执行器,即”线程池”是如何分配和设置的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());if (tpClass == null ) {......}try { tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance(); } catch (Exception e) {......} tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true );try { setBeanProps(tp, tProps); } catch (Exception e) {......}
第4步setBeanProps
本质是通过SimpleThreadPool的setter方法为实例赋值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void setBeanProps (Object obj, Properties props) throws xxxException { props.remove("class" ); props.remove(PoolingConnectionProvider.POOLING_PROVIDER); BeanInfo bi = Introspector.getBeanInfo(obj.getClass()); PropertyDescriptor[] propDescs = bi.getPropertyDescriptors(); PropertiesParser pp = new PropertiesParser(props); java.util.Enumeration<Object> keys = props.keys(); while (keys.hasMoreElements()) { String name = (String) keys.nextElement(); String c = name.substring(0 , 1 ).toUpperCase(Locale.US); String methName = "set" + c + name.substring(1 ); java.lang.reflect.Method setMeth = getSetMethod(methName, propDescs); try { ...... 接下来就是通过 setMetth.invoke(obj,属性值)来为实例赋值 } catch (NumberFormatException nfe) {......} } }
除了这里用到的属性描述符PropertyDescriptor
,还有方法描述符等等,这些描述符都继承FeatureDescriptor
,持有这些描述符基本能像实例本身一样调用各种方法和属性。
以上仅介绍了调度器的线程池配置,关于调度器的其他属性细节都大差不差。总之,通过SchedulerFactory#getScheduler
,我们获得了一个完整的调度器。
任务与触发器 自定义定时任务需要实现Job
接口,实现的接口方法execute
才是任务执行的入口。Quartz提供了SimpleScheduleBuilder
和CronScheduleBuilder
等模板供用户定义触发器。
与Spring Schedule不同的是,Quartz的任务和触发器还能附带各种自定义数据JobDataMap
,这些数据还能在Job#execute
中被访问到。
JobBuilder#setJobData
或JobBuilder#usingJobData
均可以为任务设置k-v属性,此外TriggerBuilder#setJobData
或TriggerBuilder#usingJobData
也可以为任务设置k-v属性。但因为Trigger可以绑定多个Job,如果key冲突,value以Trigger设置的值为准。
Quartz引入了JobDetail
的概念,准确来说,调度器管理的是JobDetail
而不是Job
。JobDetail可以视为Job的模板,每当需要将任务交给执行器时,调度器会根据JobDetail生成一份任务再交给执行器。这与Timer或Spring Schedule的方式是完全不同的,前两者为了实现”周期性任务”,是需要直接复制或移动原任务的,而Quartz只需要根据JobDetail再生成一份任务即可。
前文说过,用户可以自定义地向JobDataMap
存入k-v数据。但也正因为新任务是由模板生成的,新任务的JobDataMap都只会是构建JobDetail时设置的初始值,那么正常情况下上一任务是无法通过JobDataMap向下一任务传递数据的。为此,Quartz提供了@PersistJobDataAfterExecution
注解,将其放在Job类上,就能开启JobDataMap的持久化,保存上一任务的执行结果了。
调度任务 我们回到调度器初始化,即StdSchedulerFacotry#instantiate
。在完成了一些资源初始化后,执行器开始启动一个特殊线程QuartzSchedulerThread
,其任务就是不断循环地**”获取触发器-执行触发器-执行任务”**,对应的关键流程为 **RAMJobStore#acquireNextTriggers
- RAMJobStore#triggersFired
- SimpleThreadPool#runInThread
**。
另外,在循环开始之前,QuartzSchedulerThread
会因为QuartzSchedulerThread.pause
变量而阻塞,需要调度器调用start
来解除阻塞。
获取触发器#acquireNextTriggers 企图获取未来一小段时间内(默认30s)所有带触发的触发器,依据为Trigger.nextFireTime
。这里的nextFireTime与Spring Schedule的nextExecutionTime含义相同,只是前者附在触发器上,后者附在了任务上。
执行触发器#triggersFireds 在获取了可执行的触发器后,每个触发器都将完成几个重要任务:计算下一触发时间Trigger#getFireTimeAfter
;根据JobDetail生成任务并返回一个捆绑包TriggerFiredBundle
执行任务#runInThread 在执行任务前,将前一步得到的捆绑包包装成可执行类JobRunShell(Runnable类)
,随后扔入”线程池”SimpleThreadPool
。但注意,这不是通常意义上的线程池,因为SimpleThreadPool既没有核心线程数,也没有任务队列,而是SimpleThreadPool自行管理了10个WorkerThread
(默认10个,可由org.quartz.threadPool.threadCount
设置),以及链表availWorkers和链表busyWorkers,分别表示空闲的WorkerThread和运行的WorkerThread。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public boolean runInThread (Runnable runnable) { if (runnable == null ) { return false ; } synchronized (nextRunnableLock) { handoffPending = true ; while ((availWorkers.size() < 1 ) && !isShutdown) { try { nextRunnableLock.wait(500 ); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { WorkerThread wt = new WorkerThread(this , threadGroup, "WorkerThread-LastJob" , prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false ; } return true ; }
注意我在第三步提到的一次性任务,只要”线程池“不关闭,Quartz里的定时任务都是周期性任务。但当然,即使”线程池“关闭,而不意味着定时任务就不再执行,只要还有存留的worker没被销毁且定时任务被分配到了一个worker,这个任务就可以再续命一次。直到worker全部销毁,Quartz也就停止,定时任务自然也就不再执行了。
此外,第2步的wt.run(runnable)
中也有一些细节。空闲worker并不意味着这个线程是停止的,无论空闲与否,worker都处于运行状态 。只是空闲worker会一直处于空转状态,直到通过WorkerThread#run(Runnable)
方法把具体任务给设置上,worker就会感知到并开始执行这个任务,随后删除该任务,继续处于空转状态。这部分内容可参考源码WorkerThread#run
Cron解析 Quartz的Cron解析采取了与Spring Schedule完全不一样的方式,回顾Spring Schedule,它通过bits的1的位置来表明触发时间。以”0 5/10 * * * *”为例,bits转换为二进制后,第5、15、25、35、45和55位都为1,也就代表第5、15、25、35、45和55分钟是定时任务的触发时机。
回到Quartz的Cron解析,其采用了TreeSet(红黑树实现的有序集合)来储存触发时间。带着问题来看具体实现:以“0 5/10 * * * *”为例,如果触发时间为17:33:00,CronTrigger是如何推算出下一时间为17:35:00的呢?
CronExpression CronTrigger有个重要属性就是CronExpression
,它是“0 5/10 * * * *”这一抽象字符串在程序中的具体表现形式。
可以看到,CronExpression
储存了7分TreeSet,除了TreeSet-years外,其余6份TreeSet均能和CronString一一对应。本文先不聊Quartz是如何把CronString转换为这7份TreeSet的(这一步骤的源码在CronExpression#buildExpression
)。重点关注TreeSet-minutes,看下如何通过17:33:00推算出下一执行时间为17:35:00
如何计算nextFireTime 计算CronTrigger.nextFireTime
的核心逻辑在CronExpression#getTimeAfter
中,我们只抽取分钟部分讲解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 cl.setTime(afterTime); min = cl.get(Calendar.MINUTE);int hr = cl.get(Calendar.HOUR_OF_DAY); t = -1 ; st = minutes.tailSet(min);if (st != null && st.size() != 0 ) { t = min; min = st.first(); } else { min = minutes.first(); hr++; }if (min != t) { cl.set(Calendar.SECOND, 0 ); cl.set(Calendar.MINUTE, min); setCalendarHour(cl, hr); continue ; } cl.set(Calendar.MINUTE, min); hr = cl.get(Calendar.HOUR_OF_DAY);int day = cl.get(Calendar.DAY_OF_MONTH); t = -1 ;
可以看到,得力于TreeSet本身的有序性,获取下一执行时间的过程十分直白,与Spring Schedule的bits相比,属于以空间换取时间。
以上代码只是以分钟部分为例,完整流程需要将CronExpression持有的7份TreeSet都过一遍,也就是秒、分、时等7个部分都要更换一遍,如此才能真正找到下一预期执行时间。
监听器 Quartz还提供了监听器供用户探测Quartz在各个阶段的执行状态,与Spring Schedule提供的SchedulingConfigurer
功能类似但钩子的埋点数量更多。
Quartz提供了JobListener
、SchedulerListener
和TriggerListener
,它们分别提供了各种接口方法,只要实现了这些方法,就能在Job、Scheduler、Trigger执行到指定阶段时获取上下文信息。
但注意,需要将监听器实现类放入Scheduler.ListenerManager
中才能生效
最后 本文较为详细的介绍了几个主流定时器框架的实现原理,从Timer到Spring Schedule再到Quartz以及文中还未提及的xxl-job和elastic-job,不难发现一条清晰的发展路线:Spring Schedule支持了Timer缺少的Cron,Quartz又比Spring Schedule的扩展性更强,以及后者的xxl-job和elastic-job在分布式定时任务领域也做得很优秀。希望本文能在解惑Cron任务原理,定时器原理的同时,也能为读者在定时器技术选型上提供一点帮助。
此外,于笔者而言,本次源码解析的学习过程也有额外收获。比如
堆排序实现优先队列
定时任务中时常需要等待/空转,又加深了wait、notify的理解,了解到了spurious wakeup
Spring Schedule中bits的巧妙设计,以及Quartz中红黑树的又一应用
了解到属性描述符PropertyDescriptor
与配置文件读取的结合应用
Quartz中”伪线程池”SimpleThreadPool
的巧妙设计