ThreadPoolExecutor线程池原理及其execute方法(详解)

更新时间:2023-06-30 11:08:31 阅读: 评论:0

ThreadPoolExecutor线程池原理及其execute⽅法(详解)
jdk1.7.0_79
virtools对于线程池⼤部分⼈可能会⽤,也知道为什么⽤。⽆⾮就是任务需要异步执⾏,再者就是线程需要统⼀管理起来。对于从线程池中获取线程,⼤部分⼈可能只知道,我现在需要⼀个线程来执⾏⼀个任务,那我就把任务丢到线程池⾥,线程池⾥有空闲的线程就执⾏,没有空闲的线程就等待。实际上对于线程池的执⾏原理远远不⽌这么简单。
在Java并发包中提供了线程池类——ThreadPoolExecutor,实际上更多的我们可能⽤到的是Executors⼯⼚类为我们提供的线程池:newFixedThreadPool、newSingleThreadPool、newCachedThreadPool,这三个线程池并不是ThreadPoolExecutor的⼦类,关于这⼏者之间的关系,我们先来查看ThreadPoolExecutor,查看源码发现其⼀共有4个构造⽅法。public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
⾸先就从这⼏个参数开始来了解线程池ThreadPoolExecutor的执⾏原理。
corePoolSize:核⼼线程池的线程数量
maximumPoolSize:最⼤的线程池线程数量
keepAliveTime:线程活动保持时间,线程池的⼯作线程空闲后,保持存活的时间。
unit:线程活动保持时间的单位。
workQueue:指定任务队列所使⽤的阻塞队列
corePoolSize和maximumPoolSize都在指定线程池中的线程数量,好像平时⽤到线程池的时候最多就只需要传递⼀个线程池⼤⼩的参数就能创建⼀个线程池啊,Java为我们提供了⼀些常⽤的线程池类就是上⾯提到的newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,当然如果我们想要⾃⼰发挥创建⾃定义的线程池就得⾃⼰来“配置”有关线程池的⼀些参数。
当把⼀个任务交给线程池来处理的时候,线程池的执⾏原理如下图所⽰参考⾃《Java并发编程的艺术》
①⾸先会判断核⼼线程池⾥是否有线程可执⾏,有空闲线程则创建⼀个线程来执⾏任务。
②当核⼼线程池⾥已经没有线程可执⾏的时候,此时将任务丢到任务队列中去。
alonso
③如果任务队列(有界)也已经满了的话,但运⾏的线程数⼩于最⼤线程池的数量的时候,此时将会新建⼀个线程⽤于执⾏任务,但如果运⾏的线程数已经达到最⼤线程池的数量的时候,此时将⽆法创建线程执⾏任务。
所以实际上对于线程池不仅是单纯地将任务丢到线程池,线程池中有线程就执⾏任务,没线程就等待。
为巩固⼀下线程池的原理,现在再来了解上⾯提到的常⽤的3个线程池:
// Executors#newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
可以看到newFixedThreadPool中调⽤的是ThreadPoolExecutor类,传递的参数corePoolSize= maximumPoolSize=nThread。回顾线程池的执⾏原理,当⼀个任务提交到线程池中,⾸先判断核⼼线程池⾥有没有空闲线程,有则创建线程,没有则将任务放到任务队列(这⾥是有界阻塞队列LinkedBlockingQueue)中,如果任务队列已经满了的话,对于newFixedThreadPool来说,它的最⼤线程池数量=核⼼线程池数量,此时任务队列也满了,将不能扩展创建新的线程来执⾏任务。
//Executors# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
只有⼀个线程的线程池好像有点奇怪,并且并没有直接将返回ThreadPoolExecutor,甚⾄也没有直接将线程池数量1传递给newFixedThreadPool返回。那就说明这个只含有⼀个线程的线程池,或许并没有只包含⼀个线程那么简单。在其源码注释中这么写到:创建只有⼀个⼯作线程的线程池⽤于操作⼀
个⽆界队列(如果由于前驱节点的执⾏被终⽌结束了,⼀个新的线程将会继续执⾏后继节点线程)任务得以继续执⾏,不同于newFixedThreadPool(1)不会有额外的线程来重新继续执⾏后继节点。也就是说newSingleThreadExecutor⾃始⾄终都只有⼀个线程在执⾏,这和newFixedThreadPool⼀样,但如果线程终⽌结束过后newSingleThreadExecutor则会重新创建⼀个新的线程来继续执⾏任务队列中的线程,⽽newFixedThreaPool则不会。
//Executors#newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
  return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
