技术开发 频道

VS2010中Parallel类实现并行计算

  【IT168 专稿】在本系列的第一篇文章“Visual Studio 2010对并行计算的支持”中,我们简要地介绍了微软为并行计算提供了完整的解决方案。如果你是一个.NET程序员,同时又在进行并行计算方面的开发,那么即将发布的.NET Framework 4.0将是微软送给你的一份大礼。

  在.NET Framework 4.0中,在库的层次上,微软提供了大量的新内容来帮助程序员完成应用程序的并行化,其中包括Parallel LINQ(PLINQ),Task Parallel Library(TPL)和Coordination Data Structures。这里我们就先来介绍一下最简单最常用的TPL。

  将跟随Visual Studio 2010一起发布的.NET Framework 4.0将包含很多基于库的对并行计算的支持。包括数据的并行化,任务的并行化等等,这一切都通过一个共同的工作调度器进行管理。这些新的类型和类,将在System.Threading, System.Threading.Tasks, System.Linq, 和 System.Collections.Concurrent这些名字空间中提供。通过这些新的类型和类,开发人员将无需面对如今复杂的多线程开发模式,而可以直接使用.NET Framework,更加高效简便地开发支持并行计算的应用程序,从而更加充分地利用多核CPU的优势,随着计算核心或者处理器的增加,以提升应用程序的性能。

  而在.NET Framework中,Task Parallel Library (TPL)是其Parallel Extensions中一个重要组成部分,它提供了一种简便的多线程开发方式,通过它所提供的类或者函数,可以让程序员轻松地实现并行计算。其中,最简单的就是它的Parallel类

  Parallel类

  Parallel类就是TPL中的一个用于支持并行计算的类。Parallel类提供了诸多的静态函数,只需要简单的函数调用,我们就可以对常用的for循环,foreach循环进行并行化。下面我们通过一些实际的例子,来看看如何利用这个类将我们的应用程序并行化,以吃上多核这“免费的午餐”。

  创建示例项目

  为了演示如何将一个现有的项目并行化,我们需要先创建一个示例项目。在这个项目中,我们将模拟对数据的串行操作,然后介绍如何利用Parallel类将对数据的串行操作并行化,以充分利用多核CPU的优势,从而提升应用程序的性能。

  在Visual Studio 2010中,我们新创建一个Visual C#的控制台应用程序。然后在这个项目中添加一个类Employee,其实现代码如下:

 using System;

  using System.Collections.Generic;

  using System.Linq;

  using System.Text;

  namespace ParallelDemo

  {

  
// 职员类

  
public class Employee

  {

  
public string FirstName

  {

  
get;

  
set;

  }

  
public string LastName

  {

  
get;

  
set;

  }

  
public string Address

  {

  
get;

  
set;

  }

  
public DateTime HireDate

  {

  
get;

  
set;

  }

  
public int EmployeeID

  {

  
get;

  
set;

  }

  
// 模拟对数据的处理

  
public static decimal Process(Employee employee)

  {

  Console.WriteLine(
"Processing {0}", employee.EmployeeID);

  
// 产生一个随机数

  
// 用以表示处理当前数据需要的时间

  var rand
= new Random(DateTime.Now.Millisecond);

  var delay
= rand.Next(1, 5);

  var count
= 0;

  var process
= true;

  
while (process)

  {

  System.Threading.Thread.Sleep(
1000);

  count
++;

  
if (count >= delay)

  process
= false;

  }

  return delay;

  }

  }

  
// 职员列表类

  
// 这是我们需要处理的数据

  
public class EmployeeList : List

  {

  
public EmployeeList()

  {

  
// 将职员添加到列表中

  Add(
new Employee { EmployeeID = 1, FirstName = "", LastName = "", HireDate = DateTime.Parse("1/1/2007") });

  Add(
new Employee { EmployeeID = 2, FirstName = "", LastName = "", HireDate = DateTime.Parse("3/15/2006") });

  Add(
new Employee { EmployeeID = 3, FirstName = "", LastName = "麻子", HireDate = DateTime.Parse("6/17/2005") });

  Add(
new Employee { EmployeeID = 4, FirstName = "", LastName = "匡胤", HireDate = DateTime.Parse("3/19/2000") });

  Add(
new Employee { EmployeeID = 5, FirstName = "", LastName = "", HireDate = DateTime.Parse("7/17/2003") });

  Add(
new Employee { EmployeeID = 6, FirstName = "", LastName = "俊鹏", HireDate = DateTime.Parse("9/13/2005") });

  Add(
new Employee { EmployeeID = 7, FirstName = "", LastName = "", HireDate = DateTime.Parse("12/3/2002") });

  Add(
new Employee { EmployeeID = 8, FirstName = "", LastName = "大勇", HireDate = DateTime.Parse("7/1/2008") });

  Add(
new Employee { EmployeeID = 9, FirstName = "", LastName = "明子", HireDate = DateTime.Parse("1/7/2008") });

  Add(
new Employee { EmployeeID = 10, FirstName = "", LastName = "邦万", HireDate = DateTime.Parse("11/1/2001") });

  Add(
new Employee { EmployeeID = 11, FirstName = "", LastName = "", HireDate = DateTime.Parse("4/21/2006") });

  Add(
new Employee { EmployeeID = 12, FirstName = "", LastName = "玛丽", HireDate = DateTime.Parse("7/19/2006") });

  Add(
new Employee { EmployeeID = 13, FirstName = "", LastName = "良乔", HireDate = DateTime.Parse("3/9/2001") });

  Add(
new Employee { EmployeeID = 14, FirstName = "", LastName = "春晖", HireDate = DateTime.Parse("7/15/2005") });

  Add(
new Employee { EmployeeID = 15, FirstName = "", LastName = "斯理", HireDate = DateTime.Parse("8/6/2003") });

  Add(
new Employee { EmployeeID = 16, FirstName = "", LastName = "中正", HireDate = DateTime.Parse("5/18/2005") });

  Add(
new Employee { EmployeeID = 17, FirstName = "", LastName = "洋洋", HireDate = DateTime.Parse("8/5/2002") });

  Add(
new Employee { EmployeeID = 18, FirstName = "", LastName = "", HireDate = DateTime.Parse("10/1/2006") });

  Add(
new Employee { EmployeeID = 19, FirstName = "", LastName = "", HireDate = DateTime.Parse("12/7/2002") });

  Add(
new Employee { EmployeeID = 20, FirstName = "", LastName = "", HireDate = DateTime.Parse("3/30/2001") });

  DateTime.Parse(
"12/7/2002") });

  Add(
new Employee { EmployeeID = 20, FirstName = "", LastName = "", HireDate = DateTime.Parse("3/30/2001") });

  }

  }

  }

  }

  }

  }

   在这段代码中,我们新创建了一个表示公司职员信息的类Employee以及管理公司所有职员的类EmployeeList,这里为了简便,我们只是在构造函数中人为地添加了一些要处理的数据,在实际应用中,这些数据可以来自文件,也可以来自数据库。EmployeeList这就是我们需要处理的数据集合。

  使用Parallel.For进行并行化

  通常我们在处理大量相互独立的同类数据的时候,比如vector,list这些容器中的数据,都会用到for循环。利用for循环,我们遍历数据集中的每一个数据并进行处理。在以前的单核时代,我们要处理EmployeeList中的所有数据,通常会这样做:

// 输出函数开始执行信息
        
private static void Start(string strMethod)
        {
            start
= DateTime.Now;
            Console.WriteLine(
"{0} process started at {1}", strMethod, start);
        }

        
// 输出函数结束执行信息,报告函数执行的时间
        
private static void End(string strMethod)
        {
            
end = DateTime.Now;
            TimeSpan jobTime
= end.Subtract(start);

            
// 输出函数执行耗时
            Console.WriteLine(
"{0} finished at {1} and took {2}",
                strMethod,
end, jobTime);
            Console.WriteLine();
        }

        
// 标准的for循环
        
private static void StandardForLoop()
        {
            
// 输出函数开始执行
            Start(
"StandardForLoop");

            
for (int i = 0; i < employeeData.Count; i++)
            {
                Console.WriteLine(
"Starting process for employee id {0}",
                    employeeData[i].EmployeeID);
                decimal span
= Employee.Process(employeeData[i]);
                Console.WriteLine(
"Completed process for employee id {0} process took {1} seconds",
                    employeeData[i].EmployeeID, span);
                Console.WriteLine();
            }

            
// 输出函数结束执行
            
End("StandardForLoop");
        }

  在StandardForLoop函数中,我们利用标准的for循环,遍历了employeeData中的每一个数据,并对数据进行了处理。普通的for循环是串行的,只能运行在CPU的一个运算核心上,如果这段程序运行在多核CPU上,在程序执行for循环的时候,你会发现CPU只有一个运算核心在工作,其他基本上都处于空闲状态。这是对宝贵的CPU资源的极大浪费。

  面对这种简单独立的for循环,我们可以利用TPL提供的Parallel.For将其并行化,以充分利用多核CPU的运算能力。我们利用Parallel.For将上面的普通for循环改写如下:

