java线程池的四种拒绝策略
基础知识部分volunteer
ThreadPoolExecutor类的构造⽅法
为了让读者更好的理解⽂中的⽰例,笔者在讲解拒绝策略之前,列出了Java线程池的基础知识,本部分可跳过。
代码⼀:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
代码⼀是ThreadPoolExecutor类的中参数最多的构造⽅法的⽅法签名。该⽅法有7个参数,它们分别是:
1. int corePoolSize 核⼼线程数
2. int maximumPoolSize 最⼤线程数
3. long keepAliveTime 空闲线程等待⼯作的超时时间的数值
4. TimeUnit unit 空闲线程等待⼯作的超时时间的单位
5. BlockingQueue<Runnable> workQueue 阻塞队列
6. ThreadFactory threadFactory 线程⼯⼚
7. RejectedExecutionHandler handler 拒绝策略
线程池处理任务的过程
代码⼆是ThreadPoolExecutor类中execute⽅法的源码。该⽅法是我们向线程池中添加任务时调⽤的⽅法。
当我们调⽤execute(Runnable command)⽅法向线程池中添加任务时,线程池将依次按照下⾯⼏种情况处理:
1. 检查线程池中正在运⾏的线程数是否⼩于corePoolSize,如果⼩于,则来新建⼀个线程执⾏该任务。
2. 若前⼀步中的条件不满⾜,即正在运⾏的线程数不⼩于corePoolSize,则尝试将该任务添加到workQueue中,当线程池中出现空闲的线
程时,workQueue中的任务将会被取出并执⾏。但是,如果workQueue已经满了,将会出现添加不成功的情况,这时会执⾏下⼀步。
3. 若在之前两步中,任务没有执⾏也没有被成功的放⼊workQueue中,线程池将会尝试创建⼀个新的线程来执⾏该任务。若创建失败
(可能的原因有线程池中的线程数已经达到了指定的maximumPoolSize),则会使⽤指定的拒绝策略来处理。则会调
⽤RejectedExecutionHandler中的rejectedExecution⽅法。
代码⼆:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either becau this
* executor has been shutdown or becau its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
威尼斯恋人主题曲* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command){
if(command ==null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
袁娅维crazy* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents fal alarms that would add
* threads when it shouldn't, by returning fal.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (becau existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ();
if(workerCountOf(c)< corePoolSize){
if(addWorker(command,true))
return;
c = ();
}
if(isRunning(c)&& workQueue.offer(command)){
int recheck = ();
if(!isRunning(recheck)&&remove(command))hba
reject(command);
el if(workerCountOf(recheck)==0)
addWorker(null,fal);
}
el if(!addWorker(command,fal))
reject(command);
}
reject⽅法源码:
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for u by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command){
}
为了更直观的感受线程池处理任务的过程,我写了下⾯这个例⼦(代码三)。
花体英文在这个例⼦中,我创建了⼀个核⼼线程数为3,最⼤线程数为6,阻塞队列容量为3的线程池。然后向线程池中依次添加9个任务(任务1,任务2……任务9),执⾏每个任务都需要花三秒的时间,这样保证最后⼀个任务提交时,第⼀个任务仍没有执⾏完。
代码三:
public class ThreadPoolDemo4 {
public static void main(String[] args){
ThreadPoolExecutor threadPool =new ThreadPoolExecutor(3,6,3,
TimeUnit.SECONDS,new ArrayBlockingQueue<>(3));
try{
// 先放⼊三个任务这三个任务将直接在常驻线程中运⾏
for(int i =1; i <=3; i++){
int id = i;
try{
necro
System.out.println(Thread.currentThread().getName()+" Task "+ id +" 开始运⾏"); TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" Task "+ id +" 运⾏完毕*");
}catch(InterruptedException e){
e.printStackTrace();
}英文儿歌视频大全
});
try{
TimeUnit.MILLISECONDS.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
// 这三个任务将放⼊阻塞队列
for(int i =4; i <=6; i++){
int id = i;
try{butterfly的复数
System.out.println(Thread.currentThread().getName()+" Task "+ id +" 开始运⾏"); TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" Task "+ id +" 运⾏完毕*");
}catch(InterruptedException e){
e.printStackTrace();
}
});
try{
TimeUnit.MILLISECONDS.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
// 这三个线程将新创建线程来运⾏
for(int i =7; i <=9; i++){
int id = i;
try{
System.out.println(Thread.currentThread().getName()+" Task "+ id +" 开始运⾏"); TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" Task "+ id +" 运⾏完毕*");
}catch(InterruptedException e){
e.printStackTrace();
}
});
try{
TimeUnit.MILLISECONDS.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}finally{
threadPool.shutdown();
}
}
}
程序输出如下:
pool-1-thread-1 Task 1开始运⾏
pool-1-thread-2 Task 2开始运⾏
pool-1-thread-3 Task 3开始运⾏
pool-1-thread-4 Task 7开始运⾏
pool-1-thread-5 Task 8开始运⾏
pool-1-thread-6 Task 9开始运⾏
pool-1-thread-1 Task 1运⾏完毕*
pool-1-thread-1 Task 4开始运⾏
pool-1-thread-2 Task 2运⾏完毕*
pool-1-thread-2 Task 5开始运⾏
pool-1-thread-3 Task 3运⾏完毕*
pool-1-thread-3 Task 6开始运⾏
pool-1-thread-4 Task 7运⾏完毕*
pool-1-thread-5 Task 8运⾏完毕*
pool-1-thread-6 Task 9运⾏完毕*
pool-1-thread-1 Task 4运⾏完毕*
pool-1-thread-2 Task 5运⾏完毕*
pool-1-thread-3 Task 6运⾏完毕*
通过程序输出可以看出:
1. 当线程池中已有的线程数⼩于核⼼线程数时,线程池将新建线程来执⾏收到的任务(线程池新建了线程1、线程2、线程3来执⾏任务
1、任务
2、任务3)。
2. 对于随后收到的任务4、任务5和任务6,由于线程池中正在运⾏的线程已经达到了核⼼线程数,所以这三个线程被放进了线程池的阻
cruel是什么意思
塞队列(容量为3)中。此时,阻塞队列也已经满了。
3. 对于最后收到的任务7、任务8和任务9,线程池创建了新的线程4、线程5和线程6来运⾏。
4. 随后,最先开始运⾏的任务1、任务2和任务3陆续运⾏完毕,阻塞队列中的任务就被执⾏了。
四种拒绝策略
星巴克咖啡粉当线程池中正在运⾏的线程已经达到了指定的最⼤线程数量maximumPoolSize且线程池的阻塞队列也已经满了时,向线程池提交任务将触发拒绝处理逻辑。⽽juc中提供了四种拒绝策略,它们分别是AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy和DiscardPolicy.
对于这四种拒绝策略,我将使⽤代码四中的代码模板来演⽰它们的运⾏效果。第四⾏代码⽤来指定所使⽤的拒绝策略。在这段程序中,我们先创建了⼀个核⼼线程数,最⼤线程数和阻塞队列⼤⼩均为3的线程池。然后依次向线程池中提交7个任务(任务1,任务2……任务7)。由于单个任务执⾏时间需要3秒,所以当任务7被提交时,之前的任务都没有执⾏完,线程池中运⾏的线程数已经达到了指定的最⼤线程数,且阻塞队列也已经满了,所以将执⾏拒绝策略中的拒绝处理⽅法。
代码四:
public class RejectedPolicyDemo {
public static void main(String[] args){
// 指定所使⽤的拒绝策略
RejectedExecutionHandler handler =new ThreadPoolExecutor.XXXPolicy();
// 新建线程池核⼼线程数3 最⼤线程数3 阻塞队列容量3
ThreadPoolExecutor threadPool =new ThreadPoolExecutor(3,3,3,
TimeUnit.SECONDS,new ArrayBlockingQueue<>(3), handler);
// 依次向线程池中提交7个任务,触发拒绝处理逻辑
try{
for(int i =1; i <=7; i++){
int id = i;
System.out.println(Thread.currentThread().getName()+" 开始执⾏: 任务"+ id);
try{
TimeUnit.SECONDS.sleep(3);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" 完成: 任务"+ id);
});
try{
TimeUnit.MILLISECONDS.sleep(50);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}finally{
threadPool.shutdown();
}
}
}
AbortPolicy
终⽌策略,这是ThreadPoolExecutor线程池默认的拒绝策略,程序将会抛出RejectedExecutionException异常。AbortPolicy源代码:
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy(){}
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e){
throw new RejectedExecutionException("Task "+ r.toString()+
" rejected from "+
}
}
代码四中的程序输出: