第一部分 Rx快速入门

news/2024/11/25 17:41:20/

核心类型Key types

要理解Rx运行机制需要知道两个核心类型及其他辅助类型,有助于更好的学习Rx。

IObserver<T>和IObservable<T>是构建Rx的基础,而ISubject<TSource, TResult>接口的实现降低开发者学习Rx的曲线。

LinQ的用法与LinQ to Object、LinQ to Sql、LinQ to XML类似。这些通用的功能可让用户查询静态数据;Rx提供了查询动态数据的能力。本质上Rx构建于观察者模式之上。.NET已经提供了一些观察者模式的实现,如多播代理或事件(也是多播代理的一种),多播代理不是很理想的实现:

  1. 在C#中使用了奇特的接口进行操作,如+=、-=这种不自然的方式注册回调函数
  2. 事件难于编写(定义代码)
  3. 事件没有实现超时机制
  4. 事件存在内存泄露风险
  5. 事件没有支持完成信号的标准模式
  6. 事件没有针对并发和多线程提供特殊支持,例如在子线程中触发事件是需要用户自己处理跨线程问题

Rx解决了上述问题,这里将介绍Rx的基本构成及一些基本类型。

 

There are two key types to understand when working with Rx, and a subset of auxiliary types that will help you to learn Rx more effectively. The IObserver<T> and IObservable<T>form the fundamental building blocks for Rx, while implementations of ISubject<TSource, TResult> reduce the learning curve for developers new to Rx.

Many are familiar with LINQ and its many popular forms like LINQ to Objects, LINQ to SQL & LINQ to XML. Each of these common implementations allows you query data at rest; Rx offers the ability to query data in motion. Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as multicast delegates or events (which are usually multicast delegates). Multicast delegates are not ideal however as they exhibit the following less desirable features;

In C#, events have a curious interface. Some find the += and -= operators an unnatural way to register a callback

Events are difficult to compose

Events don't offer the ability to be easily queried over time

Events are a common cause of accidental memory leaks

Events do not have a standard pattern for signaling completion

Events provide almost no help for concurrency or multithreaded applications. e.g. To raise an event on a separate thread requires you to do all of the plumbing

Rx looks to solve these problems. Here I will introduce you to the building blocks and some basic types that make up Rx.

IObservable<T>

IObservable<T>是Rx两个核心接口之一。非常简单,只有一个函数Subscribe。微软在.NET基础类型4.0中提供了这个接口。对于实现了IObservable<T>的对象,可以看作是T类型对象的连续的流。因此如果一个方法返回了IObservable<Price>,则可以看作是一个价格的数据流。

IObservable<T> is one of the two new core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft is so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a streaming sequence of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.

//Defines a provider for push-based notification.

public interface IObservable<out T>

{

//Notifies the provider that an observer is to receive notifications.

IDisposable Subscribe(IObserver<T> observer);

}

.NET在System.IO.Stream类或其子类实现了流。System.IO.Stream用于数据流(通常为字节)来操作IO设备,如文件、网络或内存块。System.IO.Stream可用于读、写数据,以及定位功能(如向前定位、向后定位)。下面谈到的IObservable<T>类型的流,不提供定位和写功能。这个基本区别使Rx独立于System.IO.Stream体系进行构建。Rx提供了流的读取数据、关闭、结束的概念。Rx同时扩展出了对并发操作、查询操作(如变换、合并、统计、扩展)的支持。已存的System.IO.Stream类不支持这些特性。另外IObservable<T>实例可看作是可观察序列的集合,这很难理解。可观察序列部分可以理解,但并没有发现他们像集合。通常无法像集合一样对IObservable<T>实例进行排序、插入移除数据项的操作。集合通常有后备存储空间如内部数组。IObservable<T>中的数据项与集合中的数据项不同,没有提前实例化。在WPF和Silverlight中也叫做ObservableCollection<T>,其提供了类似于集合的行为,对这种情况描述的比较形象。事实上,IObservable<T>实例与ObservableCollection<T>实例整合的很好。为了解决这个冲突我们将IObservable<T>实例叫做序列。而IEnumerable<T>也叫做序列,但这是静态的数据序列,IObservable<T>实例是动态数据序列。

