��Sleep��Quartz

[定时器源码]从Sleep到Quartz

写在前面

笔者在实习项目中运用到了定时器,用于定时同步数据。但对于定时器的工作原理一知半解,比如定时器是如何做到到点执行的?又是如何知道每个定时任务的下一执行日期的?CronString在定时器中是如何被解释的,在代码中是以怎样的形式存在?

带着这些问题,笔者调研了几个主流定时器的源码,记录下Timer、Spring Scheduling和Quartz框架在一些实现细节上的差异。

ThreadPool+Sleep

线程加Sleep其实就能实现一个很简单的定时器

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
for (int i = 0; i < 3; i++) {
executor.submit(new Thread(()->{
while (true) {
Thread.sleep(1000);
doSomething();
}
}));
}
}

这种定时器就做到了每秒执行任务,且三个定时任务并发执行。但显然还是很粗陋,先不说单线程只能对应一个任务,任务无法交叉执行,而且一旦定时任务数量过多,核心线程数旧居不下,会占用太多资源。

Timer

Java 1.3后原生支持了定时器Timer,仅以单线程即可安排多个定时任务,且不用担心任务数量过多,待执行的任务会被安排在等待队列中。

基本使用

1
2
3
4
5
6
new Timer("test").schedule(new TimerTask() {
@Override
public void run() {
doSomething();
}
},5000L,1000L);

这种定时器做到了定时器启动5s后,每秒执行一次任务。此外,Timer派生出了三种任务类型:一次性任务、固定速率任务和固定延时任务。

三种任务都属于TimerTask类型,由TimerTask.period区分开来:

一次性任务: period = 0, 定时器执行完该任务后便不再执行

固定速率任务(FixedRateTask): period > 0, 定时器执行完该任务后, 将上一任务的启动时间后移固定时长, 作为下一任务的预期执行时间

固定延时任务(FixedDelayTask): period < 0, 定时器执行完该任务后, 将上一任务的完成时间后移固定时长, 作为下一任务的预期执行时间

按照官方说法,固定速率任务适用于对相邻任务的时间间隔比较敏感的定时任务,比如每小时的准时响铃,或者在十秒内每秒作响一次的计时器。这是因为固定速率任务会保证每个执行都完成,即使某次执行耗时过长(比如GC或者网络IO)以至于结束时间以及超过了后续几次执行的预期执行时间。

以每秒滴答作响的计时器为例,预期在0、1、2、3、4、5秒分表作响一次。在0秒的执行中出现卡顿导致在2.5s的时候才结束。固定速率任务则会把1、2秒本该执行的执行一遍,但固定延时任务会忽略掉“过时”的任务,转而安排的下一任务的预期执行时间为3.5s。

启动流程

new Timer()会立刻启动一个定时器线程TimerThread,该线程不断自循环地访问任务队列queue的队首元素queue[1],如果任务队列为空则进入等待状态queue.wait(),该调用没有设置timeout,所以需要等待其他线程主动唤醒。

Timer.schedule()会将TimerTask添加到任务队列当中,如果是首次添加任务,还会通过queue.notify()主动唤醒TimerThread。至此就成功调度了一份任务给定时器。

随后定时器线程取到队首任务,等到预期执行时间再执行该任务,但在执行之前还会根据该任务是否是一次性任务来决定是否要重新入队。

image-20231115093103486

任务队列

注意到取的队首任务是queue[1]而非queue[0],这与queue的”储存结构“有关。任务队列本质还是Arraylist,但采用了堆思想以实现优先队列的功能。

image-20231115111917032

在任务队列中,0号元素不储存任务,1号元素为队首任务,其预期执行时间最早。队列元素以小顶堆的思想组织起来的,即queue[n].executionTime <= queue[2n].executionTime && queue[n].executionTime <= queue[2n+1].executionTime

添加任务

添加任务时先在队尾塞入任务(必要时会先扩容),然后根据该任务的预期执行时间,将其安排到合适的队列位置。这一流程的核心逻辑处于TaskQueue#fixUp

