Reactive 编程-Project Reactor

ops/2024/9/19 16:17:55/ 标签: spring boot

Reactive 编程与 Project Reactor

Reactive 编程是一种编程范式,主要用于处理异步数据流。它旨在通过声明式的编程方式处理事件驱动的非阻塞任务,特别适合于构建响应式、可扩展、高并发的应用。随着互联网应用规模的扩大和响应速度的提升需求,Reactive 编程变得越来越流行。

在 Java 生态中,Project Reactor 是支持 Reactive 编程的核心库之一,基于 Reactive Streams 规范,并被 Spring 5 中的 WebFlux 采用为核心反应式框架。它提供了强大的 API 用于处理异步数据流,同时保持良好的性能和响应性。

一、Reactive 编程的核心概念

在传统的编程模型中,异步处理通常会涉及回调函数、线程池和复杂的同步控制,这样的代码不仅难以维护,还容易出现阻塞和性能瓶颈。Reactive 编程的出现解决了这些问题,它通过响应式的数据流,允许程序按需、非阻塞地处理数据和事件。

Reactive 编程的几个重要特点:

  1. 异步与非阻塞:程序可以异步处理任务,而不会因为等待结果而阻塞线程。这种非阻塞机制非常适合处理 I/O 密集型任务。

  2. 事件驱动:通过数据流来处理一系列事件。当有新数据产生时,系统会自动响应和处理这些数据。

  3. 流式处理:处理数据流中的数据项,类似于 Java 8 中的 Stream API,但不同的是,Reactive 流可以处理动态、无限的数据流。

  4. 背压(Backpressure):Reactive Streams 规范提供了背压机制,允许消费者根据自身的能力,按需请求数据,避免生产者产生过多的数据而导致内存溢出。

二、Project Reactor 概述

Project Reactor 是一个支持 Reactive Streams 规范的响应式编程库。它是构建在 Reactive Streams 基础上的高性能框架,提供了几种关键的异步数据流处理类型:MonoFlux

  • Mono:表示 0 或 1 个元素的异步数据流。适用于返回单个结果的场景,例如 HTTP 请求的响应。
  • Flux:表示 0 到 N 个元素的异步数据流,适用于处理多个结果或无限流的场景。

Project Reactor 中的 MonoFlux 提供了强大的操作符(类似于 Java 的 Stream API 中的操作符),用于组合、转换、过滤和操作异步数据流。

三、Mono 和 Flux 的基本用法
1. Mono 的使用

Mono 代表一个包含最多一个元素的异步流。它可以用于表示单个异步任务的结果,如数据库查询、HTTP 请求等。

import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {// 创建一个包含数据的 MonoMono<String> mono = Mono.just("Hello, Reactive World!");// 订阅并消费 Monomono.subscribe(System.out::println);}
}

在上述示例中,Mono.just("Hello, Reactive World!") 创建了一个包含单个元素的 Mono。当调用 subscribe() 方法时,Mono 开始执行并将数据输出到控制台。

2. Flux 的使用

Flux 是表示 0 到 N 个元素的异步流,适合处理多个数据项或无限的数据流。

