线程池实现的思路
提前创建⼀系列的线程,保存在这个线程池中。有任务要执⾏的时候,从线程池中取出线程来执⾏。没有任务的时候,把线程放回到线程池中去。
核⼼源码实现
线程池的本质就是使⽤了⼀个线程安全的⼯作队列连接⼯作 者线程和客户端线程,客户端线程将任务放⼊⼯作队列后便返回,⽽⼯作者线程则不断地从⼯作队列上取出⼯作并执⾏。
当⼯作队列为空时,所有的⼯作者线程均等待在⼯作队列上,当有客户端提交了⼀个任务之后会通知任意⼀个⼯作者线程,随着⼤量的任务被提交,更多的⼯作者线程会被唤醒。
注意的是,核⼼线程在完成任务后不会被销毁,⽽是在循环getTask()时被阻塞队列阻塞住。只有当线程数⼤于了核⼼线程数的那些普通线程会被销毁。
构造器参数:
corePoolSize:线程池中的核⼼线程数,当提交⼀个任务时,线程池创建⼀个新线程执⾏任务,直到当前线程数等于corePoolSize,即使有其他空闲线程能够执⾏新来的任务,也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执⾏;
maximumPoolSize:线程池中允许的最⼤线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执⾏任务,前提是当前线程数⼩于maximumPool Size;当阻塞队列是⽆界队列,则maximumPoolSize则不起作⽤,因为⽆法提交⾄核⼼线程池的线程会⼀直持续地放⼊workQueue.
keepAliveTime:线程存活时间(当线程池允许线程超时且运⾏中的线程数量超过corePoolSize时,会按照此变量设置时间关闭线程)
TimeUnit:keepAliveTime的单位
百度在线翻译句子BlockingQueue<Runnable> workQueue:缓冲队列,来不及执⾏的任务存放的阻塞队列
RejectedExecutionHandler handler:拒绝处理任务类(默认:AbortPolicy 会抛异常)
AbortPolicy:直接抛出异常,默认策略;
CallerRunsPolicy:⽤调⽤者所在的线程来执⾏任务;
他们用英语改变了人生DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执⾏当前任务;
DiscardPolicy:直接丢弃任务;
当然也可以根据应⽤场景实现RejectedExecutionHandler接⼝,⾃定义饱和策略,如记录⽇志或持久化存储不能处理的任务。
threadFactory:创建线程的⼯⼚,通过⾃定义的线程⼯⼚可以给每个新建的线程设置⼀个具有识别度的线程名。默认为DefaultThreadFactory ———————————————————————————————————————————————————————————————————————————————
//构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
if(corePoolSize <0||
reynolds
maximumPoolSize <=0||
maximumPoolSize < corePoolSize ||
keepAliveTime <0)
throw new IllegalArgumentException();
if(workQueue ==null|| threadFactory ==null|| handler ==null)
throw new NullPointerException();
this.acc = SecurityManager()==null?
null:
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = Nanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor.java
private final BlockingQueue<Runnable> workQueue;//缓冲队列
private final AtomicInteger ctl =new AtomicInteger(ctlOf(RUNNING,0));//原⼦类⽤来计数
public void execute(Runnable command){
if(command ==null)
throw new NullPointerException();
//1 当前运⾏的线程数量⼩于核⼼线程数量,直接将任务加⼊worker启动运⾏。
int c = ();
if(workerCountOf(c)< corePoolSize){
if(addWorker(command,true))
return;
//如果失败,则获取最新的线程池数据
c = ();
}
connie
/*2 运⾏线程数量⼤于核⼼线程数量时,上⾯的if分⽀针对⼤于corePoolSize,并且缓存队列加⼊任务操作成功的情况。
运⾏中并且将任务加⼊缓冲队列成功,正常来说这样已经完成了处理逻辑。
但是为了保险起见,增加了状态出现异常的确认判断,如果状态出现异常会继续remove操作,如果执⾏true,则按照拒绝处理策略驳回任务;*/ //运⾏线程数量⼤于核⼼线程数量时,如果线程池仍在运⾏,则把任务放到阻塞队列中等待执⾏。
if(isRunning(c)&& workQueue.offer(command)){
int recheck = ();
//当任务成功放⼊队列时,如果recheck发现线程池已经不再运⾏了则从队列中把任务删除
if(!isRunning(recheck)&&remove(command))
//删除成功以后,会调⽤构造参数传⼊的拒绝策略。
reject(command);
el if(workerCountOf(recheck)==0)
addWorker(null,fal);
}
/*3 这⾥针对运⾏线程数量超过了corePoolSize,并且缓存队列也已经放满的情况。
estimates
注意第⼆个参数是fal,可以在下⾯addWorker⽅法看到,就是针对线程池最⼤线程数量maximumPoolSize的判断。*/
el if(!addWorker(command,fal))
//如果基于maximumPoolSize新建woker失败,此时是线程池中线程数已达到上限,队列已满,则调⽤构造参数中传⼊的拒绝策略
reject(command);
}
addWorker⽅法
private boolean addWorker(Runnable firstTask,boolean core){
// CAS+死循环实现的关于线程池状态,线程数量的校验与更新逻辑
retry:
for(;;){
int c = ();注册国际人力资源管理师
int rs =runStateOf(c);
// Check if queue empty only if necessary.
if(rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask ==null&&
! workQueue.isEmpty()))
return fal;
for(;;){
int wc =workerCountOf(c);
if(wc >= CAPACITY ||
wc >=(core ? corePoolSize : maximumPoolSize))
return fal;
if(compareAndIncrementWorkerCount(c))
break retry;
c = ();// Re-rea
d ctl
if(runStateOf(c)!= rs)
continue retry;
// el CAS failed due to workerCount change; retry inner loop
}
}
}
boolean workerStarted =fal;
boolean workerAdded =fal;
Worker w =null;
英文文章翻译try{
//把指定任务作为参数新建⼀个worker线程
w =new Worker(firstTask);
//变量t就是代表woker线程
final Thread t = w.thread;
if(t !=null){
// 线程池重⼊锁
final ReentrantLock mainLock =this.mainLock;
mainLock.lock();
try{
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs =());
if(rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask ==null)){
if(t.isAlive())// precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if(s > largestPoolSize)
largestPoolSize = s;
workerAdded =true;
}
}finally{
mainLock.unlock();
}
/
/ 线程启动,执⾏任务(Worker.thread(firstTask).start())
// 找到Worker的实现的run⽅法
if(workerAdded){
t.start();
workerStarted =true;
}
}
}finally{
if(! workerStarted)
//如果woker启动失败,则进⾏⼀些善后⼯作,⽐如说修改当前woker数量等等addWorkerFailed(w);
}
return workerStarted;
}
Worker类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long rialVersionUID =6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask){
tState(-1);// inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread =getThreadFactory().newThread(this);
}元旦ppt模板
/** Delegates main run loop to outer runWorker */
//Worker类run⽅法中调⽤了runWorker⽅法
public void run(){
runWorker(this);
}
bya是什么意思protected boolean isHeldExclusively(){
return getState()!=0;
}
protected boolean tryAcquire(int unud){
if(compareAndSetState(0,1)){
tExclusiveOwnerThread(Thread.currentThread());
return true;
}
return fal;
}
protected boolean tryRelea(int unud){
tExclusiveOwnerThread(null);
tState(0);
return true;
}
public void lock(){acquire(1);}
public boolean tryLock(){return tryAcquire(1);}
public void unlock(){relea(1);}
public boolean isLocked(){return isHeldExclusively();}
void interruptIfStarted(){
英文论坛Thread t;
if(getState()>=0&&(t = thread)!=null&&!t.isInterrupted()){
try{
t.interrupt();
}catch(SecurityException ignore){
}
}
}
}
runWorker⽅法