【IT168 专稿】在本系列的第一篇文章“Visual Studio 2010对并行计算的支持”中,我们简要地介绍了微软为并行计算提供了完整的解决方案。如果你是一个.NET程序员,同时又在进行并行计算方面的开发,那么即将发布的.NET Framework 4.0将是微软送给你的一份大礼。
在.NET Framework 4.0中,在库的层次上,微软提供了大量的新内容来帮助程序员完成应用程序的并行化,其中包括Parallel LINQ(PLINQ),Task Parallel Library(TPL)和Coordination Data Structures。这里我们就先来介绍一下最简单最常用的TPL。
将跟随Visual Studio 2010一起发布的.NET Framework 4.0将包含很多基于库的对并行计算的支持。包括数据的并行化,任务的并行化等等,这一切都通过一个共同的工作调度器进行管理。这些新的类型和类,将在System.Threading, System.Threading.Tasks, System.Linq, 和 System.Collections.Concurrent这些名字空间中提供。通过这些新的类型和类,开发人员将无需面对如今复杂的多线程开发模式,而可以直接使用.NET Framework,更加高效简便地开发支持并行计算的应用程序,从而更加充分地利用多核CPU的优势,随着计算核心或者处理器的增加,以提升应用程序的性能。
而在.NET Framework中,Task Parallel Library (TPL)是其Parallel Extensions中一个重要组成部分,它提供了一种简便的多线程开发方式,通过它所提供的类或者函数,可以让程序员轻松地实现并行计算。其中,最简单的就是它的Parallel类
Parallel类
Parallel类就是TPL中的一个用于支持并行计算的类。Parallel类提供了诸多的静态函数,只需要简单的函数调用,我们就可以对常用的for循环,foreach循环进行并行化。下面我们通过一些实际的例子,来看看如何利用这个类将我们的应用程序并行化,以吃上多核这“免费的午餐”。
创建示例项目
为了演示如何将一个现有的项目并行化,我们需要先创建一个示例项目。在这个项目中,我们将模拟对数据的串行操作,然后介绍如何利用Parallel类将对数据的串行操作并行化,以充分利用多核CPU的优势,从而提升应用程序的性能。
在Visual Studio 2010中,我们新创建一个Visual C#的控制台应用程序。然后在这个项目中添加一个类Employee,其实现代码如下:
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ParallelDemo
{
// 职员类
public class Employee
{
public string FirstName
{
get;
set;
}
public string LastName
{
get;
set;
}
public string Address
{
get;
set;
}
public DateTime HireDate
{
get;
set;
}
public int EmployeeID
{
get;
set;
}
// 模拟对数据的处理
public static decimal Process(Employee employee)
{
Console.WriteLine("Processing {0}", employee.EmployeeID);
// 产生一个随机数
// 用以表示处理当前数据需要的时间
var rand = new Random(DateTime.Now.Millisecond);
var delay = rand.Next(1, 5);
var count = 0;
var process = true;
while (process)
{
System.Threading.Thread.Sleep(1000);
count++;
if (count >= delay)
process = false;
}
return delay;
}
}
// 职员列表类
// 这是我们需要处理的数据
public class EmployeeList : List
{
public EmployeeList()
{
// 将职员添加到列表中
Add(new Employee { EmployeeID = 1, FirstName = "张", LastName = "三", HireDate = DateTime.Parse("1/1/2007") });
Add(new Employee { EmployeeID = 2, FirstName = "李", LastName = "四", HireDate = DateTime.Parse("3/15/2006") });
Add(new Employee { EmployeeID = 3, FirstName = "王", LastName = "麻子", HireDate = DateTime.Parse("6/17/2005") });
Add(new Employee { EmployeeID = 4, FirstName = "赵", LastName = "匡胤", HireDate = DateTime.Parse("3/19/2000") });
Add(new Employee { EmployeeID = 5, FirstName = "钱", LastName = "进", HireDate = DateTime.Parse("7/17/2003") });
Add(new Employee { EmployeeID = 6, FirstName = "孙", LastName = "俊鹏", HireDate = DateTime.Parse("9/13/2005") });
Add(new Employee { EmployeeID = 7, FirstName = "李", LastName = "明", HireDate = DateTime.Parse("12/3/2002") });
Add(new Employee { EmployeeID = 8, FirstName = "周", LastName = "大勇", HireDate = DateTime.Parse("7/1/2008") });
Add(new Employee { EmployeeID = 9, FirstName = "吴", LastName = "明子", HireDate = DateTime.Parse("1/7/2008") });
Add(new Employee { EmployeeID = 10, FirstName = "郑", LastName = "邦万", HireDate = DateTime.Parse("11/1/2001") });
Add(new Employee { EmployeeID = 11, FirstName = "王", LastName = "朝", HireDate = DateTime.Parse("4/21/2006") });
Add(new Employee { EmployeeID = 12, FirstName = "冯", LastName = "玛丽", HireDate = DateTime.Parse("7/19/2006") });
Add(new Employee { EmployeeID = 13, FirstName = "陈", LastName = "良乔", HireDate = DateTime.Parse("3/9/2001") });
Add(new Employee { EmployeeID = 14, FirstName = "褚", LastName = "春晖", HireDate = DateTime.Parse("7/15/2005") });
Add(new Employee { EmployeeID = 15, FirstName = "卫", LastName = "斯理", HireDate = DateTime.Parse("8/6/2003") });
Add(new Employee { EmployeeID = 16, FirstName = "蒋", LastName = "中正", HireDate = DateTime.Parse("5/18/2005") });
Add(new Employee { EmployeeID = 17, FirstName = "沈", LastName = "洋洋", HireDate = DateTime.Parse("8/5/2002") });
Add(new Employee { EmployeeID = 18, FirstName = "韩", LastName = "斌", HireDate = DateTime.Parse("10/1/2006") });
Add(new Employee { EmployeeID = 19, FirstName = "杨", LastName = "雪", HireDate = DateTime.Parse("12/7/2002") });
Add(new Employee { EmployeeID = 20, FirstName = "朱", LastName = "辉", HireDate = DateTime.Parse("3/30/2001") });
DateTime.Parse("12/7/2002") });
Add(new Employee { EmployeeID = 20, FirstName = "朱", LastName = "辉", HireDate = DateTime.Parse("3/30/2001") });
}
}
}
}
}
}
在这段代码中,我们新创建了一个表示公司职员信息的类Employee以及管理公司所有职员的类EmployeeList,这里为了简便,我们只是在构造函数中人为地添加了一些要处理的数据,在实际应用中,这些数据可以来自文件,也可以来自数据库。EmployeeList这就是我们需要处理的数据集合。
使用Parallel.For进行并行化
通常我们在处理大量相互独立的同类数据的时候,比如vector,list这些容器中的数据,都会用到for循环。利用for循环,我们遍历数据集中的每一个数据并进行处理。在以前的单核时代,我们要处理EmployeeList中的所有数据,通常会这样做:
private static void Start(string strMethod)
{
start = DateTime.Now;
Console.WriteLine("{0} process started at {1}", strMethod, start);
}
// 输出函数结束执行信息,报告函数执行的时间
private static void End(string strMethod)
{
end = DateTime.Now;
TimeSpan jobTime = end.Subtract(start);
// 输出函数执行耗时
Console.WriteLine("{0} finished at {1} and took {2}",
strMethod, end, jobTime);
Console.WriteLine();
}
// 标准的for循环
private static void StandardForLoop()
{
// 输出函数开始执行
Start("StandardForLoop");
for (int i = 0; i < employeeData.Count; i++)
{
Console.WriteLine("Starting process for employee id {0}",
employeeData[i].EmployeeID);
decimal span = Employee.Process(employeeData[i]);
Console.WriteLine("Completed process for employee id {0} process took {1} seconds",
employeeData[i].EmployeeID, span);
Console.WriteLine();
}
// 输出函数结束执行
End("StandardForLoop");
}
在StandardForLoop函数中,我们利用标准的for循环,遍历了employeeData中的每一个数据,并对数据进行了处理。普通的for循环是串行的,只能运行在CPU的一个运算核心上,如果这段程序运行在多核CPU上,在程序执行for循环的时候,你会发现CPU只有一个运算核心在工作,其他基本上都处于空闲状态。这是对宝贵的CPU资源的极大浪费。
面对这种简单独立的for循环,我们可以利用TPL提供的Parallel.For将其并行化,以充分利用多核CPU的运算能力。我们利用Parallel.For将上面的普通for循环改写如下:
using System.Threading;
//…
// 并行的for循环
private static void ParallelForMethod()
{
Start("ParallelForMethod");
// 并行的for循环
Parallel.For(0, employeeData.Count, i =>
{
Console.WriteLine("Starting process for employee id {0}",
employeeData[i].EmployeeID);
decimal span = Employee.Process(employeeData[i]);
Console.WriteLine("Completed process for employee id {0}",
employeeData[i].EmployeeID);
Console.WriteLine();
});
End("ParallelForMethod");
}
为了使用Parallel类,我们首先需要声明System.Threading名字空间,Parallel类就在这个名字空间中。Parallel.For实际上是Parallel类提供的一个静态方法,它的第3个参数是类型为Action的委托,在这段代码中,我们直接使用Lambda表达式来将一个匿名函数直接“内联”作为Parallel.For方法的参数。从上面的代码我们也可以看出,将一个普通的串行的for循环改造为一个并行的for循环非常的容易,只要按照相应的规则调用Parallel.For函数就可以了,而运行时中的任务调度器会自己去进行任务的调度,硬件的分配等复杂而繁琐的事情,开发者则坐享其成。
通过使用Parallel.For对程序进行改写,可以充分利用多核CPU的运算能力,让应用程序的性能有大幅度的提升。在我的双核CPU上测试的结果是,程序性能有将近两倍的提升。
图1,Parallel.For所带来的性能提升
图2,充分利用CPU的计算能力
在Parallel.For中访问共享变量
使用Parallel.For需要注意的是,Parallel.For循环中的循环次序是乱的,并不像标准的for循环那样按照从小到大或者从大到小的顺序执行。从上面的截图中我们也可以发现,Parallel.For所处理的Emplee的ID是混乱的,并不像标准for循环那样从0到20的顺序处理。这一点我们在使用Parallel.For循环的时候要特别注意,如果你的循环对次序有要求,上一次循环是下一次循环的输入,或者是每次循环之间有相同的数据要处理,那么就不能简单地使用Parallel.For循环来并行化程序,需要做一些特殊的处理。比如,我们要统计某一个数据范围内的所有素数的个数,利用串行算法,我们可以实现如下:
static bool IsPrime(int valueToTest)
{
int upperBound = (int)Math.Sqrt(valueToTest);
for (int i = 2; i <= upperBound; i++)
{
if (valueToTest % i == 0) return false;
}
return true;
}
// 获取素数的数量
private static void GetPrimeCount()
{
// 定义范围
const int UPPER_BOUND = 4000000;
while (true)
{
int totalPrimes = 0;
var sw = System.Diagnostics.Stopwatch.StartNew();
for (int i = 2; i < UPPER_BOUND; i++)
{
if (IsPrime(i)) totalPrimes++;
}
// 输出耗时
Console.WriteLine("Sequential: {0} found in {1}",
totalPrimes, sw.Elapsed);
Console.ReadLine();
}
}
根据我们前面的示例代码,以上的并行化代码是不是看起来都很正确呢?当我们编译执行这个应用程序的时候,才会发现结果不对。这是因为程序并行化之后,多个执行单元访问同一个共享变量,当其中某一个执行单元在修改这个共享变量时,如果这时另外一个执行单元也恰好想访问这个变量,第二个执行单元的访问就会失败。针对这种情况,我们需要使用Interlocked类(或者其他的类)的函数来访问相应的共享变量,修改共享变量的值,就不会有这个问题了。我们将算法修改如下:
{
// 正确的共享变量访问方式
if (IsPrime(i)) Interlocked.Increment(ref totalPrimes);
});
经过这样的修改,我们就可以得到正确的计算结果了。
控制Parallel.For循环的执行
我们都知道,在普通for循环中,我们可以使用break或者continue关键词来控制for循环的执行,在一些特殊的情况下跳出for循环的执行或者是继续for循环的下一次执行。遗憾的是,这两个关键字不能使用在Parallel.For循环中。那么我们又如何控制Parallel.For循环的执行呢?Parallel.For函数提供了一些重载版本,这些重载的Parallel.For函数可以接受一个Action作为参数,而我们可以利用ParallelLoopState对象来控制Parallel.For函数的执行,ParallelLoopState对象是由运行时在后台创建的,我们还可以将ParallelLoopState命名为我们喜欢的名字,比如PrimeLoopState等等。这个对象有两个函数,Stop和Break,可以分别用来控制Parallel.For的执行。
其中,如果我们调用Stop,表示Parallel.For的执行立刻停止,无论其他执行单元是否达到停止的条件。而如果我们使用Break,则表示满足条件的当前执行单元立刻停止,而对于其他执行单元,可能会满足停止条件而通过Break停止,也可能在其执行过程中始终无法满足停止条件,从而全部执行完毕,自然停止。当所有执行单元停止后,Parallel.For函数才停止执行并退出。下面的代码演示了Break和Stop两个函数的差别:
private static void StopParallelFor()
{
int nTotal = 0;
Parallel.For(1, 100, (i, loopState) =>
{
// 当某一个循环单元的数大于30,
// 则停止Parallel.For的执行
if ( i > 30)
{
// 停止并退出Parallel.For
loopState.Stop();
return;
}
Console.WriteLine("Current Nummber: {0} ", i);
Interlocked.Increment(ref nTotal);
});
// 输出结果
Console.WriteLine("Total Accessed: {0} ", nTotal);
}
// 输出:
/* Current Nummber: 1
Current Nummber: 2
Total Accessed: 2
*/
// 跳出Parallel.For的循环
private static void BreakParallelFor()
{
int nTotal = 0;
Parallel.For(1, 100, (i, loopState) =>
{
// 当某一个循环单元的数大于30,
// 则跳出当前执行单元,等待其他执行单元结束
// 所有执行单元结束后退出Parallel.For的执行
if (i > 30)
{
// 跳出当前执行单元
loopState.Break();
return;
}
Console.WriteLine("Current Nummber: {0} ", i);
Interlocked.Increment(ref nTotal);
});
// 输出结果
Console.WriteLine("Total Accessed: {0} ", nTotal);
}
// 输出:
/* Current Nummber: 1
Current Nummber: 2
Current Nummber: 22
Current Nummber: 23
Current Nummber: 27
...
Current Nummber: 20
Current Nummber: 21
Total Accessed: 30
*/
从代码中我们可以看出,调用Break函数,只会停止满足条件的当前执行单元的执行,而其他执行单元是否停止,则要靠他们自己判断。而调用Stop函数,则会立刻停止所有执行单元,退出整个Parallel.For函数。
另外,Parallel.For方法有一个ParallelLoopResult类型的返回值,可以通过此返回值的IsCompleted属性来判断Parallel.For方法启动的所有任务是否运行都结束
我们都知道,在普通for循环中,我们可以使用break或者continue关键词来控制for循环的执行,在一些特殊的情况下跳出for循环的执行或者是继续for循环的下一次执行。遗憾的是,这两个关键字不能使用在Parallel.For循环中。那么我们又如何控制Parallel.For循环的执行呢?Parallel.For函数提供了一些重载版本,这些重载的Parallel.For函数可以接受一个Action作为参数,而我们可以利用ParallelLoopState对象来控制Parallel.For函数的执行,ParallelLoopState对象是由运行时在后台创建的,我们还可以将ParallelLoopState命名为我们喜欢的名字,比如PrimeLoopState等等。这个对象有两个函数,Stop和Break,可以分别用来控制Parallel.For的执行。
其中,如果我们调用Stop,表示Parallel.For的执行立刻停止,无论其他执行单元是否达到停止的条件。而如果我们使用Break,则表示满足条件的当前执行单元立刻停止,而对于其他执行单元,可能会满足停止条件而通过Break停止,也可能在其执行过程中始终无法满足停止条件,从而全部执行完毕,自然停止。当所有执行单元停止后,Parallel.For函数才停止执行并退出。下面的代码演示了Break和Stop两个函数的差别:
private static void StopParallelFor()
{
int nTotal = 0;
Parallel.For(1, 100, (i, loopState) =>
{
// 当某一个循环单元的数大于30,
// 则停止Parallel.For的执行
if ( i > 30)
{
// 停止并退出Parallel.For
loopState.Stop();
return;
}
Console.WriteLine("Current Nummber: {0} ", i);
Interlocked.Increment(ref nTotal);
});
// 输出结果
Console.WriteLine("Total Accessed: {0} ", nTotal);
}
// 输出:
/* Current Nummber: 1
Current Nummber: 2
Total Accessed: 2
*/
// 跳出Parallel.For的循环
private static void BreakParallelFor()
{
int nTotal = 0;
Parallel.For(1, 100, (i, loopState) =>
{
// 当某一个循环单元的数大于30,
// 则跳出当前执行单元,等待其他执行单元结束
// 所有执行单元结束后退出Parallel.For的执行
if (i > 30)
{
// 跳出当前执行单元
loopState.Break();
return;
}
Console.WriteLine("Current Nummber: {0} ", i);
Interlocked.Increment(ref nTotal);
});
// 输出结果
Console.WriteLine("Total Accessed: {0} ", nTotal);
}
// 输出:
/* Current Nummber: 1
Current Nummber: 2
Current Nummber: 22
Current Nummber: 23
Current Nummber: 27
...
Current Nummber: 20
Current Nummber: 21
Total Accessed: 30
*/
码中我们可以看出,调用Break函数,只会停止满足条件的当前执行单元的执行,而其他执行单元是否停止,则要靠他们自己判断。而调用Stop函数,则会立刻停止所有执行单元,退出整个Parallel.For函数。
另外,Parallel.For方法有一个ParallelLoopResult类型的返回值,可以通过此返回值的IsCompleted属性来判断Parallel.For方法启动的所有任务是否运行都结束
使用Parallel.ForEach进行并行化
Parallel.For函数对应于for循环,同样的,在TPL中也提供了一个Parallel.ForEach函数用于并行化foreach循环。例如,在本文的开始部分,我们用Parallel.For函数并行地处理了EmpleeList中的所有数据,实际上对于这种相互独立的数据,我们也可以使用Parallel.ForEach来进行并行处理:
private static void ParallelForEach()
{
Start("ParallelForEach");
// 使用Parallel.ForEach函数,
// 并行处理数据集employeeData中的每一个数据
Parallel.ForEach(employeeData, ed =>
{
Console.WriteLine("Starting process for employee id {0}",
ed.EmployeeID);
decimal span = Employee.Process(ed);
Console.WriteLine("Completed process for employee id {0}",
ed.EmployeeID);
Console.WriteLine();
});
End("ParallelForEach");
}
在后台,TPL“悄悄地”把整个集合分成若干个不相交的子集,然后,针对每个集合从线程池中选择一个线程对集合中的对象进行处理。由于每个子集都只对应着一个线程,因此,无需担心发生多线程访问共享资源的问题,而且多个子集的处理工作可以并行执行。
通过Parallel.For函数和Parallel.ForEach函数,在利用循环处理并行数据的时候,我们可以非常简便地将一个串行的for循环和foreach并行化,从而充分利用多核CPU的资源,提高应用程序的性能。这就像天上突然掉下一个大馅饼,不吃都有点过意不去啊。
(陈良乔)