Spring响应式高并发编程

ops/2024/10/20 19:05:49/

这里写目录标题

  • JDK基础
    • Lambda表达式
      • 双冒号语法
      • 函数式接口
    • StreamApi流式处理
        • 创建流对象
        • 中间操作
          • filter
        • 并行流
        • 终结操作
          • 收集操作
  • 阻塞式编程和响应式编程
  • JUC
    • flow
  • 响应式流
    • Flux流
    • Mono流
    • 事件回调
  • 缓冲区
  • 限流操作
  • 手动产生流
  • 自定义处理器
  • 多线程的协作与调度

JDK基础

响应式编程将大量使用JDK中有关Stream流,Lambda表达式等知识

Lambda表达式

只能代替有一个未实现的方法的接口(函数式接口)
可以直接代替函数式接口的实现类
具有上下文推导功能,可以根据返回值,参数个数,推断有实现的接口方法
简化了函数时接口的匿名实现类

在这里插入图片描述
语法:

  1. 参数列表不需要指定类型
  2. Lambda的类型是所实现的函数式接口的类型,所以只能用该类型的对象引用
java">		QueryUser queryUser = new QueryUser() {@Overridepublic String queryAll(String username, Integer age) 				   {return username + ":" + age;}};QueryUser queryUser1 = (username, age) -> {return username + ":" + age;};System.out.println(queryUser.queryAll("a", 2));System.out.println(queryUser1.queryAll("a", 1));

在这里插入图片描述

双冒号语法

可以通过全类名::方法名,去引用一个方法
至于传入方法的参数,将由jvm根据上下文的线索推断

函数式接口

JDK内置的函数式接口都位于java.util.function包下
JDK内置了一些函数式接口,我们只需要通过lambda表达式实现其抽象方法即可
@FunctionalInterface是检查接口,编译器将在编码阶段就能检查该接口是否为函数式接口

java">@FunctionalInterface
public interface Function<T, R> {
//使用入参,得到结果并返回R apply(T t);
}
java">@FunctionalInterface
public interface Consumer<T> {//接收入参,但不会返回结果void accept(T t);
}
java">@FunctionalInterface
public interface Supplier<T> {
//不用传入参数,会返回结果T get();
}
java">@FunctionalInterface
public interface Predicate<T> {boolean test(T t);
}

StreamApi流式处理

基于事件驱动模型,当事件发生时,将执行相应的回调函数
可以理解为流式操作就是回调操作,或者是懒操作

创建流对象

JDK提供了多个api均可以得到一个流对象
流对象:支持顺序和并行聚合操作的元素序列。
Stream接口中定义了一系列针对元素的操作
包括中间操作以及终结操作
元素序列都可以转成流对象

串行流:
所有的中间操作都在同一个线程内完成

java">ArrayList<Integer> views = new ArrayList<>();views.add(100);
views.add(10);
views.add(102);Stream<Integer> stream = views.stream();
java">int[] viewArr = {1, 2, 3, 4, 5};IntStream arrStream = Arrays.stream(viewArr);
java">Stream<Integer> stream1 = Stream.of(100, 2, 50, 4, 56);
Stream<Object> stream2 = Stream.builder().add(1).add(20).add(3).build();
Stream<Object> stream3 = Stream.concat(stream1, stream2);

并行流:
将流对象由分成多个流对象,并将这些流对象以及中间操作,分配到不同的线程上,由CPU并发执行

java">Stream<Integer> parallelStream = List.of(200, 1, 10, 300).parallelStream();
Stream<Integer> parallel = Stream.of(1, 2, 3, 4, 5).parallel();
java">public interface Stream<T> extends BaseStream<T, Stream<T>> {Stream<T> filter(Predicate<? super T> predicate);<R> Stream<R> map(Function<? super T, ? extends R> mapper);IntStream mapToInt(ToIntFunction<? super T> mapper);LongStream mapToLong(ToLongFunction<? super T> mapper);DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);void forEach(Consumer<? super T> action);T reduce(T identity, BinaryOperator<T> accumulator);Optional<T> findAny();Stream<T> build();}
}
中间操作

流是惰性的,只有指定了终止操作时,所有的中间操作才会别执行

filter

入参:传入一个断言对象
predicate函数式接口提供了一个,根据入参,运行逻辑表达式的方法
当入参,符合逻辑表达式时将返回真,否则,返回假
返回真的元素将被放入新流中,否则,将被丢弃

java">@FunctionalInterface
public interface Predicate<T> {/**根据入参执行逻辑表达式。参数: t – 输入参数 如果输入参数与断言表达式匹配,则返回: true,否则为 false*/boolean test(T t);
}
并行流
java">public static void main(String[] args) {Stream<Integer> parallelStream = List.of(200, 1, 10, 300).parallelStream();Stream<Integer> stream1 = parallelStream.filter((item) -> {System.out.println("stream1:"+Thread.currentThread().getName()+":"+item);return item % 2 == 0;});Stream<Integer> stream2 = stream1.map((item) -> {System.out.println("stream2:"+Thread.currentThread().getName()+":"+item);item *= 2;return item;});stream2.forEach((item)-> {System.out.println("stream3:"+Thread.currentThread().getName()+":"+item);});}

结果:

java">stream1:ForkJoinPool.commonPool-worker-3:300
stream1:ForkJoinPool.commonPool-worker-1:1
stream1:main:10
stream2:ForkJoinPool.commonPool-worker-3:300
stream1:ForkJoinPool.commonPool-worker-2:200
stream2:ForkJoinPool.commonPool-worker-2:200
stream2:main:10
stream3:ForkJoinPool.commonPool-worker-3:600
stream3:main:20
stream3:ForkJoinPool.commonPool-worker-2:400

从线程池中取出一个线程,去完成一个元素的所有中间操作
比如,我这里,最初有四个元素,所以可以看到,整个过程用了四个线程
单独看各个线程,可以看出,仍是按顺序执行各个元素的中间操作

在这里插入图片描述