.NET already has the concept of Streams with the type and sub types of System.IO.Stream. The System.IO.Stream implementations are commonly used to stream data (generally bytes) to or from an I/O device like a file, network or block of memory. System.IO.Stream implementations can have both the ability to read and write, and sometimes the ability to seek (i.e. fast forward through a stream or move backwards). When I refer to an instance of IObservable<T> as a stream, it does not exhibit the seek or write functionality that streams do. This is a fundamental difference preventing Rx being built on top of the System.IO.Stream paradigm. Rx does however have the concept of forward streaming (push), disposing (closing) and completing (eof). Rx also extends the metaphor by introducing concurrency constructs, and query operations like transformation, merging, aggregating and expanding. These features are also not an appropriate fit for the existing System.IO.Stream types. Some others refer to instances of IObservable<T> as Observable Collections, which I find hard to understand.

While the observable part makes sense to me, I do not find them like collections at all. You generally cannot sort, insert or remove items from an IObservable<T> instance like I would expect you can with a collection. Collections generally have some sort of backing store like an internal array. The values from an IObservable<T> source are not usually pre-materialized as you would expect from a normal collection. There is also a type in WPF/Silverlight called an ObservableCollection<T>that does exhibit collection-like behavior, and is very well suited to this description. In fact IObservable<T> integrates very well with ObservableCollection<T> instances. So to save on any confusion we will refer to instances of IObservable<T> as sequences. While instances of IEnumerable<T> are also sequences, we will adopt the convention that they are sequences of data at rest, andIObservable<T> instances are sequences of data in motion.

IObserver<T>

IObserver<T>是Rx的另一个核心接口。通用在.NET基础类库4.0中引入。如果没有使用.NET4.0也没关系,Rx为.NET3.5和Silverlight用户将这两个接口封装到了单独的程序集中。IObservable<T>是IEnumerable<T>的加强版。如要理解这句话的意思,请看视频Channel9,其中讨论了这个类型的准确定义。一般我们只需将其理解为可以抛出三种数据(next值、异常通知、序列结束通知)的特殊IEnumerable<T>类型,因此IObservable<T>需要调用IObServer<T>的三个方法OnNext、OnError、OnCompleted。

IObserver<T> is the other one of the two core interfaces for working with Rx. It too has made it into the BCL as of .NET 4.0. Don't worry if you are not on .NET 4.0 yet as the Rx team have included these two interfaces in a separate assembly for .NET 3.5 and Silverlight users. IObservable<T> is meant to be the "functional dual of IEnumerable<T>". If you want to know what that last statement means, then enjoy the hours of videos on Channel9 where they discuss the mathematical purity of the types. For everyone else it means that where an IEnumerable<T> can effectively yield three things (the next value, an exception or the end of the sequence), so too can IObservable<T> via IObserver<T>'s three methods OnNext(T)OnError(Exception) and OnCompleted().

//Provides a mechanism for receiving push-based notifications.

public interface IObserver<in T>

{

//Provides the observer with new data.

void OnNext(T value);

//Notifies the observer that the provider has experienced an error condition.

void OnError(Exception error);

//Notifies the observer that the provider has finished sending push-based notifications.

void OnCompleted();

}

Rx有如下隐式的契约。IObserver<T>实现会调用0至多次OnNext方法,调用一次OnError或一次OnCompleted。这个协定要求如果序列终止,一定调用了OnError或者调用了OnCompleted。约定没有要求必须调用OnNextOnErrorOnCompleted方法,因此就有可空序列、无穷序列的概念。后面将会看到。

有趣的是,如果基于Rx进行编程我们将会频繁的使用IObservable<T>接口,但很少关心IObserver<T>接口。这是由于Rx提供了像Subscribe方法的匿名实现。

Rx has an implicit contract that must be followed. An implementation of IObserver<T> may have zero or more calls to OnNext(T) followed optionally by a call to either OnError(Exception) or OnCompleted(). This protocol ensures that if a sequence terminates, it is always terminated by an OnError(Exception)or an OnCompleted(). This protocol does not however demand that an OnNext(T)OnError(Exception) or OnCompleted() ever be called. This enables to concept of empty and infinite sequences. We will look into this more later.

Interestingly, while you will be exposed to the IObservable<T> interface frequently if you work with Rx, in general you will not need to be concerned with IObserver<T>. This is due to Rx providing anonymous implementations via methods like Subscribe.

