RxJava响应式编程的实现

server/2024/10/9 13:31:12/

❤ 作者主页:李奕赫揍小邰的博客
❀ 个人介绍:大家好,我是李奕赫!( ̄▽ ̄)~*
🍊 记得点赞、收藏、评论⭐️⭐️⭐️
📣 认真学习!!!🎉🎉

文章目录

  • RxJava
    • 什么是响应式编程
    • RxJava介绍
    • RxJava 的核心知识
    • 常用操作符
    • 事件
    • 测试

  在做AI生成题目和回答的时候,可能因为生成时间较长,导致页面停留时间较长,这样会导致用户体验感较差, AI 生成相关的功能是等所有内容全部生成后,再返回给前端,同时用户可能要等待较长的时间。如何进行优化,通过阅读调用AI方的官方文档,提供了 流式 接口调用方式。通过设置 stream 为 true 来开启流式
  官方提供了一段示例代码,如果 stream 设置为 true,需要从返回结果中获取到 flowable 对象

java">/*** 流式请求* @param messages* @param stream* @param temperature* @return*/
public Flowable<ModelData> doRequestFlowable(List<ChatMessage> messages, Boolean stream, Float temperature) {// 构造请求ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder().model(Constants.ModelChatGLM4).stream(stream).invokeMethod(Constants.invokeMethod).temperature(temperature).messages(messages).build();ModelApiResponse invokeModelApiResp = clientV4.invokeModelApi(chatCompletionRequest);return invokeModelApiResp.getFlowable();
}

实际上 Flowable 是 RxJava 响应式编程库中定义的类,为了更好地进行流式开发,我们要先来了解下响应式编程和 RxJava。

RxJava

  RxJava 是一个基于事件驱动的、利用可观测序列来实现异步编程的类库,是响应式编程在 Java 语言上的实现。

什么是响应式编程

  响应式编程(Reactive Programming)是一种编程范式,它专注于 异步数据流 和 变化传播。
响应式编程的几个核心概念:

1)数据流:响应式编程中,数据以流(Streams)的形式存在。流就像一条河,源源不断、有一个流向(比如从 A 系统到 B 系统再到 C 系统),它可以被过滤、观测、或者跟另一条河流合并成一个新的流。
比如用户输入、网络请求、文件读取都可以是数据流,可以很轻松地对流进行处理。
比如 Java 8 的 Stream API就是用数据流处理

2)异步处理:响应式编程是异步的,即操作不会阻塞线程,而是通过回调或其他机制在未来某个时间点处理结果。这提高了应用的响应性和性能。

3)变化传播:当数据源发生变化时,响应式编程模型会自动将变化传播到依赖这些数据源的地方。这种传播是自动的,不需要显式调用。
同时,响应式编程更倾向于声明式编程风格,通过定义数据流的转换和组合来实现复杂的逻辑。比如,可以利用 map、filter 等函数来实现数据转换,而不是将一大堆复杂的逻辑混杂在一个代码块中。
 

RxJava介绍

1、事件驱动
事件可以是任何事情,如用户的点击操作、网络请求的结果、文件的读写等。事件驱动的编程模型是通过事件触发行动。

在 RxJava 中,事件可以被看作是数据流中的数据项,称为“事件流”或“数据流”。每当一个事件发生,这个事件就会被推送给那些对它感兴趣的观察者(Observers)。

2、可观测序列
可观测序列是指一系列按照时间顺序发出的数据项,可以被观察和处理。可观测序列提供了一种将数据流和异步事件建模为一系列可以订阅和操作的事件的方式。

可以理解为在数据流的基础上封装了一层,多加了一点方法。
 

RxJava 的核心知识

观察者模式
  RxJava 是基于 观察者模式 实现的,分别有观察者和被观察者两个角色,被观察者会实时传输数据流,观察者可以观测到这些数据流。

基于传输和观察的过程,用户可以通过一些操作方法对数据进行转换或其他处理。

在 RxJava 中,观察者就是 Observer,被观察者是 Observable 和 Flowable。

Observable 适合处理相对较小的、可控的、不会迅速产生大量数据的场景。它不具备背压处理能力,也就是说,当数据生产速度超过数据消费速度时,可能会导致内存溢出或其他性能问题。

Flowable 是针对背压(反向压力)问题而设计的可观测类型。背压问题出现于数据生产速度超过数据消费速度的场景。Flowable 提供了多种背压策略来处理这种情况,确保系统在处理大量数据时仍然能够保持稳定。

被观察者.subscribe(观察者),它们之间就建立的订阅关系,被观察者传输的数据或者发出的事件会被观察者观察到。

 

常用操作符

  RxJava 提供了很多操作符供我们使用,这块其实和 Java8 的 Stream 类似,概念上都是一样的。
1)变换类操作符,对数据流进行变换,如 map、flatMap 等。

比如利用 map 将 int 类型转为 string

java">Flowable<String> flowable = Flowable.range(0, Integer.MAX_VALUE).map(i -> String.valueOf(i))

2)聚合类操作符,对数据流进行聚合,如 toList、toMap 等。

将数据转成一个 list

java"> Flowable.range(0, Integer.MAX_VALUE).toList()

3)过滤操作符,过滤或者跳过一些数据,如 filter、skip 等。

将大于 10 的数据转成一个 list

