线程池详解---线程复⽤、线程销毁
1.定义
本⽂是基于jdk1.6对线程池(ThreadPoolExecutor)进⾏线程池执⾏Runnable任务主过程的剖析,jdk1.8及更⾼版本基本原理与1.6类似,但1.6主流程简单,没有进⾏太多优化,易于学习。
在这⾥我们需要知道⼀些基本的常识,这是我们进⾏后续剖析源码的基础。
1.1.多态
public class People {
public void say(){
System.out.println("");
}
public static void main(String[] args){
People p =new Student();
imply
p.say();
}
}
class Student extends People {
@Override
public void say(){
System.out.println("");
}
}
// 输出结果
//
/
/ ⼦类继承⽗类,实现⽗类的⽅法, People p = new Student();此处虽然是People指针p,但是实际创建的对象
// 是Student。p.say();其实程序⾛的是Student的say⽅法
1.2.实现线程任务的⽅法
// 1.实现Runnable接⼝
Runnable r =new MyRunnable();
Thread t =new Thread(r);
t.start();
nirvana什么意思// 这⾥是新创建了⼀个线程,然后⽤新创建的线程执⾏MyRunnable中的run⽅法,这是java底层⾃⼰实现的
// 这⾥还有⼀个执⾏MyRunnable任务的⽅法,那就是直接r.run(),但是这么执⾏的话不是新创建的线程执⾏的
/
/ runnable中的run(),⽽是系统当前线程执⾏的run(),这个点在ThreadPoolExecutor执⾏runnable的时候多
// 次使⽤,需要重点注意
// 2.实现callable接⼝
Callable caller =new MyCallable();
ThreadPoolExecutor executor =new ThreadPoolExecutor();
executor.submit(caller);
// 实现Callable需要实现⼀个call()⽅法,然⽽执⾏Callable这个任务的时候,需要当前线程⼿动去调Callable的
// call()⽅法,线程池多次使⽤使⽤这种⽅式,去实现线程执⾏
1.3.阻塞队列
// 当然这⾥BlockingQueue有7个实现类
股份有限公司英文BlockingQueue queue =new BlockingQueue();
//这⾥,如果queue中没有数据,当前线程会⼀直阻塞在这⾥,但是当前线程可以被别的线程interrupt(),之后会抛出
// InterruptedException
queue.take();
// 这⾥,如果queue中没有数据,当前线程会阻塞在这⾥10s后返回null,如果这在10s中有别的线程往queue放东西
// 了,那么queue就会解除阻塞,返回queue中的数据.
甜美的英文歌曲
queue.poll(10, TimeUnit.SECONDS);
2.线程池提交任务的⽅式
ute⽅式
该⽅式执⾏完runnable是没有返回值的,也不能尝试取消线程执⾏的任务(对于当前正在执⾏的线程并
不能真正的实现线程停⽌)。
2.2.submit⽅式
该线程执⾏完runnable是有返回值的,也能尝试取消线程执⾏的任务,其实只能停⽌那些正在阻塞获取任务的线程,⽽且应保证如果有while(true)的话try{}catch(InterruptedException e){}在外⾯.
public class TestRunnable implements Runnable{
BlockingQueue<String> queue;
@Override
public void run(){
// 这样使⽤执⾏该runnable的线程t打断的时候才能终⽌该任务的执⾏
try{
while(true){
queue.take();
// 业务代码
}
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
⼤家看下ThreadPoolExecutor和AbstractExecutorService的关系,ThreadPoolExecutor是AbstractExecutorService的⼦类,我们通常创建线程池是new ThreadPoolExecutor()。ThreadPoolExecutor.submit()的时候是调⽤的⽗类的
AbstractExecutorService.submit()⽅法,执⾏execute的时候是调⽤的⾃⼰的⽅法
// 这三个⽅法不管是传进来的是Runnble还是Callable,线程池都要把任务转换成RunnableFuture,其实是其实现
// 类FutureTask,后续线程任务取消的时候就是通过这个⽅法来取消的.
// 线程池任务执⾏分两部分
// 1.线程池执⾏任务
// 2.任务监测(FutureTask),线程任务取消的详细分解见我另外的博客
public Future<?>submit(Runnable task){
if(task == null)
throw new NullPointerException();
RunnableFuture<Object> ftask =newTaskFor(task, null);
execute(ftask);
return ftask;
}
public<T> Future<T>submit(Runnable task, T result){
if(task == null)
throw new NullPointerException();
RunnableFuture<T> ftask =newTaskFor(task, result);
execute(ftask);
return ftask;
}
public<T> Future<T>submit(Callable<T> task){
if(task == null)
throw new NullPointerException();
RunnableFuture<T> ftask =newTaskFor(task);
execute(ftask);
return ftask;
英语四级分值分布}
3.线程池执⾏线程任务
古奇英文线程池执⾏线程任务的流程如下:
线程池的当前线程数⼩于核⼼线程数的时候,线程池创建线程并执⾏线程池提交的任务
线程池的当前线程数等于核⼼线程数的时候,线程池会将线程任务提交到线程池中的任务队列⾥,核⼼线程执⾏完当前的任务时,会从任务队列继续取任务去执⾏
线程池的当前线程数等于核⼼线程数的时候,并且核⼼线程都在执⾏线程任务的时候,⽽且线程池中的任务队列满了的时候,这时会创建新的线程(⼩于线程池最⼤线程数),执⾏刚刚被线程池提交的任务,执⾏完该任务之后,才会获取积压在任务队列的中的任务线程池的当前线程数等于最⼤线程数,⽽且线程池的任务队列已经满了,这时需要给⼀个拒绝策略,⼀般这个策略需要⾃定义,并记录下那些任务被拒绝了。
3.1.线程池基本参数
现在我们看下线程池的核⼼参数
public ThreadPoolExecutor(
// 核⼼线程数
int corePoolSize,
// 最⼤线程数
int maximumPoolSize,
// ⾮核⼼线程数允许保留的时间
long keepAliveTime,
// 和keepAliveTime配对,决定单位是秒、分钟等
TimeUnit unit,
// 任务队列
BlockingQueue<Runnable> workQueue,
// 线程⼯程,创建新线程⽤的,不必过于深究
ThreadFactory threadFactory,
// 我们⾃定义的拒绝策略,或者ThreadPoolExecutor提供的默认拒绝策略,默认策略⾃⼰下去看下源码
RejectedExecutionHandler handler){
}
3.2.线程执⾏详解
public void execute(Runnable command){
if(command == null)
throw new NullPointerException();
// 这⾥有两个条件判断
// 1.如果当前线程池的线程数量 >= 核⼼线程数量,当线程池线程数量为空的时候,很明显这个表达式为fal,
// 会进⼊2号表达式;如果当线程数量 >= 核⼼线程数量为true时,明显不会再⾛2号表达式。这是断路或表达式语义。// 2.当1号表达式为fal时,会进⼊到addIfUnderCorePoolSize(command)。
// 1)当线程数量 < 核⼼线程数量,这个⽅法会创建⼀个线程并执⾏cammand(Runnable),并返回true;如4 6级
//果没有后续任务提交进来,那么该线程会阻塞在阻塞队列的poll()⽅法上,之后在永久阻塞在take()⽅法上。
// 2)当线程数量 = 核⼼线程数量,这个⽅法会返回fal,然后在下⼀个if⾥⾯该任务会被
// workQueue.offer(command),放到⼯作队列中。或许有⼈会问:1号表达式不是已经有判断poolSize >=
// corePoolSize,当poolSize = corePoolSize时,不是不会⾛2号表达式吗?好的,带着悬念我们来看下
// addIfUnderCorePoolSize(command)做了些什么。
if(poolSize >= corePoolSize ||!addIfUnderCorePoolSize(command)){
//如果状态为RUNNING,⽽且当前的runnable还能放进workQueue⾥
if(runState == RUNNING && workQueue.offer(command)){
// 如果submit或者execute任务的时候,线程池被shutdown了,需要把刚才提交的任务从⼯作队列中
// 清除出去。
if(runState != RUNNING || poolSize ==0)
ensureQueuedTaskHandled(command);
//如果当前线程数(poolSize) > 最⼤线程数(maximumPoolSize) 或者状态不是RUNNING
}el if(!addIfUnderMaximumPoolSize(command))
//根据策略选择怎么拒绝该RUNNABLE command
reject(command);// is shutdown or saturated
英语语音学习}
}
3.2.1.addIfUnderCorePoolSize解析
private boolean addIfUnderCorePoolSize(Runnable firstTask){
Thread t = null;
final ReentrantLock mainLock =this.mainLock;
// 这⾥获取锁,⼀个ThreadPoolExecutor只有这⼀个锁,多个线程同时⾛这⼀个⽅法的时候只能顺序进⼊,等// 第⼀个线程unlock()了,第⼆个线程才能进来
mainLock.lock();
try{
//如果当前线程的数量 < 核⼼线程数量。之前的悬念在这,理论上说当poolSize = corePoolSize,应该
// 不会⾛到这⾥来啊?这⾥还要过滤下poolSize = corePoolSize的情况,并返回fal呢?细⼼的同学
// 应该发现了,没错那就是锁。在execute⽅法中判断poolSize >= corePoolSize的时候,有可能第⼀
// 个线程还没有将⾃⼰新创建出来的线程累加到线程池的当前线程数上(poolSize),这时execute⽅法中判// 断poolSize有可能会⽐实际的少。那么问题来了,为什么这⾥能判断呢?因为这⾥有锁的存在,第⼀个线// 程当mainLock.unlock();未执⾏的时候,第⼆个线程只能在mainLock.lock(); 位置等待。⽽且本⽅
// 法结束了,会将poolSize+1的,请继续往下看
if(poolSize < corePoolSize && runState == RUNNING)
// 在这⾥,将firstTask封装到了worker中,其实worker是⼀个实现了runnable接⼝的类,然后把汽车空滤
// worker放到⼀个HashSet<Worker>中去持有,这个HashSet<Worker>是ThreadPoolExecutor
// ⼀个成员变量。装来装去作⽤有两个
// 1)线程防⽌被GC回收
// 2)超过核⼼线程数的线程可以从HashSet<Worker>移除,以实现线程的销毁
// 接下来看看addThread(firstTask)做了些什么
t =addThread(firstTask);
}finally{
mainLock.unlock();
}
if(t == null)
return fal;
/
/ 这⾥就是新线程启动,并执⾏worker的run⽅法
t.start();
return true;
}
3.2.1.1.addThread解析
private Thread addThread(Runnable firstTask){
// 如果是通过ute执⾏的我们runnable,那么是将封装到runnable了Worker
// 内,如果是threadPoolExecutor.submit执⾏的我们runnable,那么是将我们FutureTask封装到了Worker内。 Worker w =new Worker(firstTask);
// 这⾥是根据Worker创建线程,其实Worker也是Runnable
// 这⾥可以理解为Thread t = new Thread(new MyRunnable())
/
/ 可想⽽知,当t.start()的时候,会不会去启动⼀个新的线程去MyRunnable的run⽅法吗?这⾥也类似,请看// Worker的数据结构,见3.2.1.2:
Thread t = wThread(w);
if(t != null){
马达加斯加3 下载
w.thread = t;
workers.add(w);
int nt =++poolSize;
if(nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
3.2.1.2.Worker解析