Gatherer、Collector 自定义 Stream 的中间操作、最终操作

embedded/2024/12/19 16:23:30/

这里写目录标题GathererCollector 自定义 Stream 的中间操作、最终操作

    • collect(Collector) 自定义终止操作
    • gather(Gatherer) 自定义中间操作
      • 泛型
      • Downstream
      • Integrator
      • Gatherer 的实例方法(4 + 1 个)
      • Gatherers.GathererImpl 与工厂方法 of、ofSequential
      • 示例
        • 实现 map(mapper) (无状态)
        • 实现 limit(maxSize) (有状态)
        • 实现滑动窗口 windowSliding(windowSize)

Stream 中存在两种操作:中间操作和终止操作,中间操作会返回另一个 Stream,比如 mapfilter,而终止操作可以返回最终结果(比如 countfindFirst)或其他的副作用(比如 forEach)。

Stream 出现之初就提供了 <R, A> R collect(Collector<? super T, A, R> collector); 方法和 Collector 接口来自定义终止操作。但直到 jdk22 才提供了自定义中间操作的方法 <R> Stream<R> gather(Gatherer<? super T, ?, R> gatherer) 和接口 Gatherer

collect 的参数是 Collector,返回值是 RCollectors 中定义了多个便捷的终止操作。

gather 的参数是 Gatherer,返回值是 Stream<R>Gatherers 中定义了多个便捷的中间操作。

Collector__10">collect(Collector) 自定义终止操作

泛型

<T>:元素的类型

<A>:中间状态的类型,通常作为实现细节隐藏,使用 ? 替代

<R>:最终结果的类型

java">public interface Collector<T, A, R> 

方法

Supplier<A> supplier():提供一个中间结果,类型为 A

BiConsumer<A, T> accumulator():将元素累积到中间结中

BinaryOperator<A> combiner():合并两个中间结果,并返回合并后的中间结果(只有并行流会用到,如果实现的 Collector 只想在串行流中昂使用,可以直接抛出 UnsupportedOperationException

Function<A, R> finisher():将中间结果 A 转换为最终结果 R

Set<Characteristics> characteristics():提供此 Collector 的一些特性,它们提供了一些 hint,用于提升流执行时的性能。Characteristics 是个枚举,有以下 3 种取值:

  • CONCURRENT:表明此 Collector 是并发的,多个线程可以对同一个中间结果调用 accumulator 方法(线程安全的)。如果 CollectorCONCURRENT 的,但不是 UNORDERED 的,则只有在 Stream 本身是 StreamOpFlag.NOT_ORDERED 才并发执行
  • UNORDERED:表明此 Collector 可能不会保留元素的输入顺序。(如果中间结果或和最终结果没有内部顺序,例如 Set,则可能是这样)
  • IDENTITY_FINISH:表明 finisher 函数是恒等函数,可以省略,中间结果可以强制转换为最终结果

示例

实现 toList

内部类实现

java">import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;public class C1 {public static void main(String[] args) {List<Integer> list = Stream.of(1, 2, 3).collect(toList());System.out.println(list);// 输出// [1, 2, 3]}// 元素类型 T// 中间结果类型 ? 由于中间结果属于内部实现细节,可以不对外暴露,所以使用 ?// 最终结果类型 List<T>static <T> Collector<T, ?, List<T>> toList() {return new ToList<>();}static class ToList<T>// 元素类型 T// 中间结果类型 List<T>// 最终结果类型 List<T>implements Collector<T, List<T>, List<T>> {// ArrayList 作为中间结果@Overridepublic Supplier<List<T>> supplier() {return ArrayList::new;}// 将元素 T 累积到中间结果 List<T> 中@Overridepublic BiConsumer<List<T>, T> accumulator() {return List::add;}// 合并两个中间结果@Overridepublic BinaryOperator<List<T>> combiner() {return (left, right) -> {left.addAll(right);return left;};}// 将中间结果转换为最终结果,因为中间结果与最终结果一致,所以直接使用 Function.identity()@Overridepublic Function<List<T>, List<T>> finisher() {return Function.identity();}// 表明 finisher 是恒等函数,中间结果可强转为最终结果@Overridepublic Set<Characteristics> characteristics() {return EnumSet.of(Characteristics.IDENTITY_FINISH);}}
}

