【IT168 技术文档】如果你想利用多核机器的强大计算能力,你需要使用PLINQ(并行LINQ),任务并行库(Task Parallel Library,TPL)和Visual Studio2010中的新功能创建应用程序。
以前,如果你创建的多线程应用程序有BUG,那要跟踪起来是很麻烦的,但现在情况完全变了,感谢微软为我们带来了Microsoft Parallel Extensions for .NET(.NET并行扩展),它在.NET框架线程模型上提供了一个抽象层。
并行扩展遵循微软在COM应用程序中建立的事务管理和在数据访问领域建立的实体框架和LINQ模型,它试图通过给.NET框架中的复杂过程建立高级支持,以便将先进的技术带给大众,随着多核处理器的普及,开发人员渴望他们的应用程序可以利用所有处理器核心的计算能力。
你可以通过并行LINQ(PLINQ)和任务并行库(Task Parallel Library,TPL)使用并行扩展的功能,它们都允许你为单核和多核计算机写一套代码,依靠.NET框架,最大限度利用代码执行平台的计算能力,并防止自行创建多线程应用程序时常见的陷阱。
PLINQ扩展了LINQ查询,它将单个查询分解成多个并行运行的子查询,TPL允许你创建并行运行的循环,而不是一个接一个地运行,虽然PLINQ的声明语法使创建并行进程更加简单,但一般情况下,面向TPL的操作比PLINQ查询更轻量级。
许多时候,选择TPL还是PLINQ只是一种生活方式,如果喜欢并行循环,而不是并行查询,那么设计一个TPL解决方案比设计一个PLINQ解决方案更容易。
PLINQ简介
对于商业应用程序,只要LINQ查询涉及到多个子查询时,PLINQ就像金子一样发光,如果你要连接本地数据库某张表中的行和另一个远程数据库某张表中的行,PLINQ将非常有用,在这种情况下,LINQ必须在每个数据源上独立运行子查询,然后调和结果,PLINQ将会把这些子查询分配给多个处理器核心,这些子查询就可以同时执行。实际上,你使用的处理器周期不是少了,而是更多了,当然好处就是你可以更早得到结果,请阅读“并行处理不会让你的应用程序变得更快”了解更多关于多线程应用程序的行为。
并行处理不会让你的应用程序变得更快
关于多线程应用程序最常见的一个误解是,应用程序线程越多,运行速度就越快,多启动一个线程并不会导致Windows给你的应用程序更多的处理周期,它只是把这些周期划分给更多线程了,实际上,在单处理器计算机上,开启多线程只会让你的应用程序变得更慢。
多线程只是让你的应用程序响应更快,但它仍然要等待其它阻塞任务完成先,不过在等待期间,你可以利用多线程应用程序的特点让其它线程做一些别的事情。在单核机器上,如果线程未被阻塞,多个线程只能相互争夺有限的处理周期。
多核处理器改变了这种状况,在多核环境中,你可以让Windows给你的应用程序分配更多的处理周期,你不需要阻塞线程,所有线程都在它们自己的核心上执行。并行扩展提供了编程结构,允许你告诉.NET框架应用程序那些部分可以并行执行。
即使在多核机器上,PLINQ也并不总是并行的查询,有两个原因,一是你的应用程序并行运行不会总是更快,第二个原因是,即使你有一个抽象层管理你的线程,在并行处理时总会出现脚步不一致的情况,PLINQ会检查一些不安全的条件,如果检测到就不会进行并行查询。我会指出PLINQ不会检查的问题和条件,但使用PLINQ出了问题只有你自己负责处理。
处理PLINQ
调用PLINQ很简单,只需要在你的数据源中添加AsParallel扩展,下面是一个从本地Northwind数据库连接远程Northwind数据库,根据客户(customer)信息查询订单(Orders)的示例:
On c.CustomerID Equals o.CustomerID
Where c.CustomerID = "ALFKI"
Select o
因为两个数据源都标记了AsParallel(在连接时,如果一个数据源使用了AsParallel,另一个也必须使用),因此将会使用PLINQ。
和普通的LINQ查询一样,PLINQ查询使用延迟处理,即等到你要真正使用数据时,它才会开始检索,这意味着即使LINQ查询声明了是并行的,在你要处理结果前不会发生并行处理,除非使用下面这样的代码块:
ord.RequiredDate.Value.AddDays(2)
Next
在后台,PLINQ将使用一个线程执行For …Each循环中的代码,而其它线程可能被用来执行子查询,最大可以使用64个线程,请阅读“并行控制”材料了解这种行为的更多信息。
并行控制
本文认为并行LINQ(PLINQ)总是好的,例如,首先选择是否要并行运行,然后决定如何将多个子查询分配给多个线程,你可以使用With*扩展控制PLINQ的行为。
在使用调试工具的时候,你会发现PLINQ不是并行执行查询的,你可以传递ParallelExecutionMode .ForceParallelism值给WithExecutionMode方法让其强制并行执行查询。
WithExecutionMode(ParallelExecutionMode.ForceParallelism)
如果你想指定线程的数量(例如,你想让一或多个处理核心闲置),你可以使用WithDegreeOfParallelism方法,下面的代码示例将线程数限制为3。
WithDegreeOfParallelism(3)
你也可以使用cancellation结束处理过程,首先创建一个CancellationTokenSource对象,然后将其传递给WithCancellation扩展。
ords = From o In le.Orders.AsParallel.
WithCancellation(ctx.Token)
Where o.RequiredDate > Now
Select o
For Each ord As Order In ords
totFreight += ord.Freight
If totFreight > FreightChargeLimit Then
ctx.Cancel()
End If
Next
如果你正在处理For…Each循环中的PLINQ查询结果,调用cancellation会自动退出循环。
如果在一个订单(Order)上的处理过程不和另一个订单上的处理过程共享状态,可以使用ForAll循环进一步提高响应,ForAll可以用于支持Lambda表达式的PLINQ查询结果集,它和For…Each循环不一样,For…Each只在程序的主线程中执行的,而传递给ForAll方法的操作是在PLINQ查询产生的独立查询线程上执行的。
ord.RequiredDate.Value.AddDays(2)
End Sub)
此外,For…Each循环是在它自己的线程中串行执行的,而ForAll中的代码是在检索订单的线程上并行执行的。
管理顺序
虽然和SQL类似,但PLINQ不保证顺序,PLINQ子查询返回结果的顺序依赖于各个线程不可预知的响应时间,例如下面这个查询是为了获得将要先发货的五个订单。
Where o.RequiredDate > Now
Select o
Take (5)
图 1 PLINQ给TPL中的功能添加查询分析和标准查询操作,TPL提供管理操作系统底层线程需要的基本的结构和调度
如果不保证顺序,我将获得一个随机的订单(Orders)数据集,它们可能是(也可能不是)应该先发货的五个订单,为了确保得到前五个订单,我需要在查询中增加一个Order By子句,按照日期对查询结果进行排序,当然这样就会丢掉PLINQ的一些好处。
因为结果来自多个线程,难免不会出现异常,PLINQ不能明白“上一条”和“下一条”的概念,如果在你的循环中刚好要用到下一条项目的值时,完全有可能会遭遇错误的处理,为了让订单中的项目按照原始数据源中的顺序处理,你需要在查询中增加AsOrdered扩展。
例如,如果我想将低于某一运费的所有订单打包到一起处理,我可能会写下面这样一个循环:
totFreight += ord.Freight
If totFreight > FreightChargeLimit Then
Exit For
End If
shipOrders.Add(ord)
Next
由于并行处理返回的项目顺序不可预知,因此进入批处理的订单可能是随机的,为了保证按照原始数据源中的顺序处理返回的结果,我必须给数据源加上AsOrdered扩展。
Where o.RequiredDate > Now
Select o
TPL(任务并行库)介绍
如果你的处理不是由LINQ查询驱动的,你可以使用借鉴了PLINQ的TPL技术,从根本上看,TPL让你创建可并行执行的循环,如果你的计算机是四核的,一个循环可能用1/3的时间就完成了。
如果不使用TPL,你可能会像下面这样处理Orders集合中的所有元素:
o.RequiredDate.Value.AddDays(2)
Next
如果使用TPL,你调用Parallel类的ForEach方法,通过Lambda表达式来处理集合中的项目:
le.Orders, Sub(o)
o.RequiredDate.Value.AddDays(2)
End Sub)
通过使用Parallel ForEach,每个方法的实例可以在独立的处理器上同时处理,如果每个操作需要1毫秒,并且有足够的处理器存在,所有的订单就可以在1毫秒内处理,而不是1毫秒乘以订单数量的时间。
任何复杂的处理放在Lambda表达式中都会变得很难阅读,因此你要经常想到在你的Lambda表达式中调用下面这样一些方法:
le.Orders, Sub(o)
ExtendOrders(o)
End Sub)
...
Sub ExtendOrders(ByVal o As Order)
o.RequiredDate.Value.AddDays(2)
End Sub
从本质上讲,TPL将集合中的成员分配给独立的任务,这些任务又被分配到所有处理核心上执行,每个任务完成时释放掉代码,TPL调度器从执行队列中取出另一个任务开始执行,你也可以根据索引值使用For方法创建一个循环。
当你创建自定义任务时你才会感觉到TPL的强大之处,任务创建好后使用它的Start方法启动,但它更容易使用Task类的静态工厂对象(Factory),它的StartNew方法可以创建并启动任务(Task),你只需要通过一个Lambda表达式就可以使用StartNew方法,如果你的函数要返回一个值,你可以使用Task对象的Generic版本指定返回的类型。
下面的示例为计算订单总价的Order Detail对象创建并启动了一个Task,Task被添加到一个列表(List)中,后面的代码循环检索List中的结果,如果我需要一个未计算的结果,第二个循环将会暂停,直到Task完成。
Tasks.Task(Of Decimal)
Dim CalcTasks As New List(Of System.
Threading.Tasks.Task(Of Decimal))
For Each ord As Order_Detail In le.Order_Details
Dim od As Order_Detail = ord
CalcTask = System.Threading.
Tasks.Task(Of Decimal).
Factory.StartNew(Function() CalcValue(od))
CalcTasks.Add(CalcTask)
Next
Dim totResult As Decimal
For Each ct As System.Threading.Tasks.Task(Of Decimal) In CalcTasks
totResult += ct.Result
Next
如果我足够幸运,在我需要结果前,Task总是先完成,即使不走运,也要比按顺序运行每个Task更早得到结果。
凡是遇到一个Task的输出要依赖于另一个Task先完成的情况,你可以在Task之间创建依赖或将Task分组,最简单的办法是使用Wait方法,但它会导致你的应用程序停止执行,直到所有Task全部完成。
Task(Of Decimal).Factory.StartNew(Function() CalcValue(le.Order_Details(0))),
Task(Of Decimal).Factory.StartNew(Function() CalcValue(le.Order_Details(1)))
}
System.Threading.Tasks.Task.WaitAll(tsks)
一个更复杂的方法是使用Task对象的ContinueWith方法,当其它Task完成时,它触发一个Task继续运行。下面的例子启动了多个线程,每个都计算订单明细(Order Detail)的值,但都只有等到订单明细上的其它操作完成后才能执行。
Dim od As Order_Detail = ordd
Dim adjustedDiscount As New Task(Sub() AdjustDiscount(od))
Dim calcedValue As Task(Of Long) =
adjustedDiscount.ContinueWith(Of Long)(Function() CalcValue(od))
adjustedDiscount.Start
Next
图 2 并行堆栈窗口提供了一个可视化视图,显示了当前执行的线程的附加信息
出错时如何处理
在多个处理器上同时执行多个线程也会造成异常出现得更频繁,任何线程上一旦发生异常,整个应用程序都将挂起,给AggregateException对象添加的错误处理也会增加,通过这个对象的InnerExceptions属性允许你查看每个线程的异常。
'PLINQ or TPL processing Catch aex As AggregateException
For Each ex As Exception In aex.InnerExceptions
Messages.Append(ex.Message & "; ")
Next
End Try
注意这里没有使用Catch语句,你需要检查InnerExceptions的类型,确定每个线程究竟抛出的是什么异常。
调试并发线程变得更加有趣,因为异常可能随一个PLINQ查询中的循环出现,解决这个问题可能需要重构PLINQ查询,幸运的是,Visual Studio 2010包括了额外的工具调式并行错误。
并行堆栈窗口(Parallel Stacks)超越了旧的线程窗口,线程窗口只能提供一个视图,而并行堆栈窗口可以显示所有正在执行的线程,例如,它默认允许你同时查看多个线程的调用堆栈,你可以放大显示内容,也可以过滤只显示指定的线程,更重要的是,如果你使用TPL,你可以切换到基于任务的视图(对应于你代码中的Task对象),或方法视图(显示调用方法的任务),但使用并行任务窗口(Parallel Tasks)可能更有用,因为它围绕Task组织任务,这个窗口不仅显示当前运行的任务,已调度和等待运行的任务也会显示(显示在状态[Status]列),你可以通过检查当前运行的Task是否在等待其它任务,从而确定Task之间的依赖关系。
在早期的Visual Studio版本中,要一步一步调式多线程程序是一场噩梦,因为调试器要从一个线程中的当前语句跳转到另一个线程的当前语句,并行任务(Parallel Task)允许你冻结或解冻与Task相关的线程,在调试时控制哪一个线程先运行。
一起使用这两个窗口可以简化并行处理问题的诊断,例如,Visual Studio现在检测到一个死锁时,它会自动打破死锁,当调式器检测到两个或多个Task不能处理时(因为相互都在等待对方释放锁定的对象),Visual Studio将实施冻结处理,就好像你遇到一个断点似的,并行任务窗口将显示每个Task在等待的对象,以及它占有的线程,并行堆栈窗口的方法视图可视化显示了发生死锁时哪个Task调用了哪个方法。
其它调试功能
除了这些工具外,Visual Studio还包含了其它几个用于调式并行处理的功能,在遍历你的代码时,当你的鼠标移到一个Task对象上时,弹出一个提示窗口,显示该任务的Id,关联的方法和它当前的状态(如,等待执行)等详细信息,进一步展开该提示,可以看到该Task的属性值,包括它的结果。在观察窗口(Watch)中检查Task的InternalCurrent属性,可以得到当前正在执行的Task的信息,任务调度器(TaskScheduler)的提示展开后可以看到它管理的所有Task。
合理使用PLINQ,TPL和Visual Studio提供的功能,无论你的应用程序运行在什么计算机上,你都可以利用所有处理器的计算能力。