1
2
3
4
5
6
7
8
9
private void fixUp(int k) {
while (k > 1) {
int j = k >> 1;
if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}

新加入的第k号任务,不断比较自己的父节点的预期执行时间,如果早于父节点则与父节点交换位置,直到父节点的预期执行时间更早或新任务成为了队首任务。

复用任务

对于非一次性任务,定时器在执行任务前会重新安排一次该任务。具体步骤是先更新任务的预期执行时间(固定速率任务和固定时延任务的更新策略有所不同),然后将任务下放到合适位置,核心逻辑处于TaskQueue#fixDown

1
2
3
4
5
6
7
8
9
10
11
12
private void fixDown(int k) {
int j;
while ((j = k << 1) <= size && j > 0) {
if (j < size &&
queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
j++; // j indexes smallest kid
if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}

对于k号任务,找到子节点中预期执行时间最早的那个,与其比较。如果子节点的预期执行时间更早,则与子节点交换位置,直到该任务的预期执行时间比所有子节点都早或者没有子节点。

剔除任务

对于一次性任务,定时器在执行任务前会用队尾任务和该任务对换位置,然后将该任务踢出任务队列。针对提上来的队尾任务,执行一次fixDown

清理任务

任务自身可以通过TimerTask#cancel将自己设置为任务取消状态。定时器线程在遇到CANCELLED的任务时,会将其剔除。此外,用户可以主动调用Timer#purge来快速清理CANCELLED的任务,但这种清理方式会导致整个任务队列的排序混乱,因而还会对整个队列重排一次Timer#heapify

Spring Schedule

Spring 3.0后开始支持定时任务,且新增对CronTask任务的支持

基本使用

一般在启动类上添加@EnableScheduling注解即可开始对定时任务的支持,带有@Scheduled或@Schedules注解的方法会被识别并被注册为定时任务。

@Scheduled有几个重要属性:

属性 解释 实例
cron 类cron表达式,扩展了UN*X系统中,对包含秒、分、时、月中日、月和周中日的触发器的定义 0 * * * * MON-FRI
zone cron同用,在cron表达式解析时指明时区,默认使用服务器得本地时区 GMT+8:00
fixedDelay 固定延时,用于配置固定延时任务 1000L
fixedRate 固定速率,用于配置固定速率任务 1000L
initialDelay fixedRateTask和fixedDelayTask的首次启动时延 1000L
timeUnit 为fixedDelay、fixedRate和initialDelay指明时间单位,默认为毫秒 TimeUnit.MILLISECONDS

以下代码就注册了一个从当前小时05分开始,每10分钟执行一次的定时任务

1
2
3
4
@Scheduled(cron = "0 5/10 * * * *")
public void cronTask(){
doSomething();
}

启动流程

任务注册

随后在#afterSingletonsInstantiated或#onApplicationEvent这两个“互斥”方法中执行注册后的工作#finishRegistration,总之#finishRegistration不会被重复执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void finishRegistration() {
// 1. 为registrar放入调度器scheduler
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
if (this.beanFactory instanceof ListableBeanFactory) {
// 2. 获取SchedulingConfigurer实现类,在该类中可以手动加入定时任务
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
// 3. 执行自定义方法,比如手动添加CronTask等,这些任务都会留在registrar中
configurer.configureTasks(this.registrar);
}
}
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
try {
// 4. 手动添加了定时任务,但没有添加调度器。
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
...... 总之尽力从工厂中获取调度器, 如果什么都没有提前配置,这一步也依然获取不到调度器
}
this.registrar.afterPropertiesSet();
}

可以看到,Spring Schedule为用户提供了钩子SchedulingConfigurer,用户可以在这里提前配置任务或者线程池等等。

随后,进入到#afterPropertiesSet,其中核心步骤为#scheduleTasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected void scheduleTasks() {
// 1. 用户没有配置调度器, 默认设置调度器为Concurrent调度器, 执行器为单线程线程池
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
// 2. 将各种任务添加到registrar.scheduledTasks中备用
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}

