parallelstream启动的线程数_线程池源码详细解读(下)
前⽂回顾
AQS源码详细解读
ReentrantLock源码详细解读
LinkedBlockingQueue源码详细解读
线程池源码详细解读(上)
接着上⼀篇⽂章,知道线程池的⼀些相关概念后,⼀起来看看实现原理吧。
本⽂讲述ThreadPoolExecutor源码,⼒求理清执⾏顺序,尽量保持思路清晰,请耐⼼看完~
⽂章导读
内部类-Worker(基本属性,构造⽅法,AQS相关钩⼦⽅法,线程中断⽅法)
提交任务(execute,addWorker)
执⾏任务(runWorker)
关闭⽅法(tryTerminated,shutdown,shutdownNow)
⼀、内部类-Worker
Worker表⽰线程池中的每⼀个任务,与线程⼀⼀对应。是AQS的⼦类,实现其独占模式,封装⼀些了对于资源操作的⽅法。
1.1 基本属性
重要的是thread(当前worker线程),firstTask(初始任务),completedTasks(任务计数器)。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
目光如炬的意思/**
* 这个类永远不会被序列化,设置rialVersionUID
* 是为了停⽌javac编译器的警告
*/
private static final long rialVersionUID = 6138294804551838833L;
//表⽰⼀个⼯作线程,null则说明线程⼯⼚创建出错了
final Thread thread;
//需要运⾏的初始任务,可能为空
Runnable firstTask;
//每个线程的任务计数器,表⽰完成的任务数量
volatile long completedTasks;
......
}
1.2 构造⽅法
Worker在第⼀次接收任务的时候被线程⼯⼚创建,其中成员变量thread就是基于Worker的线程。
Worker(Runnable firstTask) {
//设置AQS.state为-1表⽰在运⾏之前禁⽌被中断
tState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
1.3 对AQS相关⽅法的实现
Worker既然继承了AbstractQueuedSynchronizer,就⼀定会有相关钩⼦⽅法的实现。钩⼦⽅法是isHeldExclusively(),tryAcquire(int unud),tryRelea(int unud)。⽽lock(),tryLock(),unlock(),isLocked()都是对他们的进⼀步封装,⾮常的简练。如果有兴趣可以回忆回忆ReentrantLoc
k都是怎么实现的,⽐较⼀下区别。
state表⽰当前线程的运⾏状态,总共有3种情况:
-1,表⽰在运⾏之前禁⽌被中断。
0,表⽰锁没有被任何线程获取。
1,表⽰锁已经被占有。
//是否持有独占锁,根据state判断
//state 0:表⽰锁没有被任何线程获取
//state 1:表⽰锁已经被占有
protected boolean isHeldExclusively() {
return getState() != 0;
}
/
夏天是什么样的
/尝试获取锁,成功后设置当前线程为锁的持有者
明信片设计
protected boolean tryAcquire(int unud) {
if (compareAndSetState(0, 1)) {
tExclusiveOwnerThread(Thread.currentThread());
return true;
}
return fal;
}
//释放锁
protected boolean tryRelea(int unud) {
tExclusiveOwnerThread(null);
tState(0);
return true;
}
//会先调⽤tryAcquire(1),若拿不到锁则阻塞获取锁资源
public void lock() { acquire(1); }
//尝试获取锁,不会阻塞
public boolean tryLock() { return tryAcquire(1); }
//会先调⽤tryRelea(1),若释放成功则去等待队列从队尾向前找下⼀个需要唤醒的节点
public void unlock() { relea(1); }
public boolean isLocked() { return isHeldExclusively(); }
1.4 对线程的中断⽅法
已经运⾏了。需要注意的是,Worker的interruptIfStarted():⽤于中断⼯作线程,保证要中断的thread必须是已经初始化完成
初始化完成的,⽽且已经运⾏了
构造⽅法中将state设置为-1。
void interruptIfStarted() {
Thread t;
//确保线程已经运⾏并且中断标志为还是fal时,就执⾏中断操作。
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
⼆、提交任务
execute(Runnable command):总共分为3个步骤。
1)如果⼯作线程数<;核⼼线程数,则每次有新任务来,都创建⼀个新的线程来处理。调⽤addWorker()⾃动检查线程池状态和⼯作线程数,可以防⽌⼀些错误,⽐如不该创建线程的时候创建线程。
2)当任务成功⼊队后,我们仍然需要双重检查机制(double-check),检查是否真的需要 添加⼀个线程。因为某个线程挂了就需要检查,或者进⼊这个⽅法后线程池已经关闭了。所以需要再次检查 线程的状态,如果线程池关闭了就有必要回滚进⼊阻塞队列,或没有线程时启动⼀个新的线程。
3)如果⽆法将任务⼊队,就尝试创建⼀个线程。如果创建失败了,就表⽰线程池已经关闭或饱和了 ,就执⾏拒绝策略。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ();
//如果⼯作线程数<;核⼼线程数,则创建新线程执⾏任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ();
}
//此时⼯作线程数已经>=核⼼线程数了,如果线程池运⾏则将任务加⼊阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ();
//1.如果线程池不是RUNNING状态,且成功从阻塞队列中删除任务,则该任务由当前 RejectedExecutionHandler 处理。
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.否则如果线程池中运⾏的线程数量为0,则通过addWorker(null, fal)尝试新建⼀个线程,
//新建线程对应的任务为null。
el if (workerCountOf(recheck) == 0)
addWorker(null, fal);
}
el if (!addWorker(command, fal))
//添加任务失败,执⾏拒绝策略
reject(command);
}
addWorker(Runnable firstTask, boolean core):创建⼯作线程,成功则返回true,并启动线程;失败则返回fal,并从⼯作线程集合中删除。⽅法⽐较长,我把主要过程先理⼀下:
1)⾸先判断线程池的状态,如果已经处于⾮运⾏状态,就看是否满⾜关闭状态或任务为空或阻塞队列为空。即创建失败。
2)将当前⼯作线程与核⼼线程数或最⼤线程数⽐较,如果当前线程数⽐较⼤,就创建失败。
3)加互斥锁,再次判断异常情况,若出现线程池状态异常的情况,则添加失败,从⼯作线程集合中移除 。
4)确定⽆误后创建新的⼯作线程,添加成功后就启动线程。
再次回顾⼀下线程池的状态:
RUNNING,-1
SHUTDOWN,0
STOP,1
TIDYING,2
TERMINATED,3
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ();
int rs = runStateOf(c);
// 当线程状态为⾮运⾏状态,并且满⾜关闭状态或任务为空或阻塞队列为空时
/
/⼯作线程创建失败,返回fal
if (rs >= SHUTDOWN &&!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return fal;
for (;;) {
int wc = workerCountOf(c);
//当core==true,会将⼯作线程数与核⼼线程数⽐较;
//当core==fal,将⼯作线程数与最⼤线程数⽐较
//当前线程数⼤于等于对应的⼯作线程数或核⼼线程数,直接返回fal
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return fal;
/
/如果CAS操作新增⼯作线程数成功了,就跳出外⾯的循环
//失败了则继续循环,⾃旋尝试
if (compareAndIncrementWorkerCount(c))
break retry;
//刷新ctl
c = ();
//当前线程池状态与之前获取的不⼀样则说明状态改变了
//跳回去刷新线程池状态,和之前的⼀系列关于创建的判断
if (runStateOf(c) != rs)
continue retry;
}
陈皮的药用价值及用途}
boolean workerStarted = fal;
boolean workerAdded = fal;
Worker w = null;
try {
//创建worker内部类
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
周瑜人物形象分析
try {
//重新检查,检查是否线程⼯⼚创建失败或线程池在获取锁之前关闭了
int rs = ());
//当线程池处于运⾏状态或者处于关闭状态且任务为空
//就将当前worker添加进去
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {爬山虎的脚说课稿
//再次检查线程是否已启动
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
/
/记录线程池出现过的最⼤线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//添加成功后就启动线程,具体怎么执⾏的留到下⼀节讨论
t.start();
workerStarted = true;
}
}
} finally {
//添加失败,从⼯作线程集合中移除
if (! workerStarted)黼国黻家
addWorkerFailed(w);
}上数学课的英文
return workerStarted;
}
这两个⽅法整体概括⼀下,可以得到线程池对于⼯作线程数量的控制策略的⼀个策略
⼯作线程数量的控制策略的⼀个策略:如果⼯作线程数 < 核⼼线程数,则创建新的线程处理请求。