深入理解Reactor Flux的生成方法

embedded/2025/3/3 18:13:24/

在Reactor框架中,Flux 是一个非常重要的概念,它用于表示一个可以产生多个事件的响应式流。通过 Flux 提供的多种生成方法,我们可以灵活地创建各种类型的流。本文将详细介绍 Flux.generate 方法的使用,并通过实例帮助读者更好地理解其原理和应用场景。

Flux.generate 方法概述

Flux.generate 方法允许我们通过编程方式创建一个 Flux。它提供了三种重载形式,分别适用于不同的场景:

  1. 无状态生成

    public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
    

    这种方式通过一个 Consumer<SynchronousSink<T>> 回调函数逐个生成信号。

  2. 有状态生成

    public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
    

    这种方式在生成信号时引入了状态管理,stateSupplier 提供初始状态,generator 根据当前状态生成信号并返回下一个状态。

  3. 有状态生成并带清理回调

    public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator, Consumer<? super S> stateConsumer)
    

    在有状态生成的基础上,增加了 stateConsumer,用于在流结束时对状态进行清理。

示例 1:无状态生成

我们可以通过 Consumer<SynchronousSink<T>> 回调函数逐个生成信号。以下是一个简单的示例:

package com.example;import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicInteger;public class GenerateViaConsumerSyncSink {public static void main(String[] args) {AtomicInteger ai = new AtomicInteger(0);Flux<Integer> flux = Flux.generate(sink -> {sink.next(ai.incrementAndGet());if (ai.get() == 5) {sink.complete();}});flux.subscribe(System.out::println);}
}

输出:

1
2
3
4
5

在这个示例中,我们使用 AtomicInteger 来生成从 1 到 5 的数字,并在生成到 5 时结束流。

示例 2:有状态生成

当需要引入状态时,可以使用第二种重载形式。以下是一个示例:

package com.example;import reactor.core.publisher.Flux;public class GenerateViaSyncSink {public static void main(String[] args) {Flux<String> flux = Flux.generate(() -> 1, // 初始状态(state, sink) -> {sink.next("state = " + state);if (state > 10) {sink.complete();}return state + 2; // 返回下一个状态});flux.subscribe(System.out::println);}
}

输出:

state = 1
state = 3
state = 5
state = 7
state = 9
state = 11

在这个示例中,我们定义了一个初始状态为 1,并在每次生成信号时将状态加 2,直到状态大于 10 时结束流。

示例 3:有状态生成并带清理回调

如果需要在流结束时对状态进行清理,可以使用第三种重载形式。以下是一个示例:

package com.example;import reactor.core.publisher.Flux;
import java.util.function.Consumer;public class GenerateViaSyncSinkWithLastConsumer {public static void main(String[] args) {Flux<String> flux = Flux.generate(() -> "apple", // 初始状态(state, sink) -> {sink.next("other " + state);if (state.length() > 10) {sink.complete();}return state + " more"; // 返回下一个状态},new Consumer<String>() { // 清理回调@Overridepublic void accept(String s) {System.out.println("state consumer-> " + s);}});flux.subscribe(System.out::println);}
}

输出:

other apple
other apple more
other apple more more
state consumer-> apple more more more

在这个示例中,我们定义了一个初始状态为 "apple",并在每次生成信号时将状态追加 " more"。当状态长度超过 10 时,流结束,并通过清理回调输出最终状态。

总结

Flux.generate 方法为我们提供了灵活的流生成方式,无论是无状态还是有状态的场景,都可以轻松实现。通过引入状态和清理回调,我们可以更好地管理流的生成过程和资源清理。希望本文的示例能帮助你更好地理解和使用 Flux.generate 方法。


http://www.ppmy.cn/embedded/169682.html

相关文章

前缀和算法 算法4

算法题中帮助复习的知识 vector<int > dp( n ,k); n为数组大小 ,k为初始化 哈希表unordered_map<int ,int > hash; hash.find(k)返回值是迭代器 ,找到k返回其迭代器 没找到返回hash.end() hash.count(k)返回值是数字 ,找到k返回1 ,没找到返回0. C和java中 负数…

C语言实现双向链表

1、概念 单向链表的构成使得节点的访问要按照链表的方向进行,某一单元的后继单元可以直接通过链指针(next指针)找到,但是想要找到其前驱单元,必须从链头重新开始查找。如果在节点中增加一个指针域指向其前驱节点,可以在牺牲空间代价的前提下,减少操作时间的代价。在单向…

SVN 简介

SVN 简介 引言 版本控制系统(Version Control System,VCS)是软件开发过程中不可或缺的工具之一。它能够帮助开发者管理代码的版本,追踪代码变更,协同工作,以及确保代码的稳定性和安全性。Subversion(简称SVN)是一种流行的版本控制系统,本文将为您详细介绍SVN的基本概…

LeetCode 0132.分割回文串 II:动态规划

【LetMeFly】132.分割回文串 II&#xff1a;动态规划 力扣题目链接&#xff1a;https://leetcode.cn/problems/palindrome-partitioning-ii/ 给你一个字符串 s&#xff0c;请你将 s 分割成一些子串&#xff0c;使每个子串都是回文串。 返回符合要求的 最少分割次数 。 示例 …

阿里管理三板斧课程和管理工具包(视频精讲+工具文档).zip

阿里管理三板斧课程和管理工具包&#xff08;视频精讲工具文档&#xff09;&#xff0c;共18课。 阿里管理三板斧工具包 阿里绩效考核文档 阿里人力资源实践全集文档 阿里文化构建工具包 阿里正委体系工具包 阿里三板斧.pdf 阿里三板斧-学员手册.pdf 第1集 三板斧的底层逻辑.…

版图自动化连接算法开发 00001 ------ 直接连接两个给定的坐标点

版图自动化连接算法开发 00001 ------ 直接连接两个给定的坐标点 引言正文定义坐标点的类绘图显示代码直接连接两个坐标点引言 由于人工智能的加速普及,每次手动绘制版图都会觉得特别繁琐,作者本人在想可否搞一个自动化连接器件端口的算法,后期可以根据一些设定的限制进行避…

代码随想录算法训练营第33天 | 62. 不同路径 63. 不同路径 II 343. 整数拆分 96. 不同的二叉搜索树

62. 不同路径 题目链接&#xff1a; 62. 不同路径 - 力扣&#xff08;LeetCode&#xff09; 代码 class Solution:def uniquePaths(self, m: int, n: int) -> int:dp [[1]*n for _ in range(m)]for i in range(1,m):for j in range(1,n):dp[i][j] dp[i-1][j] dp[i][j…

【OpenCV C++】图像清晰度增强:拉普拉斯锐化,SUM锐化,普通锐化

文章目录 1 普通锐化2. 拉普拉斯 锐化3 SUM锐化1 普通锐化 void sharpenImage(const cv::Mat& frame,float a, float b) {定义一个3x3的锐化核cv