对Parallel.Invoke进行控制
Parallel.Invoke提供了一个重载版本,它可以接受一个ParallelOptions对象作为参数,对Parallel.Invoke的执行进行控制。通过这个对象,我们可以控制并行的最大线程数,各个任务是否取消执行等等。例如,在一个智能化的家中,系统会判断主人是否离开房间,如果主人离开了房间,则自动关闭屋子里的各种电器。利用Parallel.Invoke我们可以实现如下:
{
// 创建取消对象
CancellationTokenSource cts = new CancellationTokenSource();
// 利用取消对象,创建ParallelOptions
ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token };
// 设置最大线程数
pOption.MaxDegreeOfParallelism = 2;
// 创建一个守护监视进程
Task.Factory.StartNew(() =>
{
Console.WriteLine("Cancellation in 5 sec.");
Thread.Sleep(5000);
// 取消,结束任务的执行
cts.Cancel();
Console.WriteLine("Canceled requested");
});
try
{
// 以ParallelOptions作为参数,
// 调用Parallel.Invoke
Parallel.Invoke(pOption, () => ShutdownLights(pOption.CancellationToken),
() => ShutdownComputer(pOption.CancellationToken));
//输出执行结果
Console.WriteLine("Lights and computer are tuned off.");
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
private static void ShutdownLights(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
Console.WriteLine("Light is on. " );
Thread.Sleep(1000);
}
}
private static void ShutdownComputer(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
Console.WriteLine("Computer is on." );
Thread.Sleep(1000);
}
}
除了这种方式之外,ParallelOptions更多地应用在取消任务队列中还未来得及执行的任务。当我们限制了最大并发线程数的时候,如果需要通过Parallel.Invoke执行的任务较多,则有可能部分任务在队列中排队而得不到及时的执行,如果到了一定的条件这些任务还没有执行,我们可能取消这些任务。一个恰当的现实生活中的例子就是火车站买票。火车站买票的人很多,但是售票的窗口有限,当到了下班时间后,窗口就不再售票了,也就是剩下的售票任务需要取消掉。我们可以用下面的代码来模拟这样一个场景:
{
// 创建取消对象
CancellationTokenSource cts = new CancellationTokenSource();
// 利用取消对象,创建ParallelOptions
ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token };
// 设置最大线程数,也就相当于20个售票窗口
pOption.MaxDegreeOfParallelism = 20;
// 创建一个守护监视进程
// 当到下班时间后就取消剩下的售票活动
Task.Factory.StartNew(() =>
{
Console.WriteLine("Cancellation in 5 sec.");
Thread.Sleep(5000);
// 取消,结束任务的执行
cts.Cancel();
Console.WriteLine("Canceled requested");
});
try
{
// 创建售票活动
Action[] CustomerServices = CreateCustomerService(1000);
// 以ParallelOptions作为参数,
// 调用Parallel.Invoke
Parallel.Invoke(pOption, CustomerServices);
}
catch (Exception e)
{
// 当任务取消后,抛出一个异常
Console.WriteLine(e.Message);
}
}
// 创建售票的活动
static Action[] CreateCustomerService(int n)
{
Action[] result = new Action[n];
for (int i = 0; i < n; i++)
{
result[i] = () =>
{
Console.WriteLine("Customer Service {0}", Task.CurrentId);
// 模拟售票需要的时间
Thread.Sleep(2000);
};
}
return result;
}
并行任务之间的同步
有时候我们在处理并行任务的时候,各个任务之间需要同步,也就是同时执行的并行任务,需要在共同到达某一个状态的后再一共继续执行。我们可以举一个现实生活中的例子。陈良乔,贾玮和单春晖是好朋友,他们相约到电影院看《建国大业》。他们三个住在不同的地方,为了能一起买票进电影院,他们约好先在电影院门口的KFC会合,然后再一起进电影院。这其中就涉及到一个同步的问题:他们需要先在KFC会合。他们是从家里分别到KFC的,但是需要在KFC进行同步,等到三个人都到齐后在完成后后继的动作,进电影院看电影。
为了完成并行任务之间的同步,.NET Framework中提供了一个类Barrier。顾名思义,Barrier就像一个关卡或者是剪票口一样,通过Barrier类,我们可以管理并行任务的执行,完成他们之间的同步。Barrier类的使用非常简单,我们只需要在主线程中声明一个Barrier对象,同时指明需要同步的任务数。然后,在需要进行同步的地方调用Barrier类的SignalAndWait函数就可以了。 当一个并行任务到达SignalAndWait后,它会暂停执行,等待所有并行任务都到达同步点之后再继续往下执行。下面我们以一个实际的例子,来看看如何利用Barrier类完成看电影的同步问题。
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ParallelBarrier
{
class Program
{
// 用于同步的Barrier对象
static Barrier sync;
static void Main(string[] args)
{
// 创建Barrier对象,这里我们需要同步
// 任务有三个
sync = new Barrier(3);
// 开始执行并行任务
var steps = new Action[] { () => gotothecinema("陈良乔", TimeSpan.FromSeconds(5) ),
() => gotothecinema("贾玮", TimeSpan.FromSeconds(2) ),
() => gotothecinema("单春晖", TimeSpan.FromSeconds(4) )};
Parallel.Invoke(steps);
Console.ReadKey();
}
// 任务
static void gotothecinema(string strName, TimeSpan timeToKFC )
{
Console.WriteLine("[{0}] 从家里出发。", strName);
// 从家里到KFC
Thread.Sleep(timeToKFC);
Console.WriteLine("[{0}] 到达KFC。", strName);
// 等待其他人到达
sync.SignalAndWait();
// 同步后,进行后继动作
Console.WriteLine("[{0}] 买票进电影院。", strName);
}
}
}