可以看到newCachedThread返回的是ThreadPoolExecutor,其参数核⼼线程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,这也就是说当任务被提交到newCachedThread线程池时,将会直接把任务放到SynchronousQueue任务队列中,maximumPool从任务队列中获取任务。注
意SynchronousQueue是⼀个没有容量的队列,也就是说每个⼊队操作必须等待另⼀个线程的对应出队操作,如果主线程提交任务的速度⾼于maximumPool中线程处理任务的速度时,newCachedThreadPool会不断创建线程,线程多并不是⼀件好事,严重会耗尽CPU和内存资源。
题外话:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,这三者都直接或间接调⽤了ThreadPoolExecutor,为什么它们三者没有直接是其⼦类,⽽是通过Executors来实例化呢?这是所采⽤的静态⼯⼚⽅法,在java.util.Connections接⼝中同样也是采⽤的静态⼯⼚⽅法来创建相关的类。这样有很多好处,静态⼯⼚⽅法是⽤来产⽣对象的,产⽣什么对象没关系,只要返回原返回类型或原返回类型的⼦类型都可以,降低API数⽬和使⽤难度,在《Effective Java》中的第1条就是静态⼯⼚⽅法。
回到ThreadPoolExecutor,⾸先来看它的继承关系:
搭档的英文ThreadPoolExecutor它的顶级⽗类是Executor接⼝,只包含了⼀个⽅法——execute,这个⽅法也就是线程池的“执⾏”。
//Executor#execute
public interface Executor {
void execute(Runnable command);
}
Executor#execute的实现则是在ThreadPoolExecutor中实现的:
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
  if (command == null)惊喜的英文单词
throw new NullPointerException();
  int c = ();
}
⼀来就碰到个不知所云的ctl变量它的定义:
private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));sibs
这个变量使⽤来⼲嘛的呢?它的作⽤有点类似我们在《》中提到的读写锁有读、写两个同步状态,⽽AQS则只提供了state⼀个int型变量,此时将state⾼16位表⽰为读状态,低16位表⽰为写状态。这⾥的clt同样也是,它表⽰了两个概念:
workerCount:当前有效的线程数
runState:当前线程池的五种状态,Running、Shutdown、Stop、Tidying、Terminate。
int型变量⼀共有32位,线程池的五种状态runState⾄少需要3位来表⽰,故workCount只能有29位,所以代码中规定线程池的有效线程数最多为229-1。
//ThreadPoolExecutor
private static final int COUNT_BITS = Integer.SIZE – 3;  //32-3=29,线程数量所占位数
private static final int CAPACITY = (1 << COUNT_BITS) – 1; //低29位表⽰最⼤线程数,229-1
coast guard//五种线程池状态
private static final int RUNNING = -1 << COUNT_BITS; /int型变量⾼3位(含符号位)101表RUNING
private static final int SHUTDOWN = 0 << COUNT_BITS; //⾼3位000
private static final int STOP = 1 << COUNT_BITS; //⾼3位001
private static final int TIDYING = 2 << COUNT_BITS; //⾼3位010
private static final int TERMINATED = 3 << COUNT_BITS; //⾼3位011
再次回到ThreadPoolExecutor#execute⽅法:
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
  int c = (); //由它可以获取到当前有效的线程数和线程池的状态
/*1.获取当前正在运⾏线程数是否⼩于核⼼线程池,是则新创建⼀个线程执⾏任务,否则将任务放到任务队列中*/
if (workerCountOf(c) < corePoolSize){
if (addWorker(command, tre))  //在addWorker中创建⼯作线程执⾏任务
return ;
c = ();
}
/*2.当前核⼼线程池中全部线程都在运⾏workerCountOf(c) >= corePoolSize,所以此时将线程放到任务队列中*/
烛台背后
if (isRunning(c) && workQueue.offer(command)) { //线程池是否处于运⾏状态,且是否任务插⼊任务队列成功
int recheck = ();
    if (!isRunning(recheck) && remove(command))  //线程池是否处于运⾏状态,如果不是则使刚刚的任务出队
      reject(command); //抛出RejectedExceptionException异常
    el if (workerCountOf(recheck) == 0)
      addWorker(null, fal);
  }