// 添加相应的名字空间
using System.Threading;

//

// 并行的for循环
        
private static void ParallelForMethod()
        {
            Start(
"ParallelForMethod");
            
            
// 并行的for循环
            Parallel.For(
0, employeeData.Count, i =>
            {
                Console.WriteLine(
"Starting process for employee id {0}",
employeeData[i].EmployeeID);
                decimal span
= Employee.Process(employeeData[i]);
                Console.WriteLine(
"Completed process for employee id {0}",
employeeData[i].EmployeeID);
                Console.WriteLine();
            });

            
End("ParallelForMethod");
        }

    为了使用Parallel类,我们首先需要声明System.Threading名字空间,Parallel类就在这个名字空间中。Parallel.For实际上是Parallel类提供的一个静态方法,它的第3个参数是类型为Action的委托,在这段代码中,我们直接使用Lambda表达式来将一个匿名函数直接“内联”作为Parallel.For方法的参数。从上面的代码我们也可以看出,将一个普通的串行的for循环改造为一个并行的for循环非常的容易,只要按照相应的规则调用Parallel.For函数就可以了,而运行时中的任务调度器会自己去进行任务的调度,硬件的分配等复杂而繁琐的事情,开发者则坐享其成。

  通过使用Parallel.For对程序进行改写,可以充分利用多核CPU的运算能力,让应用程序的性能有大幅度的提升。在我的双核CPU上测试的结果是,程序性能有将近两倍的提升。

  图1,Parallel.For所带来的性能提升

  图2,充分利用CPU的计算能力

  在Parallel.For中访问共享变量

  使用Parallel.For需要注意的是,Parallel.For循环中的循环次序是乱的,并不像标准的for循环那样按照从小到大或者从大到小的顺序执行。从上面的截图中我们也可以发现,Parallel.For所处理的Emplee的ID是混乱的,并不像标准for循环那样从0到20的顺序处理。这一点我们在使用Parallel.For循环的时候要特别注意,如果你的循环对次序有要求,上一次循环是下一次循环的输入,或者是每次循环之间有相同的数据要处理,那么就不能简单地使用Parallel.For循环来并行化程序,需要做一些特殊的处理。比如,我们要统计某一个数据范围内的所有素数的个数,利用串行算法,我们可以实现如下:

// 判断某一个数是否是素数
        static bool IsPrime(
int valueToTest)
        {
            
int upperBound = (int)Math.Sqrt(valueToTest);
            
            
for (int i = 2; i <= upperBound; i++)
            {
                
if (valueToTest % i == 0) return false;
            }
            return
true;
        }
        
        
// 获取素数的数量
        
private static void GetPrimeCount()
        {
            
// 定义范围
            
const int UPPER_BOUND = 4000000;
            
while (true)
            {
                
int totalPrimes = 0;
                var sw
= System.Diagnostics.Stopwatch.StartNew();
                
                
for (int i = 2; i < UPPER_BOUND; i++)
                {
                    
if (IsPrime(i)) totalPrimes++;
                }
                
                
// 输出耗时
                Console.WriteLine(
"Sequential: {0} found in {1}",
                    totalPrimes, sw.Elapsed);

                Console.ReadLine();
            }
        }

    根据我们前面的示例代码,以上的并行化代码是不是看起来都很正确呢?当我们编译执行这个应用程序的时候,才会发现结果不对。这是因为程序并行化之后,多个执行单元访问同一个共享变量,当其中某一个执行单元在修改这个共享变量时,如果这时另外一个执行单元也恰好想访问这个变量,第二个执行单元的访问就会失败。针对这种情况,我们需要使用Interlocked类(或者其他的类)的函数来访问相应的共享变量,修改共享变量的值,就不会有这个问题了。我们将算法修改如下:

