Flutter 从源码扒一扒Stream机制

server/2024/11/14 22:59:24/

Stream的基本使用

//1、创建一个流控制对象,只要用来控制流的暂停、取消和订阅
StreamController _controller = StreamController();//2、实现对一个流的订阅和监听事件
_controller.stream.listen((event) {
print("event==$event");
});//3、添加一个事件
_controller.add("123");

StreamController类

职责是一个抽象类,用于创建一个可以发送数据和接收数据的可监听对象。
_StreamController 是StreamController的真正实现类
工厂构造方法

factory StreamController({void onListen()?,void onPause()?,void onResume()?,FutureOr<void> onCancel()?,bool sync = false}) {return sync? _SyncStreamController<T>(onListen, onPause, onResume, onCancel): _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);

我们使用StreamController的时候会这样
StreamController controller = StreamController();
但是StreamController是个抽象,不能被实例化,主要他使用了工厂构造,最后使用他的子类去实例化。
还要一个字段 bool sync = false ,false表示的是异步的,true表示同步。不更改的话,默认是异步的

_SyncStreamController和_AsyncStreamController就是StreamController的子类

//异步控制器
class _AsyncStreamController<T> = _StreamController<T>with _AsyncStreamControllerDispatch<T>;//同步控制器
class _SyncStreamController<T> = _StreamController<T>with _SyncStreamControllerDispatch<T>;

可以看到_AsyncStreamController和_SyncStreamController直接继承了_StreamController。

而_StreamController是StreamController的真正的实现类

//继承关系链
abstract class _StreamController<T> implements _StreamControllerBase<T> abstract class _StreamControllerBase<T>implementsStreamController<T>,_StreamControllerLifecycle<T>,_EventSink<T>,_EventDispatch<T> {

_controller.stream就是Stream对象

Stream 提供了一种接收事件序列的方法。每个事件要么是数据事件(也称为流的元素),要么是错误事件(这是某件事情失败的通知)。

这个流机制也算是一个生产者和消费者模式。
生产者就是_controller.add(“123”); 消费者就是listen监听了
那我们来看看stream内部是如何实现这个机制的?

listen监听

我们得先注册一下监听事件,添加对此流的订阅,才能接收到生产者的通知。

  StreamSubscription<T> listen(void onData(T event)?,{Function? onError, void onDone()?, bool? cancelOnError});StreamSubscription<T> listen(void onData(T data)?,{Function? onError, void onDone()?, bool? cancelOnError}) {cancelOnError ??= false;StreamSubscription<T> subscription =_createSubscription(onData, onError, onDone, cancelOnError);_onListen(subscription);return subscription;}这个listen方法是Stream类的方法,返回的是一个订阅对象StreamSubscription。并且是空的实现,需要子类去实现它。我们看看是哪个类实现了Stream类。它有很多的实现类,我选一个主要的类_StreamImpl。_StreamImpl 是 Stream的继承类,它也是个抽象类,我去找它的最终实现类_ControllerStreamabstract class _StreamImpl<T> extends Stream<T>class _ControllerStream<T> extends _StreamImpl<T>我们可以发现在_StreamController 有个方法,用来获取流对象的,正好是Stream的子类_ControllerStreamStream<T> get stream => _ControllerStream<T>(this);我们来看看_ControllerStream类StreamSubscription<T> _createSubscription(void onData(T data)?,Function? onError, void onDone()?, bool cancelOnError) =>_controller._subscribe(onData, onError, onDone, cancelOnError);

_controller就是抽象类_StreamControllerLifecycle的实例对象,
_subscribe是个抽象方法,需要子类去实现它。

abstract class _StreamController<T> implements _StreamControllerBase<T>
abstract class _StreamControllerBase<T>implementsStreamController<T>,_StreamControllerLifecycle<T>,EventSink<T>,_EventDispatch<T>

从上面的代码可以看出,最终是_StreamController实现了_subscribe方法
我们把_subscribe方法代码拿出来

StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,void onDone()?, bool cancelOnError) {if (!_isInitialState) {throw StateError("Stream has already been listened to.");}//创建一个订阅对象_ControllerSubscription<T> subscription = _ControllerSubscription<T>(this, onData, onError, onDone, cancelOnError);_PendingEvents<T>? pendingEvents = _pendingEvents;_state |= _STATE_SUBSCRIBED;if (_isAddingStream) {_StreamControllerAddStreamState<T> addState = _varData as dynamic;addState.varData = subscription;addState.resume();} else {_varData = subscription;}subscription._setPendingEvents(pendingEvents);subscription._guardCallback(() {_runGuarded(onListen);});return subscription;}

这个方法是关键,从方法名字我们就可以知道这是一个订阅方法,实现对一个事件的订阅。
我们看一下_ControllerSubscription,它的父类是StreamSubscription。

StreamSubscription 类

StreamSubscription 是一个抽象接口类
文档上描述 订阅向listen提供事件,并保存用于处理事件的回调。订阅还可用于取消订阅事件,或暂时暂停流中的事件。

下面是它的一些抽象方法

    ///取消订阅事件Future<void> cancel();///处理订阅事件void onData(void handleData(T data)?);///完成订阅事件void onDone(void handleDone()?);///暂停订阅事件void pause([Future<void>? resumeSignal]);///恢复订阅事件void resume();
class _BufferingStreamSubscription<T>implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T>class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> 

发起数据通知

接着我们再看看数据添加方法,这个方法就是用来发起数据通知的

   if (hasListener) {_sendData(value);} else if (_isInitialState) {_ensurePendingEvents().add(_DelayedData<T>(value));}}// 这个_sendData方法是抽象类_EventDispatch的方法abstract class _EventDispatch<T> {void _sendData(T data);void _sendError(Object error, StackTrace stackTrace);void _sendDone();}