如果是串行流的话,就是由同一线程去依次逐个完成每个元素的所有中间操作

在这里插入图片描述

java">stream1:main:200
stream2:main:200
stream3:main:400stream1:main:1stream1:main:10
stream2:main:10
stream3:main:20stream1:main:300
stream2:main:300
stream3:main:600
终结操作
收集操作

collect()方法
入参:collector对象,该对象包含五个函数式接口方法
通过这些函数式接口方法,可以把各个元素存入一个容器中

java">public interface Collector<T, A, R> {/**创建存放元素的容器*/Supplier<A> supplier();/**将元素放入容器*/BiConsumer<A, T> accumulator();/**将两个容器合并成一个大容器*/BinaryOperator<A> combiner();/**Perform the final transformation from the intermediate accumulation type*/Function<A, R> finisher();/**Returns a {@code Set} of {@code Collector.Characteristics} indicating*/Set<Characteristics> characteristics();}

collectors.toList():
相当于创建了一个collector对象

java">new CollectorImpl<>(ArrayList::new, List::add,(left, right) -> { left.addAll(right); return left; },CH_ID);

阻塞式编程和响应式编程

大家一开始学的JDK的一些操作,多数是属于阻塞式编程,也就是各条编程语句之间存在高耦合,只有靠前的语句被执行并获得结果,后面的语句才能被执行到
而响应式编程基于事件驱动,通过多线程共同完成任务,就能避免等待
经典的模型包括生产者,消费者,以及缓冲队列,最大的特点是异步处理消息,生产者和消费者进行解耦,彼此都不需要等对方回应
实现的基础是多线程协作
提高性能的思路是,减少CPU的空闲时间,减少线程切换次数,最大程度增大CPU有效工作时间
当CPU遇到某个线程阻塞式,马上切到其他线程工作
所有异步线程是消息互通的,彼此可以通过触发事件,执行回调

JUC

JDK的Java.util.concurrent包体现了JDK的高并发思想的实现,是基于事件驱动模型

flow

在JUC的flow包下,专门用于构建无阻塞的异步数据流处理
核心组件:
发布者(生产者),订阅者(消费者),订阅关系,处理器(生产者+消费者)
订阅者将监控生产者全生命周期事件

java">public final class Flow {/**生产者*/@FunctionalInterfacepublic static interface Publisher<T> {/**订阅*/public void subscribe(Subscriber<? super T> subscriber);}/**消费者*/public static interface Subscriber<T> {/**在为给定订阅调用任何其他订阅服务器方法之前调用的方法。*/public void onSubscribe(Subscription subscription);/**使用订阅的下一项调用的方法*/public void onNext(T item);/***/public void onError(Throwable throwable);/***/public void onComplete();}/**链接 生产者 和 消费者 的消息控件*/public static interface Subscription {/***/public void request(long n);/***/public void cancel();}/**同时充当订阅服务器和发布服务器的组件。*/public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}//默认缓冲区的大小static final int DEFAULT_BUFFER_SIZE = 256;public static int defaultBufferSize() {return DEFAULT_BUFFER_SIZE;}}

响应式流

和Stream流类似,reactor框架中也有元素流的概念,其中,0或1个元素称为Mono流,0到N个元素,称为Flux流
响应式流基于惰性调用,当不执行订阅操作时,流是不会自己执行的

Flux流

一个带有rx操作符的响应式流发布者,它会发出0到N个元素,然后发出成功或错误的信号。
简单的说就是,消费者能拿到什么数据,只由生产者生产什么数据决定,并且,当没有消费者需求时,生产者将不会生产数据,并且,所有消费者拿到的数据都是相同的

它旨在用于实现和返回类型。尽可能保持使用原始的Publisher作为输入参数。

如果已知底层Publisher将发出0或1个元素,则应使用Mono。

请注意,在Flux操作符内部使用的java.util.function /
lambdas中避免使用状态,因为这些状态可能在多个订阅者之间共享。

subscribe(CoreSubscriber)是用于上下文传递的内部扩展,用于内部使用。用户提供的Subscriber可以传递给此“subscribe”扩展,但将失去可用的每个订阅的Hooks.onLastOperator。

泛型 -Flux流所传递的元素类型

java">public abstract class Flux<T> implements CorePublisher<T> {//创建一个Flux,它会发出提供的元素,然后完成。参数:
data - 要发出的元素,作为可变参数
返回:
一个新的Fluxpublic static <T> Flux<T> just(T... data) {return fromArray(data);}//订阅此Flux的Consumer,它将分别消耗序列中的所有元素,处理错误并对完成做出反应。此外,一个上下文与订阅相关联。在订阅时,隐式地进行无界请求。对于一个被动版本,观察并转发传入数据,请参阅doOnNext(Consumer)doOnError(Consumer)doOnComplete(Runnable)doOnSubscribe(Consumer)。对于一个给予您更多对背压和请求控制的版本,请参阅subscribe(Subscriber),其中包含一个BaseSubscriber。请记住,由于序列可能是异步的,因此这将立即将控制返回给调用线程。例如,在主线程或单元测试中执行时,这可能会给人一种消费者未被调用的印象。参数:
consumer - 在每个值上调用的消费者
errorConsumer - 在错误信号上调用的消费者
completeConsumer - 在完成信号上调用的消费者
initialContext - 与订阅相关联的基础上下文,将可见于上游操作符
返回:
一个新的Disposable,可用于取消底层订阅public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer,@Nullable Context initialContext) {return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,completeConsumer,null,initialContext)
);}
}
java">public static void main(String[] args) {Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);Disposable disposable = flux.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer element) {System.out.println(element);}});}

Mono流

一个具有基本rx操作符的响应式流发布者,通过onNext信号发出最多一个项,然后以onComplete信号终止(成功的Mono,有值或无值),或者仅发出单个onError信号(失败的Mono)。
大多数Mono实现在调用Subscriber.onNext(T)之后都应立即调用Subscriber.onComplete()。Mono.never()是一个特例:它不发出任何信号,虽然在测试之外没有什么用处,但从技术上讲并不是被禁止的。另一方面,明确禁止使用onNext和onError的组合。
学习Mono
API并发现新操作符的推荐方法是通过参考文档,而不是通过此Javadoc(与学习单个操作符相反)。请参阅“我需要哪个操作符?”附录。

