ThreadPoolExecutor最佳实践--如何选择队列
系列⽂章:
前⼀篇⽂章《》讲了如何决定线程池中线程个数,这篇⽂章讨论“如何选择⼯作队列”。
再次强调⼀下,ThreadPoolExecutor最核⼼的四点:
1、当有任务提交的时候,会创建核⼼线程去执⾏任务(即使有核⼼线程空闲);
2、当核⼼线程数达到corePoolSize时,后续提交的都会进BlockingQueue中排队;
3、当BlockingQueue满了(offer失败),就会创建临时线程(临时线程空闲超过⼀定时间后,会被销毁);
4、当线程总数达到maximumPoolSize时,后续提交的任务都会被RejectedExecutionHandler拒绝。
1、BlockingQueue
线程池中⼯作队列由BlockingQueue实现类提供功能,BlockingQueue定义了这么⼏组⽅法:
Summary of BlockingQueue methods
Throws exception Special value Blocks Times out Inrt add(e)offer(e)put(e)offer(e, time, unit) Remove remove()poll()take()poll(time, unit) Examine element()peek()not applicable not applicable 阻塞队列是最典型的模型:
⽣产者调⽤put()⽅法将⽣产的元素⼊队,消费者调⽤take()⽅法;
屠呦呦获奖当队列满了,⽣产者调⽤的put()⽅法会阻塞,直到队列有空间可⼊队;
当队列为空,消费者调⽤的get()⽅法会阻塞,直到队列有元素可消费;
但是需要⼗分注意的是:ThreadPoolExecutor提交任务时使⽤offer⽅法(不阻塞),⼯作线程从队列取任务使⽤take⽅法(阻塞)。正是因为ThreadPoolExecutor使⽤了不阻塞的offer⽅法,所以当队列容量
已满,线程池会去创建新的临时线程;同样因为⼯作线程使⽤take()⽅法取任务,所以当没有任务可取的时候线程池的线程将会空闲阻塞。
事实上,⼯作线程的超时销毁是调⽤offer(e, time, unit)实现的。
2、JDK提供的阻塞队列实现
JDK中提供了以下⼏个BlockingQueue实现类:
[外链图⽚转存失败,源站可能有防盗链机制,建议将图⽚保存下来直接上传(img-peKjQQHG-1579267322131)
(/plantuml/svg/SoWkIImgAStDuUBAp2j9BKfBJ4vLSCh9JyxEp4iFB4qjJUNYGk4gsEZgAZWM5ILM eWXZKHHScPUSKPIVbrzQZ4k9JsPUTceA8ODSKdCIAt591XHbvXTbbbIYkTaXDIy5w2i0)]
2.1、
这是⼀个由数组实现的容量固定的有界阻塞队列。这个队列的实现⾮常简单:
private void enqueue(E x){
final Object[] items =this.items;
items[putIndex]= x;// ⼊队
if(++putIndex == items.length)// 如果指针到了末尾9画属木的字
putIndex =0;// 下⼀个⼊队的位置变为0
count++;
notEmpty.signal();// 提醒消费者线程消费
}
private E dequeue(){
final Object[] items =this.items;
@SuppressWarnings("unchecked")
E x =(E) items[takeIndex];
items[takeIndex]=null;// 出队置空
if(++takeIndex == items.length)// 如果指针到了末尾
takeIndex =0;// 下⼀个出队的位置变为0
count--;
if(itrs !=null)
itrs.elementDequeued();
notFull.signal();// 提醒⽣产者线程⽣产
怎样学好初中语文
return x;
}
通过简单的指针循环实现了⼀个环形队列:
下⾯有⼀张维基百科关于环形缓冲区的的动画,虽然动画描述内容与ArrayBlockingQueue实现有所差异,但贵在⽣动形象(着实找不到更好的动画了)。
ArrayBlockingQueue主要复杂在迭代,允许迭代中修改队列(删除元素时会更新迭代器),并不会抛出
ConcurrentModificationException;好在⼤多数场景中我们不会迭代阻塞队列。
2.2、
这是⼀个⾮常有意思的集合,更准确的说它并不是⼀个集合容器,因为它没有容量。你可以“偷偷地”把它看作new ArrayBlockingQueue(0),之所以⽤"偷偷地"这么龌龊的词,⾸先是因为ArrayBlockingQueue在capacity<1时会抛异常,其
次ArrayBlockingQueue(0)并不能实现SynchronousQueue这么强⼤的功能。
正如SynchronousQueue的名字所描述⼀样——“同步队列”,它专门⽤于⽣产者线程与消费者线程之间的同步:
因为它任何时候都是空的,所以消费者线程调⽤take()⽅法的时候就会发⽣阻塞,直到有⼀个⽣产者线程⽣产了⼀个元素,消费者线程就可以拿到这个元素并返回。
同样的,你也可以认为任何时候都是满的,所以⽣产者线程调⽤put()⽅法的时候就会发⽣阻塞,直到有⼀个消费者线程消费了⼀个元素,⽣产者才会返回。
另外还有⼏点需要注意:
SynchronousQueue不能遍历,因为它没有元素可以遍历;
所有的阻塞队列都不允许插⼊null元素,因为当⽣产者⽣产了⼀个null的时候,消费者调⽤poll()返回null,⽆法判断是⽣产者⽣产了⼀个null元素,还是队列本⾝就是空。
CachedThreadPool使⽤的就是同步队列:
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,女性名人名字大全
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
因为SynchronousQueue⽆容量的特性,所以CachedThreadPool不会对任务进⾏排队,如果线程池中没有空闲线
程,CachedThreadPool会⽴即创建⼀个新线程来接收这个任务。
所以使⽤CachedThreadPool要注意避免提交长时间阻塞的任务,可能会由于线程数过多⽽导致内存溢出(OutOfOutOfMemoryError)。
2.3、
这是⼀个由单链表实现的默认⽆界的阻塞队列。LinkedBlockingQueue提供了⼀个可选有界的构造函数,⽽在未指明容量时,容量默认为Integer.MAX_VALUE。
按照官⽅⽂档的说法LinkedBlockingQueue是⼀种可选有界(optionally-bounded)阻塞队列。
SingleThreadPool和FixedThreadPool使⽤的就是LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
百泉风景区public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory){创维电视好吗
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor(){
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory){
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
中国十大轮胎品牌排行榜}
因为FixedThreadPool使⽤⽆界的LinkedBlockingQueue,所以当没有线程空闲时,新提交的任务都会提交到阻塞队列中,由于队列永远也不会满,FixedThreadPool永远也不会创建新的临时线程。
但是需要注意的是,不要往FixedThreadPool提交过多的任务,因为所有未处理的任务都会到LinkedBlockingQueue中排队,队列中任务过多也可能会导致内存溢出。虽然这个过程会⽐较缓慢,因为队列中的请求所占⽤的资源⽐线程占⽤的资源要少得多。梅花摄影
2.4、其他队列
DelayQueue和PriorityBlockingQueue底层都是使⽤⼆叉堆实现的优先级阻塞队列。
区别在于:
前者要求队列中的元素实现Delayed接⼝,通过执⾏时延从队列中提取任务,时间没到任务取不出来;
后者对元素没有要求,可以实现Comparable接⼝也可以提供Comparator来对队列中的元素进⾏⽐较,跟时间没有任何关系,仅仅是按照优先级取任务。
当我们提交的任务有优先顺序时可以考虑选⽤这两种队列
事实上。
除了这两个,BlockingQueue还有两个⼦接⼝BlockingDeque(双端阻塞队列),TransferQueue(传输队列)
并且两个接⼝都有⾃⼰唯⼀的实现类:
LinkedBlockingDeque:使⽤双向队列实现的双端阻塞队列,双端意味着可以像普通队列⼀样FIFO(先进先出),可以以像栈⼀样FILO(先进后出)
LinkedTransferQueue:,但是把它⽤在ThreadPoolExecutor中,和⽆限制的LinkedBlockingQueue⾏为⼀致。
3、让⽣产者阻塞的线程池
前⾯说到CachedThreadPool和FixedThreadPool都有可能导致内存溢出,前者是由于线程数过多,后者是由于队列任务过多。⽽究其根本就是因为任务⽣产速度远⼤于线程池处理任务的速度。
所以有⼀个想法就是让⽣产任务的线程在任务处理不过来的时候休息⼀会⼉——也就是阻塞住任务⽣产者。
但是前⾯提到过ThreadPoolExecutor内部将任务提交到队列时,使⽤的是不阻塞的offer⽅法。
我提供的第⼀种⽅式是:重写offer⽅法把它变成阻塞式。
3.1、重写BlockingQueue的offer
这种处理⽅式是将原来⾮阻塞的offer覆盖,使⽤阻塞的put⽅法实现。