经过查找发现 _AsyncStreamControllerDispatch类实现了_sendData的方法

继承关系链

abstract class _StreamController<T> implements _StreamControllerBase<T>
abstract class _StreamControllerBase<T> implements StreamController<T>, _StreamControllerLifecycle<T>,_EventSink<T>,_EventDispatch<T> {}_AsyncStreamControllerDispatch类实现的方法如下void _sendData(T data) {_subscription._addPending(_DelayedData<T>(data));}_subscription 就是_ControllerSubscription的实例对象
如下代码所示_ControllerSubscription<T> get _subscription {assert(hasListener);Object? varData = _varData;if (_isAddingStream) {_StreamControllerAddStreamState<Object?> streamState = varData as dynamic;varData = streamState.varData;}return varData as dynamic;}varData 这个很熟悉就是上面的_subscribe方法里面实例化ControllerSubscription对象赋值给varData,这个varData实际就是ControllerSubscription对象。

继承关系链

class _ControllerSubscription<T> extends _BufferingStreamSubscription<T>
class _BufferingStreamSubscription<T> implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T>

接着我们再来看看 _subscription._addPending(_DelayedData(data));

代码如下,这个方法是_BufferingStreamSubscription类实现的

   void _addPending(_DelayedEvent event) {var pending = _pending ??= _PendingEvents<T>();pending.add(event);if (!_hasPending) {_state |= _STATE_HAS_PENDING;if (!_isPaused) {pending.schedule(this);}}}

这个方法主要是添加一个处理事件,接着我们看看schedule方法

   void schedule(_EventDispatch<T> dispatch) {if (isScheduled) return;assert(!isEmpty);if (_eventScheduled) {assert(_state == stateCanceled);_state = stateScheduled;return;}scheduleMicrotask(() {int oldState = _state;_state = stateUnscheduled;if (oldState == stateCanceled) return;handleNext(dispatch);});_state = stateScheduled;}放到微任务中执行handleNextvoid handleNext(_EventDispatch<T> dispatch) {assert(!isScheduled);assert(!isEmpty);_DelayedEvent event = firstPendingEvent!;_DelayedEvent? nextEvent = event.next;firstPendingEvent = nextEvent;if (nextEvent == null) {lastPendingEvent = null;}event.perform(dispatch);}

_DelayedEvent是个抽象类,perform方法是需要子类去实现
它的子类就是_DelayedData

class _DelayedData<T> extends _DelayedEvent<T> {final T value;_DelayedData(this.value);void perform(_EventDispatch<T> dispatch) {dispatch._sendData(value);}
}

我们看_sendData这个方法,这个方法也是在_BufferingStreamSubscription类中实现的。
整个代码看下来我们只需要关心 _zone.runUnaryGuarded(_onData, data); 这个是实现数据发送的关键。
也是订阅者和被订阅者之间的中间角色,起着承上启下的作用。

 void _sendData(T data) {assert(!_isCanceled);assert(!_isPaused);assert(!_inCallback);bool wasInputPaused = _isInputPaused;_state |= _STATE_IN_CALLBACK;_zone.runUnaryGuarded(_onData, data);_state &= ~_STATE_IN_CALLBACK;_checkState(wasInputPaused);}