CollectorsCollectorImpl__Collectorof_117">Collectors.CollectorImpl 与工厂方法 Collector.of()

Collector 的实现类为 Collectors.CollectorImpl,有两个构造方法

java">// 第一个:5 个参数
record CollectorImpl<T, A, R>(Supplier<A> supplier,BiConsumer<A, T> accumulator,BinaryOperator<A> combiner,Function<A, R> finisher,Set<Characteristics> characteristics) implements Collector<T, A, R> {// 第二个:4 个参数,finisher 默认为 castingIdentity()CollectorImpl(Supplier<A> supplier,BiConsumer<A, T> accumulator,BinaryOperator<A> combiner,Set<Characteristics> characteristics) {this(supplier, accumulator, combiner, castingIdentity(), characteristics);}
}// 默认的 finisher,是个恒等函数
@SuppressWarnings("unchecked")
private static <I, R> Function<I, R> castingIdentity() {return i -> (R) i;
}

Collector 提供了两个 staticof 方法用于创建 Collector,分别调用了上述两个构造方法:

java">static final Set<Collector.Characteristics> CH_ID= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));public static<T, R> Collector<T, R, R> of(Supplier<R> supplier,BiConsumer<R, T> accumulator,BinaryOperator<R> combiner,Characteristics... characteristics) {Objects.requireNonNull(supplier);Objects.requireNonNull(accumulator);Objects.requireNonNull(combiner);Objects.requireNonNull(characteristics);// 默认会添加 Collector.Characteristics.IDENTITY_FINISHSet<Characteristics> cs = (characteristics.length == 0)? Collectors.CH_ID: Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH,characteristics));return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);
}public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,BiConsumer<A, T> accumulator,BinaryOperator<A> combiner,Function<A, R> finisher,Characteristics... characteristics) {Objects.requireNonNull(supplier);Objects.requireNonNull(accumulator);Objects.requireNonNull(combiner);Objects.requireNonNull(finisher);Objects.requireNonNull(characteristics);Set<Characteristics> cs = Collectors.CH_NOID;if (characteristics.length > 0) {cs = EnumSet.noneOf(Characteristics.class);Collections.addAll(cs, characteristics);cs = Collections.unmodifiableSet(cs);}return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs);
}

实现 toList

java">import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Stream;public class C2 {public static void main(String[] args) {List<Integer> list = Stream.of(1, 2, 3).collect(toList());System.out.println(list);}static <T> Collector<T, ?, List<T>> toList() {return Collector.of(ArrayList::new,List::add,(left, right) -> {left.addAll(right);return left;},Function.identity());}
}// 输出
// [1, 2, 3]

Gatherer__221">gather(Gatherer) 自定义中间操作

Gatherer 可实现有状态(比如 distinct、sorted)或无状态(比如 map、filter)的中间操作,操作可以按顺序执行,也可以并行执行,如果提供了 combiner 函数的话。Gatherer 可以以一对一(比如 map、peek)、一对多(比如 flatMap)、多对一(比如 distinct)或多对多(比如 sorted)的方式转换元素。它们可以跟踪先前看到的元素以影响后续元素的转换(有状态的),它们可以短路以将无限流转换为有限流(比如 limit)。例如,一个 Gatherer 可以将一个输入元素转换为一个输出元素,直到某个条件变为真,此时它开始将一个输入元素转换为两个输出元素。

Stream 接口中声明的每个现有中间操作都可以通过 gather(Gatherer) 来实现。

泛型

<T>:输入元素的类型

<A>:中间状态的类型,通常作为实现细节隐藏,使用 ? 替代

<R>:输出元素的类型,即发送到 Stream 的下一个阶段的元素类型、Downstream 的泛型类型

java">public interface Gatherer<T, A, R> 

Downstream

Downstream 可以将元素发送到 Stream下一个阶段,可在 Integrator 和 finisher 中使用。

这里的泛型 T 指的是 Gatherer 的泛型 R,不要混淆。

java">interface Downstream<T> {// 将元素发送到 Stream 的下一个阶段boolean push(T element);// 检查是否下一个阶段不希望向其发送更多元素default boolean isRejecting() { return false; }
}

Integrator

