技术开发 频道

通过线程门实现Java并发编程

  线程门实现

  线程门的目标是确保多个阅读器可以访问素数缓冲器,而且无需阻挡搜索线程对其增加结果。

  一个阅读器线程应该能够无需等待就可以访问任何可用结果;但是如果没有结果可用,它必须排队等待,直到至少有一个结果可用。该示例应用使用了一个公平设计原则,因此结果大致按照线程到达的顺序来被提供。这个示例应用的重点不是按照严格队列顺序分发结果;而是在一定时间间隔内请求素数的线程,在下一批线程到来之前都能够都获得它们的数据。

  现实世界中能够说明这种设计理念的类似事物是咖啡店情况。为了让顾客满意,在黄金时段咖啡订单是“批量处理”的。服务员取走客户订单后,将选择相同品种咖啡的用户分成一组,由咖啡机分批来制作咖啡,这样该组客户基本上可以同时享用上自己的咖啡;然后咖啡机再制作下一批客户的咖啡,按照如此过程进行重复。按照这种设计方案,在指定时间间隔内进入咖啡制作排队顺序的客户可以同时得到自己咖啡,当然可能不会严格安装下单的顺序来被提供咖啡。设想一下,如果咖啡机每次只准备一杯咖啡,其效率会有多么低下。

  回到产生和缓冲素数的任务中,通过一个具有类似二进制锁功能的门,这个简单的应用程序能够控制对结果缓冲器的访问:当这个门打开时,线程可以通过,当它被关闭时,线程必须排队等候直到其被重新打开。在某种意义上来说它比一个门锁更智能,它可以将等待线程进行批处理,可以确保同一批中的所有线程能够同时通过这个门;较早一批线程要比较晚一批线程的优先级高。门实现有其创新性的特点,即无论有多少线程在等待,结果总可以被写入到结果缓冲器中。如果以一个交通信号灯做比喻,那么线程门类可能更像一个铁路交叉口信号灯,火车总可以获得优先通行权。

  在com.javaworld.primefinder.ThreadGate中包含了门实现的过程,如列表3所示。

  列表3.门实现

1   package com.javaworld.primefinder;
2
3   class ThreadGate
4
5   {
6
7   private int waitPosition;
8
9   private int generationSize;
10
11   private int openAttempts;
12
13   private int activeSearchThreads;
14
15   private boolean open;
16
17   public ThreadGate(int searchThreadsCount)
18
19   public synchronized void await() throws InterruptedException
20
21   public synchronized void close() throws InterruptedException
22
23   public synchronized void open()
24
25   }

  为了对该类进行解析,首先看一下类的字段,共有以下五个:

  ·waitPosition:该字段记录了等待线程的创建索引。它可以确保公平,因为被允许通过的线程是自上一次门被打开后持续到达的线程集。每次门的open()方法被调用,它的值就会被增加,这样可以强制以后到达的线程进入一个新的批处理结合中。

  ·generationSize:记录目前一批等待门打开的线程集的大小。每次门的open()或close()方法被调用,它的值会被重新设定。

  ·openAttempts:一个整数计数器,表明自上次门被关闭以来所进行的打开门的尝试次数。实际上,它真正记录的数字是自上次门被关闭以来结果被增加到结果缓冲器中的次数。如果你仔细看一下com.javaworld.primefinder.ConcurrentPrimeNumberSource中的结果处理实现过程,就会非常清楚这一点,每次一个新的素数被增加到结果缓冲器中,都会调用一次open()方法。该字段的值不仅仅随着open()方法的调用而改变,而且会在每次调用close()方法后被归零。

  ·activeSearchThreads:如果没有探测到任何结果可用,读取线程无需等待。探测是否有结果可用的一个关键指示器就是是否有任何活跃的搜索线程。因此,线程门保持记录活跃搜索线程的数量是非常重要的。如果这个字段数值为0,那么await()方法将被阻止,读取线程就可以被允许自由通过该门。

  ·open:一个二进制值,表明门的状态是被打开还是关闭。

  尽管我们无需对门实现的方法进行更多说明,你还是应该花点时间来研究一下await()和open()方法。await()方法的代码如列表4所示。

  列表4.强制线程集等待的方法

1  public synchronized void await() throws InterruptedException
2
3  {
4
5  int generationIndex = waitPosition;
6
7  generationSize++;
8
9  while (!open && generationIndex ==waitPosition
10
11  && activeSearchThreads!=0)
12
13  wait();
14
15  }

  从以上代码中你可以看到,这个方法会获取目前等待位置字段的数值——索引,在门被关闭和索引未被提高的情况下,它将保持处于等待状态。这意味着,自上次门被打开以来到达的线程必须排队等候,直到上一批线程被处理完,并且门经历过另一次close()到open()的循环,或者搜索线程终止。

  列表5说明了open()方法的使用。需要明白的一点是,每次门被重新打开,线程组计数器的值将增加1。

  列表5.ThreadGate.open()方法实现

1  public synchronized void open()
2
3   {
4
5   openAttempts++;
6
7   if (generationSize<=openAttempts)
8
9   {
10
11   ++waitPosition; //reset the counter
12
13   open=true;
14
15   generationSize = 0;
16
17   notifyAll();
18
19   }
20
21   }

  你能够发现,在上面代码中使用了notifyAll()方法而并非notify()方法,目的是确保在一指定组中的所有线程都有机会被处理。

  在下一部分我们将概要介绍PrimeNumberSource接口的实现,该接口被用于实现访问搜索结果。

  PrimeNumberSource实现

  结果句柄被封装在com.javaworld.primefinder.ConcurrentPrimeNumberSource编译包中;它的内容架构如列表6所示。

  列表6.PrimeNumberSource实现

1 public class ConcurrentPrimeNumberSource
2
3   implements PrimeNumberSource
4
5   {
6
7   private ThreadGate barrier;
8
9   private List resultsBuffer;
10
11   private int resultsBufferAccessIndex;
12
13   protected ConcurrentPrimeNumberSource(
14
15   int searchThreadsCount)
16
17   public synchronized BigInteger nextPrime()
18
19   //delegates to barrier
20
21   protected void searchThreadCompleted()
22
23   //updates results
24
25   protected void addResultToBuffer(
26
27   BigInteger result)
28
29   }

  从以上代码中可以看到,这个类的实例中包含一个门引用,可以用来控制并发访问被字段resultBuffer封装的结果缓冲器。

  整数计数器resultsBufferAccessIndex被作为一个指针来使用,指向最后一次读取缓冲器的位置。需要指出的是,该字段不是原子型的,因此当nextPrime()方法被定义为同步方式,以防止破坏性更新。在列表7中,你还可以看到,只有在结果缓冲器被用完且门未被关闭的情况下,门的close()方法才会被调用。

  列表7.nextPrime()方法的实现

1   public synchronized BigInteger nextPrime()
2
3   {
4
5   BigInteger result;
6
7   int resultIndex = resultsBufferAccessIndex++;
8
9   try
10
11   {
12
13   if (!barrier.isClosed() &&
14
15   (resultIndex == resultsBuffer.size()))
16
17   barrier.close();
18
19   barrier.await();
20
21   result = resultsBuffer.get(resultIndex);
22
23   }
24
25   catch (InterruptedException exce)
26
27   {
28
29   ..........................................
30
31   }
32
33   catch (IndexOutOfBoundsException exce)
34
35   {
36
37   //search exhausted
38
39   result = null;
40
41   }
42
43   }

  接下来,我们将一起看一下PrimeNumberSearcher返回句柄实例的实现。

0
相关文章