Zone 被称做是沙箱,在 Dart 中,Zone 是用于隔离代码执行环境的概念。Zone 可以看作是一种执行上下文,它可以用于控制代码执行过程中的一些行为,比如异常处理、日志记录、资源管理等。Zone 中的 run 方法允许你在指定的 Zone 中运行一段代码块。
这里不继续深究下去,有兴趣的可以看看。我们只需要知道调用runUnaryGuarded这个方法。它就会回调_onData函数,并将参数data传给_onData方法。
_onData这个函数就是我们listen监听里的匿名函数,最终会回调那里去。

最后结语

虽然看不懂源码是一件枯燥无聊的事,但是多看几遍或许会变成一件有趣的事。如果要想深入学习flutter,阅读源码是必须要跨过的坎。
当然我写的也不是很好,只是纯粹记录学习从步骤一步步看源码的过程。有不满的,可以提提意见,但不要乱喷。


http://www.ppmy.cn/server/18283.html

相关文章

采购数据分析驾驶舱分享,照着它抄作业

今天我们来看一张采购管理驾驶舱。这是一张充分运用了多种数据可视化图表、智能分析功能&#xff0c;从物料和供应商的角度全面分析采购情况的BI数据可视化报表&#xff0c;主要分为三个部分&#xff0c;接下来就分部分来了解一下。 第一部分&#xff1a;关键指标计算及颜色预…

【Redis】集群

文章目录 一、集群的功能二、Redis集群的槽位slot和分片2.1、概念2.2、slot槽位映射的算法2.2.1 哈希取余分区2.2.2 一致性hash算法2.2.3 哈希槽分区面试题&#xff1a;为什么redis集群的最大槽数是16384&#xff1f;面试题&#xff1a;redis集群会不会写丢失&#xff1f; redi…

盘点安防监控市场常见的AI视频智能分析边缘计算硬件及其特点分析

在当今数字化时代&#xff0c;视频智能分析边缘计算技术及其硬件产品正逐渐崭露头角&#xff0c;成为众多行业领域的得力助手。视频AI智能分析边缘计算硬件是一种专门设计用于实现视频分析和边缘计算的硬件设备。它通常具有高性能的处理器、专门的图形处理单元&#xff08;GPU&…

【JavaScript】localStorage

1. 什么是 localStorage&#xff1f; localStorage 是一个 JavaScript API&#xff0c;允许在浏览器中存储键值对数据。它提供了一个简单的持久性存储方案&#xff0c;数据保存在用户的本地浏览器中&#xff0c;并且在页面刷新或关闭后依然存在。它只能存储字符串类型的数据。…

APP开发_ js 控制手机是否显示状态栏

1 Android 控制手机显示或隐藏状态栏的方法 1.1 使用 View 的 setSystemUiVisibility 方法 这个方法允许设置系统 UI 的可见性&#xff0c;包括状态栏。你可以通过组合不同的标志来实现不同的效果。 示例代码&#xff1a; Override protected void onCreate(Bundle saved…

<c++基础(6)>宏定义与函数的区别

最近需要使用openvino封装动态库&#xff0c;其中存在一个问题是openvino和windows.h之间的冲突&#xff0c;其主要是minwaindef.h中的宏定义引起的。所以好奇为什么使用宏定义处理函数过程。 //minwaindef.h中关于min、max的宏定义 #ifndef max #define max(a,b) …

ELK 日志分析系统(二)

一、ELK Kibana 部署 1.1 安装Kibana软件包 #上传软件包 kibana-5.5.1-x86_64.rpm 到/opt目录 cd /opt rpm -ivh kibana-5.5.1-x86_64.rpm 1.2 设置 Kibana 的主配置文件 vim /etc/kibana/kibana.yml --2--取消注释&#xff0c;Kiabana 服务的默认监听端口为5601 server.po…

设计模式之命令模式

一、详细介绍 命令模式是一种行为型设计模式&#xff0c;它将“请求”封装为一个对象&#xff0c;使得使用请求、参数化请求、队列请求、撤销请求、日志请求等多种请求变得简单。命令模式通过将“行为请求者”与“行为实现者”解耦&#xff0c;使得请求的发送者和接收者完全分离…