可以发现, #finishRegistration主要完成一些前置工作。后续对@Scheduled的解析还在ScheduledAnnotationBeanPostProcessor#postProcessAfterInitialization中。

该方法的前期会经过几层筛选,合格的方法会被整理到annotatedMethods(Map<Method,Set<Scheduled>>类型)中。随后在#processScheduled中被解析成各种定时任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// 1. 将方法包装为Runnable
Runnable runnable = createRunnable(bean, method);
......
// 2. 解析initialDelay
long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit()
......
// 3. 解析cron表达式和时区
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
......
// 3.5. timeZone = zone有效 ? zone : defaultZone
// 4. 添加CronTask
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
// 6. 解析fixedDelay, 准备添加fixedDelayTask
long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
if (fixedDelay >= 0) {
......
// 7. 添加fixedDelayTask
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
// 8. 除了fixedDelay,fixedDelayString也能用于注册fixedDelayTask, 这里不做解释
String fixedDelayString = scheduled.fixedDelayString();
......
// 9. 解析fixedRateTask, 准备添加fixedRateTask
long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
if (fixedRate >= 0) {
.....
// 10. 添加fixedRateTask, 添加流程与fixedDelayTask类似
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
// 11. 除了fixedRate,fixedRateString也能用于注册fixedRateTask, 这里不做解释
String fixedRateString = scheduled.fixedRateString();
......
// 12. 以类对象为依据, 整理其下的所有定时任务。
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}

以上是任务注册流程,注册结果同一储存在ScheduledAnnotationBeanPostProcessor.scheduledTasks中,这是一个容量为16的Map<Object, Set<ScheduledTask>>,其中ScheduledTask就是各种定时任务。

调度任务

Spring Schedule中将定时任务扔入执行器(本质是个线程池)的步骤称为任务调度,该过程必须由调度器执行。具体方法为ScheduledTaskRegistrar#scheduleCronTaskScheduledTaskRegistrar#scheduleFixedRateTaskScheduledTaskRegistrar#scheduleFixedDelayTask

细心的可以发现,在任务注册步骤,只要调度器存在,每次注册一个定时任务的同时也会将该任务调度到执行器中,并不是待scheduledTasks收集完所有定时任务后再统一调度。

调度任务过程,笔者将重点讲CronTask调度和DelayTask调度的区别。

DelayTask调度

DelayTask有几个重要属性:

属性 类型 含义
interval long 固定时延时长,默认以毫秒为单位
initialDelay long 首次任务的启动延时,默认以毫秒为单位
runnable Runnable 实际类型为ScheduledMethodRunnable,由原始方法经过一层包装得到的

ScheduledTaskRegistrar#scheduleFixedDelayTask是DelayTask调度的入口,主要介绍其中的核心流程TaskScheduler#scheduleWithFixedDelay

1
2
3
4
5
6
7
8
9
10
11
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
// 1. 计算首次任务的启动延时
long initialDelay = startTime.getTime() - this.clock.millis();
try {
// 2. 在这里把任务扔入执行器中,上文说过执行器就是一个线程池,这里的task也就是包装后的ScheduledMethodRunnable
return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
}
}

接下来看下任务是怎么进入线程池的,以及怎么做到“周期性执行”的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
......
// 1. 任务被包装成了ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay)); // 设置ScheduledFutureTask.period; 这和Timer里的思想一样, period为负代表固定延时任务
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t); // 开始扔入线程池, 可以直接跳到ScheduledFutureTask#run查看任务是怎么被执行的
return t;
}

深入到ScheduledFutureTask#run,可以知道为什么交给线程池之前还要再包装一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {
// 1. 判断是否为周期性任务,如果period != 0会被执行器认为是一次性任务
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// 2. 执行一次性任务
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
// 3. 执行周期性任务后,计算并设置下一任务的预期执行时间
setNextRunTime();
// 4. outerTask是本任务的一个备份,在这里把备份重新放入线程池的队列中,这样就实现了“周期性执行”
reExecutePeriodic(outerTask);
}
}

