Spring WebFlux之流式输出

devtools/2025/3/28 17:00:29/

🎉🎉🎉🎉🎉🎉
欢迎访问的个人博客:https://swzbk.site/,加好友,拉你入福利群
🎉🎉🎉🎉🎉🎉

流式输出(Streaming Output)是指将数据分块逐步发送给客户端,而不是一次性发送所有数据。这种方式特别适合处理大文件、实时数据或需要逐步展示的场景(如deepseek响应、语音、视频、日志等)。在springboot中通过Spring WebFlux实现。

1. Flux是什么?

  • 定义:Flux是一个异步数据流处理库,用于生成、操作和消费数据流(类似集合,但支持异步和非阻塞操作)。
  • 核心特点
    • 背压(Backpressure):消费者可以控制生产者的速度,避免数据过载。
    • 函数式编程:通过链式操作(如mapfilterflatMap)处理数据流。
    • 异步非阻塞:基于Reactor Netty实现高性能I/O,适合高并发场景。

2. Flux在后端Java中的作用

(1)处理异步与高并发
  • 场景:微服务通信、实时数据处理(如消息队列、日志监控)、长连接(WebSocket)。
  • 优势:通过异步非阻塞方式减少线程资源消耗,提升系统吞吐量。
(2)响应式Web开发
  • 与Spring WebFlux结合:构建响应式REST API,支持HTTP/2和Server-Sent Events(SSE)。
  • 示例代码
    java">@GetMapping("/events")
    public Flux<Event> getEvents() {return Flux.interval(Duration.ofSeconds(1)).map(sequence -> new Event("Event " + sequence));
    }
    
    (每秒推送一个事件给客户端)
(3)背压管理
  • 避免内存溢出:当消费者处理速度慢于生产者时,Flux通过背压机制暂停生产,确保系统稳定。
(4)数据流操作
  • 丰富的操作符:支持过滤、转换、合并、重试等复杂逻辑。
    java">Flux.just(1, 2, 3).map(n -> n * 2).filter(n -> n % 3 == 0).subscribe(System.out::println); // 输出:6
    

3. 为什么选择Flux?

  • 与Spring生态集成:无缝衔接Spring Boot、Spring Cloud,适合企业级应用。
  • 轻量高效:相比传统阻塞IO,资源占用更少,适合云原生环境。
  • 对比RxJava:Flux是Spring官方推荐的响应式库,更注重与Java生态的兼容性。

4. 案例实现

在 Java 中使用 OkHttp3 发送请求,并通过 Project Reactor 的 Flux 获取实时响应,通常适用于处理流式数据,比如服务器发送的实时更新或者大型数据块的逐步传输。以下为你详细介绍实现步骤并给出示例代码。

实现思路
  1. 引入依赖:需要在项目中引入 OkHttp3 和 Project Reactor 的依赖。
  2. 发送请求:使用 OkHttp3 发送 HTTP 请求。
  3. 处理响应:将 OkHttp3 的响应流转换为 Flux 进行响应式处理。
4.1 添加依赖

如果你使用的是 Maven 项目,在 pom.xml 中添加以下依赖:

<dependencies><!-- OkHttp3 --><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.9.3</version></dependency><!--1: webflux 会默认引入下面的单独reactor-core--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!-- 2:Project Reactor Core、单独引入 --><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.4.16</version></dependency>
</dependencies>
4.2 编写代码

你提供的代码片段是结合了 OkHttp3 发起异步请求和 Project Reactor 的 Flux 来处理响应结果。下面为你完善这个示例,并详细解释代码逻辑。

示例代码
java">import okhttp3.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;import java.io.IOException;public class OkHttpFluxExample {private static final OkHttpClient client = new OkHttpClient();public static Flux<String> makeApiCall(Request apiRequest) {return Flux.create(emitter -> {try {// 发起异步请求client.newCall(apiRequest).enqueue(new Callback() {@Overridepublic void onFailure(Call call, IOException e) {// 处理请求失败的情况System.err.println("请求 API 失败: " + e.getMessage());emitter.error(e);}@Overridepublic void onResponse(Call call, Response response) throws IOException {try (ResponseBody responseBody = response.body()) {if (response.isSuccessful() && responseBody != null) {// 假设响应是逐行文本数据,逐行发布到 Flux 中String[] lines = responseBody.string().split("\n");for (String line : lines) {emitter.next(line);}// 数据发布完成,发送完成信号emitter.complete();} else {// 处理响应不成功的情况String errorMessage = "请求 API 失败,响应码: " + response.code();System.err.println(errorMessage);emitter.error(new IOException(errorMessage));}}}});} catch (Exception e) {// 处理请求过程中发生的异常System.err.println("请求 API 时发生异常: " + e.getMessage());emitter.error(e);}});}public static void main(String[] args) {// 构建请求Request apiRequest = new Request.Builder().url("https://example.com/api") // 替换为实际的 API 地址.build();// 调用 makeApiCall 方法获取 FluxFlux<String> responseFlux = makeApiCall(apiRequest);// 订阅 Flux 并处理响应数据responseFlux.subscribe(// 处理每个接收到的元素line -> System.out.println("Received: " + line),// 处理错误error -> System.err.println("Error: " + error.getMessage()),// 处理完成信号() -> System.out.println("API 请求处理完成"));}
}
代码解释
  1. makeApiCall 方法
    • 该方法接收一个 Request 对象作为参数,返回一个 Flux<String> 对象。
    • 使用 Flux.create 创建 Flux,在其回调中使用 OkHttp3 的 enqueue 方法发起异步请求。
    • onFailure 方法:当请求失败时,打印错误信息并调用 emitter.error 方法将错误信号发送给 Flux 的订阅者。
    • onResponse 方法
      • 若响应成功且响应体不为空,将响应体按行分割,逐行调用 emitter.next 方法将每行数据发布到 Flux 中。
      • 数据发布完成后,调用 emitter.complete 方法发送完成信号。
      • 若响应不成功,打印错误信息并调用 emitter.error 方法发送错误信号。
  2. main 方法
    • 构建一个 Request 对象,指定要请求的 API 地址。
    • 调用 makeApiCall 方法获取 Flux
    • 使用 subscribe 方法订阅 Flux,处理接收到的元素、错误和完成信号。
