Java多线程——线程池的ThreadPoolExecutor参数、阻塞队列、拒绝策略以及
处理流程
常见的四种线程池:
可以看⼀下其源码:
public static ExecutorService newSingleThreadExecutor(){
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
面痣图解男
}
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
//这⾥的super还是会调⽤ThreadPoolExecutor的⽅法。
public ScheduledThreadPoolExecutor(int corePoolSize){
super(corePoolSize, Integer.MAX_VALUE,0, NANOSECONDS,
new DelayedWorkQueue());
}
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
也就是说,这四种创建线程池的⽅式最终都会调⽤ThreadPoolExecutor构造
在《阿⾥巴巴java开发⼿册》中指出了线程资源必须通过线程池提供,不允许在应⽤中⾃⾏显⽰的创建线程,这样⼀⽅⾯是线程的创建更加规范,可以合理控制开辟线程的数量;另⼀⽅⾯线程的细节管理交给线程池处理,优化了资源的开销。⽽线程池不允许使⽤Executors去创建,⽽要通过ThreadPoolExecutor⽅式,这⼀⽅⾯是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的⽅法,但都有其局限性,不够灵活;另外由于前⾯⼏种⽅法内部也是通过ThreadPoolExecutor⽅式实现,使⽤ThreadPoolExecutor有助于⼤家明确线程池的运⾏规则,创建符合⾃⼰的业务场景需要的线程池,避免资源耗尽的风险。
ThreadPoolExecutor的四个构造
// 五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,//核⼼线程数
int maximumPoolSize,//最⼤线程数
long keepAliveTime,//⾮核⼼线程最⼤存活时长
TimeUnit unit,//时长单位
BlockingQueue<Runnable> workQueue)//阻塞队列,存放着等待执⾏的线程任务
// 六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,三国演义推荐理由
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)//创建线程的⼯⼚
// 六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)//拒绝策略,当线程池⽆法再接受任务时调⽤
// 七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
//参数约束
{
if(corePoolSize <0||
maximumPoolSize <=0||
maximumPoolSize < corePoolSize ||
keepAliveTime <0)
throw new IllegalArgumentException();
if(workQueue ==null|| threadFactory ==null|| handler ==null)
throw new NullPointerException();
this.acc = SecurityManager()==null?
null:
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = Nanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor的参数介绍
最多的构造函数有七个⼊参,最少的5个⼊参。这⾥为5个必传,和2个⾮必传(有默认值)。
corePoolSize 核⼼线程数
指定了线程池中的核⼼线程数,即不会因线程空闲⽽被销毁的线程。线程池初始化时默认是没有线程的,当任务来临时才开始创建线程去执⾏任务。它的数量决定了添加的任务是开辟新的线程去执⾏,还是放到workQueue任务队列中去。
核⼼线程:线程池中有两类线程,核⼼线程和⾮核⼼线程。核⼼线程默认情况下会⼀直存在于线程池中,即使这个核⼼线程什么都不⼲(铁饭碗),⽽⾮核⼼线程如果长时间的闲置,就会被销毁(临时⼯)。
maximumPoolSize 最⼤线程数
指定了线程池中的最⼤线程数量,这个参数会根据你使⽤的workQueue任务队列的类型,决定线程池会开辟的最⼤线程数量。
该值等于核⼼线程数量 + ⾮核⼼线程数量。
最⼤线程数,在核⼼线程数的基础上可能会额外增加⼀些⾮核⼼线程,需要注意的是只有当workQueue队列填满时才会创建多于corePoolSize的线程(线程池总线程数不超过maxPoolSize)。
后颈有痣
keepAliveTime ⾮核⼼线程最⼤存活时长
⾮核⼼线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作⽤于核⼼线程。
⾮核⼼线程的空闲时间超过keepAliveTime就会被⾃动终⽌回收掉,注意当corePoolSize=maxPoolSize时,keepAliveTime参数也就不起作⽤了(因为不存在⾮核⼼线程);
⾮核⼼线程:当线程池中空闲线程数量超过corePoolSize时,多余的线程就叫⾮核⼼线程。
unit :keepAliveTime的单位
TimeUnit是⼀个枚举类型 ,包括以下属性:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒/1000 SECONDS : 秒 MINUTES : 分 HOURS : ⼩时 DAYS : 天
workQueue 阻塞队列
存放着等待执⾏的Runnable任务对象。它⼀般分为直接提交队列、有界任务队列、⽆界任务队列、优先任务队列、延迟队列⼏种。
SynchronousQueue:同步队列,内部容量为0,每个put操作必须等待⼀个take操作,反之亦然。
LinkedBlockingQueue:链式阻塞队列,底层数据结构是链表,默认⼤⼩是Integer.MAX_VALUE,也可以指定⼤⼩。
ArrayBlockingQueue:数组阻塞队列,底层数据结构是数组,需要指定队列的⼤⼩。
PriorityBlockingQueue:基于优先级的⽆界阻塞队列(优先级的判断通过构造函数传⼊的Compator对象来决定),内部控制线程同步的锁采⽤的是⾮公平锁。
DelayQueue:延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素
当任务队列已满时,则会以maximumPoolSize为最⼤线程数上限。
threadFactory 创建线程的⼯⼚
⼀般⽤默认即可。可以⾃定义线程的名字,设置⼀些参数等等,如果不想⾃定义,可以使⽤默认的**Executors.defaultThreadFactory()**创建⼯⼚。
handle 拒绝策略
当线程池数量⼤于maximumPoolSize,则执⾏拒绝策略。
ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执⾏程序(如果再次失败,重复此过程)。
ThreadPoolExecutor.CallerRunsPolicy:由调⽤线程处理该任务。
腰缠万贯ThreadPoolExecutor的策略
线程池本⾝有⼀个调度线程,这个线程就是⽤于管理布控整个线程池⾥的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。
ThreadPoolExecutor总共有三钟线程:
核⼼线程
⾮核⼼线程
调度线程
故线程池也有⾃⼰的状态。ThreadPoolExecutor类中定义了⼀个volatile int变量runState来表⽰线程池的状态 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。
线程池创建后处于RUNNING状态。
调⽤shutdown()⽅法后处于SHUTDOWN状态,线程池不能接受新的任务,清除⼀些空闲worker,会
等待阻塞队列的任务完成。
调⽤shutdownNow()⽅法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执⾏的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
当所有的任务已终⽌,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执⾏terminated()函数。
ThreadPoolExecutor中有⼀个控制状态的属性叫ctl,它是⼀个AtomicInteger类型的变量。
线程池处在TIDYING状态时,执⾏完terminated()⽅法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态
阻塞队列 BlockingQueue workQueue
上⾯我们已经介绍过了,它⼀般分为直接提交队列、有界任务队列、⽆界任务队列、优先任务队列、延迟队列;
BlockingQueue 的操作⽅法
阻塞队列提供了四组不同的⽅法⽤于插⼊、移除、检查元素:
⽅法\处理⽅式抛出异常返回特殊值⼀直阻塞超时退出插⼊⽅法add(e)offer(e)put(e)offer(e,time,unit)
移除⽅法remove()poll()take()poll(time,unit)
检查⽅法element()peek()--生于忧患死于安乐教案
抛出异常:如果试图的操作⽆法⽴即执⾏,抛异常。当阻塞队列满时候,再往队列⾥插⼊元素,会抛出
**IllegalStateException(“Queue full”)**异常。当队列为空时,从队列⾥获取元素时会抛出NoSuchElementException异常 。
返回特殊值:如果试图的操作⽆法⽴即执⾏,返回⼀个特殊值,通常是true / fal。
⼀直阻塞:如果试图的操作⽆法⽴即执⾏,则⼀直阻塞或者响应中断。
超时退出:如果试图的操作⽆法⽴即执⾏,该⽅法调⽤将会发⽣阻塞,直到能够执⾏,但等待时间不会超过给定值。返回⼀个特定值以告知该操作是否成功,通常是true / fal。
注意之处 :
不能往阻塞队列中插⼊null,会抛出空指针异常。
可以访问阻塞队列中的任意元素,调⽤remove(o)可以将队列之中的特定对象移除,但并不⾼效,尽量避免使⽤。SynchronousQueue 直接提交队列
1、直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是⼀个特殊的BlockingQueue,它没有容量,m每执⾏⼀个插⼊操作就会阻塞,需要再执⾏⼀个删除操作才会被唤醒,反之每⼀个删除操作也都要等待对应的插⼊操作。生态建筑
public class ThreadPool {
鸡八
private static ExecutorService pool;
public static void main( String[] args )
{
//maximumPoolSize设置为2 ,⼩于线程数3,拒绝策略为AbortPolic策略,直接抛出异常
pool =new ThreadPoolExecutor(1,2,1000, TimeUnit.MILLISECONDS,new SynchronousQueue<Run
nable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<3;i++){
}
}
}
public class ThreadTask implements Runnable{
public ThreadTask(){
}
@Override
public void run(){
System.out.println(Thread.currentThread().getName());
}
}
输出结果为:
pool-1-thread-1
pool-1-thread-2
4月4日Exception in thread “main” urrent.RejectedExecutionException: Task
st.ThreadTask@55f96302 rejected from urrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
at urrent.jectedExecution(Unknown Source)
at ject(Unknown Source)
at ute(Unknown Source)
at st.ThreadPool.main(ThreadPool.java:17)
可以看到,当任务队列为SynchronousQueue,创建的线程数⼤于maximumPoolSize时,直接执⾏了拒绝策略抛出异常。