综上,DelayTask调度其实与Timer的调度有异曲同工之妙。只不过把手动启动任务换为把任务提交给线程池,把优先队列换为线程池的延时队列DelayedWorkQueue

CronTask调度

CronTask有几个重要属性:

属性 类型 含义
expression String cron表达式,如 0 5/10 * * * *
trigger Trigger 实际类型为CronTrigger,方法nextExecutionTime可以返回下一轮任务的预期执行时间
runnable Runnable 实际类型为ScheduledMethodRunnable,由原始方法经过一层包装得到的

ScheduledTaskRegistrar#scheduleCronTask是CronTask调度的入口,主要介绍其中的核心流程TaskScheduler#schedule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
// 为了兼容jdk1.7, jdk1.8不考虑
if (this.enterpriseConcurrentScheduler) {
return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}
else {
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
// 没有直接将任务扔到执行器中, 而是继续包装成ReschedulingRunnable
return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
}
}

比起DelayTask调度, 调度器没有直接将任务扔入执行器中。这是为实现“周期性任务”做出的让步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
// 1. 通过cronTrigger计算预期执行时间
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
// 2. 计算当前时间到下一CronTask执行的时差
long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
// 3. 将任务扔入执行器,executor.schedule可以回顾DelayTask的调度, 这里就是将自己(ReschedulingRunnable)交给线程池
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}

// 这是ReschedulingRunnable.run方法, 在这里看下CronTask会做些什么
@Override
public void run() {
Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
// 1. 执行CronTask
super.run();
Date completionTime = new Date(this.triggerContext.getClock().millis());
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
// 2. 主要更新下一次任务的预期执行时间
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!obtainCurrentFuture().isCancelled()) {
// 3. 上一任务异步返回了currentFuture, 如果发现上一任务没有被取消,本次任务执行后就重复将自己调度一次,从而实现CronTask的周期执行
schedule();
}
}
}

在这里就可以发现,CronTask因为无法像DelayTask一样设置period。因此在执行器看来,CronTask就是一个一次性任务,但这个一次性任务会不断向执行器塞入任务。所以在调度器看来,CronTask是一个周期性任务。

Cron解析

上文提到了CronTask特有的CronTrigger,其nextExecutionTime方法能计算出下一任务的预期执行时间。以“0 5/10 * * * *”为例,如果触发时间为17:33:00,CronTrigger是如何推算出下一时间为17:35:00的呢?

CronExpression

CronTrigger一个重要属性就是CronExpression,它是CronString,即”0 5/10 * * * *”这一抽象字符串在程序中的具体表现形式。

image-20231115173102968

可以看到,CronExpression储存了7份BitsCronField,除了BitsCronField-NanoOfSecond外,剩余六份都能和”0 5/10 * * * *”的六个位置对应的上。仔细观察,以BitsCronField-HourOfDay这一份为例,它包含了0-23小时的所有值,这与 \* 的含义一致,表示任意小时。接下来我们重点关注BitsCronField-MinuteOfHour,为什么只挑出了这么几分钟。

image-20231115174312604

上图中的BitsCronField.type储存了0-59分钟的完整数据,这部分为固定值,不随CronString改变而改变。BitsCronField.bits为核心属性,其类型为long,将其转为二进制后,如果第i位(从0开始计算)为1,就表示选取了第i分钟。以图中的值36064015784378400为例,其转换为二进制后正好第5、15、25、35、45和55位为1,也就说明选取了第5、15、25、35、45和55分钟。这与CronString的5/10含义一致。

至于BitsCronField.bits是如何计算出来的,可以参考源码CronExpression#parse

如何计算nextExecutionTime

