RxJava2 背压

news/2024/12/2 21:57:54/

1 背压

在RxJava中,会遇到被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息,这就是典型的背压(Back Pressure)场景。

BackPressure经常被翻译为背压,背压的字面意思比较晦涩,难以理解。它是指在异步场景下,被观察者发送事件速度远快于观察者处理的速度,从而导致下游的buffer溢出,这种现象叫做背压。

产生条件:

  1. 异步,被观察者和观察者处于不同的线程中。
  2. 被观察者发送消息的速度远远快于观察者处理的的数据

在RxJava2.x中,只有Flowable类型是支持背压的,并且Flowable很多操作符内部都使用了背压策略,从而避免过多的数据填满内部的队列。

解决背压问题的方法:

1.1 过滤限流

通过使用限流操作符将被观察者产生的大部分事件过滤并抛弃,以达到限流的目的,间接降低事件发射的速度,例如使用以下操作符:

  • sample:在一段时间内,只处理最后一个数据。
  • throttleFirst:在一段时间内,只处理第一个数据。
  • debounce:发送一个数据,开始计时,到了规定时间,若没有再发送数据,则开始处理数据,反之重新开始计时。

1.2 打包缓存

在被观察者发射事件过快导致观察者来不及处理的情况下,可以使用缓存类操作符将其中一部分打包缓存起来,再一点一点的处理其中的事件。

  • buffer:将多个事件打包放入一个List中,再一起发射。
  • window:将多个事件打包放入一个Observable中,再一起发射。

1.3 使用背压操作符

通过一些操作符来转化成支持背压的Observable

2. RxJava2.x的背压策略

在RxJava2.x中,Observable不在支持背压,而是改用Flowable来专门支持背压。默认队列大小为128,并且要求所有的操作符强制支持背压。

Flowable一共有5中背压策略(BackpressureStategy中定义):MISSING,ERROR,BUFFER,DROP,LATEST

2.1 MISSING

通过Create方法创建Flowable没有指定背压策略,不会通过onNext发射的数据做缓存或丢弃处理,需要下游通过飞呀操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。

        Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; i <1000 ; i++) {emitter.onNext(i);}}},BackpressureStrategy.MISSING).onBackpressureBuffer().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println(integer);}});

上面的代码执行的是buffer的背压策略。

2.2 ERROR

如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常

        Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 129; i++) {emitter.onNext(i);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println(integer);}});

在Android中运行以上代码,会立即引起App Crash,引起以下Exception:

W/System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
W/System.err:     at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:438)at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:406)

因为Flowable的默认队列是128,所以讲上述代码的129改成128,程序就可以正常运行了。

2.3 BUFFER

Flowable的异步缓存池同Observable的一样,没有固定大小,可以无限制添加数据,不会抛出MissingBackpressureException异常,但会导致OOM(Out of Memory).

        Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0;; i++) {emitter.onNext(i);}}}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println(integer);}});

上述代码不会导致崩溃,但会引起ANR。

2.4 DROR

如果Flowable的异步缓存池满了,则会丢掉将要放入缓存池中的数据。

        Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0;i<129; i++) {emitter.onNext(i);}}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println(integer);}});

在Android中运行这段代码,不会引起Crash,但只会打印出0~127,第128则被丢弃,因为Flowable的内部队列已经满了。

2.5 LATEST

如果缓存池满了,会丢掉将要放入缓存池中的数据。这一点与DROP策略一样,不同的是,不管缓存池的状态如何,LATEST策略会将最后一条数据强行放入缓存池中。

        Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0;i<1000; i++) {emitter.onNext(i);}}}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println(integer);}});

执行结果:

0
1
...
127
516
519
521
523
526
528
530
...
734
737
740
743
745
748
999

3. 其他

Flowable不仅可以通过create创建时需要制定背压策略,还可以在通过其他创建操作符,例如just、fromArray等创建后通过背压操作符指定背压策略。例如,

onBackpressreBuffer()对应BackpressureStategy.BUFFER,

onBackpressreDrop()对应BackpressureStategy.DROP,

onBackpressreLatest()对应BackpressureStategy.LATEST.

示例:

Flowable.interval(1, TimeUnit.MILLISECONDS).onBackpressureBuffer().subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println(aLong);}});

http://www.ppmy.cn/news/471341.html

相关文章

如何让Ubuntu系统支持LDAC,APTX,AAC编码(提升蓝牙音质)

开始 一开始一直都以为电脑对LDA&#xff0c;ATPX&#xff0c;AAC等编码是硬件需要支持才可以的&#xff0c;不过在搜索了reddit和stackoverflow才知道这只是一种编码格式&#xff0c;所以理论上来说&#xff0c;所有的发射信号的设备都可以以这种编码格式进行数据封装。只是在…

关于Walkman NW-ZX300A利用Music Center分析歌曲使用sensme频道的方法(填坑)

前言 (2019/06/02 Music Center更新之后我也来跟新了一下文章) 现在Music Center更新之后&#xff0c;我们可以直接用这个功能Acquire unknown Properties来分析歌曲 之后确保所有歌曲都是Analyzed的 之后把歌曲拖到你的Walkman里面就可以了&#xff0c;这里不一定非要用那个…

spark 和 flink 的对比

一、设计理念 Spark 的数据模型是 弹性分布式数据集 RDD(Resilient Distributed Dattsets)&#xff0c;这个内存数据结构使得spark可以通过固定内存做大批量计算。初期的 Spark Streaming 是通过将数据流转成批 (micro-batches)&#xff0c;即收集一段时间(time-window)内到达的…

【AI实战】开源可商用的中英文大语言模型baichuan-7B,从零开始搭建

【AI实战】开源可商用的中英文大语言模型baichuan-7B&#xff0c;从零开始搭建 baichuan-7B 简介baichuan-7B 中文评测baichuan-7B 搭建参考 baichuan-7B 简介 baichuan-7B 是由百川智能开发的一个开源可商用的大规模预训练语言模型。基于 Transformer 结构&#xff0c;在大约…

【深入浅出 Spring Security(十)】权限管理的概述和使用详情

权限管理 一、授权的核心概念二、权限管理策略权限表达式&#xff08;SpEL Spring EL&#xff09;1. 基于 URL 的权限管理&#xff08;过滤器&#xff09;基本用法 2. 基于 方法 的权限管理&#xff08;AOP&#xff09;EnableGlobalMethodSecurity基本用法 三、权限管理之版本问…

寻找牛人了

③具备移动端页面和小程序开发经验&#xff1b; ④熟悉 Es6/css3/html5 最新规范&#xff1b; ⑤对用户体验、交互操作及用户需求分析等有一定了解&#xff0c;有产品或界面设计经验者优先。

python牛啊

测试测试进行一些测试发布了

大牛书单 | 搜索大牛都读什么书?

导语&#xff1a;读书&#xff0c;伴随技术人的一生。技术的发展日新月异&#xff0c;技术人应该永远在学习的路上&#xff0c;才能始终跟紧时代的步伐。阅读&#xff0c;是学习最重要的途径之一。 又到了新的一期《大牛书单》推荐&#xff0c;今天来和大家分享鹅厂几位搜索大牛…