Java中的函数式编程(⼋)流Stream并⾏编程
写在前⾯
在本系列⽂章的第⼀篇,我们提到了函数式编程的优点之⼀是“易于并发编程”。
Java作为⼀个多线程的语⾔,它通过 Stream 来提供了并发编程的便利性。
题外话:
严格来说,并发和并⾏是两个不同的概念。
“并发(Concurrency)”强调的是在同⼀时间开始执⾏多个任务,通常会涉及多线程之间的上下⽂切换;
“并⾏(Parallelism)”强调的是将⼀个⼤任务分解为多个⼩任务后,再同时执⾏这些⼩任务,得到多个中间结果后再汇总为⼀个最终结果。
但在多CPU和分布式的时代,并发和并⾏的概念联系越来越紧密。⾄少在Java的Stream中,我们可以将并发和并⾏理解为同⼀个意思:基于多线程技术,对⼀个⼤任务分拆为多个⼩任务,分配到不同的线程中执⾏,得到多个中间结果后再汇总为⼀个最终结果。
Stream的并⾏编程
并⾏编程是Stream的⼀个重要功能和特性。它的⼀个优点是:不管数据源是否线程安全,通过并⾏流(parallel stream)都可以轻松的实现并⾏编程。
Stream的并⾏编程,底层是基于 ForkJoinPool 技术来实现的。ForkJoinPool是Java 7引⼊的⽤于并⾏执⾏的任务框架,核⼼思想是将⼀个⼤任务拆分成多个⼩任务(即fork),然后再将多个⼩任务的处理结果汇总到⼀个结果上(即join)。此外,它也提供基本的线程池功能,譬如设置最⼤并发线程数,关闭线程池等。
财务统计
在本系列之前的⽂章中,也零零散散的提到了⼀些关于并⾏编程的知识点。本⽂再做⼀个更系统的总结。洋菜
并⾏流(parallel stream)
Stream的并⾏操作都是基于并⾏流(parallel stream)。
⽣成⼀个并⾏流也⾮常简单:
1. 通过 Collection.parallelStream ⽅法可以得到⼀个并⾏流
2. ⽣成⼀个串⾏的Stream后,可以通过⽅法 BaStream.parallel() 将⼀个串⾏流(rial stream)转换成并⾏流。当然,我们也可以通过⽅法 BaStream.quential() 将⼀个并⾏流转换成串⾏流。
通过⽅法 BaStream.isParallel() 可以判断⼀个 stream 是否是并⾏流。
不管数据源是否线程安全(譬如ArrayList、HashSet,它们都不⽀持多线程),我们都可以使⽤parallelStream 轻松实现并⾏编程,不需要额外的线程同步操作,这是parallelStream 最⼤的优点。
顺序性
encounter order,指的是Stream中元素的出现顺序。如果觉得encounter order过于抽象,可以将它简单理解为数据源(data source)的元素顺序。本⼩节涉及到的有序或⽆序都特指encounter order。
⼀个Stream是否具备encounter order的有序性,取决于它的数据源(data source)和中间操作(intermediate operations)。例如,List或者数组的Steam是有序的,但HashSet的Steam则是⽆序的。⽽中间操作Stream.sorted,可以将⼀个⽆序的Stream转换成有序的;中间操作Stream.unordered 则将⼀个有序的Stream转换成⽆序的。
有趣的是,有些终⽌操作(terminal operations)是⽆视encounter order的。什么意思呢?以最常见的Stream.forEach 为例,在并⾏执⾏的时候,即使数据源是List,forEach⽅法处理元素的顺序也是⽆
序的。要保证处理顺序,需要使⽤⽅法 Stream.forEachOrdered 。
⽰例代码:
public static void forEachExample() {
ArrayList<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
System.out.println("===forEach====");
// 在并⾏流中, forEach ⽅法是⽆视 Stream 的 encounter order 的
list.parallelStream().forEach(i -> {
System.out.println(i + ":thread-" + Thread.currentThread().getName());
});
System.out.println("===forEachOrdered====");
// 在并⾏流中, forEachOrdered ⽅法可以保持 encounter order
list.parallelStream().forEachOrdered(i -> {
invalidSystem.out.println(i + ":thread-" + Thread.currentThread().getName());
});
}
上述代码的输出类似:
===forEach====
3:thread-main
5:Pool-worker-2
1:thread-main
4:Pool-worker-3
2:Pool-worker-1
===forEachOrdered====
1:Pool-worker-4
2:Pool-worker-1
3:Pool-worker-1
4:Pool-worker-1
5:Pool-worker-1
可以看出,在并⾏执⾏时,forEach 是⽆视Stream的encounter order的,⽽ forEachOrdered 虽然也是在多线程环境下执⾏,但仍然可以保证Stream的encounter order。
在Stream并⾏编程中,理解encounter order很重要。因为对于⼤多数的Stream操作,即使是并⾏执⾏,如果Stream是有序的,那么操作后得到的Stream也保持有序。例如,对⼀个数据源为List [1,2,3] 的有序Stream,执⾏ map(x -> x * x) 操作后,结果⼀定是 [1, 4, 9]。
对encounter order的有序性和⽆序性,⽰例代码如下:
public static void unorderedExample() {
// 我们⽤ TreeMap 来做实验,因为 ArrayList 的特殊性,很难展⽰ unordered 的特性
// TreeSet 中的元素是按从⼩到⼤排序的,即 [-7, -3, 1, 5, 12]
TreeSet<Integer> t = new TreeSet<>(Arrays.asList(1, 12, 5, -7, -3));
// 按 encounter order 打印 t,输出为:-7, -3, 1, 5, 12
System.out.println("The encounter order of t: ");
t.stream().forEachOrdered(s -> System.out.print(s + " "));
System.out.println();
// TreeSet 是有序的,所以来⾃ TreeSet 的 Stream 也是有序的
// 当 Stream 是有序时,执⾏操作 limit(2) ,不管是串⾏还是并⾏,也不管执⾏多少次,结果都是前两位数字 [-7, -3]
System.out.println("Limit ordered Stream: ");
四级英语词汇
t.stream().parallel().limit(2).forEachOrdered(s -> System.out.print(s + " "));
System.out.println();
// 我们使⽤ unordered ⽅法将 Stream 转换为⽆序的。
// 当 Stream 是⽆序时,并⾏执⾏操作 limit(2) ,会发现执⾏多次时,输出的数字是不⼀样的(不确定性)
上海海关学院分数线System.out.println("Limit unordered Stream: ");
System.out.print("first time: ");
t.stream().unordered().parallel().limit(2).forEachOrdered(s -> System.out.print(s + " "));
System.out.println();
System.out.print("cond time: ");
t.stream().unordered().parallel().limit(2).forEachOrdered(s -> System.out.print(s + " "));
System.out.println();
}
上述⽰例代码的输出类似:
The encounter order of t:
-7 -3 1 5 12
Limit ordered Stream:
-7 -3
Limit unordered Stream:
first time: -3 5
cond time: 5 12
⼤家可以仔细体会。欢迎加群讨论
纯函数操作
回顾本系列⽂章的第⼀篇,纯函数(purely function)指的是它不会改变函数以外的其它状态,换⽽⾔之,即不会改变在该函数之外定义的变量值。纯函数不会导致“副作⽤(side-effects)。
在Stream的并⾏编程中,纯函数操作⾮常关键,否则我们依然需要考虑线程安全的问题。
举例说明:
public static void unsafeParallelOperation() {
List<String> provinces = Arrays.asList("Guangdong", "Jiangsu", "Guangxi", "Jiangxi", "Shandong");
// "副作⽤" 导致的线程不安全问题
ArrayList<String> results = new ArrayList<>();
provinces.parallelStream()
// 过滤掉以 G 开头的省份
.filter(s -> !s.startsWith("G"))
// 在 lambda表达式中修改了 results 的值,
// 说明了 "s -> results.add(s)" 并⾮⼀个纯函数,
// 带来了不必要的 "副作⽤",
// 在并⾏执⾏时,会导致线程不安全的问题。
.forEach(s -> results.add(s));
System.out.println(results);
}
上述⽰例代码存在线程不安全的问题 —— 多个线程会同时修改 ArrayList 类型的 results ,我们需要对 results 变量加锁。
正确的做法是:
public static void safeParallelOperation() {
List<String> provinces = Arrays.asList("Guangdong", "Jiangsu", "Guangxi", "Jiangxi", "Shandong");
List<String> results = provinces.parallelStream()
七年级上册英语试卷
// 过滤掉以 G 开头的省份
.filter(s -> !s.startsWith("G"))
// 没有 "副作⽤"
.List());
System.out.println(results);
}
通过内置的 List() ⽅法,就不存在“副作⽤”,从⽽也⽆需考虑线程安全问题。
Collectors与ConcurrentMap
回顾⼀下,在介绍Stream的规约⽅法 llect(Collector) 时,我们提到了⼀个需求场景:将员⼯按照部门分组。
并⾏执⾏的实现代码类似:
public static void groupEmployeesToMap() {
List<Employee> employees = Utils.makeEmployees();
Map<String, List<Employee>> map = employees.parallelStream()blackbirds
.upingBy(Employee::getDepartment));
System.out.println(map);
}
虽然上述代码可以实现功能,但性能可能并不尽如⼈意,因为在并⾏执⾏时,需要将多个中间结果汇总为最终的结果,但合并两个Map,性能损耗可能⾮常⼤(例如HashMap,底层是数组+红⿊树实现的,合并时复杂度不低)。
⾃然⽽然,聪明的Java程序员会想到:如果并⾏执⾏得到的中间结果和最终结果都是使⽤同⼀个Map实例,那就不需要合并两个Map了,当然,因为并⾏执⾏涉及到多线程,因此,这个Map实例要求是线程安全的。典型的线程安全的Map,当然⾸选ConcurrentHashMap 啦。
这就是Collectors⼯具类中与ConcurrentMap相关的⽅法的实现原理,主要包括:
1. toConcurrentMap 系列⽅法
2. groupingByConcurrent 系列⽅法
但使⽤ ConcurrentHashMap 有个缺点:它不能保证 Stream 的 encounter order,所以只有当你确定元素的顺序不影响最终结果时,才使⽤与ConcurrentMap相关的⽅法。
最后,还要注意,只有在并⾏编程时,我们才要考虑使⽤ toConcurrentMap 或者 groupingByConcurrent ⽅法,否则会因为不必要的线程同步操作,反⽽影响了性能。
规约操作的注意事项
在本系列介绍规约操作的⽂章中,已经提到了很多关于并⾏编程的注意事项,本⼩节将它们汇总起来,供⼤家参考。
reduce(T, BinaryOperator)
reduce(T, BinaryOperator)的⽅法签名是:
T reduce(T identity, BinaryOperator<T> accumulator);
其中 T 是 Stream 的泛型类型。
参数 identity 是规约操作的初始值。
参数accumulator 要求满⾜结合律(associative)。
参数 accumulator 定义的函数必须满⾜结合律(associative),否则在⼀些顺序不确定的或并⾏的场景中会导致不正确的结果。
此外,如果是并⾏执⾏的话,对参数 identity 还有⼀个要求:对任意值 t,要满⾜ accumulator.apply(identity, t) == t 。否则,会导致错误的结果。
public static void reduceStream2() {
List<Integer> list = Arrays.asList(1, 3, 5, 7, 9);
// 这是正确的范例:因为数字 0 是累加操作的 identity 。
sum = list.parallelStream().reduce(0, (x, y) -> x + y);
mssql// 输出为 0+1+3+5+7+9 = 25
System.out.println(sum);
// 这是错误的范例:因为数字 5 并不是累加操作的 identity 。
sum = list.parallelStream().reduce(5, (x, y) -> x + y);
笔试英文
// 本意是输出为 5+1+3+5+7+9 = 30,但实际上会输出⼀个⽐30⼤的数字。
System.out.println(sum);
}
reduce(U, BiFunction, BinaryOperator)
具体的⽅法签名是:
<U> U reduce(U identity,
辣妹组合成员BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
其中 U 是返回值的类型,T 是 Stream 的泛型类型。
参数 identity 是规约操作的初始值。
参数accumulator 是与Stream中单个元素的合并操作,等同于函数 U apply(U u, T t)。
参数 combiner 是将并⾏执⾏得到的多个中间结果进⾏合并的操作,等同于函数 U apply(U u1, U u2)。
在并⾏编程中,对3个参数都有⼀些特殊要求:
1. 参数 combiner 必须满⾜结合律
2. 参数 identity,对于任意值 u,必须满⾜ combiner.apply(identity, u) == u
3. 参数 accumulator 和 combiner 两者必须兼容,即对于任意值 u 和 t,必须满⾜:
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
collect(Supplier, BiConsumer, BiConsumer)
ollect⽅法的签名是:
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
其中 R 是返回值的类型,通常是⼀个容器类(例如 Collection 或 Map)。T 是Stream中的元素类型。
参数 supplier 是⽤来创建⼀个容器实例的函数。
参数 accumulator 是将Stream中的⼀个元素合并到容器中的函数。
参数 combiner 是将两个容器归并为⼀个容器的函数,只在并⾏执⾏的时候⽤到。
在并⾏执⾏的场景下,我们有⼀些额外的要求:
1. combiner函数满⾜结合律
2. 要求combiner 和 accumulator 是兼容的(compatible),即对于任意的r和t, combiner.accept(r, accumulator.(), t)) == accumulator.accept(r, t)
结语
Stream 提供了⾮常⽅便的并⾏编程API,但它还是存在很多问题,⾮常容易踩坑。
其中,最为⼈诟病的是它的不可控性。因为 Parallel Stream 的底层是基于 ForkJoinPool ,⽽ ForkJoinPool 的⼯作线程数是在虚拟机启动时指定的,如果 Stream 并⾏执⾏的任务数量过多或耗时过多,甚⾄会影响应⽤程序中其它使⽤ ForkJoinPool 的功能。
总的来说,除⾮你⾮常了解你正在做的事情,否则不要使⽤ Stream 的并⾏编程API 。取⽽代之,我们可以直接使⽤Java中多线程技术(例如线程池)来处理。