/*3.插⼊队列不成功,且当前线程数数量⼩于最⼤线程池数量,此时则创建新线程执⾏任务,创建失败抛出异常*/
  el if (!addWorker(command, fal)){
    reject(command); //抛出RejectedExceptionException异常
  }
}
上⾯代码注释第7⾏的即判断当前核⼼线程池⾥是否有空闲线程,有则通过addWorker⽅法创建⼯作线程执⾏任务。addWorker⽅法较长,筛选出重要的代码来解析。
//ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/*⾸先会再次检查线程池是否处于运⾏状态,核⼼线程池中是否还有空闲线程,都满⾜条件过后则会调⽤compareAndIncrementWorkerCount先将正在运⾏的线程数+1,数量⾃增成功则跳出循环,⾃增失败则继续从头继续循环*/  ...
  if (compareAndIncrementWorkerCount(c))
    break retry;
excellent是什么意思
  ...
/*正在运⾏的线程数⾃增成功后则将线程封装成⼯作线程Worker*/
  boolean workerStarted = fal;
  boolean workerAdded = fal;
  Worker w = null;
  try {
    final ReentrantLock mainLock = this.mainLock;  //全局锁
    w = new Woker(firstTask);  //将线程封装为Worker⼯作线程
    final Thread t = w.thread;
socks怎么读    if (t != null) {
      mainLock.lock(); //获取全局锁
/
*当持有了全局锁的时候,还需要再次检查线程池的运⾏状态等*/
      try {
        int c = ();
        int rs = runStateOf(c);  //线程池运⾏状态
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){  //线程池处于运⾏状态,或者线程池关闭且任务线程为空
          if (t.isAlive()) //线程处于活跃状态,即线程已经开始执⾏或者还未死亡,正确的应线程在这⾥应该是还未开始执⾏的爱迪教育集团
            throw new IllegalThreadStateException();
          workers.add(w); //private final HashSet<Worker> wokers = new HashSet<Worker>();包含线程池中所有的⼯作线程,只有在获取了全局的时候才能访问它。将新构造的⼯作线程加⼊到⼯作线程集合中
          int s = worker.size(); //⼯作线程数量
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true; //新构造的⼯作线程加⼊成功
        }
      } finally {
        mainLock.unlock();
      }
     if (workerAdded) {
        t.start(); //在被构造为Worker⼯作线程,且被加⼊到⼯作线程集合中后,执⾏线程任务,注意这⾥的start实际上执⾏Worker中run⽅法,所以接下来分析Worker的run⽅法
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted) //未能成功创建执⾏⼯作线程
      addWorkerFailed(w); //在启动⼯作线程失败后,将⼯作线程从集合中移除
  }
  return workerStarted;
}
在上⾯第35代码中,⼯作线程被成功添加到⼯作线程集合中后,则开始start执⾏,这⾥start执⾏的是Worker⼯作线程中的run⽅法。
//ThreadPoolExecutor$Worker,它继承了AQS,同时实现了Runnable,所以它具备了这两者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  public Worker(Runnable firstTask) {
    tState(-1); //设置AQS的同步状态为-1,禁⽌中断,直到调⽤runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); //通过线程⼯⼚来创建⼀个线程,将⾃⾝作为Runnable传递传递
  }
  public void run() {
    runWorker(this); //运⾏⼯作线程
  }
}
ThreadPoolExecutor#runWorker,在此⽅法中,Worker在执⾏完任务后,还会循环获取任务队列⾥的任务执⾏(其中的getTask⽅法),也就是说Worker不仅仅是在执⾏完给它的任务就释放或者结束,它不会闲着,⽽是继续从任务队列中获取任务,直到任务队列中没有任务可执⾏时,它才退出循环完成任务。理解了以上的源码过后,往后线程池执⾏原理的第⼆步、第三步的理解实则⽔到渠成。
以上这篇ThreadPoolExecutor线程池原理及其execute⽅法(详解)就是⼩编分享给⼤家的全部内容了,希望能给⼤家⼀个参考,也希望⼤家多多⽀持。

本文发布于:2023-06-30 11:08:31,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/78/1070076.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:线程   任务   队列   创建   数量   返回   状态   继续
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图