(2023年3月公司内部培训)
-
Stream 的定义
Stream
将要处理的元素集合看作一种流,在流的过程中,借助Stream API
对流中的元素进行操作,比如:筛选、排序、聚合等。
-
对流的操作
Stream可以由数组或集合创建,对流的操作分为两种:中间操作和结束操作,每种又分别有两种子类:
Stream操作分类 | ||
中间操作(Intermediate operations) | 无状态(Stateless) | unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek() |
有状态(Stateful) | distinct() sorted() limit() skip() | |
结束操作(Terminal operations) | 非短路操作 | forEach() forEachOrdered() toArray() reduce() collect() max() min() count() |
短路操作(short-circuiting) | anyMatch() allMatch() noneMatch() findFirst() findAny() |
做成图如下:
中间操作和结束操作这两种,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(Stateless)和有状态的(Stateful),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如找到第一个满足条件的元素。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。 为了更好的理解流的中间操作和终端操作,可以通过下面的两段代码来看他们的执行过程。
java">IntStream.range(1, 10).peek(x -> System.out.print("\nA" + x)).limit(3).peek(x -> System.out.print("B" + x)).forEach(x -> System.out.print("C" + x));输出结果如下:
A1B1C1
A2B2C2
A3B3C3
中间操作是懒惰的,也就是中间操作不会对数据做任何操作,直到遇到了最终操作。而最终操作,都是比较热情的。他们会往前回溯所有的中间操作。也就是当执行到最后的forEach操作的时候,它会回溯到它的上一步中间操作,上一步中间操作,又会回溯到上上一步的中间操作,...,直到最初的第一步。
第一次forEach执行的时候,开始自上向下开始执行,输出:A1B1C1
第二次forEach执行的时候,继续开始自上向下开始执行,输出:A2B2C2
... 当第四次forEach执行的时候,发现limit(3)这个job已经完成,这里就相当于循环里面的break操作,跳出来终止循环。
再来看第二段代码:
java">IntStream.range(1, 10).peek(x -> System.out.print("\nA" + x)).skip(6).peek(x -> System.out.print("B" + x)).forEach(x -> System.out.print("C" + x));输出:
A1
A2
A3
A4
A5
A6
A7B7C7
A8B8C8
A9B9C9
输出为: A1 A2 A3 A4 A5 A6 A7B7C7 A8B8C8 A9B9C9 第一次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,因为执行到skip,这个操作的意思就是跳过,下面的都不要执行了,也就是就相当于循环里面的continue,结束本次循环。输出:A1
第二次forEach执行的时候,继续自上向下开始执行,执行到skip的时候,发现这是第二次skip,结束本次循环。输出:A2
...
第七次forEach执行的时候,继续自上向下开始执行,执行到skip的时候,发现这是第七次skip,已经大于6了,它已经执行完了skip(6)的job了。这次skip就直接跳过,继续执行下面的操作。输出:A7B7C7
...直到循环结束。
另外,Stream
有几个特性:
-
stream不存储数据,而是按照特定的规则对数据进行计算,一般会输出结果。
-
stream不会改变数据源,通常情况下会产生一个新的集合或一个值。
-
stream具有延迟执行特性,只有调用终端操作时,中间操作才会执行。
可以理解为:一次性流。
-
撸一个简易的顺序流
如果直接看源码,非常难以理解,代码中包含很多并行相关的代码,因此我们只有理解的顺序流,才有可能进一步深入了解他的并行流,因此可以试下自己撸一个并行流来理解其基本原理。这里尝试解释什么是sink,以及stream接口的基本原理。
-
sink
假设我们有一个list,想对它的每个元素做以下操作:
-
映射成为另一个元素,
-
过滤,
-
去重并装到另一个集合中。
总共有3步,传统写法应该怎么做呢?
java">public class OldStyle {public static void main(String[] args) {List<Integer> list = new ArrayList<>();// 添加1到10for (int i = 1; i <= 10; i++) {list.add(i);}// 每个元素都加1并添加到另一个集合List<Integer> list2 = new ArrayList<>();for (Integer i : list) {list2.add(i + 1);}// 过滤只保留偶数并添加到新的listList<Integer> list3 = new ArrayList<>();for (Integer i : list2) {if (i % 2 == 0) {list3.add(i);}}// 去重并添加到到新的listList<Integer> list4 = new ArrayList<>();for (Integer i : list3) {if (!list4.contains(i)) {list4.add(i);}}list4.forEach(System.out::println);}
}
但是为了提高可读性,我们将这代码片段提取成为方法,
java">public class OldStyle2 {public static void main(String[] args) {List<Integer> list = new ArrayList<>();// 添加1到10for (int i = 1; i <= 10; i++) {list.add(i);}// 每个元素都加1并添加到另一个集合List<Integer> list2 = map(list);// 过滤只保留偶数并添加到新的listList<Integer> list3 = filter(list2);// 去重并添加到到新的listList<Integer> list4 = distict(list3);list4.forEach(System.out::println);}private static List<Integer> distict(List<Integer> list3) {List<Integer> list4 = new ArrayList<>();for (Integer i : list3) {if (!list4.contains(i)) {list4.add(i);}}return list4;}private static List<Integer> filter(List<Integer> list2) {List<Integer> list3 = new ArrayList<>();for (Integer i : list2) {if (i % 2 == 0) {list3.add(i);}}return list3;}private static List<Integer> map(List<Integer> list) {List<Integer> list2 = new ArrayList<>();for (Integer i : list) {list2.add(i + 1);}return list2;}
}
然后,为了让这些方法其他适用其他场景,比如映射的时候是+2二不是加1,过滤的时候保留的是奇数而不是偶数,于是引入了函数式接口,突然就高大上起来了。
java">public class OldStyle3 {public static void main(String[] args) {List<Integer> list = new ArrayList<>();// 添加1到10for (int i = 1; i <= 10; i++) {list.add(i);}// 每个元素都加1并添加到另一个集合List<Integer> list2 = map(list, i -> i + 1);// 过滤只保留偶数并添加到新的listList<Integer> list3 = filter(list2, i -> i % 2 == 0);// 去重并添加到到新的listList<Integer> list4 = distict(list3);list4.forEach(System.out::println);}private static List<Integer> distict(List<Integer> list3) {List<Integer> list4 = new ArrayList<>();for (Integer i : list3) {if (!list4.contains(i)) {list4.add(i);}}return list4;}private static List<Integer> filter(List<Integer> list2, Predicate<Integer> predicate) {List<Integer> list3 = new ArrayList<>();for (Integer i : list2) {if (predicate.test(i)) {list3.add(i);}}return list3;}private static List<Integer> map(List<Integer> list, Function<Integer, Integer> function) {List<Integer> list2 = new ArrayList<>();for (Integer i : list) {list2.add(function.apply(i));}return list2;}
}
但是此时你仍然不满足,就琢磨怎么将这几个操作做进一步的抽象。
刚好你家是开餐具洗涤公司的,你去观察洗碗流水线,每个洗碗流水线都有不同洗涤池,承担不同的功能,比如先洗掉大块杂质的池子,去除油污的池子,去除洗涤剂的池子等,这时候你来了灵感:
-
集合中的每个元素,就像一个个碗一样
-
需要进行不同处理流程的时候,就像脏碗经过不同的洗涤池
-
没必要一次性将所有的碗全部倒入池子,可以通过遍历依次通过不同的池子,在最后收集即可
于是你将以上的map,filter,distict行为过程,模拟洗碗流水线池子(sink),抽象成为了一个接口sink,其继承了Consumer<T>接口。
java">public interface Consumer<T> {void accept(T t);}public interface Sink<T> extends Consumer<T> {
}
在洗碗流水线里面,每个池子都是连接在一起的,方便从上一个池子来的碗交给下一个池子,为此你也作出了相应的设计:每个池子都有下一个池子的引用。
java">public abstract class ChainSink<T> implements Sink<T> {protected Sink downstream;
}
接下来看实现
java">public class MapSink extends ChainSink<Integer>{private Function<Integer,Integer> mapper;public MapSink(Sink downstream, Function<Integer, Integer> mapper) {this.downstream = downstream;this.mapper = mapper;}@Overridepublic void accept(Integer o) {Integer apply = mapper.apply(o);downstream.accept(apply);}
}public class FilterSink extends ChainSink<Integer>{private Predicate<Integer> predicate;public FilterSink(Sink downstream, Predicate<Integer> predicate) {this.downstream = downstream;this.predicate = predicate;}@Overridepublic void accept(Integer o) {if (predicate.test(o)){downstream.accept(o);}}
}
public class DistinctSink extends ChainSink<Integer>{HashSet<Integer> set = new HashSet<>();public DistinctSink(Sink downstream) {this.downstream = downstream;}@Overridepublic void accept(Integer o) {if (set.contains(o)){} else {set.add(o);downstream.accept(o);}}
}public class EndSink extends ChainSink<Integer>{@Overridepublic void accept(Integer o) {System.out.println(o);}
}
使用示例如下:
java">public class Test {public static void main(String[] args) {List<Integer> objects = new ArrayList<>();objects.add(1);objects.add(1);objects.add(2);objects.add(3);objects.add(4);objects.add(4);EndSink endSink = new EndSink();DistinctSink distinctSink = new DistinctSink(endSink);FilterSink filterSink = new FilterSink(distinctSink, o -> o % 2 ==0);MapSink mapSink = new MapSink(filterSink, o -> o + 2);for (Integer object : objects) {mapSink.accept(object);}}
}
到这里,处理链条已经构建完毕,但是突然来一个需求
需要进行排序
这个时候你懵逼了,因为你的Sink只有一个方法accept(),接受上游源源不断进来的元素,经过处理后往后传,根本不知道什么时候开始,也不知道什么时候结束。而排序必须要等元素到齐了才能够排序,否则排个寂寞。
这时候你灵机一动,加了两个方法,其他照旧,这样就可以拓展出排序功能了。
java">public interface Sink<T> extends Consumer<T> {// 当begin被调用的时候,说明需要做好准备(比如初始化容器)default void begin(long size) {}// 说明已经结束default void end() {}
}public abstract class ChainSink<T> implements Sink<T> {protected Sink downstream;@Overridepublic void begin(long size) {downstream.begin(size);}@Overridepublic void end() {downstream.end();}
}
这里讲解一下排序的实现逻辑:
1、当上游的sink往SortSink传递之前,会调用begin(),这时候 SortSink 初始化了一个 ArrayList 用来收集数据。
2、当上游调用accept处理数据的时候,不做任何处理,将元素收集到ArrayList中。
3、当上游调用end的时候,说明上游已经全部处理完毕,这个时候就可以着手排序了。
4、排序结束后,在end()方法中遍历元素重新走一遍begin,accept,end逻辑。
这种必须先处理完所有元素才知道结果的,称作有状态的(stateful)的操作,否则为无状态(stateless)操作
java">public class SortSink extends ChainSink<Integer> {private Predicate<Integer> predicate;Comparator<Integer> comparator;public SortSink(Sink downstream, Comparator<Integer> comparator) {this.downstream = downstream;this.comparator = comparator;}private ArrayList<Integer> list;@Overridepublic void begin(long size) {list = (size >= 0) ? new ArrayList<Integer>((int) size) : new ArrayList<Integer>();}@Overridepublic void end() {list.sort(comparator);downstream.begin(list.size());for (Integer t : list) {downstream.accept(t);}downstream.end();list = null;}@Overridepublic void accept(Integer t) {list.add(t);}
}
至此,sink基本功能已经介绍完毕,灵活运用begin,accept,end可以实现各种不同的操作功能,Stream的核心逻辑都是在sink中完成的,因此理解sink对理解Stream尤为重要,而Stream的实现类是sink的构建者和储存者。
-
stream
前面sink虽然已经介绍完毕,但是使用起来并不方便,因此我们可以使用一个壳把他们包装起来,对外只暴露简单的方法即可拼装和调用。前面说过,sink处理过程就像一个流水线,因此我们把这个壳叫做stream。这时候你偷偷去看了一眼jdk Stream的源码,终于设计出了自己的Stream。
-
首先是定义操作流程
-
设计一个调用链条
-
生成sink链条
-
使用sink链条
java">// 顶级接口,规定了可以进行的操作
public interface StreamDemo {StreamDemo map (Function<Integer, Integer> map);StreamDemo filter (Predicate<Integer> predicate);StreamDemo distinct();StreamDemo sorted (Comparator<Integer> comparator);void end();// 传入数据,作为源节点static StreamDemo of(List<Integer> sourceData){return new PipelineDemo(sourceData);}
}// 管道,实现链表功能,以及如何去生成sink链条
public abstract class AbstractPipelineDemo implements StreamDemo{private final AbstractPipelineDemo sourceStage;private final AbstractPipelineDemo previousStage;private AbstractPipelineDemo nextStage;private List<Integer> sourceData;/*** 传入数据,说明是源节点* @param sourceData*/public AbstractPipelineDemo(List<Integer> sourceData) {this.sourceData = sourceData;this.previousStage = null;this.sourceStage = this;}/*** 传入上一个节点,说明是中间节点** @param previousStage*/public AbstractPipelineDemo(AbstractPipelineDemo previousStage) {previousStage.nextStage = this;this.previousStage = previousStage;this.sourceData = previousStage.sourceData;this.sourceStage = previousStage.sourceStage;}/*** 包装sink形成一个连链表结构,在最后一个节点调用* @param sink* @return*/final Sink<Integer> wrapSink(Sink<Integer> sink) {for (AbstractPipelineDemo p=AbstractPipelineDemo.this; p.previousStage!=null; p=p.previousStage) {sink = p.opWrapSink( sink);}return sink;}/*** 当前阶段如何去承接前一个sink传来的数据,由具体的操作类去实现* @param sink* @return*/abstract Sink<Integer> opWrapSink(Sink<Integer> sink);}public class PipelineDemo extends AbstractPipelineDemo{public PipelineDemo(List<Integer> sourceData) {super(sourceData);}public PipelineDemo(AbstractPipelineDemo previousStage) {super(previousStage);}@OverrideSink<Integer> opWrapSink(Sink<Integer> sink) {throw new UnsupportedOperationException();}@Overridepublic StreamDemo map(Function<Integer, Integer> map) {return new PipelineDemo(this) {@OverrideSink<Integer> opWrapSink(Sink<Integer> sink) {return new MapSink(sink, map);}};}@Overridepublic StreamDemo filter(Predicate<Integer> predicate) {return new PipelineDemo(this) {@OverrideSink<Integer> opWrapSink(Sink<Integer> sink) {return new FilterSink(sink, predicate);}};}@Overridepublic StreamDemo distinct() {return new PipelineDemo(this) {@OverrideSink<Integer> opWrapSink(Sink<Integer> sink) {return new DistinctSink(sink);}};}@Overridepublic StreamDemo sorted(Comparator<Integer> comparator) {return new PipelineDemo(this) {@OverrideSink<Integer> opWrapSink(Sink<Integer> sink) {return new SortSink(sink, comparator);}};}@Overridepublic void end() {Sink<Integer> sink = wrapSink(new EndSink());super.sourceData.forEach(sink::accept);}
}
至此你的简易版Stream已经构建完成,测试效果如下:
java">public class Test2 {public static void main(String[] args) {List<Integer> objects = new ArrayList<>();objects.add(1);objects.add(1);objects.add(2);objects.add(3);objects.add(4);objects.add(4);StreamDemo.of(objects).map(o -> o + 2).filter(o -> o % 2 == 0).distinct().end();}
}
其难点,在于如何理解Stream的双向链表以及sink的单向链表的构建。Stream链表是边执行边构建,但是sink是只有执行了end方法,才最终构建,并开始处理数据的。
-
Stream接口简介
接下来正式进入主题,首先介绍一些重要接口
-
重要的接口类
-
BaseStream
最顶端的接口类,定义了流的基本接口方法,最主要的方法为 spliterator、isParallel。
-
Stream
最顶端的接口类。定义了流的常用方法,例如 map、filter、sorted、limit、skip、collect 等。
-
ReferencePipeline
ReferencePipeline 是一个结构类,定义内部类组装了各种操作流,定义了Head、StatelessOp、StatefulOp三个内部类,实现了 BaseStream 与 Stream 的接口方法。
-
Sink
Sink 接口定义了 Stream 之间的操作行为,包含 begin()、end()、cancellationRequested()、accpt()四个方法。ReferencePipeline最终会将整个 Stream 流操作组装成一个调用链,而这条调用链上的各个 Stream 操作的上下关系就是通过 Sink 接口协议来定义实现的。
这里重点强调一下结束操作的sink,继承了 supplier 接口,啥意思呢?其实很简单,就是如果实现类有返回值,所有的最终结构都汇聚到了get()的返回值中,通过调用get()返回即可。
java">T get();
interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }
另外cancellationRequested()这个也很重要,在顺序流中决定是否需要继续遍历的开关作用。
实现主要有三种形式:
默认情况下是:
java">@Override
public boolean cancellationRequested() {return downstream.cancellationRequested();
}
如果当前是一个短路操作,比如findFirst。当拿到值了之后,cancellationRequested返回false,因此不再需要遍历。
java">@Override
public void accept(T value) {if (!hasValue) {hasValue = true;this.value = value;}
}@Override
public boolean cancellationRequested() {return hasValue;
}
如果当前是一个sorted 的sink,那他就截断了downstream.cancellationRequested()的链式调用,直接返回false。让所有的数据先流到他这里,做好排序了以后,再重新调用起来,downstream.cancellationRequested();。
java">@Override
public boolean cancellationRequested() {return false;
}@Override
public void end() {list.sort(comparator);downstream.begin(list.size());if (!cancellationWasRequested) {list.forEach(downstream::accept);} else {for (T t : list) {if (downstream.cancellationRequested()) break;downstream.accept(t);}}downstream.end();list = null;
}
Spliterator
拆分器,使用trySplit()方法可以对数据源进行对半拆分,一般底层数据容器为数组,每次拆分后生成的新的Spliterator对象,通过双指针来划定自己的数据范围。举个例子,比如数组A的长度为10。在进行拆分后,则生成两个Spliterator对象,分别持有0-4,5-9索引的数据。所有的数据源在Stream中都以Spliterator对象的形式进行包装后才能流转。
-
类图
首先看一下类的继承关系图:
源码中,以 Stage 来描述每个阶段,而且是典型的双向链表结构。因此 Stream 流过程可以用下图描述
流程图
实例分析
java">list.stream().filter(integer -> integer>5).sorted().max();
生成 Head 对象
第一步是 stream 方法,最终调用到下面的代码,可以看出,调用完 stream 方法后本质上生成了一个 Head 对象。这里需要注意,Head 是不会生成sink对象的,因此 Head 的职责只是封装了数据源。
java">public final class StreamSupport {public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {Objects.requireNonNull(spliterator);//生成 Headreturn new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator),parallel);}
}
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {AbstractPipeline(Spliterator<?> source,int sourceFlags, boolean parallel) {// 上游 Stage,Head 上游为 nullthis.previousStage = null;// 源分裂器,可以理解成高级版本的迭代器this.sourceSpliterator = source;// Head 指针,指向自己this.sourceStage = this;//是否是中间操作的标志this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;// The following is an optimization of://源以及所有操作的组合源的操作标志,包括此管道对象表示的操作。在评估管道准备时有效。this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;//此管道对象与流源(如果是顺序)之间的中间操作数,Head 的 depth 为 0this.depth = 0;//如果管道是并行的,则为真,否则管道是顺序的;仅对源阶段有效.this.parallel = parallel;}
}static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {// 省略@Overridefinal Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {throw new UnsupportedOperationException();}
}
生成 filter 对应的操作对象
第二步是 filter 方法,最终调用如下,创建了一个无状态的操作对象 StatelessOp,创建对象时会将 Head (this) 对象传入,构建双向链表关系,同时也会记录 sourceStage (Head) 。至此,链式关系开始形成。
这里可以看到,用了两个匿名内部类:
-
StatelessOp:说明是一个无状态的stage;
-
Sink.ChainedReference
java">@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);//这里的 this 是 Head 对象 ,最终会被当前 StatelessOp 对象作为 previousStage 记录,同时也会将 Head 对象的 nextStage 指向 StatelessOpreturn new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}@Overridepublic void accept(P_OUT u) {//过滤的实现if (predicate.test(u))downstream.accept(u);}};}};
}
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {if (previousStage.linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);previousStage.linkedOrConsumed = true;//将 Head 的 nextStage 指针指向当前对象,即封装了 filter 的 StatelessOp 对象previousStage.nextStage = this;//将 StatelessOp 对象的 previousStage 指向 Headthis.previousStage = previousStage;this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);//将 StatelessOp 对象的 sourceStage 指向 Headthis.sourceStage = previousStage.sourceStage;if (opIsStateful())sourceStage.sourceAnyStateful = true;this.depth = previousStage.depth + 1;
}
这里出现了 Sink 接口和 Sink 接口的实现类 ChainedReference ,Sink 接口是用来联系当前流和下一个流的协议接口,每个 AbstractPipeline 的具体子类都要实现 opWrapSink 方法返回一个 Sink 实例。
java">interface Sink<T> extends Consumer<T> {//开始方法,可以通知下游做些相关准备default void begin(long size) {}//结束方法,告诉下游已经完成所有元素的迭代操作default void end() {}//是否是短路操作default boolean cancellationRequested() {return false;}//Sink 接口的默认实现类static abstract class ChainedReference<T, E_OUT> implements Sink<T> {//此处的 downstream 意指下游的处理 sinkprotected final Sink<? super E_OUT> downstream;public ChainedReference(Sink<? super E_OUT> downstream) {this.downstream = Objects.requireNonNull(downstream);}@Override//默认直接调用下游的 begin 方法public void begin(long size) {downstream.begin(size);}@Override//默认直接调用下游的 end 方法public void end() {downstream.end();}@Override//返回是否短路public boolean cancellationRequested() {return downstream.cancellationRequested();}}
}
生成 sorted 对应的对象
第三步的 sorted 方法,sorted 返回的是 OfRef 类的一个实例,继承了 ReferencePipeline.StatefulOp,是一个有状态的操作
类似 filter ,同样也实现了 opWrapSink 方法,不过因为 sorted 方法实现的是排序功能,所以这里会确定比较器 comparator,最后由比较器实现排序逻辑。
java">@Override
public final Stream<P_OUT> sorted() {return SortedOps.makeRef(this);
}static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {return new OfRef<>(upstream);
}private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {OfRef(AbstractPipeline<?, T, ?> upstream) {//类似 filter 方法,确定上下游 stage 关系super(upstream, StreamShape.REFERENCE,StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);this.isNaturalSort = true;@SuppressWarnings("unchecked")//默认比较器Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();this.comparator = comp;}//...@Overridepublic Sink<T> opWrapSink(int flags, Sink<T> sink) {Objects.requireNonNull(sink);if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)return sink;else if (StreamOpFlag.SIZED.isKnown(flags))return new SizedRefSortingSink<>(sink, comparator);else//返回的是 RefSortingSink 实例return new RefSortingSink<>(sink, comparator);}
}private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {private ArrayList<T> list;RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {super(sink, comparator);}@Override//开始操作,初始化一个 list,用于接收上游流过来的元素,进行排序public void begin(long size) {if (size >= Nodes.MAX_ARRAY_SIZE)throw new IllegalArgumentException(Nodes.BAD_SIZE);list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();}@Override//实际的排序操作public void end() {list.sort(comparator);downstream.begin(list.size());//终结操作不是短路操作,遍历if (!cancellationWasRequested) {list.forEach(downstream::accept);}//终结操作是短路操作,找到满足的元素else {for (T t : list) {if (downstream.cancellationRequested()) break;downstream.accept(t);}}//结束downstream.end();list = null;}@Override//添加元素到初始化好的 list 中public void accept(T t) {list.add(t);}
}
拨动齿轮
至此为止,我们已经构建好了双向链表的操作,每个操作都被保存到一个个 StatelessOp 或者 StatefulOp 对象中,但在之前的操作中,我们封装好的 Sink 对象并没有实际调用,这也是为什么 Stream 流如果不进行终结操作之前的中间操作都不会触发的原因,万事俱备,只欠东风。东风就是终结操作。最后为 max(Comparator.naturalOrder()),是终结操作,会生成一个最终的 Stage,通过这个 Stage 触发之前的中间操作,从最后一个Stage开始,递归产生一个Sink链。
1)java.util.stream.ReferencePipeline#max
java">@Override
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {return reduce(BinaryOperator.maxBy(comparator));
}
2)最终调用到 java.util.stream.AbstractPipeline#wrapSink,这个方法会调用 opWrapSink 生成一个 Sink 链表,对应到本文的例子,就是 filter 和 map 操作。
java">@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {sink = p.opWrapSink(p.previousStage.combinedFlags, sink);}return (Sink<P_IN>) sink;
}
3)wrapAndCopyInto 生成 Sink 链表后,会通过 copyInfo 方法执行 Sink 链表的具体操作。
java">@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();}else {copyIntoWithCancel(wrappedSink, spliterator);}
}
4)java.util.Spliterators.ArraySpliterator#forEachRemaining
java">@Override
public void forEachRemaining(Consumer<? super T> action) {Object[] a; int i, hi; // hoist accesses and checks from loopif (action == null)throw new NullPointerException();if ((a = array).length >= (hi = fence) &&(i = index) >= 0 && i < (index = hi)) {do { action.accept((T)a[i]); } while (++i < hi);}
}
用面向对象的思维去理解 Stream 流
类比实际生活,我们可以将 Stream 流比作水流,中间操作则是相当于水流过程中的蓄水池,Sink则是每个蓄水池的操作者,终结操作则是下达指令的指挥官。此例中的操作可如下图:
并行流
既可以按照顺序方式执行流,也可以使用并行的方式执行流是Stream的一个大的特色。这一块包含三个内容
-
Spliterator的拆分原理
-
Stream结束操作的原理
-
Stream如何使用ForJoinPool执行并行操作
-
Spliterator
这个举一个IntArraySpliterator作为例子查看原理。这里需要特别注意,老外特别喜欢在你不注意的地方赋值,比如判断条件等,因此要仔细看。这里需要注意的点:
-
每个拆分出来的子Spliterator都持有完整的数组数据,但是双指针保证当前只能操作指定范围的数据
-
只提供了两个操作,一个是遍历所有的数据,二是只获取一条数据,无论哪一个操作,指针都会发生变化,换句话说一个元素只能被获取一次
java">static final class IntArraySpliterator implements Spliterator.OfInt {private final int[] array;private int index; // current index, modified on advance/splitprivate final int fence; // one past last indexprivate final int characteristics;public IntArraySpliterator(int[] array, int additionalCharacteristics) {this(array, 0, array.length, additionalCharacteristics);}public IntArraySpliterator(int[] array, int origin, int fence, int additionalCharacteristics) {this.array = array;this.index = origin;this.fence = fence;this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;}@Overridepublic OfInt trySplit() {int lo = index, mid = (lo + fence) >>> 1;return (lo >= mid)? null: new IntArraySpliterator(array, lo, index = mid, characteristics);}// index++ @Overridepublic boolean tryAdvance(IntConsumer action) {if (action == null)throw new NullPointerException();if (index >= 0 && index < fence) {action.accept(array[index++]);return true;}return false;}// index = hi@Overridepublic void forEachRemaining(IntConsumer action) {int[] a; int i, hi; // hoist accesses and checks from loopif (action == null)throw new NullPointerException();if ((a = array).length >= (hi = fence) &&(i = index) >= 0 && i < (index = hi)) {do { action.accept(a[i]); } while (++i < hi);}}}
Stream结束操作的原理
在前面介绍过,对流的操作包括中间操作和结束操作,无论是哪种,都会生成一个sink。只是结束操作的sink是最后一个,没有下游了,其执行收集或者遍历,都是围绕这个sink执行的。我们再看下这个图。
举一个例子,findFist操作
java">
@Override
public final Optional<P_OUT> findFirst() {return evaluate(FindOps.makeRef(true));
}
// 所有的终止操作都需要执行这个方法,根据条件判断是走顺序流还是并行流。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}@Override
public <S> O evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator) {O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();return result != null ? result : emptyValue;
}
// 最终走到了这个方法
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);// 如果不包含短路操作,不包含短路操作,因此直接一条龙遍历所有即可if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();}else {// 如果是有短路的操作,就走这个流程,只要发生了短路(找到了需要的值,立即返回)copyIntoWithCancel(wrappedSink, spliterator);}
}final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {@SuppressWarnings({"rawtypes","unchecked"})AbstractPipeline p = AbstractPipeline.this;// 有短路的操作,必须往前找到头结点或者第一个while (p.depth > 0) {p = p.previousStage;}wrappedSink.begin(spliterator.getExactSizeIfKnown());p.forEachWithCancel(spliterator, wrappedSink);wrappedSink.end();
}@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(),Optional::isPresent, FindSink.OfRef::new);
}
FindOps.makeRef(true) 返回的是FindOp对象,传入了 FindSink.OfRef::new,是 FindSink 的子类。在accept中接收一个结果即可。然后使用get() 即可返回结果
java">static final class OfRef<T> extends FindSink<T, Optional<T>> {@Overridepublic Optional<T> get() {return hasValue ? Optional.of(value) : null;}
}private static abstract class FindSink<T, O> implements TerminalSink<T, O> {boolean hasValue;T value;FindSink() {} // Avoid creation of special accessor@Overridepublic void accept(T value) {if (!hasValue) {hasValue = true;this.value = value;}}@Overridepublic boolean cancellationRequested() {return hasValue;}
}
所有的终止的 sink 都实现了 TerminalSink。比如:
ForEachOp(同时实现了TerminalOp和 TerminalSink):这个很简单,直接使用Consumer接收每个传过来的元素即可。
java">static final class OfRef<T> extends ForEachOp<T> {final Consumer<? super T> consumer;@Overridepublic void accept(T t) {consumer.accept(t);}
}
Stream如何使用ForkJoinPool执行并行操作
并行操作是在 terminalOp.evaluateParallel()方法中执行的。FindOp,ForEachOp,MatchOp,ReduceOp这四个TerminalOp的实现稍有不同。
我们常用的collect是属于ReduceTask,因此来看这个
java">final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
ReduceTask复用的是父类 AbstractTask 的compute方法,是 CountedCompleter 的子类
java">@Override
public void compute() {Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators// 当前分片的长度long sizeEstimate = rs.estimateSize();// 最小的可以达到的每片的数量,一般为 sizeEstimate/(CUP核心数量-1)*4,最小为1long sizeThreshold = getTargetSize(sizeEstimate);boolean forkRight = false;@SuppressWarnings("unchecked") K task = (K) this;while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {// 如果可以继续拆分,就继续拆,每次拆分都生成新的ls ,同时rs也被进一步缩小范围K leftChild, rightChild, taskToFork;task.leftChild = leftChild = task.makeChild(ls);task.rightChild = rightChild = task.makeChild(rs);task.setPendingCount(1);if (forkRight) {forkRight = false;rs = ls;task = leftChild;taskToFork = rightChild;} else {// 第一个循环,到这个方法,forkRight = true;task = rightChild;taskToFork = leftChild;}taskToFork.fork();sizeEstimate = rs.estimateSize();}task.setLocalResult(task.doLeaf());task.tryComplete();
}@Override
protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {return new ReduceTask<>(this, spliterator);
}
// 走顺序流流程
@Override
protected S doLeaf() {return helper.wrapAndCopyInto(op.makeSink(), spliterator);
}
以上逻辑可以总结如下:
-
每次Spliterator被拆分的时候,都拆成左右两部分。
-
先是左边部分fork,进入到另一个线程执行,然后右边部分直接在当前线程继续拆分。
-
右边部分拆成两部分后,则是右边的右边fork,右边的左边部分继续在当前线程拆分。
-
重复以上两个步骤,直到所有的子任务都不能拆分为止。
交替拆分的目的是为了充分使用当前线程来执行任务,而不是拆分成两部分后提交给线程池,而自己在等待中。
在子任务不能继续拆分的时候,才会执行以下两行代码,其中第一行是执行顺序流,返回当前任务的结果。第二行则是进行任务的汇总。
java"> task.setLocalResult(task.doLeaf());task.tryComplete();
任务的汇总,核心在onCompletion()方法如何实现:
java">// 任务集合汇总
public final void tryComplete() {CountedCompleter<?> a = this, s = a;for (int c;;) {if ((c = a.pending) == 0) {// 如果是父任务,将会去收集子类的结果a.onCompletion(s);if ((a = (s = a).completer) == null) {// 如果没有付任务了,将会s.quietlyComplete();return;}}else if (U.compareAndSwapInt(a, PENDING, c, c - 1))// 当前任务还没有结束,直接返回return;}
}
当前任务结束的时候,如果是叶子任务,不做任务处理,如果是父任务,则去合并的子任务,这样一层层向上返回,直到回到最顶层的父任务,所有的结果都已经汇总到了setLocalResult()中,返回即可。
java">@Override
public void onCompletion(CountedCompleter<?> caller) {if (!isLeaf()) {S leftResult = leftChild.getLocalResult();leftResult.combine(rightChild.getLocalResult());setLocalResult(leftResult);}// GC spliterator, left and right childsuper.onCompletion(caller);
}
// 父类的 onCompletion 将左右与当前的 spliterator 置空。
@Override
public void onCompletion(CountedCompleter<?> caller) {spliterator = null;leftChild = rightChild = null;
}
-
短路又是如何进行操作的呢
短路进行任务拆分有所不同,父类为 AbstractShortCircuitTask,可以看到每次拆分之前,都会查看是否已经拿到需要的值,如果有,则不会继续拆分了。这个值是sharedResult,所有子任务共享的。
java">// 注意这个值,会从父任务一直往下传到子任务,共享的。
protected final AtomicReference<R> sharedResult;protected AbstractShortCircuitTask(K parent,Spliterator<P_IN> spliterator) {super(parent, spliterator);sharedResult = parent.sharedResult;
}@Override
public void compute() {Spliterator<P_IN> rs = spliterator, ls;long sizeEstimate = rs.estimateSize();long sizeThreshold = getTargetSize(sizeEstimate);boolean forkRight = false;@SuppressWarnings("unchecked") K task = (K) this;AtomicReference<R> sr = sharedResult;R result;// 首先要检查是否已经拿到结果了,或者已经被取消while ((result = sr.get()) == null) {if (task.taskCanceled()) {result = task.getEmptyResult();break;}if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {// 拆到足够小了之后,执行doLeaf()方法立马返回result = task.doLeaf();break;}K leftChild, rightChild, taskToFork;task.leftChild = leftChild = task.makeChild(ls);task.rightChild = rightChild = task.makeChild(rs);task.setPendingCount(1);if (forkRight) {forkRight = false;rs = ls;task = leftChild;taskToFork = rightChild;}else {forkRight = true;task = rightChild;taskToFork = leftChild;}taskToFork.fork();sizeEstimate = rs.estimateSize();}task.setLocalResult(result);task.tryComplete();
}@Override
protected O doLeaf() {O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();if (!op.mustFindFirst) {if (result != null)// 如果不是必须要找第一个,则设置 sharedResult ,不要继续拆了。shortCircuit(result);return null;}else {if (result != null) {// 查看是否是最左边的一个节点foundResult(result);return result;}elsereturn null;}
}private void foundResult(O answer) {if (isLeftmostNode())shortCircuit(answer);elsecancelLaterNodes();
}@Override
public void onCompletion(CountedCompleter<?> caller) {if (op.mustFindFirst) {for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p;p = child, child = rightChild) {O result = child.getLocalResult();if (result != null && op.presentPredicate.test(result)) {setLocalResult(result);foundResult(result);break;}}}super.onCompletion(caller);
}
-
如果中间操作有状态,又如何处理
有状态的中间操作包括: distinct() sorted() limit() skip(),所谓的有状态,针对并行流而言,对顺序流这个状态没有影响。在并行流中,由于有多个线程同步执行,因此需要处理。其核心在sourceSpliterator(int terminalFlags)这个方法。从注释上看
-
如果是顺序流或者不包含有状态操作的并行流,则直接返回源数据的 Spliterator。
-
否则对于状态操作的并行流,则描述了所有的计算结果,并包括了最近的一个有状态的操作。
以distinct为例子
java">public class ParallelDistinctTest {public static void main(String[] args) {Arrays.asList(1, 2, 3, 4, 5, 5, 5, 5).parallelStream().peek(e-> System.out.print("A"+e+" ")).peek(e-> System.out.print("B"+e+" ")).distinct().peek(e-> System.out.print("C"+e+" ")).forEach(e-> System.out.print("D"+e+" "));}
}
输出结果如下:
A4 A5 B5 A5 B5 A5 B5 A1 B1 A3 A5 B5 A2 B2 B3 B4 C3 D3 C5 D5 C4 D4 C2 D2 C1 D1
-
第一部分是无序的,有重复
-
第二部分是也是无序的,无重复
以下为包含distinct状态操作的一个流程。首先,最终操作执行之前,会去拿到 spliterator,使用的是sourceSpliterator这个方法,这个方法的功能如下:
-
假设是顺序流,则直接获取源头的spliterator。
-
如果是并行流,但是中间没有有状态的操作,并行不影响最终结果,因此也是获取源头的spliterator。
-
但是如果是并行流且包括有状态的操作,比如distinct,则先分段执行并行流,在有状态操作处收拢并行结果并进行处理,然后再执行下一段,直到最后一段的结尾为终止操作为止。
图示如下:
代码如下:
java">
@Override
public void forEach(Consumer<? super P_OUT> action) {evaluate(ForEachOps.makeRef(action, false));
}final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}// 省略一些状态赋值代码
@SuppressWarnings("unchecked")
private Spliterator<?> sourceSpliterator(int terminalFlags) {Spliterator<?> spliterator = null;// 先按照非并行流获取源 Spliteratorif (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;}else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;}else {throw new IllegalStateException(MSG_CONSUMED);}if (isParallel() && sourceStage.sourceAnyStateful) {int depth = 1;// 从头开始遍历管道。u为头部结算,p为u的下一个,e为当前,也是就是终止操作for ( AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;if (p.opIsStateful()) {// 下一个操作是distinct操作,是有状态的。depth = 0;// 调用当前阶段的 opEvaluateParallelLazy spliterator = p.opEvaluateParallelLazy(u, spliterator); }p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}return spliterator;
}
distinct终止操作DistinctOp对象,是由 DistinctOps.makeRef产生的。
-
先调用了 opEvaluateParallelLazy。
-
在调用了reduce()
-
在reduce方法中发现,新生成了一个 ReduceOps。使用的是LinkedHashSet去接受数据,这个是可以去重的。
-
而此时的helper是DistinctOp的上一个节点,而新生成的ReduceOps横叉了一脚,当成了一个临时的终止操作。
-
此时以ReduceOps为结束节点的链表,是无状态的,可以先计算结果。生成了新的 spliterator,当成了新的数据源。
java">final class DistinctOps {private DistinctOps() { }static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {@Override<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {// No-opreturn helper.wrapSpliterator(spliterator);}else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {// Not lazy, barrier required to preserve orderreturn reduce(helper, spliterator).spliterator();}else {// Lazyreturn new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));}}<P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {// If the stream is SORTED then it should also be ORDERED so the following will also// preserve the sort orderTerminalOp<T, LinkedHashSet<T>> reduceOp= ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,LinkedHashSet::addAll);return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));}};}
}
sortet又是如何操作的呢
先上一个demo:
java">
public class ParallelSortedTest {public static void main(String[] args) {Arrays.asList(1, 2, 3, 4, 5, 5, 5, 5).parallelStream().peek(e-> System.out.print("A"+e+" ")).peek(e-> System.out.print("B"+e+" ")).sorted().peek(e-> System.out.print("C"+e+" ")).forEach(e-> System.out.print("D"+e+" "));}
}
输出结果如下:
A5 B5 A1 B1 A5 B5 A4 A5 B5 A5 B5 A2 B2 A3 B3 B4 C5 D5 C3 D3 C2 D2 C4 D4 C1 D1 C5 D5 C5 D5 C5 D5
可以看到,输出结果分成两部分。
第一部分,先输出了AB,是无序的
第二部分,是输出了CD,也是无序的
java">@Override
public final Stream<P_OUT> sorted() {return SortedOps.makeRef(this);
}
SortedOps 使用的是AbstractPipeline,最终是走到实现类的 opEvaluateParallel。
java">
<P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,Spliterator<P_IN> spliterator) {return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
}
java">private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {@Overridepublic <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator,IntFunction<T[]> generator) {// If the input is already naturally sorted and this operation// naturally sorts then collect the outputif (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {return helper.evaluate(spliterator, false, generator);}else {// 稍有不同,先执行前面的操作// 获取所有的数据T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);// 获取结果后,使用 Arrays.parallelSort 进行排序// 将排序后的数据返回Arrays.parallelSort(flattenedData, comparator);return Nodes.node(flattenedData);}}
}
-
最后的总体流程图
参考:
[浅谈 jdk 中的 Stream 流使用及原理]列举了相关的接口以及有流程图
[Java 8 Stream流底层原理]列举了常用操作的sink代码,还有总的执行流程图,深得我心。