IObserver<T> 和IObservable<T>的实现

实现这两个接口都很简单。如果要创建一个观察者向控制台打印值,可如下所示:

It is quite easy to implement each interface. If we wanted to create an observer that printed values to the console it would be as easy as this.

public class MyConsoleObserver<T> : IObserver<T>

{

public void OnNext(T value)

{

Console.WriteLine("Received value {0}", value);

}

public void OnError(Exception error)

{

Console.WriteLine("Sequence faulted with {0}", error);

}

public void OnCompleted()

{

Console.WriteLine("Sequence terminated");

}

}

实现可观察序列稍微复杂。返回序列值的简单实现如下所示:

Implementing an observable sequence is a little bit harder. An overly simplified implementation that returned a sequence of numbers could look like this.

public class MySequenceOfNumbers : IObservable<int>

{

public IDisposable Subscribe(IObserver<int> observer)

{

observer.OnNext(1);

observer.OnNext(2);

observer.OnNext(3);

observer.OnCompleted();

return Disposable.Empty;

}

}

将两个实现进行关联获取如下输出:

We can tie these two implementations together to get the following output

var numbers = new MySequenceOfNumbers();

var observer = new MyConsoleObserver<int>();

numbers.Subscribe(observer);

输出:

Received value 1

Received value 2

Received value 3

Sequence terminated

问题是这种实现不是真正的响应式编程。这是阻塞的实现,用IEnumerable<T>的子类型List<T>或数组更好。

接口实现问题我们无需涉及太多。在使用Rx过程中,不必真正的去实现接口,Rx提供了必要的实现,无需关心细节。下面讲解一个简单的对象。

The problem we have here is that this is not really reactive at all. This implementation is blocking, so we may as well use an IEnumerable<T> implementation like a List<T> or an array.

This problem of implementing the interfaces should not concern us too much. You will find that when you use Rx, you do not have the need to actually implement these interfaces, Rx provides all of the implementations you need out of the box. Let's have a look at the simple ones.

Subject<T>

我喜欢将IObserver<T>IObservable<T>看做读对象和写对象,或消费者和生产者接口。如果要自己实现IObservable<T>接口,需要实现IObservable特性,即向订阅者发送数据项、抛出异常、通知序列结束。听起来这和IObserver<T>的定义类似。同时实现这两个接口似乎很奇怪,但可以简化应用。这既是Subject类提供的功能。Subject<T>是最基本的Subject实现。事实上,可以通过Subject<T>的方法来提供IObservable<T>对象,并可使用OnNextOnErrorOnCompleted方法控制序列。