计算CronTrigger#nextExecutionTime的核心逻辑在CronExpression#next,最后的计算逻辑在BitsCronfield#nextOrSame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public <T extends Temporal & Comparable<? super T>> T nextOrSame(T temporal) {
// 1. temporal为当前时间, 比如17:33:10。以分钟为例,这里type().get()返回33
int current = type().get(temporal);
// 2. 我们知道33分钟后要选择35分钟, 而这里正好返回的就是35
int next = nextSetBit(current);
if (next == -1) {
// 3. 应对超时的情况, 如果temporal为17:56:00, 当前小时也就没有合适的时间点了. 于是temproal更新到了18:00:00
temporal = type().rollForward(temporal);
next = nextSetBit(0);
}
if (next == current) {
// 4. 这是对应temporal为17:35:00的情况
return temporal;
}
else {
int count = 0;
// 5. temporal有可能更新过,这里重新取下分钟数
current = type().get(temporal);
while (current != next && count++ < CronExpression.MAX_ATTEMPTS) {
// 6. 例子中next为35,type().elapseUntil会将temporal一直推移到分钟数为35,得到17:35:10。注意只会影响分钟数
temporal = type().elapseUntil(temporal, next);
current = type().get(temporal);
next = nextSetBit(current);
if (next == -1) {
temporal = type().rollForward(temporal);
next = nextSetBit(0);
}
}
if (count >= CronExpression.MAX_ATTEMPTS) {
return null;
}
// 7. 在第6步其实就已经完成了分钟部分
return type().reset(temporal);
}
}

// 以17:33:10为例,输入33,期望输出35
private int nextSetBit(int fromIndex) {
// 1. -1L左移33位,与bits作与运算,就只剩下第35、45和55位(从0开始计数)为1。
long result = this.bits & (-1L << fromIndex);
if (result != 0) {
// 2. 返回result中末尾的连续的0的个数,正好35个
return Long.numberOfTrailingZeros(result);
}
else {
// 3. 如果以17:56:10为例, result就为0
return -1;
}
}

以上代码只是以BitsCronfield-MinuteOfHour为例,CronExpression持有的7份BitsCronfield都要执行一遍,也就是秒、分、时等7个部分都要更换一边,如此才能真正找到下一预期执行时间。

钩子函数

Spring Schedule还为用户提供了钩子函数,用户只要实现了SchedulingConfigurer接口,就能自行配置任务或者线程池等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyConfigurer implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addCronTask(runnable, "0/5 * * * * *");
// 如果在这里就设置好了调度器,就不会用默认执行器和默认调度器了
taskRegistrar.setScheduler(threadPoolTaskScheduler());
}

public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
executor.setPoolSize(20);
executor.setThreadNamePrefix("taskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}

这些钩子真正生效的地方在#finishRegistration,Spring Schedule会取出所有SchedulingConfigurer的实现类,并一一执行#configureTasks方法。在钩子方法中添加的任务会被暂存到ScheduledAnnotationBeanPostProcessor.registrar中,在ScheduledTaskRegistrar#scheduleTasks内被一一调度到执行器中。

Quartz

Spring Schedule其实已经基本能够满足常规的定时任务工作。但其可自定义程度并不高。Quartz框架除了有着更为灵活的自定义功能,在调度器实现,cron解析等方面也有着些许不同。

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 获取任务调度的实例
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 定义任务调度实例, 并与TestJob绑定
JobDetail job = JobBuilder.newJob(MyJob.class)
.withIdentity("testJob", "testJobGroup")
.build();
// 定义触发器, 会马上执行一次, 接着每30秒执行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("testTrigger", "testTriggerGroup")
.withSchedule(CronScheduleBuilder.cronSchedule("*/30 * * * * ?"))
.build();
// 使用触发器调度任务的执行
scheduler.scheduleJob(job, trigger);
// 开启任务
scheduler.start();

quartz的使用较为直白,相比于Spring Schedule,调度器、任务和触发器需要手动实例化并手动触发调度。对于Trigger,常用类型为CronTrigger和SimpleTrigger,分别可用于CronTask和FixedDelayTask、FixedRateTask

1
2
3
4
5
6
7
TriggerBuilder.newTrigger()
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(30)
.repeatForever())
.startNow()
.build();

