【IT168 专稿】在前一篇文章中(.NET异步编程:使用CPS及yield实现异步)我们看到了一些关于CPS的讨论,并利用C# 2新增的迭代器yield来实现CPS的编码方式,简化异步编程。不过异步并不是你想象的那么简单,也不是我那个随手写的几行代码能够解决的,特别是当并发环境下你会看到到处是异常。所幸的是在.NET平台中已经有一些第三方类库能够简化我们的异步编程,而且还更可靠。在本文中我想介绍的第三方类库有三个:Jeffrey Ritcher的AsyncEnumerator,微软的CCR以及Microsoft Research的Rx。不过我这里只是浅尝辄止的介绍,要了解更多内容可以参照她们的官方文档,我在后面会列出链接。
AsyncEnumerator
把上一篇文章的代码改用AsyncEnumerator实现就是下面这个样子:
2: {
3: var request = HttpWebRequest.Create("http://www.google.com");
4: request.BeginGetResponse(ae.End(),null);
5: yield return 1;
6: var response = request.EndGetResponse(ae.DequeueAsyncResult());
7: using(var stream = response.GetResponseStream())
8: {
9: stream.BeginRead(buffer,1,1024,ae.End(),null);
10: yield return 1;
11: var actualRead = stream.EndRead(ae.DequeueAsyncResult());
12: }
13: }
14:
15: public void btnDownload_Click(object sender,EventArgs e)
16: {
17: AsyncEnumerator ae = new AsyncEnumerator();
18: ae.EndExecute(ae.BeginExecute(Download(),null));
19: }
可以看到,这里的代码几乎与上一篇文章使用yield实现异步的代码一模一样,实际上站在AsyncEnumerator背后的正是yield。但是AsyncEnumerator对异步的一些问题做了增强。比如大家经常碰到的在不是创建UI的线程里更新UI上的元素会抛出异常的问题在使用AsyncEnumerator中就不存在。所以对于上面的代码我们可以放心的在异步接收到服务器端返回的内容后更新UI上的元素:
2: this.txtContent.Text = content;
关于这方面的细节你可以使用“SynchronizationContext”作为关键字搜索,MSDN杂志的二月刊上也有一篇相关文章:It's All About the SynchronizationContext
AsyncEnumerator还有很多其他便利性,在本文我就不一一讨论了。感兴趣的可以去其官方网站,下载详细的示例。
另外,虽然AsyncEnumerator使用同步方式的编写异步代码,样子看起来是同步的,但它并没有避免它异步的本质。异步的代码调试起来是非常困难的,你会发现执行流程乱序五章。而AsyncEnumerator对调试也提供了一些支持,让你调试的时候有一条线索可以跟踪。
CCR
你可能要问,第三方的开发者都提供了这些改进异步编程的类库,难道微软就没有么?
微软的CCR(Concurrency and Coordination Runtime)是为机器人开发平台中需要并行协作而开发的类库。下面我会先简单介绍一下CCR对并发编程的支持,然后介绍一下其对异步编程模型的简化。
在CCR中有一个核心类型Dispatcher,每个Dispatcher都管理着一些线程,相当于一个线程池。与CLR的ThreadPool不同的是,每个Dispatcher都对应着一个自己的“线程池”,而每个CLR中只有一个ThreadPool。一般会将一个Dispatcher与一个DispatcherQueue绑定,任务会进入DispatcherQueue这个队列,然后Dispatcher管理的线程就会执行这个队列里的任务。从这一点看很像我们在上一篇已经接触过的IO完成端口的行为。
CCR中对并发的实现应该算actor-based模型,熟悉Erlang的同学应该会很清楚这个东西。在CCR中,我们创建一个Port
{
//创建一个任务队列与一个Dispatcher绑定
DispatcherQueue taskQueue = new DispatcherQueue("task queue",dispatcher);
Port sender = new Port();
//创建一个接受者,监视sender
Receiver receiver = Arbiter.Receive(true, sender,delegate(String message) { Console.WriteLine(message);})
//该接受者接收到消息后会向taskQueue推入一个任务
Arbiter.Active(taskQueue,receiver);
//发送消息
sender.Post("Hello");
}
上面是使用CCR进行message-based并发编程,message-based的并发编程的好处是避免了Shared Memory,也就不需要锁机制了,简化了并发编程的模型。
而CCR中还提供了FromIteratorHandler这么一个方法,其接受一个IEnumerable参数,表示IEnumerable里每一项都是都是一个操作。有了这个我们再和“神奇的yield”结合起来,又可以大大简化异步开发了:
2: {
3: var resultPort = new Port();
4:
5: var request = WebRequest.Create("http://www.google.com");
6: request.BeginGetResponse(resultPort.Post,null);
7:
8: yield return resultPort.Receive();
9:
10: var response = request.EndGetResponse((IAsyncResult)resultPort.Test());
11:
12: using(var stream = response.GetResponseStream())
13: {
14: var buffer = new byte[1024];
15:
16: stream.BeginRead(buffer,0,1024,resultPort.Post,null);
17:
18: yield return resultPort.Receive();
19:
20: var actualRead = stream.EndRead((IAsyncResult)resultPort.Test());
21:
22: Console.WriteLine(Encoding.Default.GetString(buffer,0,actualRead));
23: }
24: }
然后使用下面的代码运行:
2: DispatcherQueue queue = new DispatcherQueue("download", dispatcher);
3: Arbiter.Activate(queue, Arbiter.FromIteratorHandler(Download));
关于CCR更详细的资料参见这里。Jeffrey Ritcher也在MSDN杂志上有一篇文章介绍CCR,该文的示例代码中还有对各种异步IO的封装代码。
Rx
第三个要介绍的是Microsoft Research的Reactive Extension for .NET(Rx)。首先来看看名字:Reactive。这个单词表示什么意思呢?
我们可以看看Wiki上简单的描述:
传统的编程模型中a = b + c这个简单的语句表示将b和c相加的结果赋给a,以后b和c发生任何变化都不会影响到a了。假设现在有一个特殊的赋值操作符<-,我们写出这么个语句a <- b + c。这个操作符的意思是将b和c的和与a绑定,只要b或c的值发生变化,a的值也会发生变化。这就是Reactive Programming(响应式编程)的意思。就好像b和c是一个发布者,而a是它们的订阅者,当b或c发生改变时会发送通知给它们的订阅者。
而Rx就是一个让你可以更容易的使用.NET进行响应式编程的框架。它利用可监视的(observable)集合,将异步和基于事件的编程联合在一起。其使用方式就是LINQ的语法,也可以将其称为LINQ to Events。在本节我们仅仅讨论Rx如何简化BeginXXX/EndXXX模式的异步编程的模型。在Rx中有System.Linq.Observable这个类型,提供了大量扩展方法,对我们现在要讨论有帮助有这么一个:
FromAsyncPattern有很多重载,其作用就是将APM中BeginXXX/EndXXX两个方法传入,然后返回一个Func,运行后得到一个IObservable(实现了该接口的AsyncSubject)。拿Stream举个例子:
var asyncRead = Observable.FromAsyncPattern(stream.BeginRead, stream.EndRead);
byte[] buffer = new byte[1024];
var bytesRead = asyncRead(buffer,0,1024);
从上面的代码可以看出来,FromAsyncPattern的使用方法:它的泛型参数前面几个是BeginRead的参数(除了callback和state外),最后一个泛型参数是EndRead的返回值。那么调用asyncRead的时候传入的参数就像Stream.Read这个同步版本的一样。执行asyncRead后我们得到了一个实现了IObservable接口的对象,该接口提供了丰富的功能可供我们使用。比如其中就有这么一个Run方法:
var asyncRead = Observable.FromAsyncPattern(stream.BeginRead, stream.EndRead);
byte[] buffer = new byte[1024];
var read = asyncRead(buffer,0,1024);
read.Run(bytesRead => Console.WriteLine("读取的字节数:{0}",bytesRead));
其实上面这段代码,等到我们讨论Async CTP的时候会发现隐隐约约有些Rx的影子。要了解Rx的更详细信息,请点击这里。另外Rx也有针对JavaScript的版本,这对于要在网页上实现拖拽等操作提供了很大的便利。