【⾼并发】什么是ForkJoin?看这⼀篇就够了!
写在前⾯
在JDK中,提供了这样⼀种功能:它能够将复杂的逻辑拆分成⼀个个简单的逻辑来并⾏执⾏,待每个并⾏执⾏的逻辑执⾏完成
后,再将各个结果进⾏汇总,得出最终的结果数据。有点像Hadoop中的MapReduce。
ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分⽽治之。什么是分⽽治之?分⽽治之就是将⼀个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进⾏汇总。相应的,ForkJoin将复杂的计算当做⼀个任务,⽽分解的多个计算则是当做⼀个个⼦任务来并⾏执⾏。
Java并发编程的发展
对于Java语⾔来说,⽣来就⽀持多线程并发编程,在并发编程领域也是在不断发展的。Java在其发展过程中对并发编程的⽀持越来越完善也正好印证了这⼀点。
Java 1 ⽀持thread,synchronized。
Java 5 引⼊了 thread pools, blocking queues, concurrent collections,locks, condition queues。
Java 7 加⼊了fork-join库。
Java 8 加⼊了 parallel streams。
并发与并⾏
并发和并⾏在本质上还是有所区别的。
并发
并发指的是在同⼀时刻,只有⼀个线程能够获取到CPU执⾏任务,⽽多个线程被快速的轮换执⾏,这就使得在宏观上具有多个线程同时执⾏的效果,并发不是真正的同时执⾏,并发可以使⽤下图表⽰。
并⾏
并⾏指的是⽆论何时,多个线程都是在多个CPU核⼼上同时执⾏的,是真正的同时执⾏。
分治法
基本思想
把⼀个规模⼤的问题划分为规模较⼩的⼦问题,然后分⽽治之,最后合并⼦问题的解得到原问题的解。
步骤
①分割原问题;
②求解⼦问题;
③合并⼦问题的解为原问题的解。
我们可以使⽤如下伪代码来表⽰这个步骤。
if(任务很⼩){
直接计算得到结果
}el{
分拆成N个⼦任务
调⽤⼦任务的fork()进⾏计算
调⽤⼦任务的join()合并计算结果
}
在分治法中,⼦问题⼀般是相互独⽴的,因此,经常通过递归调⽤算法来求解⼦问题。
典型应⽤
⼆分搜索
⼤整数乘法
Strasn矩阵乘法
棋盘覆盖
合并排序
快速排序
线性时间选择
汉诺塔
ForkJoin并⾏处理框架
ForkJoin框架概述
Java 1.7 引⼊了⼀种新的并发框架—— Fork/Join Framework,主要⽤于实现“分⽽治之”的算法,特别是分治之后递归调⽤的函数。ForkJoin框架的本质是⼀个⽤于并⾏执⾏任务的框架,能够把⼀个⼤任务分割成若⼲个⼩任务,最终汇总每个⼩任务结果后得到⼤任务的计算结果。在Java中,ForkJoin框架与ThreadPool共存,并不是要替换ThreadPool
其实,在Java 8中引⼊的并⾏流计算,内部就是采⽤的ForkJoinPool来实现的。例如,下⾯使⽤并⾏流实现打印数组元组的程序。
public class SumArray {
public static void main(String[] args){
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
numberList.parallelStream().forEach(System.out::println);
}
}
这段代码的背后就使⽤到了ForkJoinPool。
说到这⾥,可能有读者会问:可以使⽤线程池的ThreadPoolExecutor来实现啊?为什么要使⽤ForkJoinPool啊?ForkJoinPool是个什么⿁啊?!接下来,我们就来回答这个问题。
ForkJoin框架原理
ForkJoin框架是从jdk1.7中引⼊的新特性,它同ThreadPoolExecutor⼀样,也实现了Executor和ExecutorService接⼝。它使⽤了⼀个⽆限队列来保存需要执⾏的任务,⽽线程的数量则是通过构造函数传⼊,如果没有向构造函数中传⼊指定的线程数量,那么当前计算机可⽤的CPU数量会被设置为线程数量作为默认值。
ForkJoinPool主要使⽤分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应⽤⽐如快速排序算法。这⾥的要点在于,ForkJoinPool 能够使⽤相对较少的线程来处理⼤量的任务。⽐如要对1000万个数据进⾏排序,那么会将这个任务分割成两个500万的排序任务和⼀个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置⼀个阈值来规定当数据规模到多少时,停⽌这样的分割处理。⽐如,当元素的数量⼩于10时,会停⽌分割,转⽽使⽤插⼊排序对它们进⾏排序。那么到最后,所有的任务加起来会有⼤概200万+个。问题的关键在于,对于⼀个任务⽽⾔,只有当它所有的⼦任务完成之后,它才能够被执⾏。
所以当使⽤ThreadPoolExecutor时,使⽤分治法会存在问题,因为ThreadPoolExecutor中的线程⽆法向任务队列中再添加⼀个任务并在等待该任务完成之后再继续执⾏。⽽使⽤ForkJoinPool就能够解决这个问题,它就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择⼦任务执⾏。
那么使⽤ThreadPoolExecutor或者ForkJoinPool,性能上会有什么差异呢?
⾸先,使⽤ForkJoinPool能够使⽤数量有限的线程来完成⾮常多的具有⽗⼦关系的任务,⽐如使⽤4个线程来完成超过200万个任务。但是,使⽤ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread⽆法选择优先执⾏⼦任务,需要完成200万个具有⽗⼦关系的任务时,也需要200万个线程,很显然这是不可⾏的,也是很不合理的!!
⼯作窃取算法
假如我们需要做⼀个⽐较⼤的任务,我们可以把这个任务分割为若⼲互不依赖的⼦任务,为了减少线程间的竞争,于是把这些⼦任务分别放到不同的队列⾥,并为每个队列创建⼀个单独的线程来执⾏队列⾥的任务,线程和队列⼀⼀对应,⽐如A线程负责处理A队列⾥的任务。但是有的线程会先把⾃⼰队列⾥的任务⼲完,⽽其他线程对应的队列⾥还有任务等待处理。⼲完活的线程与其等着,不如去帮其他线程⼲活,于是它就去其他线程的队列⾥窃取⼀个任务来执⾏。⽽在这时它们会访问同⼀个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使⽤双端队列,被窃取任务线程永远从双端队列的头部拿任务执⾏,⽽窃取任务的线程永远从双端队列的尾部拿任务执⾏。
⼯作窃取算法的优点:
充分利⽤线程进⾏并⾏计算,并减少了线程间的竞争。
⼯作窃取算法的缺点:
在某些情况下还是存在竞争,⽐如双端队列⾥只有⼀个任务时。并且该算法会消耗更多的系统资源,⽐如创建多个线程和多个双端队列。
Fork/Join框架局限性:
对于Fork/Join框架⽽⾔,当⼀个任务正在等待它使⽤Join操作创建的⼦任务结束时,执⾏这个任务的⼯作线程查找其他未被执⾏的任务,并开始执⾏这些未被执⾏的任务,通过这种⽅式,线程充分利⽤它们的运⾏时间来提⾼应⽤程序的性能。为了实现这个⽬标,Fork/Join框架执⾏的任务有⼀些局限性。
(1)任务只能使⽤Fork和Join操作来进⾏同步机制,如果使⽤了其他同步机制,则在同步操作时,⼯作线程就不能执⾏其他任务了。⽐
如,在Fork/Join框架中,使任务进⾏了睡眠,那么,在睡眠期间内,正在执⾏这个任务的⼯作线程将不会执⾏其他任务了。
(2)在Fork/Join框架中,所拆分的任务不应该去执⾏IO操作,⽐如:读写数据⽂件。
(3)任务不能抛出检查异常,必须通过必要的代码来出来这些异常。
ForkJoin框架的实现
ForkJoin框架中⼀些重要的类如下所⽰。
ForkJoinPool 框架中涉及的主要类如下所⽰。
1.ForkJoinPool类
实现了ForkJoin框架中的线程池,由类图可以看出,ForkJoinPool类实现了线程池的Executor接⼝。
我们也可以从下图中看出ForkJoinPool的类图关系。
其中,可以使⽤wWorkStealPool()⽅法创建ForkJoinPool。
ForkJoinPool中提供了如下提交任务的⽅法。
public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)
2.ForkJoinWorkerThread类
实现ForkJoin框架中的线程。
3.ForkJoinTask类
ForkJoinTask封装了数据及其相应的计算,并且⽀持细粒度的数据并⾏。ForkJoinTask⽐线程要轻量,ForkJoinPool中少量⼯作线程能够运⾏⼤量的ForkJoinTask。
ForkJoinTask类中主要包括两个⽅法fork()和join(),分别实现任务的分拆与合并。
fork()⽅法类似于Thread.start(),但是它并不⽴即执⾏任务,⽽是将任务放⼊⼯作队列中。跟Thread.join()⽅法不同,ForkJoinTask的join()⽅法并不简单的阻塞线程,⽽是利⽤⼯作线程运⾏其他
任务,当⼀个⼯作线程中调⽤join(),它将处理其他任务,直到注意到⽬标⼦任务已经完成。
我们可以使⽤下图来表⽰这个过程。
ForkJoinTask有3个⼦类:
RecursiveAction:⽆返回值的任务。
RecursiveTask:有返回值的任务。
CountedCompleter:完成任务后将触发其他任务。
4.RecursiveTask 类
有返回结果的ForkJoinTask实现Callable。
5.RecursiveAction类
⽆返回结果的ForkJoinTask实现Runnable。
6.CountedCompleter 类
在任务完成执⾏后会触发执⾏⼀个⾃定义的钩⼦函数。
ForkJoin⽰例程序
package ample.aqs;
slf4j.Slf4j;
import urrent.ForkJoinPool;
import urrent.Future;
import urrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务⾜够⼩就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} el {
// 如果任务⼤于阈值,就分裂成两个⼦任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执⾏⼦任务
leftTask.fork();
rightTask.fork();
// 等待任务执⾏结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并⼦任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//⽣成⼀个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//执⾏⼀个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", ());
} catch (Exception e) {
<("exception", e);
}
}
}
写在最后
如果觉得⽂章对你有点帮助,请微信搜索并关注「冰河技术」微信公众号,跟冰河学习⾼并发编程技术。最后,附上并发编程需要掌握的核⼼技能知识图,祝⼤家在学习并发编程时,少⾛弯路。