Reactive编程:数据流和观察者

news/2025/4/2 6:53:07/

在这里插入图片描述

文章目录

  • **第二部分:Reactive编程核心概念**
    • **2.1 数据流(Data Stream)**
      • **2.1.1 数据流的基本概念**
      • **2.1.2 数据流的类型**
        • **(1) 冷流(Cold Stream)**
        • **(2) 热流(Hot Stream)**
        • **(3) 有限流(Finite Stream)**
        • **(4) 无限流(Infinite Stream)**
      • **2.1.3 数据流的操作符**
      • **2.1.4 数据流的实际应用**
        • **(1) 实时搜索建议**
        • **(2) 游戏开发(角色移动)**
        • **(3) 金融交易数据聚合**
      • **2.1.5 数据流的背压问题(Backpressure)**
    • **2.2 观察者模式(Observer Pattern)**
      • **2.2.1 观察者模式基础**
        • **模式角色**
      • **2.2.2 观察者模式实现**
        • **经典实现(Java)**
        • **Reactive扩展(RxJS)**
      • **2.2.3 观察者模式与Reactive编程的关系**
      • **2.2.4 观察者模式的实际应用**
        • **(1) 用户界面事件处理**
        • **(2) 状态管理(Redux Store)**
        • **(3) WebSocket实时通信**
      • **2.2.5 观察者模式的优缺点**
        • **优点**
        • **缺点**
      • **2.2.6 观察者模式的变体**
    • **总结**

在这里插入图片描述

第二部分:Reactive编程核心概念

2.1 数据流(Data Stream)

2.1.1 数据流的基本概念

数据流(Data Stream)是Reactive编程的核心概念之一,它代表一系列按时间顺序排列的事件或数据项的集合。在传统的命令式编程中,数据通常是静态的,程序通过顺序执行指令来处理数据。而在Reactive编程中,数据被视为动态的、持续变化的流,程序通过订阅这些流来响应数据的变化。

数据流的特点包括:

  1. 时间连续性:数据流中的事件是按时间顺序发生的。
  2. 异步性:数据项可能在任何时间点到达,不受程序控制流约束。
  3. 不可变性:流中的数据一旦发出就不能更改,只能通过转换操作生成新流。
  4. 可组合性:多个数据流可以通过操作符(Operators)进行组合、转换和过滤。

2.1.2 数据流的类型

在Reactive编程中,数据流可以分为以下几种类型:

(1) 冷流(Cold Stream)
  • 特点:数据流的生成由订阅者触发,每个订阅者都会收到完整的数据序列。
  • 示例:从文件读取数据、HTTP请求返回的响应流。
  • 代码示例(RxJS)
    import { of } from 'rxjs';const coldStream$ = of(1, 2, 3); // 冷流
    coldStream$.subscribe(value => console.log(`Subscriber 1: ${value}`));
    coldStream$.subscribe(value => console.log(`Subscriber 2: ${value}`));// 输出:
    // Subscriber 1: 1
    // Subscriber 1: 2
    // Subscriber 1: 3
    // Subscriber 2: 1
    // Subscriber 2: 2
    // Subscriber 2: 3
    
(2) 热流(Hot Stream)
  • 特点:数据流独立于订阅者存在,订阅者只能收到订阅后发出的数据。
  • 示例:鼠标移动事件、WebSocket实时消息。
  • 代码示例(RxJS)
    import { fromEvent, interval } from 'rxjs';const hotStream$ = fromEvent(document, 'click'); // 热流(鼠标点击事件)
    setTimeout(() => {hotStream$.subscribe(event => console.log(`Subscriber: Click at ${event.timeStamp}`));
    }, 3000);// 3秒后订阅,只能收到3秒后的点击事件
    
(3) 有限流(Finite Stream)
  • 特点:数据流会在某个时刻结束(如HTTP请求完成)。
  • 示例:API请求返回的单个响应。
(4) 无限流(Infinite Stream)
  • 特点:数据流可能永远不会结束(如传感器数据、实时股票行情)。
  • 示例interval 生成的定时数据流。

2.1.3 数据流的操作符

Reactive编程提供丰富的操作符(Operators)来处理数据流,常见的操作符包括:

类别操作符示例功能描述
创建流of, from, interval从静态数据、数组或时间间隔创建流
转换流map, scan, buffer对数据项进行转换或累积计算
过滤流filter, take, skip根据条件筛选或限制数据项
组合流merge, concat, zip合并多个流的数据
错误处理catchError, retry捕获和处理流中的错误
高级调度debounceTime, throttle控制数据流的发射频率

代码示例(RxJS 操作符组合)