Integrator 接收元素 T 并对其进行处理,可以有选择的使用中间状态 A,并可以有选择的使用 Downstream 向下一个阶段发送增量结果 R,可通过返回 false 来短路此 Gatherer

java">interface Integrator<A, T, R> {// 1 执行给定的操作:当前状态、下一个元素和下游对象;可能会检查和/或更新 State,可以选择向下游发送任意数量的元素 —— 然后返回是否要使用更多元素。// 2 处理元素 T element// 3 可以检查、更新中间状态 A state// 4 可以选择向 Downstream 发送任意数量(可不发送、可发送 1 个、可发送多个)的元素// 5 然后返回是否要处理更多元素 T element,返回 false 会短路此 Gatherer,丢弃未处理的元素//   比如实现 limit(n) 时,前几次返回 true,当到达 n 个时返回 false 来短路以丢弃未处理的元素boolean integrate(A state, T element, Downstream<? super R> downstream);// Integrator 的子接口,Greedy 会处理所有的元素,换句话说,Greedy 不会短路,该信息可用于优化 Stream 的执行// 需要处理所有元素的中间操作可以interface Greedy<A, T, R> extends Integrator<A, T, R> { }// 工厂方法,将 lambda 转换成 Integratorstatic <A, T, R> Integrator<A, T, R> of(Integrator<A, T, R> integrator) {return integrator;}// 工厂方法,将 lambda 转换成 Greedystatic <A, T, R> Greedy<A, T, R> ofGreedy(Greedy<A, T, R> greedy) {return greedy;}
}

Gatherer_4__1__286">Gatherer 的实例方法(4 + 1 个)

java">// 用于提供中间状态,比如实现滑动窗口时存储当前窗口的元素;实现 limit 时存储已经处理过几个元素了
default Supplier<A> initializer() {return defaultInitializer();
};// 用于提供 Integrator
Integrator<A, T, R> integrator();// 将两个中间状态合并成一个并返回
default BinaryOperator<A> combiner() {return defaultCombiner();
}// 1 处理完所有的元素之后调用,参数为中间状态 A 和 Downstream。
// 2 可以检查、更新中间状态 A state
// 3 可以选择向 Downstream 发送任意数量(可不发送、可发送 1 个、可发送多个)的元素
// 例如在实现 windowFixed(windowSize) 时(将元素分组到大小固定的 List 中),最后一个窗口的元素可能小于 windowSize 个,所以不会在 Integrator#integrate 方法中发送到 Stream 下一个阶段,此时需要在 finisher 中判断当前窗口是否有剩余的元素,如果有则发送到下一个阶段
default BiConsumer<A, Downstream<? super R>> finisher() {return defaultFinisher();
}// 组合两个 Gatherer,将 this 的输出作为 that 输入
default <RR> Gatherer<T, ?, RR> andThen(Gatherer<? super R, ?, ? extends RR> that) {Objects.requireNonNull(that);return Gatherers.Composite.of(this, that);
}

GatherersGathererImpl__ofofSequential_317">Gatherers.GathererImpl 与工厂方法 of、ofSequential

Gatherer 的默认实现为 Gatherers.GathererImpl,有两个构造方法

java">record GathererImpl<T, A, R>(@Override Supplier<A> initializer,@Override Integrator<A, T, R> integrator,@Override BinaryOperator<A> combiner,@Override BiConsumer<A, Downstream<? super R>> finisher) implements Gatherer<T, A, R> {static <T, A, R> GathererImpl<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher) {return new GathererImpl<>(Objects.requireNonNull(initializer,"initializer"),Objects.requireNonNull(integrator, "integrator"),Objects.requireNonNull(combiner, "combiner"),Objects.requireNonNull(finisher, "finisher"));}
}

3 个特殊用途的 initializer、combiner、finisher 方法:

java">// 返回默认的 initializer,使用此 initializer 的 Gatherer 被认为是无状态的
static <A> Supplier<A> defaultInitializer() {return Gatherers.Value.DEFAULT.initializer();
}// 返回默认的 combiner,使用此 combiner 的 Gatherer 只能串行执行,不能并行执行
static <A> BinaryOperator<A> defaultCombiner() {return Gatherers.Value.DEFAULT.combiner();
}// 返回默认的 finisher,此 finisher 此空的,不执行任何操作
static <A, R> BiConsumer<A, Downstream<? super R>> defaultFinisher() {return Gatherers.Value.DEFAULT.finisher();
}