Parallel.For(2, UPPER_BOUND, i =>
                {
                    
// 正确的共享变量访问方式
                    
if (IsPrime(i)) Interlocked.Increment(ref totalPrimes);

                });

    经过这样的修改,我们就可以得到正确的计算结果了。

  控制Parallel.For循环的执行

  我们都知道,在普通for循环中,我们可以使用break或者continue关键词来控制for循环的执行,在一些特殊的情况下跳出for循环的执行或者是继续for循环的下一次执行。遗憾的是,这两个关键字不能使用在Parallel.For循环中。那么我们又如何控制Parallel.For循环的执行呢?Parallel.For函数提供了一些重载版本,这些重载的Parallel.For函数可以接受一个Action作为参数,而我们可以利用ParallelLoopState对象来控制Parallel.For函数的执行,ParallelLoopState对象是由运行时在后台创建的,我们还可以将ParallelLoopState命名为我们喜欢的名字,比如PrimeLoopState等等。这个对象有两个函数,Stop和Break,可以分别用来控制Parallel.For的执行。

  其中,如果我们调用Stop,表示Parallel.For的执行立刻停止,无论其他执行单元是否达到停止的条件。而如果我们使用Break,则表示满足条件的当前执行单元立刻停止,而对于其他执行单元,可能会满足停止条件而通过Break停止,也可能在其执行过程中始终无法满足停止条件,从而全部执行完毕,自然停止。当所有执行单元停止后,Parallel.For函数才停止执行并退出。下面的代码演示了Break和Stop两个函数的差别:

// 停止Parallel.For
        
private static void StopParallelFor()
        {
            
int nTotal = 0;
            Parallel.For(
1, 100, (i, loopState) =>
            {
                
// 当某一个循环单元的数大于30,
                
// 则停止Parallel.For的执行
                
if ( i > 30)
                {
                    
// 停止并退出Parallel.For
                    loopState.Stop();
                    return;
                }
                Console.WriteLine(
"Current Nummber: {0} ", i);
                Interlocked.Increment(ref nTotal);
            });

            
// 输出结果
            Console.WriteLine(
"Total Accessed: {0} ", nTotal);
        }
        
// 输出:
        
/*  Current Nummber: 1
            Current Nummber:
2
            Total Accessed:
2
        
*/

        
// 跳出Parallel.For的循环
        
private static void BreakParallelFor()
        {
            
int nTotal = 0;
            Parallel.For(
1, 100, (i, loopState) =>
            {  
                
// 当某一个循环单元的数大于30,
                
// 则跳出当前执行单元,等待其他执行单元结束
                
// 所有执行单元结束后退出Parallel.For的执行
                
if (i > 30)
                {
                    
// 跳出当前执行单元
                    loopState.Break();
                    return;
                }
                Console.WriteLine(
"Current  Nummber: {0} ", i);
                Interlocked.Increment(ref nTotal);
            });

            
// 输出结果
            Console.WriteLine(
"Total Accessed: {0} ", nTotal);
        }
        
// 输出:
        
/*  Current Nummber: 1
            Current Nummber:
2
            Current  Nummber:
22
            Current  Nummber:
23
            Current  Nummber:
27
            ...
            Current  Nummber:
20
            Current  Nummber:
21
            Total Accessed:
30
        
*/

    从代码中我们可以看出,调用Break函数,只会停止满足条件的当前执行单元的执行,而其他执行单元是否停止,则要靠他们自己判断。而调用Stop函数,则会立刻停止所有执行单元,退出整个Parallel.For函数。

  另外,Parallel.For方法有一个ParallelLoopResult类型的返回值,可以通过此返回值的IsCompleted属性来判断Parallel.For方法启动的所有任务是否运行都结束

         我们都知道,在普通for循环中,我们可以使用break或者continue关键词来控制for循环的执行,在一些特殊的情况下跳出for循环的执行或者是继续for循环的下一次执行。遗憾的是,这两个关键字不能使用在Parallel.For循环中。那么我们又如何控制Parallel.For循环的执行呢?Parallel.For函数提供了一些重载版本,这些重载的Parallel.For函数可以接受一个Action作为参数,而我们可以利用ParallelLoopState对象来控制Parallel.For函数的执行,ParallelLoopState对象是由运行时在后台创建的,我们还可以将ParallelLoopState命名为我们喜欢的名字,比如PrimeLoopState等等。这个对象有两个函数,Stop和Break,可以分别用来控制Parallel.For的执行。

  其中,如果我们调用Stop,表示Parallel.For的执行立刻停止,无论其他执行单元是否达到停止的条件。而如果我们使用Break,则表示满足条件的当前执行单元立刻停止,而对于其他执行单元,可能会满足停止条件而通过Break停止,也可能在其执行过程中始终无法满足停止条件,从而全部执行完毕,自然停止。当所有执行单元停止后,Parallel.For函数才停止执行并退出。下面的代码演示了Break和Stop两个函数的差别:

// 停止Parallel.For
        
private static void StopParallelFor()
        {
            
int nTotal = 0;
            Parallel.For(
1, 100, (i, loopState) =>
            {
                
// 当某一个循环单元的数大于30,
                
// 则停止Parallel.For的执行
                
if ( i > 30)
                {
                    
// 停止并退出Parallel.For
                    loopState.Stop();
                    return;
                }
                Console.WriteLine(
"Current Nummber: {0} ", i);
                Interlocked.Increment(ref nTotal);
            });

            
// 输出结果
            Console.WriteLine(
"Total Accessed: {0} ", nTotal);
        }
        
// 输出:
        
/*  Current Nummber: 1
            Current Nummber:
2
            Total Accessed:
2
        
*/

        
// 跳出Parallel.For的循环
        
private static void BreakParallelFor()
        {
            
int nTotal = 0;
            Parallel.For(
1, 100, (i, loopState) =>
            {  
                
// 当某一个循环单元的数大于30,
                
// 则跳出当前执行单元,等待其他执行单元结束
                
// 所有执行单元结束后退出Parallel.For的执行
                
if (i > 30)
                {
                    
// 跳出当前执行单元
                    loopState.Break();
                    return;
                }
                Console.WriteLine(
"Current  Nummber: {0} ", i);
                Interlocked.Increment(ref nTotal);
            });

            
// 输出结果
            Console.WriteLine(
"Total Accessed: {0} ", nTotal);
        }
        
// 输出:
        
/*  Current Nummber: 1
            Current Nummber:
2
            Current  Nummber:
22
            Current  Nummber:
23
            Current  Nummber:
27
            ...
            Current  Nummber:
20
            Current  Nummber:
21
            Total Accessed:
30
        
*/

  码中我们可以看出,调用Break函数,只会停止满足条件的当前执行单元的执行,而其他执行单元是否停止,则要靠他们自己判断。而调用Stop函数,则会立刻停止所有执行单元,退出整个Parallel.For函数。

  另外,Parallel.For方法有一个ParallelLoopResult类型的返回值,可以通过此返回值的IsCompleted属性来判断Parallel.For方法启动的所有任务是否运行都结束

  使用Parallel.ForEach进行并行化

  Parallel.For函数对应于for循环,同样的,在TPL中也提供了一个Parallel.ForEach函数用于并行化foreach循环。例如,在本文的开始部分,我们用Parallel.For函数并行地处理了EmpleeList中的所有数据,实际上对于这种相互独立的数据,我们也可以使用Parallel.ForEach来进行并行处理:

// 并行处理数据
        
private static void ParallelForEach()
        {    
            Start(
"ParallelForEach");
            
// 使用Parallel.ForEach函数,
            
// 并行处理数据集employeeData中的每一个数据
            Parallel.ForEach(employeeData, ed
=>
            {
                Console.WriteLine(
"Starting process for employee id {0}",
                    ed.EmployeeID);
                decimal span
= Employee.Process(ed);
                Console.WriteLine(
"Completed process for employee id {0}",
                    ed.EmployeeID);
                Console.WriteLine();
            });

            
End("ParallelForEach");
        }

    在后台,TPL“悄悄地”把整个集合分成若干个不相交的子集,然后,针对每个集合从线程池中选择一个线程对集合中的对象进行处理。由于每个子集都只对应着一个线程,因此,无需担心发生多线程访问共享资源的问题,而且多个子集的处理工作可以并行执行。

  通过Parallel.For函数和Parallel.ForEach函数,在利用循环处理并行数据的时候,我们可以非常简便地将一个串行的for循环和foreach并行化,从而充分利用多核CPU的资源,提高应用程序的性能。这就像天上突然掉下一个大馅饼,不吃都有点过意不去啊。

  (陈良乔)

0
相关文章