重学SpringBoot3-Spring WebFlux之Reactor事件感知 API

ops/2024/11/1 16:58:33/

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍

在这里插入图片描述

Spring WebFlux之Reactor事件感知 API

  • 1. 什么是 `doOnXxx` 系列 API?
  • 2. `doOnXxx` API 的常用方法
    • 2.1 `doOnNext()`
      • 示例:
      • 输出:
    • 2.2 `doOnError()`
      • 示例:
      • 输出:
    • 2.3 `doOnComplete()`
      • 示例:
      • 输出:
    • 2.4 `doOnSubscribe()`
      • 示例:
      • 输出:
    • 2.5 `doOnCancel()`
      • 示例:
      • 输出:
    • 2.6 `doFinally()`
      • 示例:
      • 输出:
    • 2.7 `doOnTerminate()`
      • 示例:
      • 输出:
    • 2.8 `doOnEach()`
      • 示例:
      • 输出:
    • 2.9 `doOnDiscard()`
      • 示例:
      • 输出:
    • 2.10 `doOnRequest()`
      • 示例:
      • 输出:
  • 3. `doOnXxx` 的应用场景
  • 4. 总结

在 Spring Boot 3 中,响应式编程通过 Reactor 库得到了广泛应用,提供了强大的流式数据处理能力。为了增强对流式数据流的调试和处理能力,Reactor 提供了一组非常重要的事件感知(side-effect)API,也就是我们常听到的 doOnXxx 系列方法。

这篇博客将详细介绍 doOnXxx 系列 API 的功能和用法,帮助大家更好地理解它们在响应式流中的作用,并展示其在实际开发中的一些应用场景。

1. 什么是 doOnXxx 系列 API?

doOnXxx 系列方法是 Reactor 提供的一组用于在流操作过程中执行副作用的 API。它们不会改变流的内容或数据流本身,而是允许我们在特定的生命周期事件发生时进行操作(如日志记录、调试、监控等)。

doOnXxx

这些 API 名称中的 Xxx 代表不同的事件类型,比如:

  • doOnNext(): 当下一个元素被发出时执行操作。
  • doOnError(): 当流中出现错误时执行操作。
  • doOnComplete(): 当流完成时执行操作。
  • doOnSubscribe(): 当订阅发生时执行操作。

这些方法非常适合用于监控、调试或者记录流的行为。

2. doOnXxx API 的常用方法

下面我们依次介绍常见的 doOnXxx API,并通过简单的示例进行演示。

2.1 doOnNext()

doOnNext() 方法允许你在每个元素被发布时执行操作,通常用于对每个数据元素进行日志记录、调试或者进行某种副作用操作。

示例:

java">        Flux<String> flux = Flux.just("Spring", "Boot", "3","Reactor").doOnNext(value -> System.out.println("Processing value: " + value)).map(String::toUpperCase);flux.subscribe(System.out::println);

输出:

doOnNext()

在这个例子中,doOnNext() 被用于每个元素发出时打印日志。这对于调试非常有用,可以清楚看到每个数据元素何时被处理。

2.2 doOnError()

doOnError() 方法允许你在流中出现异常时执行操作,通常用于记录异常信息、执行错误处理逻辑等。

示例:

java">        Flux<Integer> fluxWithError = Flux.just(1, 2, 0).map(i -> 10 / i)  // 这里会抛出 ArithmeticException: / by zero.doOnError(e -> System.err.println("Error occurred: " + e.getMessage()));fluxWithError.subscribe(System.out::println,error -> System.err.println("Subscriber received error: " + error));

输出:

doOnError()

在这个例子中,Flux 被用来创建一个数据流,并且在这个数据流中执行了一些操作,包括可能抛出异常的操作。下面是对消费者和生产者异常捕获的区别:
生产者异常捕获:

  • 在生产者端,可以使用 doOnError 方法来捕获并处理异常,这个方法会在数据流中发生错误时被调用。
  • doOnError 可以用于记录日志或执行一些清理操作,它不会改变数据流的行为,但数据流会被终止。

消费者异常捕获:

  • 在消费者端,可以通过 subscribe 方法的第二个参数(错误处理回调)来捕获并处理异常。
  • 这个错误处理回调会在数据流中发生错误时被调用,可以用于记录日志或执行其他错误处理逻辑。

2.3 doOnComplete()

doOnComplete() 方法在流完成时(即没有更多元素发出)执行操作。你可以利用它在流结束时执行一些收尾工作,比如关闭资源、统计处理结果等。

示例:

java">        Flux<String> flux = Flux.just("Spring", "Boot", "3","Reactor").doOnComplete(() -> System.out.println("Stream completed"));flux.subscribe(System.out::println);

输出:

doOnComplete()

这里,doOnComplete() 用于在数据流结束时打印一条日志,通知处理完成。

2.4 doOnSubscribe()

doOnSubscribe() 允许你在流被订阅时执行操作。它通常用于监控订阅事件,适合用于统计订阅数或进行相关的初始化操作。

示例:

java">        Flux<String> flux = Flux.just("A", "B", "C").doOnSubscribe(subscription -> System.out.println("Subscription started"));flux.subscribe(System.out::println);

输出:

doOnSubscribe()

在这个例子中,当流被订阅时,doOnSubscribe() 被调用,打印订阅开始的日志。

2.5 doOnCancel()

doOnCancel() 方法在取消订阅时执行操作。取消订阅通常是在消费者不再需要流数据时发生的(例如手动取消订阅或者发生超时等情况),可以用于处理一些资源释放的操作。

示例:

java">Flux<String> flux = Flux.just("A", "B", "C").doOnCancel(() -> System.out.println("Subscription canceled")).take(2);  // 只取前两个元素,第三个元素将被跳过(取消)flux.subscribe(System.out::println);

输出:

doOnCancel()

这里 doOnCancel() 在流被取消时执行了取消订阅的操作。

2.6 doFinally()

doFinally() 是一个非常有用的方法,它在流结束时始终会被调用(无论是正常完成、错误还是取消订阅)。它类似于 try-finally 语句中的 finally,适合做一些无论流如何结束都需要执行的操作,如清理资源等。

示例:

java">        Flux<String> flux = Flux.just("A", "B", "C").doFinally(signalType -> System.out.println("Stream ended with signal: " + signalType));flux.subscribe(System.out::println);

输出:

doFinally()

doFinally() 可以捕捉到不同类型的信号,包括 onComplete, onErroronCancel

2.7 doOnTerminate()

doOnTerminate() 在流完成或出错时执行操作。它是 doOnComplete()doOnError() 的组合,但不区分流是正常完成还是出现错误,只要流结束了,它就会被调用。

示例:

java">        Flux<String> flux = Flux.just("A", "B", "C").doOnTerminate(() -> System.out.println("Stream terminated"));flux.subscribe(System.out::println);

输出:

doOnTerminate()

它在流结束时总会执行,不管是否出现错误。

2.8 doOnEach()

doOnEach() 是一个非常通用的事件感知 API,它允许对流中的每一个信号(包括 onNextonErroronCompleteonSubscribe)进行统一处理。这个方法会接收一个 Signal 对象,表示当前发生的事件类型,从而可以处理不同的信号类型。

示例:

java">        Flux<String> flux = Flux.just("Spring", "Boot", "3", "Reactor").doOnEach(signal -> {if (signal.isOnNext()) {System.out.println("Element received: " + signal.get());} else if (signal.isOnError()) {System.err.println("Error occurred: " + signal.getThrowable().getMessage());} else if (signal.isOnComplete()) {System.out.println("Stream completed");}});flux.subscribe(System.out::println);

输出:

doOnEach()

2.9 doOnDiscard()

doOnDiscard() 方法用于处理被 丢弃的元素。当某些元素由于某种原因(例如 filter() 操作或上游取消)没有被使用时,可以通过 doOnDiscard() 来感知这些元素的丢弃,并执行相关的操作(如清理资源、记录日志等)。

可能使用 doOnDiscard 钩子的例子包括以下情况:

  • filter: 不符合过滤器的项被视为 “丢弃”。
  • skip:跳过的项将被丢弃。
  • buffer(maxSize, skip)maxSize < skip:“丢弃的缓冲区” — 缓冲区之间的元素被丢弃。

示例:

java">        Flux<String> flux = Flux.just("AA", "BB", "C", "D", "E").filter(s -> s.length() > 1).doOnDiscard(String.class, discardedValue ->System.out.println("Discarded: " + discardedValue));flux.subscribe(System.out::println);

输出:

doOnDiscard()

2.10 doOnRequest()

doOnRequest() 是一个用于处理 背压请求(request signals) 的 API,它允许你在下游请求元素时执行操作。响应式流中上游发送元素的数量通常由下游通过请求背压机制控制,因此 doOnRequest() 可以帮助我们监控下游对元素的需求。

示例:

java">        Flux<Integer> flux = Flux.range(1, 5).doOnRequest(request ->System.out.println("Request for: " + request + " elements"));flux.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {s.request(3);}@Overridepublic void onNext(Integer integer) {System.out.println("Received: " + integer);}@Overridepublic void onError(Throwable t) {}@Overridepublic void onComplete() {}});  // 请求 3 个元素

输出:

doOnRequest()

3. doOnXxx 的应用场景

  1. 日志记录与调试:在流的不同阶段插入 doOnXxx,帮助我们记录每个阶段的状态变化或异常情况,从而更好地调试响应式流。
  2. 监控和统计:我们可以使用 doOnSubscribe()doOnComplete() 结合监控系统来统计订阅的数量、完成的流数量,分析流的性能。
  3. 资源管理:使用 doFinally() 进行资源释放和清理,确保无论流如何结束都能进行相应的收尾工作。
  4. 错误处理:使用 doOnError() 可以在发生错误时记录日志、发送通知或者做出其他相应的处理。

4. 总结

Reactor 的 doOnXxx 系列 API 是在响应式流中进行事件感知和副作用处理的强大工具。它们的主要作用是让开发者能够在不干扰流式数据处理的情况下,插入额外的操作,如调试、监控、资源清理等。通过合理使用 doOnNext()doOnError()doFinally() 等方法,我们可以更好地理解和控制响应式流的执行过程,从而构建更加健壮和高效的应用程序。

希望这篇文章能帮助你更好地掌握 doOnXxx 系列方法。如果你有任何问题或建议,欢迎讨论!


http://www.ppmy.cn/ops/130182.html

相关文章

水利水电安全员考试真题题库及答案

水利水电安全员考试真题题库及答案 58.水利水电施工企业三类人员每年再培训时间不少于&#xff08;&#xff09;学时。 A.12 B.20 C.32 D.36 答案&#xff1a;B 59.双重绝缘指同时具备&#xff08;&#xff09;。 A.工作绝缘和保护绝缘 B.工作绝缘 C.保护绝缘 D.加强…

开发之翼:划时代的原生鸿蒙应用市场开发者服务

前言 随着"纯血鸿蒙" HarmonyOS NEXT在原生鸿蒙之夜的正式发布&#xff0c;鸿蒙生态正以前所未有的速度蓬勃发展。据知已有超过15000个鸿蒙原生应用和元服务上架&#xff0c;覆盖18个行业&#xff0c;通用办公应用覆盖全国3800万多家企业。原生鸿蒙操作系统降低了接…

PHP海外矿物矿机理财投资源码-金融理财投资源码

PHP海外矿物矿机理财投资源码/金融理财投资源码 海外矿物矿机理财投资源码 测试不错,可以做其他产品理财,功能都没啥太大问题

清仓和斩仓有什么不一样?

在股票市场中&#xff0c;清仓和斩仓是两种常见的操作策略&#xff0c;它们各自具有不同的含义和应用场景。以下是对这两种策略的详细解析&#xff1a; 一、清仓 清仓&#xff0c;从字面意思上理解&#xff0c;即清理仓库&#xff0c;但在股票市场中&#xff0c;它引申为投资…

Python自动化测试中的Mock与单元测试实战

在软件开发过程中&#xff0c;自动化测试是确保代码质量和稳定性的关键一环。而Python作为一门灵活且强大的编程语言&#xff0c;提供了丰富的工具和库来支持自动化测试。本文将深入探讨如何结合Mock与单元测试&#xff0c;利用Python进行自动化测试&#xff0c;以提高代码的可…

flask第一个应用

文章目录 安装一、编程第一步二、引入配置三、代码解析 安装 python环境安装的过程就不重复赘述了&#xff0c;flask安装使用命令pip install Flask即可&#xff0c;使用命令pip show Flask查看flask版本信息 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供…

3dsMax 展管道UV

3dsMax 展管道UV 创建管道模型 https://blog.csdn.net/GoodCooking/article/details/140876371有管道模型之后&#xff0c;进行展UV 展开UV之后 旋转UV&#xff0c;大致靠左 挨个拉直拐角 挨个拉直拐角 缩放到UV里面&#xff0c;不要拖拽点。 水平缩放&#xff0c;将U…

鸿蒙生态:机遇与挑战

在数字化浪潮的推动下&#xff0c;操作系统作为智能设备的灵魂&#xff0c;正经历着前所未有的变革。华为鸿蒙系统&#xff08;HarmonyOS&#xff09;的推出&#xff0c;不仅标志着中国在操作系统领域的自主创新&#xff0c;也为全球开发者带来了新的机遇与挑战。本文将探讨鸿蒙…