以上就定义了一个自启动开始,每隔30秒执行一次的触发器。

启动流程

调度器初始化

quartz的启动入口在于SchedulerFactory#getScheduler,即当我们获取到调度器实例的时候,背后的执行器等相关也就一同准备好了。后续代码以StdSchedulerFactory#getScheduler为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
// 1. 读取配置文件,存到cfg中
initialize();
}
// 2. 获取默认调度器,一般情况下都为空
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
// 3. 初始化一个调度器
sched = instantiate();
return sched;
}

在读取配置文件的步骤中,读取的配置文件名为org.quartz.properties或quartz.properties,quartz自带的一份配置文件就设置了一部分的默认值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#

org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

org.quartz.jobStore.misfireThreshold: 60000

org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

接下来深入到StdSchedulerFactory#instantiate,重点关注执行器,即”线程池”是如何分配和设置的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Get ThreadPool Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// 1. 根据org.quartz.threadPool.class获取线程池类名
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
if (tpClass == null) {......}
try {
// 2. 获取该类的一个实例,这里是SimpleThreadPool类
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {......}
// 3. 获取org.quartz.threadPool.属性
tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
try {
// 4. 将属性值赋给实例tp
setBeanProps(tp, tProps);
} catch (Exception e) {......}

第4步setBeanProps本质是通过SimpleThreadPool的setter方法为实例赋值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void setBeanProps(Object obj, Properties props) throws xxxException {
props.remove("class");
props.remove(PoolingConnectionProvider.POOLING_PROVIDER);
// 1. props还剩下threadCount、threadPriority、threadsInheritContextClassLoaderOfInitializingThread属性
BeanInfo bi = Introspector.getBeanInfo(obj.getClass());
// 2. PropertyDescriptor是属性描述符, 能为我们描述一个类的所有类属性,包括属性名、属性类型、属性setter/getter
PropertyDescriptor[] propDescs = bi.getPropertyDescriptors();
PropertiesParser pp = new PropertiesParser(props);
java.util.Enumeration<Object> keys = props.keys();
while (keys.hasMoreElements()) {
// 3. 生成setter方法名,threadCount就对应setThreadCount。这里可以看出为什么setter/getter会要求属性名首字母大写
String name = (String) keys.nextElement();
String c = name.substring(0, 1).toUpperCase(Locale.US);
String methName = "set" + c + name.substring(1);
// 4. 获得setter方法句柄
java.lang.reflect.Method setMeth = getSetMethod(methName, propDescs);
try {
...... 接下来就是通过 setMetth.invoke(obj,属性值)来为实例赋值
} catch (NumberFormatException nfe) {......}
}
}

除了这里用到的属性描述符PropertyDescriptor,还有方法描述符等等,这些描述符都继承FeatureDescriptor,持有这些描述符基本能像实例本身一样调用各种方法和属性。

以上仅介绍了调度器的线程池配置,关于调度器的其他属性细节都大差不差。总之,通过SchedulerFactory#getScheduler,我们获得了一个完整的调度器。

任务与触发器

自定义定时任务需要实现Job接口,实现的接口方法execute才是任务执行的入口。Quartz提供了SimpleScheduleBuilderCronScheduleBuilder等模板供用户定义触发器。

与Spring Schedule不同的是,Quartz的任务和触发器还能附带各种自定义数据JobDataMap,这些数据还能在Job#execute中被访问到。

JobBuilder#setJobDataJobBuilder#usingJobData均可以为任务设置k-v属性,此外TriggerBuilder#setJobDataTriggerBuilder#usingJobData也可以为任务设置k-v属性。但因为Trigger可以绑定多个Job,如果key冲突,value以Trigger设置的值为准。

Quartz引入了JobDetail的概念,准确来说,调度器管理的是JobDetail而不是Job。JobDetail可以视为Job的模板,每当需要将任务交给执行器时,调度器会根据JobDetail生成一份任务再交给执行器。这与Timer或Spring Schedule的方式是完全不同的,前两者为了实现”周期性任务”,是需要直接复制或移动原任务的,而Quartz只需要根据JobDetail再生成一份任务即可。

image-20231116141316837

前文说过,用户可以自定义地向JobDataMap存入k-v数据。但也正因为新任务是由模板生成的,新任务的JobDataMap都只会是构建JobDetail时设置的初始值,那么正常情况下上一任务是无法通过JobDataMap向下一任务传递数据的。为此,Quartz提供了@PersistJobDataAfterExecution注解,将其放在Job类上,就能开启JobDataMap的持久化,保存上一任务的执行结果了。

调度任务

我们回到调度器初始化,即StdSchedulerFacotry#instantiate。在完成了一些资源初始化后,执行器开始启动一个特殊线程QuartzSchedulerThread,其任务就是不断循环地**”获取触发器-执行触发器-执行任务”**,对应的关键流程为 **RAMJobStore#acquireNextTriggers - RAMJobStore#triggersFired - SimpleThreadPool#runInThread**。

另外,在循环开始之前,QuartzSchedulerThread会因为QuartzSchedulerThread.pause变量而阻塞,需要调度器调用start来解除阻塞。

获取触发器#acquireNextTriggers

企图获取未来一小段时间内(默认30s)所有带触发的触发器,依据为Trigger.nextFireTime。这里的nextFireTime与Spring Schedule的nextExecutionTime含义相同,只是前者附在触发器上,后者附在了任务上。

执行触发器#triggersFireds

在获取了可执行的触发器后,每个触发器都将完成几个重要任务:计算下一触发时间Trigger#getFireTimeAfter;根据JobDetail生成任务并返回一个捆绑包TriggerFiredBundle

执行任务#runInThread

在执行任务前,将前一步得到的捆绑包包装成可执行类JobRunShell(Runnable类),随后扔入”线程池”SimpleThreadPool。但注意,这不是通常意义上的线程池,因为SimpleThreadPool既没有核心线程数,也没有任务队列,而是SimpleThreadPool自行管理了10个WorkerThread(默认10个,可由org.quartz.threadPool.threadCount设置),以及链表availWorkers和链表busyWorkers,分别表示空闲的WorkerThread和运行的WorkerThread。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// 1. 等待一个空闲的worker
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
// 2. 将空闲worker移动到忙碌worker里,将任务设置到worker中,worker感知到后就会执行该任务
wt.run(runnable);
} else {
// 3. 如果"线程池"关闭了,就为该worker安排一个一次性任务,执行完后worker也就被销毁了
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}