工厂方法 of、ofSequential

java">// 创建串行的、无状态的,且 中间状态为 Void
static <T, R> Gatherer<T, Void, R> ofSequential(Integrator<Void, T, R> integrator);// 创建串行的、无状态的,且 中间状态为 Void
static <T, R> Gatherer<T, Void, R> ofSequential(Integrator<Void, T, R> integrator,BiConsumer<Void, Downstream<? super R>> finisher);// 创建串行的
static <T, A, R> Gatherer<T, A, R> ofSequential(Supplier<A> initializer,Integrator<A, T, R> integrator);// 创建串行的
static <T, A, R> Gatherer<T, A, R> ofSequential(Supplier<A> initializer,Integrator<A, T, R> integrator,BiConsumer<A, Downstream<? super R>> finisher);// 4 个参数
static <T, A, R> Gatherer<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher) {return new Gatherers.GathererImpl<>(Objects.requireNonNull(initializer),Objects.requireNonNull(integrator),Objects.requireNonNull(combiner),Objects.requireNonNull(finisher));
}// 创建无状态的,且 中间状态为 Void
static <T, R> Gatherer<T, Void, R> of(Integrator<Void, T, R> integrator);// 创建无状态的,且 中间状态为 Void
static <T, R> Gatherer<T, Void, R> of(Integrator<Void, T, R> integrator,BiConsumer<Void, Downstream<? super R>> finisher);

示例

实现 map(mapper) (无状态)
java">import java.util.function.Function;
import java.util.stream.Gatherer;
import java.util.stream.Stream;@SuppressWarnings("preview")
public class G1 {public static void main(String[] args) {Stream.of(1, 2, 3).gather(map(x -> x * x)).forEach(System.out::println);}// <R> Stream<R> map(Function<? super T, ? extends R> mapper);static <T, R> Gatherer<T, Void, R> map(Function<? super T, ? extends R> mapper) {// 单参数的 ofSequential 创建串行的、无状态的return Gatherer.ofSequential(// map 会处理所有元素,不会短路,所以用 ofGreedy 包装成 Greedy 以优化 Stream 执行Gatherer.Integrator.ofGreedy(// map 是无状态的,state 用 _ 代替(_, element, downstream) -> {R r = mapper.apply(element);return downstream.push(r);}));}
}// 输出
1
4
9
实现 limit(maxSize) (有状态)
java">import java.util.stream.Gatherer;
import java.util.stream.Stream;@SuppressWarnings("preview")
public class G2 {public static void main(String[] args) {Stream.of(1, 2, 3).gather(limit(1)).forEach(System.out::println);System.out.println();Stream.of(1, 2, 3).gather(limit(2)).forEach(System.out::println);System.out.println();Stream.of(1, 2, 3).gather(limit(5)).forEach(System.out::println);System.out.println();}// Stream<T> limit(long maxSize);static <T> Gatherer<T, ?, T> limit(long maxSize) {// State 存储已发送到下一个阶段的元素数量class State {long count;}// 串行的、有状态的return Gatherer.ofSequential(State::new,(state, element, downstream) -> {// 数量未达到 maxSize,便发送元素到下一个阶段if (state.count < maxSize && downstream.push(element)) {state.count++;return true;}return false;});}
}// 输出
11
21
2
3
实现滑动窗口 windowSliding(windowSize)

输入:1,2,3,4,5,6,7,8

windowSize = 2,结果 [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]

windowSize = 6,结果 [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]

