【多线程】线程池拒绝策略详解与⾃定义拒绝策略
线程池的拒绝策略
ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略
CallerRunsPolicy:由调⽤execute⽅法提交任务的线程来执⾏这个任务
AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
DiscardPolicy:直接抛弃任务,不做任何处理
听雨声DiscardOldestPolicy:去除任务队列中的第⼀个任务,重新提交
线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核⼼线程数,也即最⼩的线程数。workQueue - 阻塞队列 。maximumPoolSize - 最⼤线程数
当提交任务数⼤于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最⼤线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是⼀句话,当提交的任务数⼤于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
拒绝策略的源码
CallerRunsPolicy
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which ca the task
* is discarded.
* ⽤于拒绝任务的处理程序,
* 可以直接在{@code execute}⽅法的调⽤线程中运⾏被拒绝的任务
* 除⾮执⾏器已被关闭,否则将丢弃该任务。
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
蒙在鼓里的意思
概念学习* Creates a {@code CallerRunsPolicy}.
* 创建⼀个{@code CallerRunsPolicy}。
*/
public CallerRunsPolicy(){}
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which ca the task is discarded.
* 除⾮执⾏器已关闭,否则在调⽤者线程中执⾏任务,
* r 在这种情况下,该任务将被丢弃。
*
* @param r the runnable task requested to be executed
* r 请求执⾏的可运⾏任务
* @param e the executor attempting to execute this task
* e 尝试执⾏此任务的执⾏者
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e){
if(!e.isShutdown()){
r.run();
}
}
}
分析:
CallerRunsPolicy:线程调⽤运⾏该任务的 execute 本⾝。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
这个策略显然不想放弃执⾏任务。但是由于池中已经没有任何资源了,那么就直接使⽤调⽤该execute的线程本⾝来执⾏。(开始我总不想丢弃任务的执⾏,但是对某些应⽤场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执⾏的,很可能导致程序没法继续跑了。需要视业务情景⽽定吧。)
这样⽣产者虽然没有被阻塞,但提交任务也会被暂停。
但这种策略也有隐患,当⽣产者较少时,⽣产者消费任务的时间⾥,消费者可能已经把任务都消费完了,队列处于空状态,当⽣产者执⾏完任务后才能再继续⽣产任务,这个过程中可能导致消费者线程的饥饿。
AbortPolicy
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
* 抛出{@code RejectedExecutionException}的拒绝任务处理程序。
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy(){}
/**
* Always throws RejectedExecutionException.
* 总是抛出RejectedExecutionException
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e){
throw new RejectedExecutionException("Task "+ r.toString()+
" rejected from "+
}
}
分析:
该策略是默认饱和策略。
使⽤该策略时在饱和时会抛出RejectedExecutionException(继承⾃RuntimeException),调⽤者可捕获该异常⾃⾏处理。
DiscardPolicy
/**
* A handler for rejected tasks that silently discards the
* rejected task.
* 拒绝任务的处理程序,默认丢弃拒绝任务。
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy(){}
/**
* Does nothing, which has the effect of discarding task r.
* 不执⾏任何操作,这具有丢弃任务 r 的作⽤。
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e){
}
}
分析:
如代码所⽰,不做任何处理直接抛弃任务婴儿连体衣
DiscardOldestPolicy
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
写日记的方法* is shut down, in which ca the task is discarded.
* 处理被拒绝任务的处理程序,它丢弃最旧的未处理请求,
* 然后重试{@code execute},
* 除⾮执⾏器*被关闭,在这种情况下,该任务将被丢弃。有你同在
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy(){}
/**
* Obtains and ignores the next task that the executor
* would otherwi execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which ca task r is instead discarded.
* 获取并忽略执⾏者*会⽴即执⾏的下⼀个任务(如果⼀个任务⽴即可⽤),
* 然后重试任务r的执⾏,除⾮执⾏者*被关闭,在这种情况下,任务r会被丢弃。
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e){
if(!e.isShutdown()){
}
}
}
分析:
如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使⽤PriorityBlockingQueue优先级队列,将会导致优先级最⾼的任务被抛弃,因此不建议将该种策略配合优先级队列使⽤。
⾃定义策略
看完发现默认的⼏个拒绝策略并不是特别的友好,那么可不可以咱们⾃⼰搞个呢?
可以发现,所有的拒绝策略都是实现了 RejectedExecutionHandler 接⼝
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are渡江战役简介
* available becau their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the abnce of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
这个接⼝只有⼀个 rejectedExecution ⽅法。
r 为待执⾏任务;executor 为线程池;⽅法可能会抛出拒绝异常。
那么咱们就可以通过实现 RejectedExecutionHandler 接⼝扩展
两个栗⼦:⼀
netty⾃⼰实现的线程池⾥⾯私有的⼀个拒绝策略。单独启动⼀个新的临时线程来执⾏任务。
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
try{
final Thread t =new Thread(r,"Temporary task executor");
t.start();
}catch(Throwable e){
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
经典老歌400首
}
}
两个栗⼦:⼆
dubbo的⼀个例⼦,它直接继承的 AbortPolicy ,加强了⽇志输出,并且输出dump⽂件
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
}
⾃⼰玩
参考类似的思路,最简单的做法,我们可以直接定义⼀个RejectedExecutionHandler,当队列满时改为调⽤BlockingQueue.put来实现⽣产者的阻塞:
new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
if(!executor.isShutdown()){
try{
}catch(InterruptedException e){
// should not be interrupted
}
}
}
};
这样,我们就⽆需再关⼼Queue和Consumer的逻辑,只要把精⼒集中在⽣产者和消费者线程的实现逻辑上,只管往线程池提交任务就⾏了。
相⽐最初的设计,这种⽅式的代码量能减少不少,⽽且能避免并发环境的很多问题。当然,你也可以采⽤另外的⼿段,例如在提交时采⽤信号量做⼊⼝限制等,但是如果仅仅是要让⽣产者阻塞,那就显得复杂了。
总结
四种线程池拒绝策略,具体使⽤哪种策略,还得根据实际业务场景才能做出抉择。