❤ 作者主页:李奕赫揍小邰的博客
❀ 个人介绍:大家好,我是李奕赫!( ̄▽ ̄)~*
🍊 记得点赞、收藏、评论⭐️⭐️⭐️
📣 认真学习!!!🎉🎉
文章目录
- RxJava
- 什么是响应式编程?
- RxJava介绍
- RxJava 的核心知识
- 常用操作符
- 事件
- 测试
在做AI生成题目和回答的时候,可能因为生成时间较长,导致页面停留时间较长,这样会导致用户体验感较差, AI 生成相关的功能是等所有内容全部生成后,再返回给前端,同时用户可能要等待较长的时间。如何进行优化,通过阅读调用AI方的官方文档,提供了 流式 接口调用方式。通过设置 stream 为 true 来开启流式
官方提供了一段示例代码,如果 stream 设置为 true,需要从返回结果中获取到 flowable 对象
java">/*** 流式请求* @param messages* @param stream* @param temperature* @return*/
public Flowable<ModelData> doRequestFlowable(List<ChatMessage> messages, Boolean stream, Float temperature) {// 构造请求ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder().model(Constants.ModelChatGLM4).stream(stream).invokeMethod(Constants.invokeMethod).temperature(temperature).messages(messages).build();ModelApiResponse invokeModelApiResp = clientV4.invokeModelApi(chatCompletionRequest);return invokeModelApiResp.getFlowable();
}
实际上 Flowable 是 RxJava 响应式编程库中定义的类,为了更好地进行流式开发,我们要先来了解下响应式编程和 RxJava。
RxJava
RxJava 是一个基于事件驱动的、利用可观测序列来实现异步编程的类库,是响应式编程在 Java 语言上的实现。
什么是响应式编程?
响应式编程(Reactive Programming)是一种编程范式,它专注于 异步数据流 和 变化传播。
响应式编程的几个核心概念:
1)数据流:响应式编程中,数据以流(Streams)的形式存在。流就像一条河,源源不断、有一个流向(比如从 A 系统到 B 系统再到 C 系统),它可以被过滤、观测、或者跟另一条河流合并成一个新的流。
比如用户输入、网络请求、文件读取都可以是数据流,可以很轻松地对流进行处理。
比如 Java 8 的 Stream API就是用数据流处理
2)异步处理:响应式编程是异步的,即操作不会阻塞线程,而是通过回调或其他机制在未来某个时间点处理结果。这提高了应用的响应性和性能。
3)变化传播:当数据源发生变化时,响应式编程模型会自动将变化传播到依赖这些数据源的地方。这种传播是自动的,不需要显式调用。
同时,响应式编程更倾向于声明式编程风格,通过定义数据流的转换和组合来实现复杂的逻辑。比如,可以利用 map、filter 等函数来实现数据转换,而不是将一大堆复杂的逻辑混杂在一个代码块中。
RxJava介绍
1、事件驱动
事件可以是任何事情,如用户的点击操作、网络请求的结果、文件的读写等。事件驱动的编程模型是通过事件触发行动。
在 RxJava 中,事件可以被看作是数据流中的数据项,称为“事件流”或“数据流”。每当一个事件发生,这个事件就会被推送给那些对它感兴趣的观察者(Observers)。
2、可观测序列
可观测序列是指一系列按照时间顺序发出的数据项,可以被观察和处理。可观测序列提供了一种将数据流和异步事件建模为一系列可以订阅和操作的事件的方式。
可以理解为在数据流的基础上封装了一层,多加了一点方法。
RxJava 的核心知识
观察者模式
RxJava 是基于 观察者模式 实现的,分别有观察者和被观察者两个角色,被观察者会实时传输数据流,观察者可以观测到这些数据流。
基于传输和观察的过程,用户可以通过一些操作方法对数据进行转换或其他处理。
在 RxJava 中,观察者就是 Observer,被观察者是 Observable 和 Flowable。
Observable 适合处理相对较小的、可控的、不会迅速产生大量数据的场景。它不具备背压处理能力,也就是说,当数据生产速度超过数据消费速度时,可能会导致内存溢出或其他性能问题。
Flowable 是针对背压(反向压力)问题而设计的可观测类型。背压问题出现于数据生产速度超过数据消费速度的场景。Flowable 提供了多种背压策略来处理这种情况,确保系统在处理大量数据时仍然能够保持稳定。
被观察者.subscribe(观察者),它们之间就建立的订阅关系,被观察者传输的数据或者发出的事件会被观察者观察到。
常用操作符
RxJava 提供了很多操作符供我们使用,这块其实和 Java8 的 Stream 类似,概念上都是一样的。
1)变换类操作符,对数据流进行变换,如 map、flatMap 等。
比如利用 map 将 int 类型转为 string
java">Flowable<String> flowable = Flowable.range(0, Integer.MAX_VALUE).map(i -> String.valueOf(i))
2)聚合类操作符,对数据流进行聚合,如 toList、toMap 等。
将数据转成一个 list
java"> Flowable.range(0, Integer.MAX_VALUE).toList()
3)过滤操作符,过滤或者跳过一些数据,如 filter、skip 等。
将大于 10 的数据转成一个 list
java">Flowable.range(0, Integer.MAX_VALUE).filter(i -> i > 10).toList();
4)组合/合并操作符,将多个数据流连接到一起,如 concat、zip 等。
创建两个 Flowable,通过 concat 连接得到一个被观察者,进行统一处理
java">// 创建两个 Flowable 对象
Flowable<String> flowable1 = Flowable.just("A", "B", "C");
Flowable<String> flowable2 = Flowable.just("D", "E", "F");// 使用 concat 操作符将两个 Flowable 合并
Flowable<String> flowable = Flowable.concat(flowable1, flowable2);
5)排序操作符,对数据流内的数据进行排序,如 sorted
java">Flowable<String> flowable = Flowable.concat(flowable1, flowable2).sorted();
事件
RxJava 也是一个基于事件驱动的框架,我们来看看一共有哪些事件,分别在什么时候触发:
1)onNext,被观察者每发送一次数据,就会触发此事件。
2)onError,如果发送数据过程中产生意料之外的错误,那么被观察者可以发送此事件。
3)onComplete,如果没有发生错误,那么被观察者在最后一次调用 onNext 之后发送此事件表示完成数据传输。
对应的观察者得到这些事件后,可以进行一定处理,例如:
java">flowable.observeOn(Schedulers.io()).doOnNext(item -> {System.out.println("来数据啦" + item.toString());}).doOnError(e -> {System.out.println("出错啦" + e.getMessage());}).doOnComplete(() -> {System.out.println("数据处理完啦");}).subscribe();
测试
1)引入依赖
java"><dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams</artifactId><version>1.0.4</version>
</dependency>
2)编写单元测试
java">@Test
void rxJavaDemo() throws InterruptedException {// 创建一个流,每秒发射一个递增的整数(数据流变化)Flowable<Long> flowable = Flowable.interval(1, TimeUnit.SECONDS).map(i -> i + 1).subscribeOn(Schedulers.io()); // 指定创建流的线程池// 订阅 Flowable 流,并打印每个接受到的数字flowable.observeOn(Schedulers.io()).doOnNext(item -> System.out.println(item.toString())).subscribe();// 让主线程睡眠,以便观察输出Thread.sleep(10000L);
}