import { fromEvent } from 'rxjs';
import { map, filter, debounceTime } from 'rxjs/operators';const input = document.querySelector('input');
fromEvent(input, 'input').pipe(map(event => event.target.value), // 提取输入值filter(text => text.length > 3), // 过滤长度≤3的输入debounceTime(500) // 防抖(500ms内无新输入才发射)).subscribe(value => console.log(`Search for: ${value}`));

2.1.4 数据流的实际应用

(1) 实时搜索建议
// 使用RxJS实现搜索框自动补全
searchInput$.pipe(debounceTime(300),distinctUntilChanged(),switchMap(query => fetch(`/api/search?q=${query}`))
).subscribe(results => updateUI(results));
(2) 游戏开发(角色移动)
// 使用键盘事件流控制角色移动
const keyPress$ = fromEvent(document, 'keydown');
const move$ = keyPress$.pipe(filter(key => ['ArrowUp', 'ArrowDown'].includes(key.code)),map(key => key.code === 'ArrowUp' ? 1 : -1)
);move$.subscribe(delta => character.y += delta * SPEED);
(3) 金融交易数据聚合
// 合并多个股票行情流
const stock1$ = stockFeed('AAPL');
const stock2$ = stockFeed('GOOG');merge(stock1$, stock2$).pipe(bufferTime(1000)) // 每1秒聚合一次.subscribe(prices => calculatePortfolioValue(prices));

2.1.5 数据流的背压问题(Backpressure)

当数据生产速度超过消费速度时,系统可能因资源耗尽而崩溃。Reactive编程通过以下策略处理背压:

  1. 丢弃策略throttlesample
  2. 缓冲策略bufferwindow
  3. 反馈控制:响应式流规范(Reactive Streams)的request(n)机制

代码示例(背压处理)

// Reactor(Java)中的背压控制
Flux.range(1, 1000).onBackpressureBuffer(100) // 缓冲100个元素.subscribe(value -> processSlowly(value),err -> handleError(err),() -> System.out.println("Done"));

2.2 观察者模式(Observer Pattern)

2.2.1 观察者模式基础

观察者模式是Reactive编程的底层设计模式,它定义了一对多的依赖关系:当一个对象(Subject)的状态改变时,所有依赖它的对象(Observers)会自动收到通知并更新。

模式角色
角色描述
Subject维护观察者列表,提供注册/注销方法,通知状态变化
Observer定义更新接口,接收Subject的通知并执行响应逻辑
ConcreteSubject具体的被观察对象,存储状态并在变化时通知观察者
ConcreteObserver具体的观察者,实现更新逻辑

2.2.2 观察者模式实现

经典实现(Java)
// 主题接口
interface Subject {void registerObserver(Observer o);void removeObserver(Observer o);void notifyObservers();
}// 具体主题(温度传感器)
class TemperatureSensor implements Subject {private List<Observer> observers = new ArrayList<>();private float temperature;public void setTemperature(float temp) {this.temperature = temp;notifyObservers();}@Overridepublic void registerObserver(Observer o) {observers.add(o);}@Overridepublic void notifyObservers() {for (Observer o : observers) {o.update(temperature);}}
}// 观察者接口
interface Observer {void update(float temperature);
}// 具体观察者(温度显示器)
class TemperatureDisplay implements Observer {@Overridepublic void update(float temp) {System.out.println("当前温度: " + temp + "°C");}
}// 使用示例
public class Main {public static void main(String[] args) {TemperatureSensor sensor = new TemperatureSensor();sensor.registerObserver(new TemperatureDisplay());sensor.setTemperature(25.5f); // 输出:当前温度: 25.5°C}
}
Reactive扩展(RxJS)
// 创建一个可观察对象(Subject)
const subject = new rxjs.Subject();// 订阅观察者
const subscription1 = subject.subscribe(value => console.log(`Observer 1: ${value}`)
);const subscription2 = subject.subscribe(value => console.log(`Observer 2: ${value}`)
);// 发送数据
subject.next('Hello');
subject.next('World');// 输出:
// Observer 1: Hello
// Observer 2: Hello
// Observer 1: World
// Observer 2: World

2.2.3 观察者模式与Reactive编程的关系

  1. 数据流即Subject:Reactive框架中的Observable本质上是增强版的Subject,支持多播和操作符。
  2. 观察者即Subscriber:订阅者通过subscribe()方法注册回调,相当于观察者模式的update()
  3. 扩展能力
    • 多播(Multicast):一个数据流被多个观察者共享
    • 生命周期管理:complete()error()通知
    • 操作符链式调用:mapfilter等转换操作

2.2.4 观察者模式的实际应用

(1) 用户界面事件处理
// 按钮点击事件观察
const button = document.getElementById('myButton');
const click$ = fromEvent(button, 'click');click$.subscribe(event => {console.log('Button clicked at:', event.timeStamp);
});
(2) 状态管理(Redux Store)
// Redux的Store本质上是一个Subject
const store = createStore(reducer);
store.subscribe(() => {console.log('State changed:', store.getState());
});store.dispatch({ type: 'INCREMENT' });
(3) WebSocket实时通信
const socket = new WebSocket('ws://example.com');
const message$ = new Subject();socket.onmessage = event => {message$.next(event.data);
};message$.subscribe(data => {console.log('Received:', data);
});

2.2.5 观察者模式的优缺点

优点
  • 松耦合:Subject和Observer之间无直接依赖
  • 动态关系:可运行时添加/删除观察者
  • 广播通信:一次状态变更可通知多个观察者
缺点
  • 内存泄漏风险:未正确注销观察者会导致引用残留
  • 通知顺序不可控:观察者被调用的顺序可能影响系统行为
  • 调试困难:数据流动的隐式传播可能增加调试复杂度

2.2.6 观察者模式的变体

变体描述Reactive实现
发布-订阅模式引入事件通道,解耦发布者和订阅者RxJS的Subject
响应式属性(Binding)自动同步对象属性变化(如Vue的v-model)MobX的observable
数据总线(Event Bus)全局事件中心,任意组件可发布/订阅事件Vue的EventEmitter

总结

  • 数据流是Reactive编程的核心抽象,代表随时间变化的事件序列,可通过操作符进行灵活转换。
  • 观察者模式是Reactive系统的底层机制,通过订阅/通知机制实现数据变化的自动传播。
  • 两者结合使得Reactive编程能够高效处理异步、实时数据,适用于从UI交互到分布式系统的广泛场景。

http://www.ppmy.cn/news/1584587.html

相关文章

解决 Android AGP 最新版本中 BuildConfig 报错问题

在最新版本的 Android Gradle Plugin (AGP) 中&#xff0c;Google 对构建系统做了不少改动&#xff0c;可能会导致一些与 BuildConfig 相关的问题。以下是常见问题及解决方案&#xff1a; 常见问题及修复方法 1. BuildConfig 类完全缺失 原因&#xff1a;AGP 8.0 默认不再为库模…

WPF 附加属性

在WPF&#xff08;Windows Presentation Foundation&#xff09;中&#xff0c;附加属性&#xff08;Attached Properties&#xff09;是一种特殊的依赖属性机制&#xff0c;它允许父元素为子元素提供额外的属性支持。这种特性特别适用于布局系统、输入处理和其他需要跨多个控件…

vue对音频添加背景音乐

说明: 用vue&#xff0c;将name.mp3这段录音文件&#xff0c;添加背景音乐&#xff0c;bg.mp3&#xff0c;然后生成新的文件 提前准备好两个mp3文件&#xff0c;一个录音文件&#xff0c;一个背景音乐&#xff0c;放在public目录里 step1:下载依赖 {"name": "u…

【STL】stack

s t a c k stack stack 是一种容器适配器&#xff0c;设计为先进后出&#xff08; F i r s t I n L a s t O u t , F I L O First\ In\ Last\ Out,\ FILO First In Last Out, FILO&#xff09;的数据结构&#xff0c;只有一个出口&#xff0c;将元素推入栈的操作称为 p u s h …

微服务架构中的精妙设计:服务注册/服务发现-Eureka

一.使用注册中心背景 1.1服务远程调用问题 服务之间远程调⽤时, 我们的URL是写死的 String url "http://127.0.0.1:9090/product/" orderInfo.getProductId(); 缺点&#xff1a; 当更换机器, 或者新增机器时, 这个URL就需要跟着变更, 就需要去通知所有的相关服…

STM32H743学习记录

2025/03/30 SRAM速率计算方式 MCU主频 乘以 单片机位数 除以 每个字节的位数&#xff08;8&#xff09;即可得出单片机的SRAM速率 如72M主频32位单片机速率 72 * 32 / 8 288 M/s FLASH速率计算方式 FLASH大小 乘以 单片机位数 除以 每个字节位数&#xff08;8&#xff09…

RK3588使用笔记:设置程序/服务开机自启

一、前言 一般将系统用作嵌入式设备时肯定要布置某些程序&#xff0c;这时候就需要对程序设置开机自己&#xff0c;否则每次都要人为启动&#xff0c;当有些嵌入式系统未连接显示屏或者无桌面环境去操作启动程序时&#xff0c;程序自启就是必须的了&#xff0c;本文介绍在纯li…

简易版“异步多线程服务器”

1.引用 和复制(不用&做参数) 的区别 (1)第一个:肯定是一个原参数操作,另外一个是对原参数 复制一个副本 去操作&#xff08;开销问题&#xff09; (2)第二个:如果是公共参数,比如io_context,他始终只有一个(与内核相关),只能由& 简介 本文基于c,重点是来掌握c的异步概…