java的反应式流

news/2024/12/23 4:54:31/

Java的反应式流是一种新的编程模型,它在异步和事件驱动的环境下工作。反应式流的目的是为了解决传统的单线程或者多线程编程模型在高并发和大流量情况下的性能瓶颈。

反应式流的核心是Observable和Observer,Observable表示一个数据流,而Observer则表示这个数据流的消费者。Observable在数据流上产生事件,而Observer则对这些事件进行响应。反应式流的数据流是一种推式的流,Observable发布事件时不需要等待Observer接收,Observable会把事件推送给Observer,而不是Observer去轮询Observable。

Java的反应式流通常基于Reactor或RxJava等库,这些库提供了丰富的函数式编程API和运算符,可以非常方便地处理异步事件。这些库都提供了类似于Observable和Observer的抽象概念,可以用来描述和处理异步数据流。同时还提供了常用的运算符,包括map、filter、reduce等,这些运算符可以方便地对数据流进行变换和过滤。

反应式流还有一个重要的概念是背压(backpressure),它是指在高并发和大流量情况下,消费者无法处理生产者产生的数据流,导致数据积压的情况。为了解决这个问题,反应式流引入了背压机制,生产者会在发送数据前先询问消费者的处理能力,如果消费者没有处理能力,生产者会等待一段时间或者缓存数据,等待消费者处理完数据后再继续发送。

反应式流已经被广泛应用于大规模的互联网应用中,包括机器学习、数据分析、网络爬虫等领域。它的优点在于处理高并发和大流量的数据流时,能够更加高效地利用系统资源,提高系统的性能和可扩展性。

总之,反应式流是Java编程中的一个重要概念,它可以帮助我们更好地处理异步和事件驱动的数据流,提高系统的性能和可扩展性。

不涉及任何库,就单纯用java的反应式流,完成发布订阅者模式:

package com.example.jdk9.react;import java.util.concurrent.Flow.*;public class PublisherSubscriberDemo {public static void main(String[] args) {SimplePublisher<String> publisher = new SimplePublisher<>();SimpleSubscriber<String> subscriber1 = new SimpleSubscriber<>();SimpleSubscriber<String> subscriber2 = new SimpleSubscriber<>();publisher.subscribe(subscriber1);publisher.subscribe(subscriber2);publisher.submit("hello");publisher.submit("world");publisher.close();}
}class SimplePublisher<T> implements Publisher<T> {private Subscription subscription;@Overridepublic void subscribe(Subscriber<? super T> subscriber) {subscriber.onSubscribe(new Subscription() {@Overridepublic void request(long n) {}@Overridepublic void cancel() {// nothing to do}});this.subscription = new Subscription() {private boolean cancelled = false;@Overridepublic void request(long n) {// nothing to do}@Overridepublic void cancel() {this.cancelled = true;}public boolean isCancelled() {return this.cancelled;}};subscriber.onSubscribe(this.subscription);}public void submit(T item) {subscriptionLimitedQueue.offer(item);subscription.request(1);}public void close() {while (!subscriptionLimitedQueue.isEmpty()) {subscriptionLimitedQueue.poll();}subscription.cancel();}private SubscriptionLimitedQueue<T> subscriptionLimitedQueue = new SubscriptionLimitedQueue<>(2);static class SubscriptionLimitedQueue<T> {private final int limit;private int size = 0;private Node<T> head;private Node<T> tail;public SubscriptionLimitedQueue(int limit) {this.limit = limit;}private static class Node<T> {final T item;Node<T> next;Node(T item, Node<T> next) {this.item = item;this.next = next;}}public void offer(T item) {Node<T> node = new Node<>(item, null);if (head == null) {head = node;tail = head;} else {tail.next = node;tail = tail.next;}size++;if (size > limit) {Node<T> newHead = head.next;head.next = null;head = newHead;size--;}}public boolean isEmpty() {return size == 0;}public T poll() {if (isEmpty()) {return null;}T item = head.item;Node<T> newHead = head.next;head.next = null;head = newHead;size--;return item;}}
}class SimpleSubscriber<T> implements Subscriber<T> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {this.subscription = subscription;System.out.println("订阅成功");subscription.request(1);}@Overridepublic void onNext(T item) {System.out.println("Received item: " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Done");}
}

这段代码演示了使用Flow API来发布和订阅消息的过程,它包含以下类和接口:

  1. Publisher<T>:发布者接口,表示能够发布指定类型的消息给订阅者。
  2. Subscriber<T>:订阅者接口,表示能够接收指定类型的消息。
  3. Subscription:订阅接口,表示订阅关系,能够请求一定数量的消息和取消订阅。
  4. SubmissionPublisher<T>:继承自Publisher<T>接口,实现了异步发布消息的能力。
  5. Flow API:一组用于处理数据流和异步操作的接口和类。

具体解释:

  1. SimplePublisher类是一个实现了Publisher接口的简单发布者类,它能够发布指定类型的消息给订阅者。它内部维护了一个SubscriptionLimitedQueue类的对象,用于限制消息队列的长度。
  2. SubscriptionLimitedQueue类是一个维护队列长度的类,用于实现限制消息队列长度的功能。
  3. SimpleSubscriber类是一个实现了Subscriber接口的简单订阅者类,它能够接收指定类型的消息,并将其输出到控制台中。
  4. main方法创建了一个SimplePublisher类的实例和一个SimpleSubscriber类的实例,然后将它们关联起来,最后向SimplePublisher类的实例中发布了两个消息,随后关闭了发布者。

运行结果:

例子:

第一步,引入依赖:

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.5.11</version></dependency>

第二步,编写代码:

package com.example.jdk9.react;import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactiveStreamExample {public static void main(String[] args) {Flux<Integer> stream = Flux.range(1, 10);stream.map(i -> i * 2).filter(i -> i % 3 == 0).flatMap(i -> Mono.just(i).zipWith(Mono.just(i * 3))).subscribe(System.out::println);}
}

上面的代码首先创建了一个从1到10的数字列表,然后通过map操作符将每个元素乘以2,再使用filter操作符过滤掉不能被3整除的元素。接下来,使用flatMap操作符来创建一个新的流,该流将原始元素和该元素乘以3的结果合并在一起。最后,使用subscribe方法来订阅这个流并打印出每个元素的值。

这个例子展示了Reactor库中的一些常见操作符,包括mapfilterflatMap。通过这些操作符的链式调用,我们可以轻松地对数据流进行复杂的操作。在实际的应用中,我们可以根据具体的需求选择不同的操作符来实现所需的数据处理逻辑。

使用Reactor 库实现发布订阅者模式:

package com.example.jdk9.react;import reactor.core.publisher.Flux;public class PublisherSubscriberExample {public static void main(String[] args) {// 创建发布者Flux<Integer> publisher = Flux.just(1, 2, 3, 4, 5);// 订阅者1:打印每个元素publisher.subscribe(System.out::println);// 订阅者2:计算元素的总和并打印publisher.reduce(0, Integer::sum).subscribe(total -> System.out.println("Sum = " + total));}
}


http://www.ppmy.cn/news/1204800.html

相关文章

java后端debug排查问题思路

问题排查思路 这里说的是主要是debug以及线上问题排查的思路. 解决问题的步骤 确认环境、确定问题、复现问题、查看日志、定位问题 、解决问题 确认环境/url/参数 确认是哪个环境。 是开发环境&#xff0c;测试环境&#xff0c;还是生产环境。 如果问题是在测试环境&…

工业CT 三维重建 及分割

目录 工业CT介绍 工业CT主要应用于以下领域&#xff1a; CT三维重建软件&#xff1a; 效果&#xff1a; 工业CT介绍 工业CT设备是基于线阵探测器的断层扫描技术&#xff0c;是一种常用的无损检测技术&#xff0c;用于获取物体内部的准确三维结构信息。它通过X射线的投射和接…

希尔排序原理

目录&#xff1a; 一、希尔排序与插入排序 1&#xff09;希尔排序的概念 2&#xff09;插入排序实现 二、希尔排序实现 一、希尔排序与插入排序 1&#xff09;希尔排序的概念 希尔排序(Shells Sort)是插入排序的一种又称“缩小增量排序”&#xff08;Diminishing Incremen…

腾讯觅影数智医疗影像平台获颁世界互联网领先科技成果大奖

11月8日&#xff0c;2023年世界互联网大会乌镇峰会在乌镇举行&#xff0c;腾讯再度获颁“世界互联网领先科技成果”大奖。腾讯健康总裁吴文达在世界互联网领先科技成果发布活动中介绍&#xff0c;“腾讯觅影数智医疗影像平台”已全面开放20多个医疗AI引擎助力科研创新&#xff…

大模型时代的编码习惯

遇到问题&#xff0c;第一个问的再也不是百度&#xff0c;而是Chat-GPT&#xff0c;百度现在适合找佐证的资料&#xff0c;而非找答案&#xff0c;百度不到反而是在浪费时间&#xff0c;代码有问题找Chat-GPT和各类大模型

Jupyter Notebook 内核似乎挂掉了,它很快将自动重启

报错原因&#xff1a; OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized. OMP: Hint This means that multiple copies of the OpenMP runtime have been linked into the program. That is dangerous, since it can degrade perfo…

龙迅LT6911GXC,HDMI 2.1转4 PORT MIPI/LVDS支持分辨率高达8K30HZ

描述&#xff1a; LT6911GXC 是一款面向 VR / 显示应用的高性能 HDMI2.1 至 MIPI 或 LVDS 芯片。 高清遥控器RX作为高清电脑中继器的上游&#xff0c;可与其他芯片的高清电脑TX合作&#xff0c;实现直译台功能。 对于 HDMI2.1 输入&#xff0c;LT6911GXC 可配置为 3/4 通道。 …

新手必看:Bitget Wallet 上购买 ETH 的步骤解析

Base 链是一种 Layer 2&#xff08;L2&#xff09;公链&#xff0c;它可以为用户提供以太坊&#xff08;ETH&#xff09;代币&#xff0c;而 Bitget Wallet 是一款多功能加密货币钱包&#xff0c;支持 Base 链以及其他主要区块链。