java">import java.util.ArrayList;
import java.util.List;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;@SuppressWarnings("preview")
public class G3 {public static void main(String[] args) {List<List<Integer>> windows2 = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).gather(windowSliding(2)).toList();System.out.println(windows2);List<List<Integer>> windows6 =Stream.of(1, 2, 3, 4, 5, 6, 7, 8).gather(Gatherers.windowSliding(6)).toList();System.out.println(windows6);}static <T> Gatherer<T, ?, List<T>> windowSliding(int windowSize) {if (windowSize <= 0) {throw new IllegalArgumentException("'windowSize' must be greater than zero");}// state 维护当前窗口和是否是第一个窗口class State {final List<T> window = new ArrayList<>(windowSize);// 当 Stream 的元素数量小于 windowSize 时,// finisher 中判断是否是第一个窗口,如果是,则将元素发送到下一个阶段,否则不发送boolean firstWindow = true;}// 串行的、有状态的return Gatherer.ofSequential(State::new,// 不短路Gatherer.Integrator.ofGreedy((state, element, downstream) -> {// 添加元素到当前窗口state.window.add(element);// 若当前窗口满足大小就发送到下一个阶段if (state.window.size() == windowSize) {boolean result = downstream.push(new ArrayList<>(state.window));// 删除窗口最左边的元素state.window.removeFirst();state.firstWindow = false;return result;}return true;}),(state, downstream) -> {// firstWindow 为 true 说明 Stream 中的元素数量小于 windowSize 且从未向下一个阶段发送过元素// 需要将当前窗口的元素发送到下一个阶段if (state.firstWindow && !state.window.isEmpty() && !downstream.isRejecting()) {downstream.push(new ArrayList<>(state.window));state.window.clear();}});}
}// 输出
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
[[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]

http://www.ppmy.cn/embedded/147062.html

相关文章

vscode中同时运行两个python文件(不用安装插件)

如何在vscode中同时运行两个python文件呢&#xff1f;今天在工作中遇到了这个问题。 查了网上的方法是安装coder runner插件&#xff0c;后来发现自身就有这个功能。所以记录一下,方便后续查找: 这是我的第一个文件&#xff0c;点击右上角的运行旁边的小箭头&#xff0c;有一…

文件上传—阿里云OSS对象存储

目录 一、OSS简介 二、OSS基本使用 1. 注册账号 2. 基本配置 (1) 开通OSS (2) 创建存储空间 (3) 修改权限 (4) 配置完成&#xff0c;上传一张图片&#xff0c;检验是否成功。 (5) 创建AccessKey 三、Java项目集成OSS 1. 导入依赖 2. Result.java代码&#xff1a; …

如何将多张图片合并为一个pdf?多张图片合并成一个PDF文件的方法

如何将多张图片合并为一个pdf&#xff1f;当我们需要将多张图片合并为一个PDF文件时&#xff0c;通常是因为我们希望将这些图片整理成一个统一的文档&#xff0c;方便查看、分享或打印。无论是工作中需要提交的报告、学生们需要整理的作业&#xff0c;还是个人收藏的照片、旅行…

【Excel】单元格分列

目录 分列&#xff08;新手友好&#xff09; 1. 选中需要分列的单元格后&#xff0c;选择 【数据】选项卡下的【分列】功能。 2. 按照分列向导提示选择适合的分列方式。 3. 分好就是这个样子 智能分列&#xff08;进阶&#xff09; 高级分列 Tips&#xff1a; 新手推荐基…

java: 无效的目标发行版: 9或警告: 源发行版 9 需要目标发行版 9

idea启动Java项目报错: java: 无效的目标发行版: 9 警告: 源发行版 9 需要目标发行版 9 ReformAlertRulesController has been compiled by a more recent version of the Java Runtime (class file version 53.0), this version of the Java Runtime only recognizes clas…

C/S软件授权注册系统-轻量级WebApi服务器介绍

CS软件授权注册系统-WebApi服务器介绍 目录 WebApi服务器框架 WebApi服务器技术实现技术栈VS解决方案 Project项目说明 依赖包依赖程序集WebApi接口清单 管理员工具api接口&#xff08;Swagger OpenApi&#xff09;授权服务器api接口WebApplication 介绍 WebApi服务器框架 …

qt 类中的run线程

在Qt中&#xff0c;QThread类的run()方法是线程的执行入口&#xff0c;它是由QThread内部自动调用的&#xff0c;而不是用户直接调用。 详细解释&#xff1a; QThread类&#xff1a; QThread是Qt的线程类&#xff0c;提供了用于多线程操作的接口。我们可以创建QThread对象并将…

常耀斌:深度学习和大模型原理与实战(深度好文)

目录 机器学习 深度学习 Transformer大模型架构 人工神经元网络 卷积神经网络 深度学习是革命性的技术成果&#xff0c;有利推动了计算机视觉、自然语言处理、语音识别、强化学习和统计建模的快速发展。 深度学习在计算机视觉领域上&#xff0c;发展突飞猛进&#xff0c;…