引言

示例代码

Double[] numbers = {2.4,55.6,90.12,26.6};
Set<Double> set = new HashSet<>(Arrays.asList(numbers));
int count = 0;
for (Double num : set)
	if (num > 50)
		count++;
System.out.println("Count is "+count);
System.out.println("Count is "+set.stream().filter(e->e>50).count());

Note

可以利用多个处理器并行地执行流上的集合操作。因此,使用聚合操作编写的代码通常比使用 foreach 循环的代码运行得更快。

流管道

流是惰性的,这意味着仅仅当终止操作开始时才会进行计算。这能允许JVM优化计算。

总体模式

BaseStream<T, S> 接口

BaseStream<T, S>Stream<T> 的基类,定义了一些基本的操作方法。

方法描述
+close(): S关闭此流。
+parallel(): S返回一个等效的并行执行的流。
+sequential(): S返回一个等效的顺序执行的流。
+isParallel(): boolean如果此流是并行的,则返回 true

Stream<T> 接口

Stream<T> 接口继承自 BaseStream<T, S>,定义了多种操作流的方法。

方法分类

中间操作 (Intermediate operations)

这些方法返回一个新的流,对原始流进行某种变换。

方法描述
+distinct(): Stream<T>返回一个包含此流中不同元素的新流。
+filter(p: Predicate<? super T>): Stream<T>返回一个包含满足谓词条件的元素的新流。
+limit(n: long): Stream<T>返回一个包含此流前 n 个元素的新流。
+skip(n: long): Stream<T>返回一个在丢弃前 n 个元素后剩余元素的新流。
+sorted(): Stream<T>返回一个按自然顺序排序的元素的新流。
+sorted(comparator: Comparator<? super T>): Stream<T>返回一个使用指定比较器排序的元素的新流。
+map(mapper: Function<? super T, ? extends R>): Stream<R>返回一个将函数应用于此流中每个元素的结果的新流。
+mapToInt(mapper: ToIntFunction<? super T>): IntStream返回一个将函数应用于此流中每个元素的结果的 IntStream
+mapToLong(mapper: ToLongFunction<? super T>): LongStream返回一个将函数应用于此流中每个元素的结果的 LongStream
+mapToDouble(mapper: ToDoubleFunction<? super T>): DoubleStream返回一个将函数应用于此流中每个元素的结果的 DoubleStream

终止操作 (Terminal operations)

这些方法对流进行最终处理,通常会消耗流中的所有元素。

方法描述
+count(): long返回此流中元素的数量。
+max(c: Comparator<? super T>): Optional<T>返回基于比较器的最大元素。
+min(c: Comparator<? super T>): Optional<T>返回基于比较器的最小元素。
+findFirst(): Optional<T>返回此流中的第一个元素。
+findAny(): Optional<T>返回此流中的任意一个元素。
+allMatch(p: Predicate<? super T>): boolean如果此流中的所有元素都满足谓词,则返回 true
+anyMatch(p: Predicate<? super T>): boolean如果此流中的某个元素满足谓词,则返回 true
+noneMatch(p: Predicate<? super T>): boolean如果此流中没有元素满足谓词,则返回 true
+forEach(action: Consumer<? super T>): void对此流中的每个元素执行操作。
+reduce(accumulator: BinaryOperator<T>): Optional<T>使用标识符和关联累积函数将流中的元素减少为一个值,并返回一个描述该值的 Optional
+reduce(identity: T, accumulator: BinaryOperator<T>): T使用标识符和关联累积函数将流中的元素减少为一个值,并返回该值。
+collect(collector: Collector<? super T, A, R>): R使用 Collector 对此流中的元素执行可变的归约操作。
+toArray(): Object[]返回一个包含此流中元素的数组。

静态方法 (Static methods)

这些方法用于创建流或对流进行静态操作。

方法描述
+empty(): Stream<T>返回一个空的顺序流(静态方法)。
+of(values: T...): Stream<T>返回一个包含指定值的流(静态方法)。
+of(values: T): Stream<T>返回一个包含单个值的流(静态方法)。
+concat(a1: Stream<? extends T>, a2: Stream<? extends T>): Stream<T>返回一个懒加载的流,包含 a1 的元素后跟 a2 的元素(静态方法)。

IntStream、LongStream 和 DoubleStream

除了 Stream 之外,Java 提供了 IntStream、LongStream 和Doublestream 来表示int、1ong 和double 值序列。这些流也是BaseStream的子接口,可以像 Stream一样使用这些流。此外,可以用 sum()、average() 和 summaryStatistics()方法来返回流中元素的和、平均值以及各种统计。可以使用 mapToInt 方法将 Stream转化为IntStream,或使用map 方法把包括 IntStream 在内的任何流变成 Stream。

并行流

所有的流都可以通过调用在BaseStream接口中定义的parallel()方法来转变为并行流。类似地,调用sequential()方法可以把并行流转变为顺序流。

中间方法可以进一步分为无状态方法(stateless method)和有状态方法(stateful method)。如 flter 和map 等无状态方法可以独立于流中其他元素来执行。如 distinct 和sorted 等有状态方法必须考虑整个流来执行。比如,distinct 方法必须考虑流中所有元素才能得到结果。无状态方法内部机制上是可以并行化的,并可以一次并行执行。有状态方法必须并行执行多次。