注意事项
  • 请将 https://example.com/api 替换为实际的 API 地址。
  • 该示例假设响应是逐行文本数据,你可以根据实际情况调整数据处理逻辑。
  • 在实际应用中,需要注意资源管理,例如关闭 OkHttp 客户端等。

🎉🎉🎉🎉🎉🎉
欢迎访问的个人博客:https://swzbk.site/,加好友,拉你入福利群
🎉🎉🎉🎉🎉🎉


http://www.ppmy.cn/devtools/168774.html

相关文章

辉视SIP:编织酒店智慧沟通的“声”动网络

在酒店这一追求极致服务与体验的行业中&#xff0c;辉视SIP广播对讲系统以其卓越的性能和广泛的应用场景&#xff0c;成为酒店内部沟通协作、应急响应及日常运营管理的得力助手&#xff0c;为提升酒店服务质量、创造卓越宾客体验开辟了新的路径。 一、即时通讯&#xff0c;构建…

【视频】H.264的码率和图像质量

1、简述 分辨率、帧率、I帧设置不变的情况下,码率过低时,IP摄像机如果没有足够的带宽来传输高质量的图像,便会抹掉一些细节,导致出现马赛克,尤其是动态性强的画面(比如:运动的云台、非固定的摄像头)。 2、计算码率 1)码率的基础理论公式为: 码率 (bps) = 分辨率像…

【eNSP实战】三层交换机使用ACL实现网络安全

拓图 要求&#xff1a; vlan1可以访问Internetvlan2和vlan3不能访问Internet和vlan1vlan2和vlan3之间可以互相访问PC配置如图所示&#xff0c;这里不展示 LSW1接口vlan配置 vlan batch 10 20 30 # interface Vlanif1ip address 192.168.40.2 255.255.255.0 # interface Vla…

工程化与框架系列(35)--前端微服务架构实践

前端微服务架构实践 &#x1f3d7;️ 引言 随着前端应用规模的不断扩大&#xff0c;微服务架构在前端领域的应用越来越广泛。本文将深入探讨前端微服务架构的实现方案、最佳实践和相关工具。 微服务架构概述 前端微服务架构主要包括以下方面&#xff1a; 应用拆分&#xf…

mybatis集合映射association与collection

官方文档&#xff1a;MyBatis的一对多关联关系 一、用途 一对一&#xff1a;association 一对多&#xff1a;collection 二、association 比较容易理解&#xff0c;可参考官方文档 三、collection <?xml version"1.0" encoding"UTF-8"?> &l…

Redis高级结构-布隆过滤器

可以将布隆过滤器看成一个set&#xff0c;但是这个set可能不太准&#xff0c;当你使用它的contains方法判断时&#xff0c;他可能会误判。但只要设置的参数合理&#xff0c;精确度还是非常高的。当布隆过滤器说某个值存在的时候&#xff0c;那这个值可能不存在。但是当其判断某…

深蕾半导体IP-KVM产品方案解析

2025 深蕾半导体IP-KVM产品方案解析 引言 随着信息技术的飞速发展&#xff0c;远程访问和控制技术在各行各业中的应用日益广泛。根据Market Research Intellect的调研报告&#xff0c;2023年&#xff0c;KVM切换器全球市场规模已经达到了100亿美元&#xff0c;预计到2031年…

repo init 错误 Permission denied (publickey)

一、已经生成ssh-key并设置到gerrit上 二、已经设置.gitconfig &#xff08;此步骤是公司要求&#xff0c;设置gerrit地址为一个别名之类的&#xff0c;有的公司不需要&#xff09; 然后出现下面的错误&#xff0c;最后发现忘记设置git的用户名和邮箱 1. git config --globa…