import reactor.core.publisher.Flux;public class FluxExample {public static void main(String[] args) {// 创建一个包含多个元素的 FluxFlux<String> flux = Flux.just("Spring", "Reactor", "WebFlux");// 订阅并消费 Fluxflux.subscribe(System.out::println);}
}

在这个例子中,Flux.just() 创建了一个包含多个元素的 Flux,当 subscribe() 被调用时,Flux 会依次发射每个元素并输出它们。

四、背压(Backpressure)机制

背压是 Reactive Streams 中的一个关键概念,旨在解决生产者与消费者之间速率不匹配的问题。当生产者产生数据的速度快于消费者处理的速度时,背压允许消费者按自己的能力请求数据,避免数据积压导致内存问题。

在 Project Reactor 中,背压由订阅者(即消费者)通过 request(n) 方法来控制。例如,Flux 通过 onBackpressureBuffer()onBackpressureDrop() 来处理背压。

import reactor.core.publisher.Flux;public class BackpressureExample {public static void main(String[] args) {Flux.range(1, 1000).onBackpressureBuffer(10)  // 当消费者处理不过来时,使用缓冲区.subscribe(data -> {System.out.println("Processing " + data);try {Thread.sleep(100);  // 模拟处理时间} catch (InterruptedException e) {e.printStackTrace();}},error -> System.err.println("Error: " + error),() -> System.out.println("Complete"));}
}

在该示例中,Flux 将发射 1000 个元素,但消费者处理的速度较慢,因此我们使用 onBackpressureBuffer(10) 将多余的数据存放到缓冲区,以便消费者可以按需处理数据。

五、常用的操作符

Project Reactor 提供了许多操作符,用于处理和转换数据流。以下是一些常用的操作符示例:

1. map()

map() 操作符用于将每个元素进行转换,类似于 Java 8 的 Stream.map()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4);
numbers.map(n -> n * 2).subscribe(System.out::println);  // 输出 2, 4, 6, 8
2. flatMap()

flatMap() 可以用于异步转换,并返回新的 MonoFlux

Flux<String> names = Flux.just("Tom", "Jerry", "Spike");
names.flatMap(name -> Flux.just(name.toUpperCase())).subscribe(System.out::println);  // 输出 TOM, JERRY, SPIKE
3. filter()

filter() 用于过滤掉不满足条件的元素。

Flux<Integer> numbers = Flux.range(1, 10);
numbers.filter(n -> n % 2 == 0).subscribe(System.out::println);  // 输出 2, 4, 6, 8, 10
4. reduce()

reduce() 用于将多个值组合成一个值,类似于 Stream.reduce()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.reduce((a, b) -> a + b).subscribe(System.out::println);  // 输出 15
5. zip()

zip() 操作符用于合并多个流,并将其元素“打包”成一个对象或元组。

Flux<String> names = Flux.just("Tom", "Jerry");
Flux<Integer> ages = Flux.just(5, 6);
Flux.zip(names, ages).subscribe(tuple -> System.out.println("Name: " + tuple.getT1() + ", Age: " + tuple.getT2()));
六、Project Reactor 的错误处理

Reactive 编程中的错误处理非常重要,因为异步流程中的异常不会像同步代码那样直接抛出。在 Project Reactor 中,有几种方式处理错误:

1. onErrorReturn()

在发生错误时返回默认值。

Flux<Integer> flux = Flux.just(1, 2, 0, 4).map(i -> 10 / i)  // 会产生除以 0 的异常.onErrorReturn(-1);
flux.subscribe(System.out::println);  // 输出 10, 5, -1
2. onErrorResume()

onErrorResume() 允许你在发生错误时切换到一个新的 FluxMono

Flux<Integer> flux = Flux.just(1, 2, 0, 4).map(i -> 10 / i).onErrorResume(e -> Flux.just(-1, -2));
flux.subscribe(System.out::println);  // 输出 10, 5, -1, -2
七、Reactive 编程的优势与适用场景

Reactive 编程在处理高并发、I/O 密集型任务时表现尤为出色,特别适用于以下场景:

  1. 微服务架构:在微服务中,服务之间的通信常常需要通过非阻塞 I/O,Reactive 编程能够显著提升系统的吞吐量。

  2. **高并发的

Web 应用**:Reactive 编程可以处理大量同时进行的用户请求,而不会因为线程阻塞而限制系统性能。

  1. 事件驱动系统:如物联网(IoT)系统,Reactive 编程可以很好地处理流式数据和异步事件。
八、总结

Reactive 编程为构建高性能、响应式系统提供了一种全新的方式,而 Project Reactor 则是 Java 生态中实现 Reactive Streams 规范的强大工具。通过 MonoFlux 的流式 API,开发者可以简洁高效地处理异步任务,并借助背压机制避免过度生产数据导致的资源问题。

Project Reactor 的丰富操作符、错误处理机制和与 Spring WebFlux 的无缝集成,使其成为开发现代高并发应用的得力助手。掌握 Reactive 编程及其在 Project Reactor 中的实现,能够显著提高应用的扩展性、性能和响应性。


http://www.ppmy.cn/ops/110924.html

相关文章

Leetcode面试经典150题-79.搜索单词

题目比较简单&#xff0c;回溯最基础的题&#xff0c;记得除非覆盖&#xff0c;否则一定要恢复现场就行 解法都在代码里&#xff0c;不懂就留言或者私信 class Solution {public boolean exist(char[][] board, String word) {int m board.length; int n board[0].length;i…

JS 扩展运算符有哪些使用场景?

你好&#xff0c;我是沐爸&#xff0c;欢迎点赞、收藏、评论和关注。 扩展运算符有哪些使用场景&#xff1f;直接进入正题 一、复制数组 const a1 [1, 2];// 写法一 const a2 [...a1]; // 写法二 const [...a2] a1;二、合并数组 const part1 [1, 2, 3]; const part2 …

Python 数学建模——ARMA 时间序列分析

文章目录 前言使用前提平稳性检验白噪声检验 用法代码实例第一步——平稳性分析方法一方法二方法三 第二步——白噪声分析第三步——确定参数第四步——模型构建与检验检验模型效果预测未来数据 前言 常见的时间序列分析方法有很多&#xff0c;之前介绍了一个稍微新颖的 Prophe…

HarmonyOS开发者基础认证试题

文章目录 一、HarmonyOS介绍二、DevEco Studio的使用三、ArkTS语法介绍四、应用程序框架基础五、从简单的页面开始六、构建更加丰富的页面七、从网络获取数据八、保存应用数据 一、HarmonyOS介绍 判断题&#xff1a; 1.“一次开发&#xff0c;多端部署”指的是一个工程&#x…

【GBase 8c V5_3.0.0 分布式部署(单机安装)】

GBase 8c数据库分布式形态采用share nothing的分布式架构&#xff0c;计算节点和存储节点分离。节点间通过高速网络进行通信&#xff0c;所有节点都有主从互备&#xff0c;确保系统的极致高可用。 本文主要包含GBase 8c V5_3.0.0 分布式部署(单机安装)的简单流程&#xff0c;供…

AI创意引擎:优化Prompt提示词的高效提问技巧

AI内容创作的精髓&#xff1a;提示词&#xff08;Prompt&#xff09; 在AI领域中&#xff0c;提示词&#xff08;Prompt&#xff09;是与模型沟通的关键工具。提示词不仅决定了AI生成内容的方向和质量&#xff0c;还在优化模型输出、提升用户体验中扮演着至关重要的角色。因此…

【pgAdmin4】使用psql命令行执行查询时,使用占位符(:v1)传入参数

目录 0.环境 1.相关知识点 2.举例 3.详细操作 0.环境 windows11 pgAdmin4 8.10 1.相关知识点 占位符如何理解&#xff1f; SQL语句中的占位符是一种用于代表参数的特殊符号&#xff0c;通常以"?"或":"开头。它们被用来构建可重用的SQL语句&#xff0…

Grafana 汉化

点击 Home -> Administration 点击 Default preferences 点击 中文&#xff08;简体&#xff09;后点击 Save 即可

遥感技术在生态系统碳储量、碳收支、碳排放、碳循环以及人为源排放反演等领域的技术发展,实践角度解决遥感技术在生态、能源、大气等领域的碳排放监测及模拟问题

卫星遥感具有客观、连续、稳定、大范围、重复观测的优点&#xff0c;已成为监测全球碳盘查不可或缺的技术手段&#xff0c;卫星遥感也正在成为新一代 、国际认可的全球碳核查方法。本教程的目的就是梳理碳中和与碳达峰对卫星遥感的现实需求&#xff0c;系统总结遥感技术在生态系…

唯徳知识产权管理系统 DownloadFileWordTemplate 文件读取漏洞复现

0x01 产品简介 唯徳知识产权管理系统,由深圳市唯德科创信息有限公司精心打造,旨在为企业及代理机构提供全方位、高效、安全的知识产权管理解决方案。该系统集成了专利、商标、版权等知识产权的全面管理功能,并通过云平台实现远程在线办公,提升工作效率。是一款集知识产权申…

Radware Alteon 负载均衡-基于URL Filetype的七层负载均衡

作者&#xff1a;Xiaolei Ren Radware Alteon作为一款高性能的负载均衡器&#xff0c;其基于URL Filetype的七层负载均衡功能为众多企业提供了灵活、高效的解决方案。 该案例实现如下需求&#xff1a;当客户端访问服务器时&#xff0c;默认访问10.200.1.100&#xff0c;在ht…

快手自研Spark向量化引擎正式发布,性能提升200%

Blaze 是快手自研的基于Rust语言和DataFusion框架开发的Spark向量化执行引擎&#xff0c;旨在通过本机矢量化执行技术来加速Spark SQL的查询处理。Blaze在快手内部上线的数仓生产作业也观测到了平均30%的算力提升&#xff0c;实现了较大的降本增效。本文将深入剖析blaze的技术原…

Anaconda 安装与使用教程

Anaconda 安装与使用教程 介绍 Anaconda 是一个用于科学计算的 Python 和 R 的发行版&#xff0c;它包含了众多流行的科学计算、数据分析、机器学习等领域的库。本教程旨在帮助初学者快速上手 Anaconda&#xff0c;并学会如何使用其管理环境以及安装包。 第一步&#xff1a;…

Linux下的gcc与gdb

目录 Linux下的gcc与gdb 代码编译与链接 函数库 gdb介绍和安装 gdb基本使用指令 示例代码 debug模式和release模式 基本指令 进入gdb调试与显示调试代码 创建断点与删除断点 启用和禁用断点 执行代码 逐语句和逐过程调试 断点跳转 显示指定变量以及对应内容 打印变量的值 执行到…

代码随想录算法day30 | 动态规划算法part03 | 01背包问题 二维 ,01背包问题 一维,416. 分割等和子集

正式开始背包问题&#xff0c;背包问题还是挺难的&#xff0c;虽然大家可能看了很多背包问题模板代码&#xff0c;感觉挺简单&#xff0c;但基本理解的都不够深入。 背包问题&#xff0c;力扣上没有原题&#xff0c;题目是在卡码网找的 动态规划&#xff1a;01背包理论基础 本…

简单易懂的方式来解释机器学习(ML)和深度学习(DL)的区别与联系

让我们从多个角度出发&#xff0c;用一些简单易懂的方式来解释机器学习&#xff08;ML&#xff09;和深度学习&#xff08;DL&#xff09;的区别与联系。 1. 概念上的区别 机器学习 想象一下你在教一个小孩子如何分辨猫和狗。你可能会给这个孩子看很多猫和狗的照片&#xff…

Unity-Time类

目录 Time.timeScale Time.deltaTime Time.unscaledDeltaTime Time.time Time.frameCount Time.fixedDeltaTime Time.timeScale 时间缩放比例 时间停止 Time.timeScale 0; //回复正常 //Time.timeScale 1; //2倍速 …

VSCode 离线安装中文语言包

1.插件市场 Extensions for Visual Studio family of products | Visual Studio Marketplace 输入&#xff1a; language 在version history里面下载相应的版本&#xff0c;若没有就下载最新的 在下面安装 安装完重启就可以了。 可能会提示的失败&#xff1a; Unable to ins…

【vscode】 快速生成react组件

在VSCode中快速生成React组件的方法主要包括使用内置的代码片段和安装第三方插件。以下是具体的步骤和方法&#xff1a; 使用内置代码片段‌ VSCode支持用户自定义代码片段&#xff0c;你可以通过输入特定的前缀&#xff0c;然后按下Tab键&#xff0c;来快速生成React组件的基…

Cenos7镜像+Docker问题

前言 好用的镜像 阿里 curl -o /etc/yum.repos.d/Centos7-aliyun.repo https://mirrors.wlnmp.com/centos/Centos7-aliyun-x86_64.repo科大 curl -o /etc/yum.repos.d/Centos7-ustc.repo https://mirrors.wlnmp.com/centos/Centos7-ustc-x86_64.repo网易 curl -o /etc/yum…