Info

有状态方法(如 distinct 和 sorted):需要收集和比较整个流的数据,因此无法一次性并行完成。它们可能需要多次并行执行或部分串行化处理。 例如:

  • distinct 需要检查所有元素以去重。
  • sorted 需要收集所有元素才能排序。

因此,有状态方法在并行流中的性能提升可能不如无状态方法显著。

Random random = new Random();
int[] list = random.ints(200_000_000).toArray();
System.out.println("处理器核心数: "+ Runtime.getRuntime().availableProcessors());
 
long startTime = System.currentTimeMillis();
int[] list1 = IntStream.of(list).filter(e->e>0).sorted().limit(5).toArray();
System.out.println(Arrays.toString(list1));
long endTime = System.currentTimeMillis();
System.out.println("顺序执行耗时: "+(endTime-startTime)+"ms");
 
startTime = System.currentTimeMillis();
int[] list2 = IntStream.of(list).parallel().filter(e->e>0).sorted().limit(5).toArray();
System.out.println(Arrays.toString(list2));
endTime = System.currentTimeMillis();
System.out.println("并行执行耗时: "+(endTime-startTime)+"ms");
处理器核心数: 10
[21, 25, 79, 81, 90]
顺序执行耗时: 9335ms
[21, 25, 79, 81, 90]
并行执行耗时: 1693ms
  1. 中间方法是惰性的,它在启动终止方法时执行。
  2. 流管道中的中间方法的顺序有影响。比如将 filter 放在 sorted 前可以去掉约一半的排序元素。
  3. 并行流是否总是更快?不一定。并行执行需要同步,这将带来一定的开销。
  4. 当并行执行流方法时,流中的元素可能以任意顺序处理。

reduce

归约(reduction)通过重复应用二元运算,例如加法、乘法,或在两个元素之间找到最大值,读取流中的元素并生成一个单一值。

reduce 方法读人两个参数。第一个是一个标识,即初始值。第二个参数是函数接口 IntBinaryOperator 的一个对象。该接口包含一个应用二元运算后返回int 值的抽象方法 applyAsInt(int e1,int e2)

int sum = s.parallelStream().reduce (0, (e1, e2) -> e1 + e2);
int result = s.parallelStream().reduce(Integer.MIN_VALUE,(e1, e2) -> Math.max(e1, e2));

collect

collect方法把流中的元素收集在一个可变容器中,该方法传入三个函数式参数:一个用来构造结果容器的新实例的供应者函数;一个将来自流的元素并入结果容器的累加器函数;以及一个将结果容器的内容合并到另一个结果容器中的合并函数。

例如,要把字符串结合到 StringBuilder 中,可以编写如下使用 collect 方法的代码:

String[] names = {"aa", "bb", "cc", "dd", "ee" };
//StringBuilder sb = Stream.of(names).collect(() -> new StringBuilder(), (c, e) -> c.append(e), (c1, c2) -> c1.append(c2));
StringBuilder sb = Stream.of(names).collect(StringBuilder::new, StringBuilder::append, StringBuilder::append);
System.out.println(sb);

Wwarning

  1. 使用方法引用简化代码
  2. 在顺序实现中没有使用合并器函数(c1, c2) -> c1.append(c2)。它在流管道并行执行时使用。当并行地执行 collect 方法时,创建了多个 StringBuilder结果,之后再用合并器函数合并。所以,合并器函数的目的是进行并行处理

foreach循环实现如下:

StringBuilder sb = new StringBuilder();
for(String s : names){
	sb.append(s);
}
  • 供应器函数是ArrayList的构造方法。累加器是add方法,它将一个元素添加到ArrayList 中。合并器函数将一个 ArrayList 合并到另一个 ArrayList 中。这三个参数——供应器、累加器、合并器——是紧密耦合的,并使用标准方法进行定义。为了简单起见, Java 提供了另一种 collect 方法,它读入 Collector 类型的参数,称收集器 (collector)。
  • Collector 接口定义了返回供应器、累加器以及合并器的方法。可以使用Collector 类中的静态工厂方法 toList()来创建Collector接口的实例。所以,之前的语句可以用标准收集器做如下简化:
List<String> list = Stream.of(names).collect(Collectors.toList());

待理解

groupingBy

String[] names ={'John”,"Peter","Susan", "Kim", "Jen", "George", "Stacy" "Steve","john"}; 
Map<Character, Long> map = Stream.of (names).collect ( Collectors.groupingBy (e -> e.charAt (0),Collectors.counting()));

groupingBy 方法中的第一个参数指定了分组的标准,称为分类器。第二个参数指定了组中元素的处理方式,称为组处理器。处理器通常是汇总收集器,比如 counting()。使用 groupingBy 收集器、collect方法返回以分类器为键的映射。也可以在groupingBy 方法中指定供应器:

// 使用了树映射来存储映射的条目
Map<Character,Long> map = Stream.of(names).collect ( Collectors.groupingBy(e -> e.charAt (0), TreeMap::new, Collectors.counting ()));