Rx
第三个要介绍的是Microsoft Research的Reactive Extension for .NET(Rx)。首先来看看名字:Reactive。这个单词表示什么意思呢?
我们可以看看Wiki上简单的描述:
传统的编程模型中a = b + c这个简单的语句表示将b和c相加的结果赋给a,以后b和c发生任何变化都不会影响到a了。假设现在有一个特殊的赋值操作符<-,我们写出这么个语句a <- b + c。这个操作符的意思是将b和c的和与a绑定,只要b或c的值发生变化,a的值也会发生变化。这就是Reactive Programming(响应式编程)的意思。就好像b和c是一个发布者,而a是它们的订阅者,当b或c发生改变时会发送通知给它们的订阅者。
而Rx就是一个让你可以更容易的使用.NET进行响应式编程的框架。它利用可监视的(observable)集合,将异步和基于事件的编程联合在一起。其使用方式就是LINQ的语法,也可以将其称为LINQ to Events。在本节我们仅仅讨论Rx如何简化BeginXXX/EndXXX模式的异步编程的模型。在Rx中有System.Linq.Observable这个类型,提供了大量扩展方法,对我们现在要讨论有帮助有这么一个:
FromAsyncPattern有很多重载,其作用就是将APM中BeginXXX/EndXXX两个方法传入,然后返回一个Func,运行后得到一个IObservable(实现了该接口的AsyncSubject)。拿Stream举个例子:
var asyncRead = Observable.FromAsyncPattern(stream.BeginRead, stream.EndRead);
byte[] buffer = new byte[1024];
var bytesRead = asyncRead(buffer,0,1024);
从上面的代码可以看出来,FromAsyncPattern的使用方法:它的泛型参数前面几个是BeginRead的参数(除了callback和state外),最后一个泛型参数是EndRead的返回值。那么调用asyncRead的时候传入的参数就像Stream.Read这个同步版本的一样。执行asyncRead后我们得到了一个实现了IObservable接口的对象,该接口提供了丰富的功能可供我们使用。比如其中就有这么一个Run方法:
var asyncRead = Observable.FromAsyncPattern(stream.BeginRead, stream.EndRead);
byte[] buffer = new byte[1024];
var read = asyncRead(buffer,0,1024);
read.Run(bytesRead => Console.WriteLine("读取的字节数:{0}",bytesRead));
其实上面这段代码,等到我们讨论Async CTP的时候会发现隐隐约约有些Rx的影子。要了解Rx的更详细信息,请点击这里。另外Rx也有针对JavaScript的版本,这对于要在网页上实现拖拽等操作提供了很大的便利。