java线程池⽤法_Java线程池的认识和使⽤
多线程编程很难,难点在于多线程代码的执⾏不是按照我们直觉上的执⾏顺序。所以多线程编程必须要建⽴起⼀个宏观的认识。
线程池是多线程编程中的⼀个重要概念。为了能够更好地使⽤多线程,学习好线程池当然是必须的。
为什么要使⽤线程池?
平时我们在使⽤多线程的时候,通常都是架构师配置好了线程池的 Bean,我们需要使⽤的时候,提交⼀个线程即可,不需要过多关注其内部原理。
在学习⼀门新的技术之前,我们还是先了解下为什么要使⽤它,使⽤它能够解决什么问题:
创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很⼤程度上影响处理效率
例如:
记创建线程消耗时间T1,执⾏任务消耗时间T2,销毁线程消耗时间T3
如果T1+T3>T2,那么是不是说开启⼀个线程来执⾏这个任务太不划算了!
正好,线程池缓存线程,可⽤已有的闲置线程来执⾏新任务,避免了T1+T3带来的系统开销
线程并发数量过多,抢占系统资源从⽽导致阻塞
我们知道线程能共享系统资源,如果同时执⾏的线程过多,就有可能导致系统资源不⾜⽽产⽣阻塞的情况
运⽤线程池能有效的控制线程最⼤并发数,避免以上的问题
对线程进⾏⼀些简单的管理
⽐如:延时执⾏、定时循环执⾏的策略等
运⽤线程池都能进⾏很好的实现
创建⼀个线程池
在 Java 中,新建⼀个线程池对象⾮常简单,Java 本⾝提供了⼯具类urrent.Executors,可以使⽤如下代码创建⼀个固定数量线程的线程池:
ExecutorService rvice = wFixedThreadPool(10);
注意:以上代码⽤来测试还可以,实际使⽤中最好能够显⽰地指定相关参数。
我们可以看下其内部源码实现:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
在阿⾥巴巴代码规范中,建议我们⾃⼰指定线程池的相关参数,为的是让开发⼈员能够⾃⾏理解线程池创建中的每个参数,根据实际情况,创建出合理的线程池。接下来,我们来剖析下urrent.ThreadPoolExecutor的构造⽅法参数。
ThreadPoolExecutor 浅析
urrent.ThreadPoolExecutor有多个构造⽅法,我们拿参数最多的构造⽅法来举例,以下
是阿⾥巴巴代码规范中给出的创建线程池的范例:
ThreadPoolExecutor rvice = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().tNameFormat("demo-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
贴⼀张IDEA中的图更⽅便看:
⾸先最重要的⼏个参数,可能就是:corePoolSize,maximumPoolSize,workQueue了,先看下这⼏个参数的解释:
corePoolSize
⽤于设定 thread pool 需要时刻保持的最⼩ core threads 的数量,即便这些 core threads 处于空闲状态啥事都不做也不会将它们回收掉,当然前提是你没有设置 allowCoreThreadTimeOut 为 true。⾄于 pool 是如何做到保持这些个 threads 不死的,我们稍后再说。
maximumPoolSize
⽤于限定 pool 中线程数的最⼤值。如果你⾃⼰构造了 pool 且传⼊了⼀个 Unbounded 的 queue 且没有设置它的 capacity,那么不好意思,最⼤线程数会永远 <= corePoolSize,maximumPoolSize 变成了⽆效的。
workQueue
该线程池中的任务队列:维护着等待执⾏的 Runnable 对象。当所有的核⼼线程都在⼲活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建⾮核⼼线程执⾏任务
由于本⽂是初步了解线程池,所以先理解这⼏个参数,上⽂对于这三个参数的解释,基本上跟JDK源码中的注释⼀致
(urrent.ThreadPoolExecutor#execute⾥的代码)。
我们编写个程序来⽅便理解:
// 创建线程池
ThreadPoolExecutor rvice = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().tNameFormat("demo-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
// 等待执⾏的runnable
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
// 启动的任务数量
int counts = 1224;
for (int i = 0; i < counts; i++) {
}
// 监控线程池执⾏情况的代码
ThreadPoolExecutor tpe = ((ThreadPoolExecutor) rvice);
while (true) {
System.out.println();
int queueSize = Queue().size();
System.out.println("当前排队线程数:" + queueSize);
int activeCount = ActiveCount();
System.out.println("当前活动线程数:" + activeCount);
long completedTaskCount = CompletedTaskCount();
System.out.println("执⾏完成线程数:" + completedTaskCount);
long taskCount = TaskCount();
System.out.println("总线程数:" + taskCount);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
线程池的容量与我们启动的任务数量息息相关。
已知:
corePoolSize = 5
maximumPoolSize = 200
workQueue.size() = 1024
我们修改同时 execute 添加到线程池的 Runnable 数量 counts:
counts <= corePoolSize:所有的任务均为核⼼线程执⾏,没有任何 Runnable 被添加到 workQueue中当前排队线程数:0
当前活动线程数:3
执⾏完成线程数:0
总线程数:3
corePoolSize < counts <= corePoolSize + workQueue.size():所有任务均为核⼼线程执⾏,当核⼼线程处于繁忙状态,则将任务添加到 workQueue 中等待
当前排队线程数:15
当前活动线程数:5
执⾏完成线程数:0
总线程数:20
corePoolSize + workQueue.size() < counts <= maximumPoolSize + workQueue.size():corePoolSize 个线程由核⼼线程执⾏,超出队列长度 workQueue.size() 的任务,将另启动⾮核⼼线程执⾏
当前排队线程数:1024
当前活动线程数:105
执⾏完成线程数:0
总线程数:1129
counts > maximumPoolSize + workQueue.size():将会报异常urrent.RejectedExecutionException
urrent.RejectedExecutionException: Task
com.bwjava.util.ExecutorServiceUtilTest$$Lambda$1/314265080@725bef66 rejected from
urrent.ThreadPoolExecutor@2aaf7cc2[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]
线程池踩坑:线程嵌套导致阻塞
这次的踩坑才是我写这篇⽂章的初衷,借此机会好好了解下线程池的各个概念。本⾝这段时间在研究
爬⾍,为了尽量提⾼爬⾍的效率,⽤到了多线程处理。由于代码写得⽐较随性,所以遇到了⼀个阻塞的问题,研究了⼀下才搞明⽩,模拟的代码如下:
ThreadPoolExecutor rvice = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().tNameFormat("demo-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
@Test
public void testBlock() {
Runnable runnableOuter = () -> {
try {
Runnable runnableInner1 = () -> {
try {
TimeUnit.SECONDS.sleep(3); // 模拟⽐较耗时的爬⾍操作
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Future> submit = rvice.submit(runnableInner1);
<(); // 实际业务中,runnableInner2需要⽤到此处返回的参数,所以必须get Runnable runnableInner2 = () -> {
try {
TimeUnit.SECONDS.sleep(5); // 模拟⽐较耗时的爬⾍操作
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Future> submit2 = rvice.submit(runnableInner2);
<();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 20; i++) {
}
ThreadPoolExecutor tpe = ((ThreadPoolExecutor) rvice);
while (true) {
System.out.println();
int queueSize = Queue().size();
System.out.println("当前排队线程数:" + queueSize);
int activeCount = ActiveCount();
System.out.println("当前活动线程数:" + activeCount);
long completedTaskCount = CompletedTaskCount();
System.out.println("执⾏完成线程数:" + completedTaskCount);
long taskCount = TaskCount();
System.out.println("总线程数:" + taskCount);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}