⾼并发之——深度解析ScheduledThreadPoolExecutor类的源
代码
在【⾼并发专题】的专栏中,我们深度分析了ThreadPoolExecutor类的源代码,⽽ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的⼦类。今天我们就来⼀起⼿撕ScheduledThreadPoolExecutor类的源代码。
构造⽅法
我们先来看下ScheduledThreadPoolExecutor的构造⽅法,源代码如下所⽰。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
从代码结构上来看,ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的⼦类,ScheduledThreadPoolExecutor类的构造⽅法实际上调⽤的是ThreadPoolExecutor类的构造⽅法。
schedule⽅法
接下来,我们看⼀下ScheduledThreadPoolExecutor类的schedule⽅法,源代码如下所⽰。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
//如果传递的Runnable对象和TimeUnit时间单位为空
//抛出空指针异常
if (command == null || unit == null)
throw new NullPointerException();
//封装任务对象,在decorateTask⽅法中直接返回ScheduledFutureTask对象
RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
//执⾏延时任务
delayedExecute(t);
//返回任务
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
//如果传递的Callable对象和TimeUnit时间单位为空
//抛出空指针异常
if (callable == null || unit == null)
throw new NullPointerException();
//封装任务对象,在decorateTask⽅法中直接返回ScheduledFutureTask对象
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
//执⾏延时任务
delayedExecute(t);
//返回任务
return t;
}
从源代码可以看出,ScheduledThreadPoolExecutor类提供了两个重载的schedule⽅法,两个schedule⽅法的第⼀个参数不同。可以传递Runnable接⼝对象,也可以传递Callable接⼝对象。在⽅法内部,会将Runnable接⼝对象和Callable接⼝对象封装成RunnableScheduledFuture对象,本质上就是封装成ScheduledFutureTask对象。并通过delayedExecute⽅法来执⾏延时任务。
在源代码中,我们看到两个schedule都调⽤了decorateTask⽅法,接下来,我们就看看decorateTask⽅法。
decorateTask⽅法
decorateTask⽅法源代码如下所⽰。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
通过源码可以看出decorateTask⽅法的实现⽐较简单,接收⼀个Runnable接⼝对象或者Callable接⼝对象和封装的RunnableScheduledFuture任务,两个⽅法都是将RunnableScheduledFuture任务直接返回。在ScheduledThreadPoolExecutor类的⼦类中可以重写这两个⽅法。
接下来,我们继续看下scheduleAtFixedRate⽅法。
scheduleAtFixedRate⽅法
scheduleAtFixedRate⽅法源代码如下所⽰。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
//传⼊的Runnable对象和TimeUnit为空,则抛出空指针异常
if (command == null || unit == null)
throw new NullPointerException();
//如果执⾏周期period传⼊的数值⼩于或者等于0
//抛出⾮法参数异常
if (period <= 0)
throw new IllegalArgumentException();
//将Runnable对象封装成ScheduledFutureTask任务,
//并设置执⾏周期
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), Nanos(period));
//调⽤decorateTask⽅法,本质上还是直接返回ScheduledFutureTask对象
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//设置执⾏的任务
sft.outerTask = t;
//执⾏延时任务
delayedExecute(t);
//返回执⾏的任务
return t;
}
通过源码可以看出,scheduleAtFixedRate⽅法将传递的Runnable对象封装成ScheduledFutureTask任务对象,并设置了执⾏周期,下⼀次的执⾏时间相对于上⼀次的执⾏时间来说,加上了period时长,时长的具体单位由TimeUnit决定。采⽤固定的频率来执⾏定时任务。
ScheduledThreadPoolExecutor类中另⼀个定时调度任务的⽅法是scheduleWithFixedDelay⽅法,接下来,我们就⼀起看看scheduleWithFixedDelay⽅法。
scheduleWithFixedDelay⽅法
scheduleWithFixedDelay⽅法的源代码如下所⽰。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
//传⼊的Runnable对象和TimeUnit为空,则抛出空指针异常
if (command == null || unit == null)
throw new NullPointerException();
//任务延时时长⼩于或者等于0,则抛出⾮法参数异常
if (delay <= 0)
throw new IllegalArgumentException();
//将Runnable对象封装成ScheduledFutureTask任务
//并设置固定的执⾏周期来执⾏任务
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit), Nanos(-delay));
//调⽤decorateTask⽅法,本质上直接返回ScheduledFutureTask任务
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//设置执⾏的任务
sft.outerTask = t;
//执⾏延时任务
delayedExecute(t);
//返回任务
return t;
}
从scheduleWithFixedDelay⽅法的源代码,我们可以看出在将Runnable对象封装成ScheduledFutureTask时,设置了执⾏周期,但是此时设置的执⾏周期与scheduleAtFixedRate⽅法设置的执⾏周期不同。此时设置的执⾏周期规则为:下⼀次任务执⾏的时间是上⼀次任务完成的时间加上delay时长,时长单位由TimeUnit决定。也就是说,具体的执⾏时间不是固定的,但是执⾏的周期是固定的,整体采⽤的是相对固定的延迟来执⾏定时任务。
如果⼤家细⼼的话,会发现在scheduleWithFixedDelay⽅法中设置执⾏周期时,传递的delay值为负数,如下所⽰。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), Nanos(-delay));
这⾥的负数表⽰的是相对固定的延迟。
在ScheduledFutureTask类中,存在⼀个tNextRunTime⽅法,这个⽅法会在run⽅法执⾏完任务后调⽤,这个⽅法更能体现scheduleAtFixedRate⽅法和scheduleWithFixedDelay⽅法的不同,tNextRunTime⽅法的源码如下所⽰。
private void tNextRunTime() {
//距离下次执⾏任务的时长
long p = period;
//固定频率执⾏,
//上次执⾏任务的时间
/
/加上任务的执⾏周期
if (p > 0)
time += p;
//相对固定的延迟
//使⽤的是系统当前时间
//加上任务的执⾏周期
el
time = triggerTime(-p);
}
在tNextRunTime⽅法中通过对下次执⾏任务的时长进⾏判断来确定是固定频率执⾏还是相对固定的延迟。
triggerTime⽅法
在ScheduledThreadPoolExecutor类中提供了两个triggerTime⽅法,⽤于获取下⼀次执⾏任务的具体时间。triggerTime⽅法的源码如下所⽰。
private long triggerTime(long delay, TimeUnit unit) {
return Nanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
这两个triggerTime⽅法的代码⽐较简单,就是获取下⼀次执⾏任务的具体时间。有⼀点需要注意的是:delay < (Long.MAX_VALUE >> 1判断delay的值是否⼩于Long.MAX_VALUE的⼀半,如果⼩于Long.MAX_VALUE值的⼀半,则直接返回delay,否则需要处理溢出的情况。
我们看到在triggerTime⽅法中处理防⽌溢出的逻辑使⽤了overflowFree⽅法,接下来,我们就看看overflowFree⽅法的实现。overflowFree⽅法
overflowFree⽅法的源代码如下所⽰。
private long overflowFree(long delay) {
//获取队列中的节点
Delayed head = (Delayed) Queue().peek();
//获取的节点不为空,则进⾏后续处理
if (head != null) {
//从队列节点中获取延迟时间
long headDelay = Delay(NANOSECONDS);
//如果从队列中获取的延迟时间⼩于0,并且传递的delay
//值减去从队列节点中获取延迟时间⼩于0
if (headDelay < 0 && (delay - headDelay < 0))
//将delay的值设置为Long.MAX_VALUE + headDelay
delay = Long.MAX_VALUE + headDelay;
}
//返回延迟时间
return delay;
}
通过对overflowFree⽅法的源码分析,可以看出overflowFree⽅法本质上就是为了限制队列中的所有节点的延迟时间在Long.MAX_VALUE值之内,防⽌在ScheduledFutureTask类中的compareTo⽅法中溢出。
ScheduledFutureTask类中的compareTo⽅法的源码如下所⽰。
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
el if (diff > 0)
return 1;
el if (quenceNumber < x.quenceNumber)
return -1;
el
return 1;
}
long diff = getDelay(NANOSECONDS) - Delay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
compareTo⽅法的主要作⽤就是对各延迟任务进⾏排序,距离下次执⾏时间靠前的任务就排在前⾯。delayedExecute⽅法
delayedExecute⽅法是ScheduledThreadPoolExecutor类中延迟执⾏任务的⽅法,源代码如下所⽰。