下面是一个基础的范例,创建了一个Subject对象,进行订阅,向序列发送值(调用subject.OnNext(T).

I like to think of the IObserver<T> and the IObservable<T> as the 'reader' and 'writer' or, 'consumer' and 'publisher' interfaces. If you were to create your own implementation of IObservable<T> you may find that while you want to publicly expose the IObservable characteristics you still need to be able to publish items to the subscribers, throw errors and notify when the sequence is complete. Why that sounds just like the methods defined in IObserver<T>! While it may seem odd to have one type implementing both interfaces, it does make life easy. This is what subjects can do for you. Subject<T> is the most basic of the subjects. Effectively you can expose your Subject<T> behind a method that returns IObservable<T> but internally you can use the OnNextOnErrorand OnCompleted methods to control the sequence.

In this very basic example, I create a subject, subscribe to that subject and then publish values to the sequence (by calling subject.OnNext(T)).

static void Main(string[] args)

{

var subject = new Subject<string>();

WriteSequenceToConsole(subject);

subject.OnNext("a");

subject.OnNext("b");

subject.OnNext("c");

Console.ReadKey();

}

//Takes an IObservable<string> as its parameter.

//Subject<string> implements this interface.

static void WriteSequenceToConsole(IObservable<string> sequence)

{

//The next two lines are equivalent.

//sequence.Subscribe(value=>Console.WriteLine(value));

sequence.Subscribe(Console.WriteLine);

}

注意WriteSequenceToConsole 方法带有一个IObservable<string>类型的参数,仅有一个订阅方法。Subscribe方法需要一个IObserver<string>类型的参数吗?当然Comsole.WriteLine不匹配这个接口。但Rx项目组给IObservable<T>提供了一个扩展方法,仅接收Action<T>类型的参数。这个函数在每个数据项被发射的时候都会调用。另一个重载的Subsribe扩展方法可以接受一个委托的组合体(传递两个或三个委托),在OnNextOnCompletedOnError时被调用。这样就不必在去实现IObserver<T>接口了,库!

可见,Subject<T>适用于Rx开发入门阶段。然而Subject<T>只是一个基本的实现。Subject<T>还有三个其他版本的实现,可以彻底的改变程序运行方式。

Note that the WriteSequenceToConsole method takes an IObservable<string> as it only wants access to the subscribe method. Hang on, doesn't the Subscribe method need an IObserver<string> as an argument? Surely Console.WriteLine does not match that interface. Well it doesn't, but the Rx team supply me with an Extension Method to IObservable<T> that just takes an Action<T>. The action will be executed every time an item is published. There are other overloads to the Subscribe extension method that allows you to pass combinations of delegates to be invoked for OnNextOnCompleted and OnError. This effectively means I don't need to implement IObserver<T>. Cool.

As you can see, Subject<T> could be quite useful for getting started in Rx programming. Subject<T> however, is a basic implementation. There are three siblings to Subject<T> that offer subtly different implementations which can drastically change the way your program runs.

ReplaySubject<T>

ReplaySubject<T>提供了一个对发送值的缓存,对于后来的订阅,也能接收到全部序列值。为查看效果,我们在订阅前就开始发送第一个数据项。

ReplaySubject<T> provides the feature of caching values and then replaying them for any late subscriptions. Consider this example where we have moved our first publication to occur before our subscription

static void Main(string[] args)

{

var subject = new Subject<string>();

subject.OnNext("a");

WriteSequenceToConsole(subject);

subject.OnNext("b");

subject.OnNext("c");

Console.ReadKey();

}

这里只有bc的值显示在控制台上,a没有输出。如果做一点小改变,使用ReplaySubject<T>来创建subject对象,则可以看到全部的发送数据项。

The result of this would be that 'b' and 'c' would be written to the console, but 'a' ignored. If we were to make the minor change to make subject a ReplaySubject<T> we would see all publications again.

var subject = new ReplaySubject<string>();

subject.OnNext("a");

WriteSequenceToConsole(subject);

subject.OnNext("b");

subject.OnNext("c");

这样可以很方便的解决竞争问题。注意,ReplaySubject<T>的默认构造函数创建的实例会缓存发送过的所有值。很多情况下会产生不必要的内存压力。ReplaySubject<T>可以指定一个缓存逾期设置,可以缓解内存问题。一种方式是可以设置缓冲区大小。本例我们将ReplaySubject<T>的缓冲区大小设置为2,因此只能获取订阅前发送的两个值:

This can be very handy for eliminating race conditions. Be warned though, the default constructor of the ReplaySubject<T> will create an instance that caches every value published to it. In many scenarios this could create unnecessary memory pressure on the application. ReplaySubject<T> allows you to specify simple cache expiry settings that can alleviate this memory issue. One option is that you can specify the size of the buffer in the cache. In this example we create the ReplaySubject<T> with a buffer size of 2, and so only get the last two values published prior to our subscription:

public void ReplaySubjectBufferExample()

{

var bufferSize = 2;

var subject = new ReplaySubject<string>(bufferSize);

subject.OnNext("a");

subject.OnNext("b");

subject.OnNext("c");

subject.Subscribe(Console.WriteLine);

subject.OnNext("d");

}

从输出可见a已经在缓冲区中被清除,但保留了bcd是在订阅后发送的,因此也显示在控制台上。

Here the output would show that the value 'a' had been dropped from the cache, but values 'b' and 'c' were still valid. The value 'd' was published after we subscribed so it is also written to the console.

Output:

b

c

d

另一个选择是防止数据项一直缓存下去。本例指定数据项缓存的时间窗口:

Another option for preventing the endless caching of values by the ReplaySubject<T>, is to provide a window for the cache. In this example, instead of creating a ReplaySubject<T> with a buffer size, we specify a window of time that the cached values are valid for.

public void ReplaySubjectWindowExample()

{

var window = TimeSpan.FromMilliseconds(150);

var subject = new ReplaySubject<string>(window);

subject.OnNext("w");

Thread.Sleep(TimeSpan.FromMilliseconds(100));

subject.OnNext("x");

Thread.Sleep(TimeSpan.FromMilliseconds(100));

subject.OnNext("y");

subject.Subscribe(Console.WriteLine);

subject.OnNext("z");

}

上面时间窗口指定为150毫秒。每100毫秒发送一次数据项。订阅时第一个发送的数据项已经过了200毫秒,已过期并被移除缓冲区。

In the above example the window was specified as 150 milliseconds. Values are published 100 milliseconds apart. Once we have subscribed to the subject, the first value is 200ms old and as such has expired and been removed from the cache.

Output:

x

y

z

BehaviorSubject<T>

BehaviorSubject<T>与ReplaySubject<T>类似,但只缓存最近的数据项。BehaviorSubject<T>同时需要用户提供一个默认的T类型值。这样所有的订阅都可以立即接收到一个值(除非已经结束)。

本例控制台输出a:

BehaviorSubject<T> is similar to ReplaySubject<T> except it only remembers the last publication. BehaviorSubject<T> also requires you to provide it a default value of T. This means that all subscribers will receive a value immediately (unless it is already completed).

In this example the value 'a' is written to the console:

public void BehaviorSubjectExample()

{

//Need to provide a default value.

var subject = new BehaviorSubject<string>("a");

subject.Subscribe(Console.WriteLine);

}

这个范例控制台输出b而不是a

In this example the value 'b' is written to the console, but not 'a'.

public void BehaviorSubjectExample2()

{

var subject = new BehaviorSubject<string>("a");

subject.OnNext("b");

subject.Subscribe(Console.WriteLine);

}

本例控制台输出bcd,而不是a

In this example the values 'b', 'c' & 'd' are all written to the console, but again not 'a'

public void BehaviorSubjectExample3()

{

var subject = new BehaviorSubject<string>("a");

subject.OnNext("b");

subject.Subscribe(Console.WriteLine);

subject.OnNext("c");

subject.OnNext("d");

}

最后范例,序列已经结束没有值输出。

Finally in this example, no values will be published as the sequence has completed. Nothing is written to the console.

public void BehaviorSubjectCompletedExample()

{

var subject = new BehaviorSubject<string>("a");

subject.OnNext("b");

subject.OnNext("c");

subject.OnCompleted();

subject.Subscribe(Console.WriteLine);

}

注意BehaviorSubject<T>和缓冲区大小为1ReplaySubject<T>(叫做重复1次的subject)是不同的。BehaviorSubject<T>需要一个初始值。这里假定如果BehaviorSubject<T>没有结束就一定会发送一个数据项。但ReplaySubject<T>是无法保证的。处于这种考虑,结束BehaviorSubject<T>应谨慎。另外重复1次的Subject在结束时也会缓存一个数据项。因此订阅到结束的BehaviorSubject<T>不会收到数据项,但ReplaySubject<T>可能会收到数据项。

BehaviorSubject<T>通常与类的属性关联使用。用于通知属性值的变化,并可以缓存属性的原始值。

That note that there is a difference between a ReplaySubject<T> with a buffer size of one (commonly called a 'replay one subject') and a BehaviorSubject<T>. A BehaviorSubject<T> requires an initial value. With the assumption that neither subjects have completed, then you can be sure that the BehaviorSubject<T> will have a value. You cannot be certain with the ReplaySubject<T> however. With this in mind, it is unusual to ever complete a BehaviorSubject<T>. Another difference is that a replay-one-subject will still cache its value once it has been completed. So subscribing to a completed BehaviorSubject<T> we can be sure to not receive any values, but with a ReplaySubject<T> it is possible.

BehaviorSubject<T>s are often associated with class properties. As they always have a value and can provide change notifications, they could be candidates for backing fields to properties.

AsyncSubject<T>

AsyncSubject<T>与Replay和BehaviorSubject类似都会缓存值,然而他只会存储最近的值,而且在完成的时候才会发送出去。AsyncSubject<T>用于完成时立刻发送数据项的情况。就相当于一个Task<T>变量。

本例中序列没有结束就不会发送数据项。控制台没有输出值。

AsyncSubject<T> is similar to the Replay and Behavior subjects in the way that it caches values, however it will only store the last value, and only publish it when the sequence is completed. The general usage of the AsyncSubject<T> is to only ever publish one value then immediately complete. This means that is becomes quite comparable to Task<T>.

In this example no values will be published as the sequence never completes. No values will be written to the console.

static void Main(string[] args)

{

var subject = new AsyncSubject<string>();

subject.OnNext("a");

WriteSequenceToConsole(subject);

subject.OnNext("b");

subject.OnNext("c");

Console.ReadKey();

}

这里调用了OnCompleted方法后控制台输出了c值:

In this example we invoke the OnCompleted method so the last value 'c' is written to the console:

static void Main(string[] args)

{

var subject = new AsyncSubject<string>();

subject.OnNext("a");

WriteSequenceToConsole(subject);

subject.OnNext("b");

subject.OnNext("c");

subject.OnCompleted();

Console.ReadKey();

}

隐式约定Implicit contracts

使用上面提到的Rx类时需要遵守一些隐式约定。一旦一个序列完成,就不会在被激活。结束序列有两种方法,OnCompleted或OnError。

本文描述的四个Subject都遵守这个隐式的约定,如果序列终止则会忽略发送数据项、发送错误通知、完成通知的操作。

这里看一下向结束的序列发送数据项c。仅a和b在控制台上显示。

There are implicit contacts that need to be upheld when working with Rx as mentioned above. The key one is that once a sequence is completed, no more activity can happen on that sequence. A sequence can be completed in one of two ways, either by OnCompleted() or by OnError(Exception).

The four subjects described in this chapter all cater for this implicit contract by ignoring any attempts to publish values, errors or completions once the sequence has already terminated.

Here we see an attempt to publish the value 'c' on a completed sequence. Only values 'a' and 'b' are written to the console.

public void SubjectInvalidUsageExample()

{

var subject = new Subject<string>();

subject.Subscribe(Console.WriteLine);

subject.OnNext("a");

subject.OnNext("b");

subject.OnCompleted();

subject.OnNext("c");

}

ISubject 接口

本文描述的四个Subject都实现了IObservable<T>IObserver<T>接口,即实现了另一个接口:

While each of the four subjects described in this chapter implement the IObservable<T> and IObserver<T>interfaces, they do so via another set of interfaces:

//Represents an object that is both an observable sequence as well as an observer.

public interface ISubject<in TSource, out TResult>

: IObserver<TSource>, IObservable<TResult>

{

}

上面提到的Subject都有相同的类型TSourceTResult,他们实现了上面提到的接口的超集。

As all the subjects mentioned here have the same type for both TSource and TResult, they implement this interface which is the superset of all the previous interfaces:

//Represents an object that is both an observable sequence as well as an observer.

public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T>

{

}

这些接口用处不多,但对未开放的基类很有用。在后面的文档hot and cold observables将会看到ISubject接口。

These interfaces are not widely used, but prove useful as the subjects do not share a common base class. We will see the subject interfaces used later when we discover hot and cold observables.

Subject工厂

最后可以通过工厂方法创建Subject。考虑到Subject组合了IObservable<T>IObserver<T>,通过工厂方法来组合接口是由很有必要的。

工厂方法Subject.Create(IObserver<TSource>, IObservable<TResult>)就是为此功能提供的。

Finally it is worth making you aware that you can also create a subject via a factory method. Considering that a subject combines the IObservable<T> and IObserver<T> interfaces, it seems sensible that there should be a factory that allows you to combine them yourself. The Subject.Create(IObserver<TSource>, IObservable<TResult>)factory method provides just this.

//Creates a subject from the specified observer used to publish messages to the subject

//  and observable used to subscribe to messages sent from the subject

public static ISubject<TSource, TResult> Create<TSource, TResult>(

IObserver<TSource> observer,

IObservable<TResult> observable)

{...}

Subject提供了方便的方式使用Rx,但对于老用户却不推荐使用。解释见Usage Guidelines文档。比Subject更好的工厂方法见Part 2

基本类型IObserver<T>IObservable<T>及其辅助的Subject类组成了读者对Rx 认识。了解这些简单类型和隐式约定很重要。在产品开发中你会发现很少直接使用IObserver<T>接口和Subject类,但理解他们以及将其融入到Rx生态圈还是很重要的。IObservable<T>接口是占统治地位的类型,需要为动态数据序列开放这个接口,构成了使用Rx过程中涉及的核心。

Subjects provide a convenient way to poke around Rx, however they are not recommended for day to day use. An explanation is in the Usage Guidelines in the appendix. Instead of using subjects, favor the factory methods we will look at in Part 2.

The fundamental types IObserver<T> and IObservable<T> and the auxiliary subject types create a base from which to build your Rx knowledge. It is important to understand these simple types and their implicit contracts. In production code you may find that you rarely use the IObserver<T> interface and subject types, but understanding them and how they fit into the Rx eco-system is still important. The IObservable<T> interface is the dominant type that you will be exposed to for representing a sequence of data in motion, and therefore will comprise the core concern for most of your work with Rx and most of this book.


http://www.ppmy.cn/news/112510.html

相关文章

爱普生EPSON实时时钟芯片-RX8111CE

1.翻译记录 •内置频率调整32.768 kHz晶体单元 •接口类型&#xff1a;I2C&#xff08;高达400 kHz&#xff09; 普通 100khz •备份时的低电流消耗&#xff1a;100毫安/3.0伏典型值。 •宽工作电压范围&#xff1a;1.6 V至5.5 V •宽时间保持器电压范围&#xff1a;1.1 V…

串口接收模块uart_rx详解

接收模块要比发送模块稍微复杂一丢丢&#xff0c;但有助于锻炼自己对于时序模块的逻辑性的掌握能力。 本文是学习了小梅哥的串口接收模块教程的总结整理。 1. 原理介绍 详见《串口发送模块uart_tx详解》 https://blog.csdn.net/m0_37921318/article/details/105913390 2. 串…

分享一个RX8025T时钟芯片的Arduino代码

分享一个RX8025T时钟芯片的Arduino代码 背景 之前做那个点阵时钟使用的是DS3231的时钟芯片&#xff0c;这个时钟芯片最大的有点就是高精度&#xff0c;缺点就是有点贵&#xff0c;现在淘宝一颗这样的芯片最便宜的都要十几块钱&#xff0c;大大的增加了我整个点阵时钟的成本造…

4rx4 服务器内存2rx4_服务器内存条上写8GB,2RX4,其中2RX4是什么意思?是单条8G的。...

展开全部 其实62616964757a686964616fe58685e5aeb931333365663535这个是内存计算方法,有个前提是根据CPU的关系。 1、CPU数据总线的位宽,现在一般是64bit,这个位宽就称之为物理Bank。 2、那么memory 1RX4则表示1个64bit,X4则表示memory每颗内存颗粒的位数。 从这里我们就可…

[RK3288][Android6.0] RTC模块RX8010SJ驱动添加及改动

Platform: Rockchip OS: Android 6.0 Kernel: 3.10.92 设备部分驱动部分改动部分参考 在网上的其他branch上找到一份RX8010SJ的驱动&#xff0c;但是有点问题&#xff0c;先贴代码&#xff0c;再说明修改部分。 设备部分&#xff1a; 比较简单&#xff0c;确定I2C端口以及…

linux中的rx作用,linux – ifconfig输出中RX’错误’和’帧’的确切含义?

我看到(RHEL)节点出现网络问题(数据包丢失),这似乎也表现为ifconfig输出中’error’和’frame’字段的非零计数&#xff1a; eth2 Link encap:Ethernet HWaddr xx:xx:xx:xx:xx:xx ... RX packets:277593775 errors:1049 dropped:0 overruns:0 frame:536 在某处有详细描述’错误…

PCIe_TX/RX测试步骤

TX测试 一: GEN1、2、3采用自动化测试&#xff1a; 1. 点击infinum软件中Analyze->Automated Test Apps->D9050PCIC PCIExpress Gen5 Test App&#xff0c;进入自动化测试界面&#xff1b; 2. Set Up窗口&#xff1a;PCIE5.0->CEM-End Point Tests->Device Defi…

摄影小白入门相机选择(个人出发)

1.微单与卡片机 在产品质量上&#xff0c;相机的感光器件CMOS这些&#xff0c;可以一概认为&#xff0c;同价同质。 两者的区别主要在镜头的设计&#xff0c;黑卡被设计为不可更换镜头式无反相机&#xff0c;入门级别的一般搭配标准变焦镜头 如16-55这种焦距&#xff0c;旗舰级…