技术开发 频道

应用 fork-join 框架

  清单 3. 使用 fork-join 框架解决选择最大值问题

public class MaxWithFJ extends RecursiveAction {

  
private final int threshold;

  
private final SelectMaxProblem problem;

  
public int result;

  
public MaxWithFJ(SelectMaxProblem problem, int threshold) {

  this.problem
= problem;

  this.threshold
= threshold;

  }

  protected void compute() {

  
if (problem.size < threshold)

  result
= problem.solveSequentially();

  
else {

  
int midpoint = problem.size / 2;

  MaxWithFJ
left = new MaxWithFJ(problem.subproblem(0, midpoint), threshold);

  MaxWithFJ
right = new MaxWithFJ(problem.subproblem(midpoint +

  
1, problem.size), threshold);

  coInvoke(
left, right);

  result
= Math.max(left.result, right.result);

  }

  }

  
public static void main(String[] args) {

  SelectMaxProblem problem
= ...

  
int threshold = ...

  
int nThreads = ...

  MaxWithFJ mfj
= new MaxWithFJ(problem, threshold);

  ForkJoinExecutor fjPool
= new ForkJoinPool(nThreads);

  fjPool.invoke(mfj);

  
int result = mfj.result;

  }

  }

 

    表 1 显示了在不同系统上从 500,000 个元素的数组中选择最大元素的结果,以及改变阙值,使顺序方法优于并行方法。对于大多数运行,fork-join 池中的线程数量与可用的硬件线程(内核数乘以每个内核中的线程数)相等。与顺序方法相比,这些线程数量呈现一种加速比(speedup)。

  表 1. 在不同系统上从 500k 个元素的数组中运行 select-max 的结果

 

阙值 = 500k

阙值 = 50k

阙值 = 5k

阙值 = 500

阙值 = -50

Pentium-4 HT2 个线程)

1.0

1.07

1.02

.82

.2

Dual-Xeon HT4 个线程)

.88

3.02

3.2

2.22

.43

8-way Opteron8 个线程)

1.0

5.29

5.73

4.53

2.03

8-core Niagara32 个线程)

.98

10.46

17.21

15.34

6.49

    结果很令人振奋,因为它们在选择各种参数时显示了不错的加速比。因此,只要避免为问题和底层硬件选择完全不合理的参数,就会获得不错的结果。使用 chip-multithreading 技术,最优的加速比不太明显;像 Hyperthreading 这样的 CMT 方法所提供的性能要低于等价数量的实际内核提供的性能,但是性能损失取决于许多因素,包括正在执行的代码的缓存丢失率(miss rate)。

  此处选择的顺序阙值范围从 500K(数组的大小,表示没有并行性)到 50。在这个例子中,阙值为 50 实在太小,有些不切实际,而且结果显示,顺序阙值太低时,fork-join 任务管理开销将起决定作用。但是表 1 也显示只要避免这种不切实际的过高和过低的参数,就会获得不错的结果。选择 Runtime.availableProcessors() 作为工作线程的数量通常会获得与理想结果相近的结果,因为在 fork-join 池中执行的任务都是和 CPU 绑定的,但是,只要避免设置过大或过小的池,这个参数对结果就不会有太大影响。

  MaxWithFJ 类中不需要显式同步。它操作的数据对于问题的生命周期来说是不变的,并且 ForkJoinExecutor 中有足够的内部同步可以保证问题数据对于子任务的可视性,也可以保证子任务结果对于跟它们结合的任务的可视性。

  fork-join 框架剖析

  可以有很多方法实现 清单 3 中演示的 fork-join 框架。可以使用原始的线程;Thread.start() 和 Thread.join() 提供了所有必要的功能。然而,这种方法需要的线程数可能比 VM 所能支持的数量更多。对于大小为 N(假设为一个很小的顺序阙值)的问题,将需要 O(N) 个线程来解决问题(问题树深度为 log2N,深度为 k 的二进制树有 2k个节点)。在这些线程中,半数线程会用几乎整个生命周期来等待子任务的完成。创建线程会占用许多内存,这使得这种方法受到限制(尽管这种方法也能工作,但是代码非常复杂,并且需要仔细针对问题大小和硬件进行参数调优)。

  使用传统的线程池来实现 fork-join 也具有挑战性,因为 fork-join 任务将线程生命周期的大部分时间花费在等待其他任务上。这种行为会造成线程饥饿死锁(thread starvation deadlock),除非小心选择参数以限制创建的任务数量,或者池本身非常大。传统的线程池是为相互独立的任务设计的,而且设计中也考虑了潜在的阻塞、粗粒度任务 — fork-join 解决方案不会产生这两种情况。对于传统线程池的细粒度任务,也存在所有工作线程共享的任务队列发生争用的情况。

  工作窃取

  fork-join 框架通过一种称作工作窃取(work stealing) 的技术减少了工作队列的争用情况。每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的(Java 6 在类库中添加了几种 deque 实现,包括 ArrayDeque 和 LinkedBlockingDeque)。当一个任务划分一个新线程时,它将自己推到 deque 的头部。当一个任务执行与另一个未完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任务完成(像 Thread.join() 的操作一样)。当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。

  可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争用和窃取。因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用;因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用(在 fork-join 框架中结合 deque 实现会使这些访问模式进一步减少协调成本)。跟传统的基于线程池的方法相比,减少争用会大大降低同步成本。此外,这种方法暗含的后进先出(last-in-first-out,LIFO)任务排队机制意味着最大的任务排在队列的尾部,当另一个线程需要窃取任务时,它将得到一个能够分解成多个小任务的任务,从而避免了在未来窃取任务。因此,工作窃取实现了合理的负载平衡,无需进行协调并且将同步成本降到了最小。

  结束语

  fork-join 方法提供了一种表示可并行化算法的简单方式,而不用提前了解目标系统将提供多大程度的并行性。所有的排序、搜索和数字算法都可以进行并行分解(以后,像 Arrays.sort() 这样的标准库机制将会使用 fork-join 框架,允许应用程序免费享有并行分解的益处)。随着处理器数量的增长,我们将需要在程序内部使用更多的并行性,以有效利用这些处理器;对计算密集型操作(比如排序)进行并行分解,使程序能够更容易利用未来的硬件。

0
相关文章