java线程池中的Worker解析
java线程池中的Worker解析
上⼀篇说到java线程池中添加真实的线程都是在Worker对象中完成的。今天看下Worker中是如何进⾏线程管理的。
上⼀篇说道coresize和maxsize两个池⼦的⼤⼩后,线程池会更具情况添加线程。添加线程主要依赖⽅法
addWorker(Runable command)⽅法,本篇将对addWorker⽅法进⾏详细分析。
urrent.ThreadPoolExecutor#addWorker⽅法
这⾥主要看重要的⼏⾏
w = new Worker(firstTask);
final Thread t = w.thread;
……
if (workerAdded) {
t.start();
workerStarted = true;
}
这⾥线程实际就是Worker.thread对象。下⾯看下Worker
Worker(Runnable firstTask) {
tState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
这⾥的thread来⾃在创建ThreadPoolExecutor中传⼊的ThreadFactory,该⼯⼚⽅法⽤来按照模板创建线程。即⽅法this.thread = getThreadFactory().newThread(this);
看下默认的⼯⼚newThread(this)⽅法
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + AndIncrement(),
0);
调频收音机if (t.isDaemon())
t.tDaemon(fal);
if (t.getPriority() != Thread.NORM_PRIORITY)
凉拌拍黄瓜t.tPriority(Thread.NORM_PRIORITY);
return t;
}
这⾥实际将worker对象作为runnable 对象传⼊进来,最终是new了⼀个Thread(最外层的worker的thi
s实例),因此最终t.start⽅法回调的就是这个传⼊的worker对象的run⽅法。所以直接看Worker类的run⽅法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be rialized, but we provide a
* rialVersionUID to suppress a javac warning.
*/
private static final long rialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
tState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
run⽅法⼜调⽤了runWorker(this)⽅法
ps直线工具在哪
{
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in cond ca to deal with
女孩儿名字// shutdownNow race while clearing interrupt
if (((), STOP) ||
(Thread.interrupted() &&
(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
纸短情长try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.unlock();
}
}
completedAbruptly = fal;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
1 将传⼊worker对象中的thread成员放到⼀个临时的Runnable task中,然后将引⽤置位null(即代码中的 w.firstTask)
2 w.unlock⽅法,即可以中断
3 最重要的部分while循环
while (task != null || (task = getTask()) != null)
task即worker对象构造中传⼊的runable 即上⼀篇看到的comman,即真实的任务对象
如果task不为空,则进⼊后⾯的task.run⽅法,直接调⽤Runable对象的run⽅法。这⾥解释⼀下原因:
整个动作是由⼯⼚的Thread触发的:即⼯⼚newThread出来的线程start⽅法
3.1 start⽅法会回调Thread(Runnable r)构造器中r的run⽅法
3.2 r 实际是⼀个worker的this引⽤,因此调⽤的是worker对象的run⽅法
3.3 worker的run⽅法调⽤了runworker⽅法,最终到达while循环中的task.run⽅法
如果task为空,还需要判断getTask()是否为空
getTask⽅法是从阻塞队列BlockedQueue中去任务,即上⼀篇中第⼆个if判断中的queue.offer getTask⽅法:
private Runnable getTask() {
boolean timedOut = fal; // Did the last poll() time out?
retry:
for (;;) {
int c = ();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
新乡市免费的33个景点break;
if (compareAndDecrementWorkerCount(c))
return null;
c = (); // Re-rea
d ctl
if (runStateOf(c) != rs)
continue retry;
// el CAS failed due to workerCount change; retry inner loop
扁食馅做法
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = fal;
}
}
}
注意最后的代码:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
这⾥是⼀个三⽬运算符,但是结果都是
workQueue.poll或者workQueue.take
都是出队操作。
接着上⾯的while循环,下⾯这块代码主要是判断线程池状态如果再不正常情况下,线程中断
if (((), STOP) ||
(Thread.interrupted() &&
(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
然后是真正线程执⾏的部分
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
}
这⾥有三个回调⽅法
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
其中beforeExecute和afterExecute可以在⾃⼰的任务中重写这两个⽅法。
最后在finally中
finally {
task = null;
w.unlock();
}
更新线程完成的数量
交易性金融负债while结束后,执⾏
finally {
processWorkerExit(w, completedAbruptly);
}
这个processWorkerExit⽅法,这个⽅法主要是⽤来更新线程池中alive的数量