java">Flowable.range(0, Integer.MAX_VALUE).filter(i -> i > 10).toList();

4)组合/合并操作符,将多个数据流连接到一起,如 concat、zip 等。

创建两个 Flowable,通过 concat 连接得到一个被观察者,进行统一处理

java">// 创建两个 Flowable 对象
Flowable<String> flowable1 = Flowable.just("A", "B", "C");
Flowable<String> flowable2 = Flowable.just("D", "E", "F");// 使用 concat 操作符将两个 Flowable 合并
Flowable<String> flowable = Flowable.concat(flowable1, flowable2);

5)排序操作符,对数据流内的数据进行排序,如 sorted

java">Flowable<String> flowable = Flowable.concat(flowable1, flowable2).sorted();

事件

RxJava 也是一个基于事件驱动的框架,我们来看看一共有哪些事件,分别在什么时候触发:

1)onNext,被观察者每发送一次数据,就会触发此事件。

2)onError,如果发送数据过程中产生意料之外的错误,那么被观察者可以发送此事件。

3)onComplete,如果没有发生错误,那么被观察者在最后一次调用 onNext 之后发送此事件表示完成数据传输。

对应的观察者得到这些事件后,可以进行一定处理,例如:

java">flowable.observeOn(Schedulers.io()).doOnNext(item -> {System.out.println("来数据啦" + item.toString());}).doOnError(e -> {System.out.println("出错啦" + e.getMessage());}).doOnComplete(() -> {System.out.println("数据处理完啦");}).subscribe();

测试

1)引入依赖

java"><dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams</artifactId><version>1.0.4</version>
</dependency>

2)编写单元测试

java">@Test
void rxJavaDemo() throws InterruptedException {// 创建一个流,每秒发射一个递增的整数(数据流变化)Flowable<Long> flowable = Flowable.interval(1, TimeUnit.SECONDS).map(i -> i + 1).subscribeOn(Schedulers.io()); // 指定创建流的线程池// 订阅 Flowable 流,并打印每个接受到的数字flowable.observeOn(Schedulers.io()).doOnNext(item -> System.out.println(item.toString())).subscribe();// 让主线程睡眠,以便观察输出Thread.sleep(10000L);
}

http://www.ppmy.cn/server/126141.html

相关文章

代码工艺:Spring Boot 防御式编程实践

防御式编程是一种编程实践&#xff0c;其核心理念是编写代码时要假设可能会发生错误、异常或非法输入&#xff0c;并通过各种手段防止这些问题引发系统崩溃、错误行为或安全漏洞。该编程方法的目的是让程序在面对不可预测的情况&#xff08;如输入数据异常、硬件故障、意外的用…

[极客大挑战 2019]RCE ME1

<?php error_reporting(0); if(isset($_GET[code])){$code$_GET[code];if(strlen($code)>40){die("This is too Long.");}if(preg_match("/[A-Za-z0-9]/",$code)){die("NO.");}eval($code); } else{highlight_file(__FILE__); }// ?>…

Android 利用OSMdroid开发GIS 添加点、线、面和标记点

部署看这个&#xff1a;Android 利用OSMdroid开发GIS-CSDN博客 直接上代码 activity_main.xml&#xff1a; <?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"xm…

HBase 性能优化的高频面试题及答案

目录 高频面试题及答案1. 如何通过数据建模优化 HBase 性能?2. 如何优化 HBase 的写入性能?3. 如何通过压缩优化 HBase 的存储性能?4. 如何通过调整 RegionServer 配置优化性能?5. 如何优化 HBase 的读取性能?6. 如何通过使用 HBase 的版本控制优化性能?7. 如何通过使用合…

ping基本使用详解

在网络中ping是一个十分强大的TCP/IP工具。它的作用主要为&#xff1a; 用来检测网络的连通情况和分析网络速度根据域名得到服务器 IP根据 ping 返回的 TTL 值来判断对方所使用的操作系统及数据包经过路由器数量。我们通常会用它来直接 ping ip 地址&#xff0c;来测试网络的连…

【分布式微服务云原生】 RPC协议:超越HTTP的远程通信艺术

RPC协议&#xff1a;超越HTTP的远程通信艺术 摘要 RPC&#xff08;远程过程调用&#xff09;协议是分布式系统中实现远程调用的关键技术。本文将探讨RPC协议是否必须依赖HTTP协议来完成远程通信&#xff0c;并介绍几种常见的RPC实现方式。通过本文&#xff0c;你将了解RPC的核…

js逆向——webpack实战案例(一)

今日受害者网站&#xff1a;https://www.iciba.com/translate?typetext 首先通过跟栈的方法找到加密位置 我们跟进u函数&#xff0c;发现是通过webpack加载的 向上寻找u的加载位置&#xff0c;然后打上断点&#xff0c;刷新网页&#xff0c;让程序断在加载函数的位置 u r.n…

Linux —— Socket编程(三)

一、本章重点 1. tcp服务器实现思路&#xff0c;进一步了解和总结相关的接口 2. 了解日志和守护进程 二、tcp服务器核心思路 tcp版的服务器与udp的不同在于&#xff0c;udp是面向数据报传输数据&#xff0c;在数据传输中不需要建立与客户端的链接&#xff0c;直接用recvfrom…