rx操作符将为输入Mono类型提供别名,以保留结果Mono的“最多一个”属性。例如,flatMap返回一个Mono,而有可能会有多个发射的flatMapMany别名。
应该使用Mono来表示只完成而没有任何值的发布者。 它旨在用于实现和返回类型,输入参数应尽可能使用原始发布者。
请注意,在Mono操作符内部使用java.util.function /
lambdas中的状态应该避免,因为这些状态可能在多个订阅者之间共享。
泛型-此类的单个值的类型

java">public abstract class Mono<T> implements CorePublisher<T> {public static <T> Mono<T> just(T data) {return onAssembly(new MonoJust<>(data));}public final Disposable subscribe(Consumer<? super T> consumer) {Objects.requireNonNull(consumer, "consumer");return subscribe(consumer, null, null);}
}
java">public static void main(String[] args) {Mono<Integer> mono = Mono.just(1);mono.subscribe(e -> {e += 1;System.out.println(Thread.currentThread().getName()+":"+e);});mono.subscribe(e -> {System.out.println(Thread.currentThread().getName()+":"+e);});mono.subscribe(e -> {System.out.println(Thread.currentThread().getName()+":"+e);});mono.subscribe(e -> {System.out.println(Thread.currentThread().getName()+":"+e);});}

结果:

java">main:2
main:1
main:1
main:1

事件回调

信号,专门用于表示流中的元素的状态:
`public enum SignalType {

/*** A signal when the subscription is triggered*/
SUBSCRIBE,
/*** A signal when a request is made through the subscription*/
REQUEST,
/*** A signal when the subscription is cancelled*/
CANCEL,
/*** A signal when an operator receives a subscription*/
ON_SUBSCRIBE,
/*** A signal when an operator receives an emitted value*/
ON_NEXT,
/*** A signal when an operator receives an error*/
ON_ERROR,
/*** A signal when an operator completes*/
ON_COMPLETE,
/*** A signal when an operator completes*/
AFTER_TERMINATE,
/*** A context read signal*/
CURRENT_CONTEXT,
/*** A context update signal*/
ON_CONTEXT;@Override
public String toString() {switch (this) {case ON_SUBSCRIBE:return "onSubscribe";case ON_NEXT:return "onNext";case ON_ERROR:return "onError";case ON_COMPLETE:return "onComplete";case REQUEST:return "request";case CANCEL:return "cancel";case CURRENT_CONTEXT:return "currentContext";case ON_CONTEXT:return "onContextUpdate";case AFTER_TERMINATE:return "afterTerminate";default:return "subscribe";

`

当信号发出时,将会执行回调函数
doOnxxx方法专门用来传递信号
onXXX是一个回调函数,是在接收到信号后执行的操作
map,filter,distinct等方法,是中间操作,可以得到一个新流
事件回调:
doOnComplete,doOnCancel,doOnError,doOnEach,doOnNext

在这里插入图片描述

java">public static void main(String[] args) {Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5).doOnComplete(() -> System.out.println(Thread.currentThread().getName() + ":"+"Flux流完成了"));;flux.subscribe(e -> {System.out.println(Thread.currentThread().getName() + ":"+e);});}
java">main:1
main:2
main:3
main:4
main:5
main:Flux流完成了
java">    public static void main(String[] args) {Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5).doOnComplete(() -> System.out.println(Thread.currentThread().getName() + ":"+"Flux流完成了"));;flux.subscribe(e -> {System.out.println(Thread.currentThread().getName() + ":"+e);});flux.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription subscription) {System.out.println("生产者,消费者订阅关系建立:" + subscription);subscription.request(6);}@Overridepublic void onNext(Integer integer) {System.out.println("下一个元素到达:"+integer);}@Overridepublic void onError(Throwable throwable) {System.out.println("接收信号的时候出错了:" + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("接收元素完成了:");}});}
java">main:1
main:2
main:3
main:4
main:5
main:Flux流完成了
main生产者,消费者订阅关系建立:reactor.core.publisher.StrictSubscriber@17c68925
main下一个元素到达:1
main下一个元素到达:2
main下一个元素到达:3
main下一个元素到达:4
main下一个元素到达:5
main:Flux流完成了
main接收元素完成了:

由结果可以得出结论:Subscriber接口定义一系列由生产者信号所触发的回调函数
消费者可以直接通过信号感知到生产者所生产的元素的状态

java">Flux<Integer> flux = Flux.range(1, 50);flux.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected Subscription upstream() {return super.upstream();}@Overridepublic boolean isDisposed() {return super.isDisposed();}@Overridepublic void dispose() {super.dispose();}@Overrideprotected void hookOnSubscribe(Subscription subscription) {super.hookOnSubscribe(subscription);}@Overrideprotected void hookOnNext(Integer value) {super.hookOnNext(value);}@Overrideprotected void hookOnComplete() {super.hookOnComplete();}@Overrideprotected void hookOnError(Throwable throwable) {super.hookOnError(throwable);}@Overrideprotected void hookOnCancel() {super.hookOnCancel();}@Overrideprotected void hookFinally(SignalType type) {super.hookFinally(type);}@Overridepublic String toString() {return super.toString();}});
java">        Flux<Integer> flux = Flux.range(1, 50);flux.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println(Thread.currentThread().getName()+"消费者和生产者绑定订阅关系" + subscription);request(1);}@Overrideprotected void hookOnNext(Integer value) {System.out.println(Thread.currentThread().getName()+"元素到达" + value);if (value==20){this.cancel();System.out.println("流元素20被取消了");}request(1);}@Overrideprotected void hookOnComplete() {System.out.println(Thread.currentThread().getName()+"流正常结束");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println(Thread.currentThread().getName()+"流异常结束" + throwable.getMessage());}@Overrideprotected void hookOnCancel() {System.out.println(Thread.currentThread().getName()+"流被取消了");}@Overrideprotected void hookFinally(SignalType type) {System.out.println(Thread.currentThread().getName()+"最终执行" + type);}});

