技术开发 频道

.NET开发中的异步编程:第三方类库

     Rx

  第三个要介绍的是Microsoft Research的Reactive Extension for .NET(Rx)。首先来看看名字:Reactive。这个单词表示什么意思呢?

  我们可以看看Wiki上简单的描述:

  传统的编程模型中a = b + c这个简单的语句表示将b和c相加的结果赋给a,以后b和c发生任何变化都不会影响到a了。假设现在有一个特殊的赋值操作符<-,我们写出这么个语句a <- b + c。这个操作符的意思是将b和c的和与a绑定,只要b或c的值发生变化,a的值也会发生变化。这就是Reactive Programming(响应式编程)的意思。就好像b和c是一个发布者,而a是它们的订阅者,当b或c发生改变时会发送通知给它们的订阅者。

  而Rx就是一个让你可以更容易的使用.NET进行响应式编程的框架。它利用可监视的(observable)集合,将异步和基于事件的编程联合在一起。其使用方式就是LINQ的语法,也可以将其称为LINQ to Events。在本节我们仅仅讨论Rx如何简化BeginXXX/EndXXX模式的异步编程的模型。在Rx中有System.Linq.Observable这个类型,提供了大量扩展方法,对我们现在要讨论有帮助有这么一个:

Func> FromAsyncPattern(Func begin, Func end)

  FromAsyncPattern有很多重载,其作用就是将APM中BeginXXX/EndXXX两个方法传入,然后返回一个Func,运行后得到一个IObservable(实现了该接口的AsyncSubject)。拿Stream举个例子:

Stream stream = ...

  var asyncRead
= Observable.FromAsyncPattern(stream.BeginRead, stream.EndRead);

  
byte[] buffer = new byte[1024];

  var bytesRead
= asyncRead(buffer,0,1024);

  从上面的代码可以看出来,FromAsyncPattern的使用方法:它的泛型参数前面几个是BeginRead的参数(除了callback和state外),最后一个泛型参数是EndRead的返回值。那么调用asyncRead的时候传入的参数就像Stream.Read这个同步版本的一样。执行asyncRead后我们得到了一个实现了IObservable接口的对象,该接口提供了丰富的功能可供我们使用。比如其中就有这么一个Run方法:

  

var stream = new FileStream("bigfile.txt",FileMode.Open,FileAccess.Read,FileShare.Read,2<<16,true);

  var asyncRead
= Observable.FromAsyncPattern(stream.BeginRead, stream.EndRead);

  
byte[] buffer = new byte[1024];

  var read
= asyncRead(buffer,0,1024);

  read.Run(bytesRead
=> Console.WriteLine("读取的字节数:{0}",bytesRead));

  其实上面这段代码,等到我们讨论Async CTP的时候会发现隐隐约约有些Rx的影子。要了解Rx的更详细信息,请点击这里。另外Rx也有针对JavaScript的版本,这对于要在网页上实现拖拽等操作提供了很大的便利。

  

0
相关文章