参考:Java中的流、并行流
本博文为视频内容的简单梳理。
目录
引入
已知一个列表
1 2 3 4 5 6 7 List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 12 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ) );
检查所有人的信息,找出年龄大于18岁的人。
命令式编程 ,使用for循环依次检查每一个人的信息,将年龄超过十八岁的人添加到新的名单中,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Test public void test () { List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 12 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ) ); List<Person> adults = new ArrayList <>(); for (Person p : people) { if (p.getAge() > 18 ) { adults.add(p); } } System.out.println(adults); }
使用Stream API 可以实现相同的功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test public void test2 () { List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 12 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ) ); List<Person> adults = people.stream() .filter(person -> person.getAge() > 18 ) .toList(); System.out.println(adults); }
代码更加简洁。
Stream本身并不是数据结构,不会处理数据或改变数据源,它仅定义处理方式。不仅能够支持顺序处理,还能进行并行处理。
Java的Stream只能使用一次 ,一旦你调用了终结操作(比如
.forEach()、.collect() 等),这个流就被“消费掉”了,不能再用。
三个步骤 :
创建流 Stream Creation
中间操作 Intermediate Operations
中间操作是惰性执行的,只有遇到终端操作才会实际执行。
终端操作 Terminal Operations
整个流的实际处理部分,他会触发之前所有定义中的中间操作,生成最终结果。
下面从这三个方面进行分析。
创建流
对于任何实现了Collection
接口的集合,可以通过stream()
方法直接创建流:
1 2 3 4 5 6 7 8 9 10 11 String[] array = {"a" ,"b" ,"c" ,"d" ,"e" ,"f" }; Stream<String> stream = Arrays.stream(array); stream.forEach(System.out::println); List<String> list = List.of("a" , "b" , "c" ); Stream<String> myStream = list.stream(); myStream.forEach(System.out::println); Stream<String> stream1 = Stream.of("a" ,"b" ,"c" );
采用上述方法创建的stream对象自诞生之初就是写死的,无法修改。想要动态创建流 ,需要使用Stream.Builder
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Stream.Builder<String> builder = Stream.builder(); builder.add("a" ); builder.add("b" );if (Math.random()<0.5 ){ builder.add("c" ); } Stream<String> stream = builder.build(); stream.forEach(System.out::println);
使用Stream也可从文件创建流 。
1 2 3 4 5 6 7 8 9 @Test public void test3 () { Path path = Paths.get("D:\\develop\\code\\Practice\\chapter0\\hello_copy.txt" ); try (Stream<String> lines = Files.lines(path)){ lines.forEach(System.out::println); } catch (IOException e) { e.printStackTrace(); } }
其中:
Files.lines(path, StandardCharsets.UTF_8)
返回一个惰性读取的
Stream,可以逐行读取文本文件内容。
try-with-resources
保证资源及时关闭
StreamAPI也提供了IntStream
等类用来方便地创建基本数据类型的流 。
1 2 3 4 IntStream intStream = IntStream.range(1 ,6 );IntStream intStream = IntStream.of(1 ,2 ); Stream<Integer> integerStream = intStream.boxed();
Stream.iterate可生成无限流 (通常用limit限制生成的个数):
语法 :
1 public static <T> Stream<T> iterate (final T seed, final UnaryOperator<T> f)
seed
:初始值
f
:作用于初始值的函数
返回值:一个新的Stream类型的序列
增强型:public static<T> Stream<T> iterate(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next)
,其中hasNext
是终止条件
1 2 Stream.iterate(0 , i -> i + 1 ).limit(5 ).forEach(System.out::println);
并行流 :把数据分成多个部分,并“同时”用多个线程来处理,从而加快处理速度。
对于集合,调用list.parallelStraem()
可以直接得到一个并行流。例如List.of("a", "b", "c", "d", "e", "f").parallelStream().forEach(System.out::println);
,输出是乱序 的
对于已有的Stream,可以调用.parallel()
得到并行流
1 2 3 4 5 6 7 8 @Test public void test5 () { Stream<Integer> iterateStream = Stream.iterate(0 , n -> n + 1 ).limit(5 ); iterateStream.parallel(); iterateStream.forEach(System.out::println); }
中间操作
中间操作用于对流中的元素进行处理,如筛选、排序等等。根据操作的性质可以分为以下几个类别:
筛选和切片 (Filtering and
Slicing):过滤或缩减流中的元素的数量
映射 (Mapping):转换流中的元素或提取元素的特定属性
排序 (Sorting)
中间操作图解
筛选和切片
例如开头处提到的
1 2 3 List<Person> adults = people.stream() .filter(myperson -> myperson.getAge() > 18 ) .toList();
过滤器定义 :
Stream<T> filter(Predicate<? super T> predicate);
Predicate即判断条件函数 ,用lambda写出,返回布尔值。意思是:给我一个叫
myperson 的参数,判断这个人年龄是否大于 18,返回一个 true/false
结果 。myperson
的类型由前文自动推断出,所以省略。
这里等价于用更简洁的形式实现了Predicate
接口的抽象方法boolean test(T t);
1 2 3 4 5 6 Predicate<Person> predicate = new Predicate <Person>() { @Override public boolean test (Person person) { return person.getAge() > 18 ; } };
可以用.distinct()
为流中的元素去重 ,底层通过维护一个HashSet实现。
1 2 3 4 5 6 7 @Test public void test3 () { Stream.of("Origami" , "Kurumi" , "Kurumi" , "Kurumi" , "Kurumi" , "Kurumi" , "Kurumi" , "Kurumi" ) .distinct() .forEach(System.out::println); }
如果是自定义的类的对象,需要确保正确地重写了equals()``HashCode
方法,因为HashSet就是通过这两个方法判断元素是否相等。
Stream类中定义的Stream<T> limit(long maxSize);
方法返回由此流的元素组成的截断长度不超过maxSize的流。
同样,Stream<T> skip(long n);
跳过前n个元素。
limit和skip在有序的并行流中使用时可能会使性能变差,此时可以用
.unordered()
提高效率;如果顺序必须保留,那最好使用顺序流
.sequential()
。
1 2 3 4 5 6 7 8 9 @Test public void test4 () { Stream<Integer> s1 = Stream.of(1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ); s1.limit(5 ).forEach(System.out::print); Stream<Integer> s2 = Stream.of(1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ); s2.skip(5 ).forEach(System.out::print); }
映射
映射本质上是一个数据转换的过程
定义
1 <R> Stream<R> map (Function<? super T, ? extends R> mapper) ;
T
是Stream中元素的类型,R
新的流的元素类型
map能够通过提供的函数Function
将流中的每个元素转换成新的元素,最后生成一个新元素构成的流。
map接受的是一个函数式接口 。
利用映射层层提取数据的过程
示例:利用映射获得Person列表中所有人的name
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Test public void test6 () { List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 12 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ), new Person ("Kurumi" , 18 , "Japan" ) ); Stream<Person> peopleStream = people.stream(); peopleStream.map(Person::getName).forEach(System.out::println); }
打印结果:
1 2 3 4 5 6 Neo Blu Alex Bob Kurumi Kurumi
map
结构适用于单层结构 的流,进行元素一对一 的转换。
对于嵌套的集合,数组等等,适合使用flatMap
。
定义 :Stream.java
中
1 <R> Stream<R> flatMap (Function<? super T, ? extends Stream<? extends R>> mapper) ;
示例:将嵌套List转换成的流扁平化为单层流打印出来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Test public void test7 () { List<List<Person>> peopleGroups = List.of( List.of( new Person ("neo" , 45 , "USA" ), new Person ("Stan" , 10 , "USA" ) ), List.of( new Person ("Grace" , 16 , "UK" ), new Person ("Alex" , 17 , "UK" ) ), List.of( new Person ("Sebastian" , 40 , "FR" ) ) ); Stream<List<Person>> peopleGroupStream = peopleGroups.stream(); Stream<Person> personStream = peopleGroupStream.flatMap(people -> people.stream()); personStream.forEach(System.out::println); }
流操作返回的是一个新的流,原始流在第一次操作后就会被标记为已操作,不能再次进行操作。实际应用中通常会采用链式操作 :
嵌套流 -> 单层流 -> 提取name属性 -> 打印
mapToInt
可以将对象流转换为基础类型的流IntStream
。
排序
当流中的元素类型实现了Comparable
接口时(自然排序),可直接调用sorted()
。
定制排序:
1 2 3 4 5 6 7 @Test public void test10 () { Stream.of("blueberry" , "greenberry" , "redberry" , "pear" , "apple" , "orange" ) .sorted(Comparator.comparingInt(String::length).reversed()) .forEach(System.out::println); }
关于Comparator.comparingInt(String::length).reversed()
:
comparingInt
是Comparator的一个方法
相当于Comparator.comparingInt(p -> p.getAge())
,将Age作为排序的标准
comparingInt
和reversed()
并列定义,但是前者返回的仍然是一个Comparator<T>
对象,后者是一个Comparator的默认方法,只要是Comparator<T>
的对象就可以调用它。
中间操作只是定义了操作的规则,并不会立即执行,常常用变量保存和传递。
综合练习:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Test public void test1 () { List<List<Person>> peopleGroups = List.of( List.of( new Person ("neo" , 45 , "USA" ), new Person ("Stan" , 10 , "USA" ) ), List.of( new Person ("Grace" , 16 , "UK" ), new Person ("Alex" , 19 , "UK" ) ), List.of( new Person ("Sebastian" , 40 , "FR" ), new Person ("Sebastian" , 40 , "FR" ) ) ); Stream<String> s= peopleGroups.stream() .flatMap(List::stream) .distinct() .filter(person -> person.getAge() > 18 ) .map(Person::getName) .sorted(Comparator.comparing(String::length)); s.forEach(System.out::println); }
终端操作
终端操作是流处理的最终步骤,实弹发生,流中的元素被消费,流不能再被使用。
终端操作包括查找与匹配、聚合操作、归约操作、收集操作、迭代操作。
查找与匹配
属于短路操作(Short-circuiting
Operations),也就是说这些操作在找到所需的元素后会立即返回结果,不会遍历整个流.
anyMatch
:如果流中任意元素满足给定的条件则返回true
noneMatch
:与前者相反
allMatch
:所有的都满足才会返回true
Optional<T> findFirst();
找到流中的第一个元素,类型为Optional
.因为返回的元素可能为空,这样做的话更加安全.
1 2 3 4 5 6 7 8 9 10 11 12 13 @Test public void test2 () { List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 12 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ) ); Optional<Person> optionalPerson = people.stream().findFirst(); optionalPerson.ifPresent(System.out::println); }
打印结果:
1 Person{name='Neo', age=1, country='USA'}
聚合操作
long count();
:计算元素的数量
Optional<T> max(Comparator<? super T> comparator);
:返回流中的最大元素,需要提供一个比较器.
同理还有min
sum()
用于求和,只能处理基本数据类型的流,所以使用之前要进行流类型的转换.
average
同上
本质上聚合操作是归约操作的一种特殊形式,适合快速简单的统计任务.归约操作reduce更加通用.
归约操作
整型的reduce定义如下:
1 int reduce (int identity, IntBinaryOperator op) ;
Java提供的注解:
对该流的元素执行归约操作,使用提供的标识值(identity)和一个关联的累积函数(accumulation
function),并返回归约后的结果。其等价于以下代码: 1 2 3 4 int result = identity;for (int element : this stream) result = accumulator.applyAsInt(result, element)return result;
即,先把 identity
的值赋给结果,然后再使用累计函数对结果和流中的元素进行运算.
这种归约不一定按顺序执行(即它可以并行处理)。
identity
必须是一个恒等元 ,也就是说累计函数作用在它身上之后的结果仍然等于它本身.例如:
加法的恒等元是0,因为 0 + x = x
乘法的恒等元是1,因为1 * x = x
此外,累积函数必须是可结合的 (associative)函数,也就是符合结合律,计算顺序不影响结果.
形参 :
identity
:累积函数的恒等元
op
:一个可结合的,无副作用(函数在处理值时不应该修改外部状态)的,无状态的函数
返回值 :归约的结果
对一个流中的所有元素求和,可以写为int sum = integers.reduce(0, (a, b) -> a+b);
或者更简洁一点int sum = integers.reduce(0, Integer::sum);
虽然相比于在循环中直接修改一个运行中的总和变量,这种方式看起来有些绕远路,但归约(reduction)操作在并行化时表现得更加优雅,
无需额外的同步(synchronization),并且大大降低了数据竞争(data
races)的风险。
例:对流的所有元素的年龄求和,然后输出所有人名字的串接字符串
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Test public void test3 () { List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 13 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ) ); IntStream doubleStream = people.stream().mapToInt(Person::getAge); int sum = doubleStream.reduce(0 , (a,b) -> (a + b)); System.out.println(sum); String joinedName = people.stream() .map(Person::getName) .reduce("" , (a, b) -> a + b); System.out.println(joinedName); }
打印结果
收集操作
把流处理后的元素汇集到新的数据结构中, 比如列表, map, 集合等等.
定义
1 <R, A> R collect (Collector<? super T, A, R> collector) ;
Collectors.java
中提供了丰富的静态方法 用作collector参数.
如toList
,toMap
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test public void test4 () { List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 13 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ) ); List<Person> adults = people.stream() .filter(person -> person.getAge() > 18 ) .collect(Collectors.toList()); System.out.println(adults); }
分组 :Collectors
类提供的静态方法
1 2 3 4 public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(Function<? super T, ? extends K > classifier) { return groupingBy(classifier, toList()); }
T
:流中元素的类型,例如Person
K
:用于分组的键的类型
形参 :classifier
,
将输入元素映射到键的分类器函数
返回值 :实现分组操作的收集器,
将Stream<T>
收集成一个Map<K, List<T>>
.
key:由classifier生成
value:分到该组的T
元素组成的列表
示例:根据国家分组
1 2 3 4 5 6 7 8 9 10 11 12 13 @Test public void test6 () { List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 13 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ) ); Map<String, List<Person>> collect = people.stream() .collect(Collectors.groupingBy(Person::getCountry)); collect.forEach((k, v) -> System.out.println(k + " = " + v)); }
T
在这里即Person
这里的分类器是(Person::getCountry)
,所以键的类型是String
最终得到的Map泛型为Map<String, List<Person>>
打印结果
1 2 3 4 USA = [Person{name='Neo', age=1, country='USA'}] Japan = [Person{name='Kurumi', age=18, country='Japan'}] China = [Person{name='Blu', age=13, country='China'}] Germany = [Person{name='Alex', age=29, country='Germany'}, Person{name='Bob', age=25, country='Germany'}]
分区 功能同理,
调用Collectors.partitioningBy
即可,
不过这时传入的是一个条件判断式, map的第一个参数也变成了boolean变量.
1 2 3 Map<Boolean, List<Person>> agePartition = people.stream() .collect(Collectors.partitioningBy(person -> person.getAge() > 18 )); agePartition.forEach((k, v) -> System.out.println(k + " = " + v));
Collectors
也提供了拼接字符串的方法joining
1 2 3 4 String joinedName = people.stream() .map(Person::getName) .collect(Collectors.joining("," )); System.out.println(joinedName);
打印结果
也可以用Collectors.joining
连接字符串.
使用Collectors.summarizingInt
可以汇总 指定的数据,
返回类型为IntSummaryStatistics
,
使用IntSummaryStatistics
的get
方法可以快速获取最大值,
最小值, 平均值等等.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Test public void test7 () { List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 13 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ) ); String joinedName = people.stream() .map(Person::getName) .collect(Collectors.joining("," )); System.out.println(joinedName); IntSummaryStatistics collect = people.stream() .collect(Collectors.summarizingInt(Person::getAge)); System.out.println(collect); System.out.println(collect.getMax()); System.out.println(collect.getMin()); }
自定义收集器
Collector.of(...)
的五个参数结构
1 2 3 4 5 6 7 Collector.of( Supplier<R> supplier, BiConsumer<R, T> accumulator, BinaryOperator<R> combiner, Function<R, R> finisher, Collector.Characteristics... characteristics )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Test public void customTest () { List<Person> people = List.of( new Person ("Neo" , 1 , "USA" ), new Person ("Blu" , 13 , "China" ), new Person ("Alex" , 29 , "Germany" ), new Person ("Bob" , 25 , "Germany" ), new Person ("Kurumi" , 18 , "Japan" ) ); ArrayList<Person> collect = people.stream() .collect(Collector.of( () -> new ArrayList <>(), (list, person) -> { System.out.println("Accumulator: " + person); list.add(person); }, (left, right) -> { System.out.println("Combiner: " + left); left.addAll(right); return left; }, Collector.Characteristics.IDENTITY_FINISH )); System.out.println(collect); }
打印结果
1 2 3 4 5 6 7 Accumulator: Person{name='Neo', age=1, country='USA'} Accumulator: Person{name='Blu', age=13, country='China'} Accumulator: Person{name='Alex', age=29, country='Germany'} Accumulator: Person{name='Bob', age=25, country='Germany'} Accumulator: Person{name='Kurumi', age=18, country='Japan'} [Person{name='Neo', age=1, country='USA'}, Person{name='Blu', age=13, country='China'}, Person{name='Alex', age=29, country='Germany'}, Person{name='Bob', age=25, country='Germany'}, Person{name='Kurumi', age=18, country='Japan'}]
(这一块没看太懂,先放到这里以后再来看)
并行流
Parallel Streams
能够借助多核处理器 的并行计算能力 加速数据处理,
特别适合大型数据集或计算密集型任务.
工作原理
并行流在开始时, Spliterator分割迭代器将数据分割成多个片段,
分割过程通常采用递归的方式动态进行, 以此平衡子任务的工作负载,
提高资源利用率.
然后Fork/Join框架将这些数据片段分配到多个线程和处理器核心上进行并行处理.
处理完成后, 结果将会被汇总合并,
其核心是[任务的分解Fork]和[结果的合并Join]
无论是并行流还是顺序流.二者都提供相同的中间操作和终端操作,
也就是说我们可以采用几乎相同的方式进行数据处理和结果收集
并行流的工作原理
forEach & forEachOrdered
demo
1 2 3 4 5 6 7 @Test public void test1 () { List<String> Example = List.of("A" , "B" , "C" , "D" , "E" , "F" , "G" ); Example.parallelStream() .map(String::toLowerCase) .forEach(System.out::println); }
打印出的字母是乱序的, 而且每次都不一样.
不像顺序流的执行是单线程的, 并行流采用多线程并发处理,
不保证元素的处理顺序.用forEachOrdered
可以保证元素的出现顺序,
这归功于Spliterator和Fork/Join框架的协作:
在处理并行流时, 对于有序的数据源,
Spliterator会对数据源进行递归分割, 通过划分数据源的索引范围来实现.
每次分割都会产生一个新的Spliterator实例,
其内部维护了指向原数据的索引范围,
这种分割机制可以让数据的出现顺序得以保持.
然后, Fork/Join框架接手, 将分配后的数据块分配给不同的子任务执行.
对于forEachOrdered操作,
框架依据Spliterator维护的顺序信息来调度方法的执行顺序. 所以,
就算某个子任务提前完成了, 如果跟它关联的顺序还没到来, 系统将缓存该顺序,
并暂停执行该方法, 直到所有前序的任务都已经完成.
上述机制确保了即使是并行处理也能保证原始的出现顺序,
代价是牺牲了一些并行执行的效率.
对于forEach
,
Fork/Join会忽略顺序的信息 , 能够提高执行效率.
forEach会在不同的线程上独立进行, 所以如果操作的是共享资源,
必须确保这些操作是线程安全 的(同步).
所以forEach
更适合执行无状态操作或资源独立的场景.
一个关于多线程的测试:
1 2 3 4 5 6 7 8 9 10 11 @Test public void test2 () { List<String> Example = List.of("A" , "B" , "C" , "D" , "E" , "F" , "G" ); Example.parallelStream().forEach(item -> { System.out.println("Item: " + item + "->" + "Thread: " + Thread.currentThread().getName()); }); }
打印结果
1 2 3 4 5 6 7 Item: E->Thread: main Item: D->Thread: main Item: G->Thread: main Item: A->Thread: main Item: F->Thread: ForkJoinPool.commonPool-worker-2 Item: B->Thread: ForkJoinPool.commonPool-worker-1 Item: C->Thread: ForkJoinPool.commonPool-worker-2
main
:主线程
ForkJoinPool.commonPool-worker-x
:后台线程池中的工作线程
collect收集
List使用collect
收集 输出结果,
最终合并得到的列表仍为有序 (与第一个demo不同)
1 2 3 4 5 6 7 8 9 10 11 @Test public void test3 () { List<String> collect = List.of("A" , "B" , "C" , "D" , "E" , "F" , "G" ).parallelStream() .map(String::toLowerCase) .collect(Collectors.toList()); System.out.println(); System.out.println(collect); }
使用自定义收集器 演示有序列表List
在并行流的情况下合并后仍有序输出的过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Test public void test4 () { List<String> collect = List.of("A" , "B" , "C" , "D" , "E" ).parallelStream() .map(String::toLowerCase) .collect(Collector.of( () -> { System.out.println("Supplier: new ArrayList" + "Thread: " + Thread.currentThread().getName()); return new ArrayList <>(); }, (list,item) -> { System.out.println("Accumulator: " + item + " Thread: " + Thread.currentThread().getName()); list.add(item); }, (left, right) -> { System.out.println("Combiner: " + left + " " + right + " Thread: " + Thread.currentThread().getName()); left.addAll(right); return left; }, Collector.Characteristics.IDENTITY_FINISH )); System.out.println(collect); }
打印内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Supplier: new ArrayListThread: main Accumulator: c Thread: main Supplier: new ArrayListThread: ForkJoinPool.commonPool-worker-1 Accumulator: b Thread: ForkJoinPool.commonPool-worker-1 Supplier: new ArrayListThread: ForkJoinPool.commonPool-worker-2 Supplier: new ArrayListThread: ForkJoinPool.commonPool-worker-1 Accumulator: e Thread: ForkJoinPool.commonPool-worker-2 Accumulator: d Thread: ForkJoinPool.commonPool-worker-1 Supplier: new ArrayListThread: main Accumulator: a Thread: main Combiner: [d] [e] Thread: ForkJoinPool.commonPool-worker-1 Combiner: [a] [b] Thread: main Combiner: [c] [d, e] Thread: ForkJoinPool.commonPool-worker-1 Combiner: [a, b] [c, d, e] Thread: ForkJoinPool.commonPool-worker-1 [a, b, c, d, e]
如果使用Set
, 则结果仍然是无序的,
这是由数据结构本身 的特点决定的,
Spliterator和Fork/Join框架的分割合并策略并没有什么不同.
UNORDERED & CONCURRENT
UNORDERED
即便Colector被标记为UNORDERED
,
如果数据源或流操作本身是有序的,
系统的执行策略通常仍会保持这些元素的出现顺序.
只有在特定场景下系统才会针对那些被标记为UNORDERED
的流进行优化从而打破顺序的约束.
CONCURRENT
在标准的并行流处理中, 每个线程处理数据的一个子集,
维护自己的局部结果容器. 在所有的数据处理完成后,
这些局部结果会通过Combiner函数合并成一个最终的结果.
使用CONCURRENT特性后, 所有线程将共享同一个结果容器 ,
而不是维护独立的局部结果, 从而减少了合并的需要.
这通常会带来性能上的提升, 特别是合并操作较为复杂时.
这时只有一个结果容器,
这个容器必须是线程安全的(例如ConcurrentHashMap).