缓冲区

生产者可以将多个元素放进缓存中,在一起发给消费者

java">public static void main(String[] args) {Flux<List<Integer>> flux = Flux.range(1, 183).buffer(10);flux.subscribe(list -> {System.out.println(list.size()+"接收到缓冲区中的元素:" + list);});}
java">10接收到缓冲区中的元素:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
10接收到缓冲区中的元素:[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
10接收到缓冲区中的元素:[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
10接收到缓冲区中的元素:[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
10接收到缓冲区中的元素:[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
10接收到缓冲区中的元素:[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
10接收到缓冲区中的元素:[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
10接收到缓冲区中的元素:[71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
10接收到缓冲区中的元素:[81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
10接收到缓冲区中的元素:[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
10接收到缓冲区中的元素:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110]
10接收到缓冲区中的元素:[111, 112, 113, 114, 115, 116, 117, 118, 119, 120]
10接收到缓冲区中的元素:[121, 122, 123, 124, 125, 126, 127, 128, 129, 130]
10接收到缓冲区中的元素:[131, 132, 133, 134, 135, 136, 137, 138, 139, 140]
10接收到缓冲区中的元素:[141, 142, 143, 144, 145, 146, 147, 148, 149, 150]
10接收到缓冲区中的元素:[151, 152, 153, 154, 155, 156, 157, 158, 159, 160]
10接收到缓冲区中的元素:[161, 162, 163, 164, 165, 166, 167, 168, 169, 170]
10接收到缓冲区中的元素:[171, 172, 173, 174, 175, 176, 177, 178, 179, 180]
3接收到缓冲区中的元素:[181, 182, 183]

限流操作

limitRate

java">public static void main(String[] args) {Flux<Integer> flux = Flux.range(1, 183).log().limitRate(50);flux.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("订阅关系建立" + subscription);request(1);}@Overrideprotected void hookOnNext(Integer value) {System.out.println("下一个元素到达" + value);request(1);}});}
java">[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(50)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | onNext(7)
[main] INFO reactor.Flux.Range.1 - | onNext(8)
[main] INFO reactor.Flux.Range.1 - | onNext(9)
[main] INFO reactor.Flux.Range.1 - | onNext(10)
[main] INFO reactor.Flux.Range.1 - | onNext(11)
[main] INFO reactor.Flux.Range.1 - | onNext(12)
[main] INFO reactor.Flux.Range.1 - | onNext(13)
[main] INFO reactor.Flux.Range.1 - | onNext(14)
[main] INFO reactor.Flux.Range.1 - | onNext(15)
[main] INFO reactor.Flux.Range.1 - | onNext(16)
[main] INFO reactor.Flux.Range.1 - | onNext(17)
[main] INFO reactor.Flux.Range.1 - | onNext(18)
[main] INFO reactor.Flux.Range.1 - | onNext(19)
[main] INFO reactor.Flux.Range.1 - | onNext(20)
[main] INFO reactor.Flux.Range.1 - | onNext(21)
[main] INFO reactor.Flux.Range.1 - | onNext(22)
[main] INFO reactor.Flux.Range.1 - | onNext(23)
[main] INFO reactor.Flux.Range.1 - | onNext(24)
[main] INFO reactor.Flux.Range.1 - | onNext(25)
[main] INFO reactor.Flux.Range.1 - | onNext(26)
[main] INFO reactor.Flux.Range.1 - | onNext(27)
[main] INFO reactor.Flux.Range.1 - | onNext(28)
[main] INFO reactor.Flux.Range.1 - | onNext(29)
[main] INFO reactor.Flux.Range.1 - | onNext(30)
[main] INFO reactor.Flux.Range.1 - | onNext(31)
[main] INFO reactor.Flux.Range.1 - | onNext(32)
[main] INFO reactor.Flux.Range.1 - | onNext(33)
订阅关系建立reactor.core.publisher.FluxPublishOn$PublishOnSubscriber@4157f54e
下一个元素到达1
下一个元素到达2
下一个元素到达3
下一个元素到达4
下一个元素到达5
下一个元素到达6
下一个元素到达7
下一个元素到达8
下一个元素到达9
下一个元素到达10
下一个元素到达11
下一个元素到达12
下一个元素到达13
下一个元素到达14
下一个元素到达15
下一个元素到达16
下一个元素到达17
下一个元素到达18
下一个元素到达19
下一个元素到达20
下一个元素到达21
下一个元素到达22
下一个元素到达23
下一个元素到达24
下一个元素到达25
下一个元素到达26
下一个元素到达27
下一个元素到达28
下一个元素到达29
下一个元素到达30
下一个元素到达31
下一个元素到达32
下一个元素到达33
下一个元素到达34
下一个元素到达35
下一个元素到达36
下一个元素到达37
下一个元素到达38
下一个元素到达39
下一个元素到达40
下一个元素到达41
下一个元素到达42
下一个元素到达43
下一个元素到达44
下一个元素到达45
下一个元素到达46
下一个元素到达47
下一个元素到达48
下一个元素到达49
下一个元素到达50
下一个元素到达51
下一个元素到达52
下一个元素到达53
下一个元素到达54
下一个元素到达55
下一个元素到达56
下一个元素到达57
下一个元素到达58
下一个元素到达59
下一个元素到达60
下一个元素到达61
下一个元素到达62
下一个元素到达63
下一个元素到达64
下一个元素到达65
下一个元素到达66
下一个元素到达67
下一个元素到达68
下一个元素到达69
下一个元素到达70
下一个元素到达71
下一个元素到达72
下一个元素到达73
下一个元素到达74
下一个元素到达75
下一个元素到达76
下一个元素到达77
下一个元素到达78
下一个元素到达79
下一个元素到达80
下一个元素到达81
下一个元素到达82
下一个元素到达83
下一个元素到达84
下一个元素到达85
下一个元素到达86
下一个元素到达87
下一个元素到达88
下一个元素到达89
下一个元素到达90
下一个元素到达91
下一个元素到达92
下一个元素到达93
下一个元素到达94
下一个元素到达95
下一个元素到达96
下一个元素到达97
下一个元素到达98
下一个元素到达99
下一个元素到达100
下一个元素到达101
下一个元素到达102
下一个元素到达103
下一个元素到达104
下一个元素到达105
下一个元素到达106
下一个元素到达107
下一个元素到达108
下一个元素到达109
下一个元素到达110
下一个元素到达111
下一个元素到达112
下一个元素到达113
下一个元素到达114
下一个元素到达115
下一个元素到达116
下一个元素到达117
下一个元素到达118
下一个元素到达119
下一个元素到达120
下一个元素到达121
下一个元素到达122
下一个元素到达123
下一个元素到达124
下一个元素到达125
下一个元素到达126
下一个元素到达127
下一个元素到达128
下一个元素到达129
下一个元素到达130
下一个元素到达131
下一个元素到达132
下一个元素到达133
下一个元素到达134
下一个元素到达135
下一个元素到达136
下一个元素到达137
下一个元素到达138
下一个元素到达139
下一个元素到达140
下一个元素到达141
下一个元素到达142
下一个元素到达143
下一个元素到达144
下一个元素到达145
下一个元素到达146
下一个元素到达147
下一个元素到达148
下一个元素到达149
下一个元素到达150
下一个元素到达151
下一个元素到达152
下一个元素到达153
下一个元素到达154
下一个元素到达155
下一个元素到达156
下一个元素到达157
下一个元素到达158
下一个元素到达159
下一个元素到达160
下一个元素到达161
下一个元素到达162
下一个元素到达163
下一个元素到达164
下一个元素到达165
下一个元素到达166
下一个元素到达167
下一个元素到达168
下一个元素到达169
下一个元素到达170
下一个元素到达171
下一个元素到达172
[main] INFO reactor.Flux.Range.1 - | onNext(34)
[main] INFO reactor.Flux.Range.1 - | onNext(35)
[main] INFO reactor.Flux.Range.1 - | onNext(36)
[main] INFO reactor.Flux.Range.1 - | onNext(37)
[main] INFO reactor.Flux.Range.1 - | onNext(38)
[main] INFO reactor.Flux.Range.1 - | request(38)
[main] INFO reactor.Flux.Range.1 - | onNext(39)
[main] INFO reactor.Flux.Range.1 - | onNext(40)
[main] INFO reactor.Flux.Range.1 - | onNext(41)
[main] INFO reactor.Flux.Range.1 - | onNext(42)
[main] INFO reactor.Flux.Range.1 - | onNext(43)
[main] INFO reactor.Flux.Range.1 - | onNext(44)
[main] INFO reactor.Flux.Range.1 - | onNext(45)
[main] INFO reactor.Flux.Range.1 - | onNext(46)
[main] INFO reactor.Flux.Range.1 - | onNext(47)
[main] INFO reactor.Flux.Range.1 - | onNext(48)
[main] INFO reactor.Flux.Range.1 - | onNext(49)
[main] INFO reactor.Flux.Range.1 - | onNext(50)
[main] INFO reactor.Flux.Range.1 - | onNext(51)
[main] INFO reactor.Flux.Range.1 - | onNext(52)
[main] INFO reactor.Flux.Range.1 - | onNext(53)
[main] INFO reactor.Flux.Range.1 - | onNext(54)
[main] INFO reactor.Flux.Range.1 - | onNext(55)
[main] INFO reactor.Flux.Range.1 - | onNext(56)
[main] INFO reactor.Flux.Range.1 - | onNext(57)
[main] INFO reactor.Flux.Range.1 - | onNext(58)
[main] INFO reactor.Flux.Range.1 - | onNext(59)
[main] INFO reactor.Flux.Range.1 - | onNext(60)
[main] INFO reactor.Flux.Range.1 - | onNext(61)
[main] INFO reactor.Flux.Range.1 - | onNext(62)
[main] INFO reactor.Flux.Range.1 - | onNext(63)
[main] INFO reactor.Flux.Range.1 - | onNext(64)
[main] INFO reactor.Flux.Range.1 - | onNext(65)
[main] INFO reactor.Flux.Range.1 - | onNext(66)
[main] INFO reactor.Flux.Range.1 - | onNext(67)
[main] INFO reactor.Flux.Range.1 - | onNext(68)
[main] INFO reactor.Flux.Range.1 - | onNext(69)
[main] INFO reactor.Flux.Range.1 - | onNext(70)
[main] INFO reactor.Flux.Range.1 - | onNext(71)
[main] INFO reactor.Flux.Range.1 - | onNext(72)
[main] INFO reactor.Flux.Range.1 - | onNext(73)
[main] INFO reactor.Flux.Range.1 - | onNext(74)
[main] INFO reactor.Flux.Range.1 - | onNext(75)
[main] INFO reactor.Flux.Range.1 - | onNext(76)
[main] INFO reactor.Flux.Range.1 - | request(38)
[main] INFO reactor.Flux.Range.1 - | onNext(77)
[main] INFO reactor.Flux.Range.1 - | onNext(78)
[main] INFO reactor.Flux.Range.1 - | onNext(79)
[main] INFO reactor.Flux.Range.1 - | onNext(80)
[main] INFO reactor.Flux.Range.1 - | onNext(81)
[main] INFO reactor.Flux.Range.1 - | onNext(82)
[main] INFO reactor.Flux.Range.1 - | onNext(83)
[main] INFO reactor.Flux.Range.1 - | onNext(84)
[main] INFO reactor.Flux.Range.1 - | onNext(85)
[main] INFO reactor.Flux.Range.1 - | onNext(86)
[main] INFO reactor.Flux.Range.1 - | onNext(87)
[main] INFO reactor.Flux.Range.1 - | onNext(88)
[main] INFO reactor.Flux.Range.1 - | onNext(89)
[main] INFO reactor.Flux.Range.1 - | onNext(90)
[main] INFO reactor.Flux.Range.1 - | onNext(91)
[main] INFO reactor.Flux.Range.1 - | onNext(92)
[main] INFO reactor.Flux.Range.1 - | onNext(93)
[main] INFO reactor.Flux.Range.1 - | onNext(94)
[main] INFO reactor.Flux.Range.1 - | onNext(95)
[main] INFO reactor.Flux.Range.1 - | onNext(96)
[main] INFO reactor.Flux.Range.1 - | onNext(97)
[main] INFO reactor.Flux.Range.1 - | onNext(98)
[main] INFO reactor.Flux.Range.1 - | onNext(99)
[main] INFO reactor.Flux.Range.1 - | onNext(100)
[main] INFO reactor.Flux.Range.1 - | onNext(101)
[main] INFO reactor.Flux.Range.1 - | onNext(102)
[main] INFO reactor.Flux.Range.1 - | onNext(103)
[main] INFO reactor.Flux.Range.1 - | onNext(104)
[main] INFO reactor.Flux.Range.1 - | onNext(105)
[main] INFO reactor.Flux.Range.1 - | onNext(106)
[main] INFO reactor.Flux.Range.1 - | onNext(107)
[main] INFO reactor.Flux.Range.1 - | onNext(108)
[main] INFO reactor.Flux.Range.1 - | onNext(109)
[main] INFO reactor.Flux.Range.1 - | onNext(110)
[main] INFO reactor.Flux.Range.1 - | onNext(111)
[main] INFO reactor.Flux.Range.1 - | onNext(112)
[main] INFO reactor.Flux.Range.1 - | onNext(113)
[main] INFO reactor.Flux.Range.1 - | onNext(114)
[main] INFO reactor.Flux.Range.1 - | request(38)
[main] INFO reactor.Flux.Range.1 - | onNext(115)
[main] INFO reactor.Flux.Range.1 - | onNext(116)
[main] INFO reactor.Flux.Range.1 - | onNext(117)
[main] INFO reactor.Flux.Range.1 - | onNext(118)
[main] INFO reactor.Flux.Range.1 - | onNext(119)
[main] INFO reactor.Flux.Range.1 - | onNext(120)
[main] INFO reactor.Flux.Range.1 - | onNext(121)
[main] INFO reactor.Flux.Range.1 - | onNext(122)
[main] INFO reactor.Flux.Range.1 - | onNext(123)
[main] INFO reactor.Flux.Range.1 - | onNext(124)
[main] INFO reactor.Flux.Range.1 - | onNext(125)
[main] INFO reactor.Flux.Range.1 - | onNext(126)
[main] INFO reactor.Flux.Range.1 - | onNext(127)
[main] INFO reactor.Flux.Range.1 - | onNext(128)
[main] INFO reactor.Flux.Range.1 - | onNext(129)
[main] INFO reactor.Flux.Range.1 - | onNext(130)
[main] INFO reactor.Flux.Range.1 - | onNext(131)
[main] INFO reactor.Flux.Range.1 - | onNext(132)
[main] INFO reactor.Flux.Range.1 - | onNext(133)
[main] INFO reactor.Flux.Range.1 - | onNext(134)
[main] INFO reactor.Flux.Range.1 - | onNext(135)
[main] INFO reactor.Flux.Range.1 - | onNext(136)
[main] INFO reactor.Flux.Range.1 - | onNext(137)
[main] INFO reactor.Flux.Range.1 - | onNext(138)
[main] INFO reactor.Flux.Range.1 - | onNext(139)
[main] INFO reactor.Flux.Range.1 - | onNext(140)
[main] INFO reactor.Flux.Range.1 - | onNext(141)
[main] INFO reactor.Flux.Range.1 - | onNext(142)
[main] INFO reactor.Flux.Range.1 - | onNext(143)
[main] INFO reactor.Flux.Range.1 - | onNext(144)
[main] INFO reactor.Flux.Range.1 - | onNext(145)
[main] INFO reactor.Flux.Range.1 - | onNext(146)
[main] INFO reactor.Flux.Range.1 - | onNext(147)
[main] INFO reactor.Flux.Range.1 - | onNext(148)
[main] INFO reactor.Flux.Range.1 - | onNext(149)
[main] INFO reactor.Flux.Range.1 - | onNext(150)
[main] INFO reactor.Flux.Range.1 - | onNext(151)
[main] INFO reactor.Flux.Range.1 - | onNext(152)
[main] INFO reactor.Flux.Range.1 - | request(38)
[main] INFO reactor.Flux.Range.1 - | onNext(153)
[main] INFO reactor.Flux.Range.1 - | onNext(154)
[main] INFO reactor.Flux.Range.1 - | onNext(155)
[main] INFO reactor.Flux.Range.1 - | onNext(156)
[main] INFO reactor.Flux.Range.1 - | onNext(157)
[main] INFO reactor.Flux.Range.1 - | onNext(158)
[main] INFO reactor.Flux.Range.1 - | onNext(159)
[main] INFO reactor.Flux.Range.1 - | onNext(160)
[main] INFO reactor.Flux.Range.1 - | onNext(161)
[main] INFO reactor.Flux.Range.1 - | onNext(162)
[main] INFO reactor.Flux.Range.1 - | onNext(163)
[main] INFO reactor.Flux.Range.1 - | onNext(164)
[main] INFO reactor.Flux.Range.1 - | onNext(165)
[main] INFO reactor.Flux.Range.1 - | onNext(166)
[main] INFO reactor.Flux.Range.1 - | onNext(167)
[main] INFO reactor.Flux.Range.1 - | onNext(168)
[main] INFO reactor.Flux.Range.1 - | onNext(169)
[main] INFO reactor.Flux.Range.1 - | onNext(170)
[main] INFO reactor.Flux.Range.1 - | onNext(171)
[main] INFO reactor.Flux.Range.1 - | onNext(172)
[main] INFO reactor.Flux.Range.1 - | onNext(173)
[main] INFO reactor.Flux.Range.1 - | onNext(174)
[main] INFO reactor.Flux.Range.1 - | onNext(175)
[main] INFO reactor.Flux.Range.1 - | onNext(176)
[main] INFO reactor.Flux.Range.1 - | onNext(177)
[main] INFO reactor.Flux.Range.1 - | onNext(178)
[main] INFO reactor.Flux.Range.1 - | onNext(179)
[main] INFO reactor.Flux.Range.1 - | onNext(180)
[main] INFO reactor.Flux.Range.1 - | onNext(181)
[main] INFO reactor.Flux.Range.1 - | onNext(182)
[main] INFO reactor.Flux.Range.1 - | onNext(183)
[main] INFO reactor.Flux.Range.1 - | onComplete()
下一个元素到达173
下一个元素到达174
下一个元素到达175
下一个元素到达176
下一个元素到达177
下一个元素到达178
下一个元素到达179
下一个元素到达180
下一个元素到达181
下一个元素到达182
下一个元素到达183

手动产生流

通过消费者回调和某些状态逐个生成信号,以编程方式创建 Flux。stateSupplier 可能返回 null。
参数:
stateSupplier – 要求每个传入用户为生成器双功能生成器提供初始状态 使用 Reactor 为每个用户提供的 SynchronousSink 以及当前状态,以在每次传递时生成单个信号并返回(新)状态。

单线程:generate()

java">public static void main(String[] args) {Flux<Object> flux = Flux.generate(() -> 0, (state, sink) -> {if (state < 20) {sink.next(state);} else {sink.complete();}return state + 1;}).log();flux.subscribe(System.out::println);}
java">[main] INFO reactor.Flux.Generate.1 - | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
[main] INFO reactor.Flux.Generate.1 - | request(unbounded)
[main] INFO reactor.Flux.Generate.1 - | onNext(0)
[main] INFO reactor.Flux.Generate.1 - | onNext(1)
[main] INFO reactor.Flux.Generate.1 - | onNext(2)
[main] INFO reactor.Flux.Generate.1 - | onNext(3)
[main] INFO reactor.Flux.Generate.1 - | onNext(4)
[main] INFO reactor.Flux.Generate.1 - | onNext(5)
[main] INFO reactor.Flux.Generate.1 - | onNext(6)
[main] INFO reactor.Flux.Generate.1 - | onNext(7)
[main] INFO reactor.Flux.Generate.1 - | onNext(8)
[main] INFO reactor.Flux.Generate.1 - | onNext(9)
[main] INFO reactor.Flux.Generate.1 - | onNext(10)
[main] INFO reactor.Flux.Generate.1 - | onNext(11)
[main] INFO reactor.Flux.Generate.1 - | onNext(12)
[main] INFO reactor.Flux.Generate.1 - | onNext(13)
[main] INFO reactor.Flux.Generate.1 - | onNext(14)
[main] INFO reactor.Flux.Generate.1 - | onNext(15)
[main] INFO reactor.Flux.Generate.1 - | onNext(16)
[main] INFO reactor.Flux.Generate.1 - | onNext(17)
[main] INFO reactor.Flux.Generate.1 - | onNext(18)
[main] INFO reactor.Flux.Generate.1 - | onNext(19)
[main] INFO reactor.Flux.Generate.1 - | onComplete()
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

异步线程:create

java">public static void main(String[] args) {Flux<Object> flux = Flux.create(fluxSink -> {UserServiceImpl userService = new UserServiceImpl(fluxSink);userService.online();}).log();flux.subscribe(System.out::println);}
java">[main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
[main] INFO reactor.Flux.Create.1 - request(unbounded)
main:用户上线
main
[main] INFO reactor.Flux.Create.1 - onNext(main)

自定义处理器

处理器将直接返回新流,处理器既是消费者又是生产者

通过调用具有每个 onNext 的输出接收器的双使用者来处理此 Flux 发出的项目。最多必须执行一个 SynchronousSink.next(Object) 调用,或者执行 0 或 1 个 SynchronousSink.error(Throwable) 或 SynchronousSink.complete()。错误模式支持:当 BiConsumer 抛出异常或通过 SynchronousSink.error(Throwable) 显式发出错误信号时,此运算符支持在错误时恢复(包括启用融合时)。
Params: handler – 处理 BiConsumer
Returns:转换后的 Flux

java">Flux<Object> flux = Flux.range(100, 150).handle(((integer, sink) -> {integer *= 2;sink.next(integer);}));flux.subscribe(System.out::println);
java">200
202
204
206
208
210
212
214
216
218
220
222
224
226
228
230
232
234
236
238
240
242
244
246
248
250
252
254
256
258
260
262
264
266
268
270
272
274
276
278
280
282
284
286
288
290
292
294
296
298
300
302
304
306
308
310
312
314
316
318
320
322
324
326
328
330
332
334
336
338
340
342
344
346
348
350
352
354
356
358
360
362
364
366
368
370
372
374
376
378
380
382
384
386
388
390
392
394
396
398
400
402
404
406
408
410
412
414
416
418
420
422
424
426
428
430
432
434
436
438
440
442
444
446
448
450
452
454
456
458
460
462
464
466
468
470
472
474
476
478
480
482
484
486
488
490
492
494
496
498

多线程的协作与调度

在提供的 Scheduler Scheduler.Worker 上运行 onNext、onComplete 和
onError。此运算符会影响线程上下文,在该上下文中,它下面的链中的其余运算符将执行该上下文,直到 publishOn
出现新出现。通常用于快速发布者、慢速使用者方案。flux.publishOn(Schedulers.single()).subscribe()
放弃支持:此运算符在数据信号取消或触发错误时丢弃其内部排队等待背压的元素。
Params:
scheduler – 一个Scheduler,提供 Scheduler.Worker 在哪里发布 delayError – 是否应该在转发任何错误之前消耗缓冲区 预取
– 异步边界容量
返回:异步生成的 Flux

注意:当主线程关闭的时候,所有子线程都将被关闭(粗暴关闭)

java">public static void main(String[] args) {Flux.range(1, 20).publishOn(Schedulers.boundedElastic()).log().subscribe(integer -> {System.out.println(Thread.currentThread().getName());});try {System.in.read();} catch (IOException e) {throw new RuntimeException(e);}}

常见的 boundedElastic 实例,一个调度程序,它动态创建有限数量的基于 ExecutorService 的 Worker,并在 Worker 关闭后重用它们。如果空闲超过 60 秒,则可以逐出底层守护程序线程。创建的最大线程数受上限限制(默认情况下,是可用 CPU 内核数的 10 倍,请参见DEFAULT_BOUNDED_ELASTIC_SIZE)。可以在每个支持线程上排队和延迟的最大任务提交数是有限制的(默认情况下,有 100K 个附加任务,请参阅DEFAULT_BOUNDED_ELASTIC_QUEUESIZE)。超过该点,将引发 RejectedExecutionException。根据首选项的顺序,支持新 Scheduler.Worker 的线程将从空闲池中选取、重新创建或从繁忙的池中重用。在后一种情况下,尽最大努力选择支持最少工作线程的线程。请注意,如果一个线程支持少量的工作线程,但这些工作线程提交了大量待处理任务,则第二个工作线程最终可能会由同一线程支持,并看到任务被拒绝。在创建工作线程时,支持线程的拾取也是一劳永逸地完成的,因此,尽管另一个支持线程在此期间处于空闲状态,但由于两个工作线程共享同一个支持线程并提交长时间运行的任务,任务可能会延迟。在第一次调用时,只会创建此通用调度程序的一个实例,并对其进行缓存。在后续调用中返回相同的实例,直到它被释放。不能直接释放公共实例,因为它们在调用方之间缓存和共享。但是,它们可以一起关闭,或者由工厂中的更改替换。返回:通用的 boundedElastic 实例,一个 Scheduler,它动态创建工作线程,其上限为后备线程数,之后是排队任务数,可重用线程并逐出空闲线程

java">public static void main(String[] args) {Flux.range(1, 20).publishOn(Schedulers.fromExecutor(new ThreadPoolExecutor(4,20,60,TimeUnit.SECONDS,new LinkedBlockingDeque<>()))).log().subscribe(integer -> {System.out.println(Thread.currentThread().getName());});try {System.in.read();} catch (IOException e) {throw new RuntimeException(e);}}

http://www.ppmy.cn/ops/22149.html

相关文章

微软开源了 MS-DOS 4.00

DOS的历史源远流长&#xff0c;很多现在的年轻人不知道DOS了。其实早期的windows可以看做是基于DOS的窗口界面的模拟器&#xff0c;系统的本质其实是DOS。后来DOS的漏洞还是太多了&#xff0c;微软重新写了windows的底层内核。DOS只是一个辅助终端的形式予以保留了。 微软是在…

常用excel操作笔记

一、表达式 1. 查找excel中一列值出现在另外一列中 IF(SUMPRODUCT(--(ISNUMBER(SEARCH($A$1:$A$101,C2)))),1,0) 2.计算某个字符出现的次数 LEN(G2)-LEN(SUBSTITUTE(G2,">",))1 3.拼接字符串 C2&"|"&A2 4.根据当前日期生成19位id TEXT(TODAY…

分布式与一致性协议之Raft算法(一)

Raft算法 概述 Raft算法属于Multi-Paxos算法&#xff0c;它在兰伯特Multi-Paxos思想的基础上做了一些简化和限制&#xff0c;比如日志必须是连续的&#xff0c;只支持领导者(Leader)、跟随者(Follwer)和候选人(Candidate)3种状态。在理解和算法实现上&#xff0c;Raft算法相对…

开发总结-Dao层(Mapper层)

Mybatis-plus新用法 VehicleBO one vehicleService.getOne(Wrappers.<VehicleBO>lambdaQuery().eq(VehicleBO::getVin, reqVo.getVin()));boolean b bizAccountApplyService.remove(Wrappers.<BizAccountApplyBO>lambdaQuery().eq(BizAccountApplyBO::getId, 14…

Vitis HLS 学习笔记--S_AXILITE 寄存器及驱动

目录 1. 简介 2. S_AXILITE Registers 寄存器详解 2.1 “隐式”优势 2.2 驱动程序文件 2.3 硬件头文件 2.4 硬件头文件中 SC/COR/TOW/COH 的解释 2.5 驱动控制过程 3. 总结 1. 简介 回顾此博文《Vitis HLS 学习笔记--Syn Report解读&#xff08;1&#xff09;-CSDN博…

《C++学习笔记---入门篇3》---内联函数,auto关键字,范围for,指针空值nullptr

1.内联函数 1.1 内联函数概念 1.2 特性 1.3 接下来说一道面试题&#xff1a; 2.auto关键字(C11) 2.1auto简介 2.2 auto的使用细则 3.3 auto不能推导的场景 3.基于范围的for循环(C11) 3.1范围for的语法 3.2 范围for的使用条件 4.指针空值---nullptr(C11) 4.1 C98中的…

618科技嘉年华!五款极致科技产品,开启智能生活新篇章!

准备好迎接一年一度的618了吗&#xff1f;这不仅仅是一场购物的狂欢&#xff0c;更是一次科技的盛宴&#xff0c;一次智能生活的全新启航。今年&#xff0c;我们将带来五款令人瞩目的极致科技产品&#xff0c;它们将彻底颠覆你对智能生活的认知。从娱乐到工作&#xff0c;这些产…

自动化密码填充:使用Python提高日常工作效率

密码是我们日常生活中难以逃脱的一部分。从解锁电脑到登录各种服务&#xff0c;我们需要记住无数的密码。幸运的是&#xff0c;通过Python和一些有用的库&#xff0c;我们可以简化填入密码的过程&#xff0c;使日常任务自动化变得简单。在本文中&#xff0c;我们将探讨如何使用…