注意我在第三步提到的一次性任务,只要”线程池“不关闭,Quartz里的定时任务都是周期性任务。但当然,即使”线程池“关闭,而不意味着定时任务就不再执行,只要还有存留的worker没被销毁且定时任务被分配到了一个worker,这个任务就可以再续命一次。直到worker全部销毁,Quartz也就停止,定时任务自然也就不再执行了。

此外,第2步的wt.run(runnable)中也有一些细节。空闲worker并不意味着这个线程是停止的,无论空闲与否,worker都处于运行状态。只是空闲worker会一直处于空转状态,直到通过WorkerThread#run(Runnable)方法把具体任务给设置上,worker就会感知到并开始执行这个任务,随后删除该任务,继续处于空转状态。这部分内容可参考源码WorkerThread#run

Cron解析

Quartz的Cron解析采取了与Spring Schedule完全不一样的方式,回顾Spring Schedule,它通过bits的1的位置来表明触发时间。以”0 5/10 * * * *”为例,bits转换为二进制后,第5、15、25、35、45和55位都为1,也就代表第5、15、25、35、45和55分钟是定时任务的触发时机。

回到Quartz的Cron解析,其采用了TreeSet(红黑树实现的有序集合)来储存触发时间。带着问题来看具体实现:以“0 5/10 * * * *”为例,如果触发时间为17:33:00,CronTrigger是如何推算出下一时间为17:35:00的呢?

CronExpression

CronTrigger有个重要属性就是CronExpression,它是“0 5/10 * * * *”这一抽象字符串在程序中的具体表现形式。

image-20231116164718843

可以看到,CronExpression储存了7分TreeSet,除了TreeSet-years外,其余6份TreeSet均能和CronString一一对应。本文先不聊Quartz是如何把CronString转换为这7份TreeSet的(这一步骤的源码在CronExpression#buildExpression)。重点关注TreeSet-minutes,看下如何通过17:33:00推算出下一执行时间为17:35:00

如何计算nextFireTime

计算CronTrigger.nextFireTime的核心逻辑在CronExpression#getTimeAfter中,我们只抽取分钟部分讲解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// afterTime = 17:33:00
cl.setTime(afterTime);
// 1. 获取当前分钟数 33
min = cl.get(Calendar.MINUTE);
// 2. 获取当前小时数 17
int hr = cl.get(Calendar.HOUR_OF_DAY);
t = -1;
// get minute.................................................
// 3. 利用红黑树有序的特性,直接获取minutes中比33大的值。st = 35,45,55
st = minutes.tailSet(min);
if (st != null && st.size() != 0) {
t = min;
// 4. 正常情况下,获取st的首元素35就是结果
min = st.first();
} else {
// 5. 如果当前分钟数为56,st就为空,说明需要延期到下一小时
min = minutes.first();
hr++;
}
if (min != t) {
// 6. 从第5步过来的,将当前时间更新到下一小时,即18:00:00
cl.set(Calendar.SECOND, 0);
cl.set(Calendar.MINUTE, min);
setCalendarHour(cl, hr);
continue;
}
cl.set(Calendar.MINUTE, min);
hr = cl.get(Calendar.HOUR_OF_DAY);
int day = cl.get(Calendar.DAY_OF_MONTH);
t = -1;

可以看到,得力于TreeSet本身的有序性,获取下一执行时间的过程十分直白,与Spring Schedule的bits相比,属于以空间换取时间。

以上代码只是以分钟部分为例,完整流程需要将CronExpression持有的7份TreeSet都过一遍,也就是秒、分、时等7个部分都要更换一遍,如此才能真正找到下一预期执行时间。

监听器

Quartz还提供了监听器供用户探测Quartz在各个阶段的执行状态,与Spring Schedule提供的SchedulingConfigurer功能类似但钩子的埋点数量更多。

Quartz提供了JobListenerSchedulerListenerTriggerListener,它们分别提供了各种接口方法,只要实现了这些方法,就能在Job、Scheduler、Trigger执行到指定阶段时获取上下文信息。

但注意,需要将监听器实现类放入Scheduler.ListenerManager中才能生效

最后

本文较为详细的介绍了几个主流定时器框架的实现原理,从Timer到Spring Schedule再到Quartz以及文中还未提及的xxl-job和elastic-job,不难发现一条清晰的发展路线:Spring Schedule支持了Timer缺少的Cron,Quartz又比Spring Schedule的扩展性更强,以及后者的xxl-job和elastic-job在分布式定时任务领域也做得很优秀。希望本文能在解惑Cron任务原理,定时器原理的同时,也能为读者在定时器技术选型上提供一点帮助。

此外,于笔者而言,本次源码解析的学习过程也有额外收获。比如

  1. 堆排序实现优先队列
  2. 定时任务中时常需要等待/空转,又加深了wait、notify的理解,了解到了spurious wakeup
  3. Spring Schedule中bits的巧妙设计,以及Quartz中红黑树的又一应用
  4. 了解到属性描述符PropertyDescriptor与配置文件读取的结合应用
  5. Quartz中”伪线程池”SimpleThreadPool的巧妙设计

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!