【IT168 专稿】线程门(thread gate)模式是控制线程并发的一个有效工具,但是很多开发者对它并不熟悉。如同交通信号灯可以让汽车在十字路口有序通行一样,线程门可以根据给定条件阻止或允许线程执行。Obi Ezechukwu在本文中为我们介绍了线程门的概念,然后通过一个一个多线程素数生成器告诉我们如何使用它。
多线程和并发编程曾经是编程高手们才涉足的领域,但是随着多核处理器的出现,以及应用程序需求的更复杂,还有javax.util.concurrent包的出现,这种情况已经发生了变化。现在,企业应用开发者需要了解Java语言中不同的并发机制和构思。当面临需要非教科书式、高度创新的并发构思才能解决的问题时,这种需求更显迫切。这种情形下,仅仅理解Java语言和标准SDK的并发机制已经不能满足需要;你必须能够使用这些工具来编写程序,实现定制化的并发控制。
在本篇文章中,我们将了解一个人们较少讨论的并发模式,通常被称为线程门。如同现实世界中门的概念一样,门实例可以打开或关闭,从而实现允许或阻止线程执行。它基于某些判定式的真值来实现这个操作。
下面我将大体介绍一下基于通信流模型的线程门,然后解释一下如何建立示例应用(一个多线程素数生成器)的开发环境。文章的剩余部分将通过实例方式让你了解线程门模式。
线程门概述
与线程门非常类似的一个例子是运行在很多十字路口的交通信号灯系统。当红灯亮的时候,汽车必须停车等候,直到信号改变;当绿灯亮的时候,汽车可以自由通行。 交通信号灯被设计用来实现交通的交叉进行,在不需要交叉通行的地方它是没有用武之地的。对于程序员来说,你可以把交通信号灯看作控制器,它可以让双向交通共享使用同一小部分道路,如果没有它,交通线路交叉的地方将是一个非常危险的地方。
同样,线程门通常最适合的情况是:当一个线程集处于激活状态时,其它线程不能被执行。换句话说,相互竞争的线程集依赖于某些真值判定式的值,判定式的每一个不同的值只触发一个线程集,而强制其它处于挂起状态。注意此处的重点是针对一系列或一组线程而并非单个线程。实际上,我们关注的重点是多个线程共享访问一个底层资源的情形,而且这些线程根据对资源所执行的操作,被划分成不同的集合。
很好的一个例子是生产者—消费者(producer-consumer)流程,某些线程负责制造另一组线程所使用的数据;共享资源最可能是不同线程集所使用的切换机制(数据总线);而决定线程处理的真值判定式是数据量。如果数据作为一个与生产过程类似的流程部分被进行入队操作,然后该数据被不同过程使用或进行出队操作,一个内存请求队列有时候可能适合此类模式。
生产者-消费者模型是描述线程门概念使用的一个很好例子。对于绝大多数程序员来说,通过一个示例程序或许能够更轻松理解一个概念。在本篇文章中,问题还应该是如何轻松的实现任务分解和并行操作,因为重点是对其创建一个多线程解决方案。本篇文章的示例应用将实现上述目标。
多线程素数生成器
本篇文章的示例应用程序将解决一个经典老问题,即在指定范围的数字(例如从1到100万之间)找出所有可能的素数。确切的说,我们的任务就是实现一个软件组件,利用一个方法获得指定范围内的所有素数。假定该组件的客户端要求该方法返回一个线程安全的句柄,一旦其被调用的时候可以访问结果,同时完成在后台或以异步方式找出素数的任务。还有一个条件是该操作必须提供一个阻塞方法来允许素数尽快的被访问或返回,这样该客户端就不用必须在访问结果前等待搜索完成。为了简化这个任务,对句柄返回结果的顺序没有添加限制条件。
在继续阅读本篇文章前,你应该下载本篇文章的代码文件(http://www.javaworld.com/javaworld/jw-03-2009/threadgates-src.zip),然后在自己喜欢使用的IDE中创建一个开发项目。该文件包含十个源文件,其结构如下所示:
2
3 -main\
4
5 -java\
6
7 -com\
8
9 -javaworld\
10
11 -primefinder\
12
13 PrimeNumberSearcher.java
14
15 PrimeNumberSource.java
16
17 PrimeUtil.java
18
19 ThreadGate.java
20
21 PrimeSearchThread.java
22
23 PartitionInfo.java
24
25 PrimeNumberReader.java
26
27 GatedPrimeNumberSearcher.java
28
29 ConcurrentPrimeNumberSource.java
30
31 -src\
32
33 -test\
34
35 -java\
36
37 -com\
38
39 -javaworld\
40
41 -primefinder\
42
43 PrimeFinderTest.java
PrimeFinderTest.java是一个JUnit 4测试包;为了测试验证该示例程序,你需要使用JUnit工具。为了遵循Maven 2命名约定,该应用程序的源文件在src/main/java下,而验证该解决方案的JUnit 4测试包则在src/test/java下。下面我们将对这些文件夹的关键内容进行介绍。
发现素数
接口com.javaworld.primefinder.PrimeNumberSearcher定义了组件必须遵循的契约。该契约的功能有方法findPrimeNumbers()来指定,如列表1所示。
列表1.搜索契约
2 BigInteger upperBound);
com.javaworld.primefinder.PrimeNumberSource是结果句柄必须遵循的接口。它定义了一个方法BigInteger nextPrime(),当被调用时,它应该返回搜索结果缓冲器中的下一个元素。如果结果缓冲器已经被用完,必须返回null值来表示没有结果可用。正如此前所提到的,该方法的执行必须是线程安全的。
需要指出的是,该示例应用程序任务的重点是找出指定搜索范围内的素数;因此我们可以使用来自java.math.BigInteger类的nextProbablePrime()方法。你可以在效用类com.javaworld.primefinder.PrimeUtil的一个静态方法中封装调用该方法,示例程序如列表2所示。
列表2.在指定范围内发现首个素数的效用方法
2
3 {
4
5 BigInteger result;
6
7 BigInteger startPos = BigInteger.valueOf(lowerBound);
8
9 BigInteger nextProbablePrime;
10
11 if (startPos.isProbablePrime(.....)) // some reasonable accuracy
12
13 nextProbablePrime = startPos;
14
15 else nextProbablePrime = startPos.nextProbablePrime();
16
17 if (nextProbablePrime.longValue() >= upperBound)
18
19 result = null;
20
21 else result = nextProbablePrime;
22
23 return result;
24
25 }
现在已经定义了建立该解决方案的公共接口和工具,你现在可以对该任务进行细化了。我们将首先定义线程门的实现,它指定控制访问结果缓冲器的方式。
线程门实现
线程门的目标是确保多个阅读器可以访问素数缓冲器,而且无需阻挡搜索线程对其增加结果。
一个阅读器线程应该能够无需等待就可以访问任何可用结果;但是如果没有结果可用,它必须排队等待,直到至少有一个结果可用。该示例应用使用了一个公平设计原则,因此结果大致按照线程到达的顺序来被提供。这个示例应用的重点不是按照严格队列顺序分发结果;而是在一定时间间隔内请求素数的线程,在下一批线程到来之前都能够都获得它们的数据。
现实世界中能够说明这种设计理念的类似事物是咖啡店情况。为了让顾客满意,在黄金时段咖啡订单是“批量处理”的。服务员取走客户订单后,将选择相同品种咖啡的用户分成一组,由咖啡机分批来制作咖啡,这样该组客户基本上可以同时享用上自己的咖啡;然后咖啡机再制作下一批客户的咖啡,按照如此过程进行重复。按照这种设计方案,在指定时间间隔内进入咖啡制作排队顺序的客户可以同时得到自己咖啡,当然可能不会严格安装下单的顺序来被提供咖啡。设想一下,如果咖啡机每次只准备一杯咖啡,其效率会有多么低下。
回到产生和缓冲素数的任务中,通过一个具有类似二进制锁功能的门,这个简单的应用程序能够控制对结果缓冲器的访问:当这个门打开时,线程可以通过,当它被关闭时,线程必须排队等候直到其被重新打开。在某种意义上来说它比一个门锁更智能,它可以将等待线程进行批处理,可以确保同一批中的所有线程能够同时通过这个门;较早一批线程要比较晚一批线程的优先级高。门实现有其创新性的特点,即无论有多少线程在等待,结果总可以被写入到结果缓冲器中。如果以一个交通信号灯做比喻,那么线程门类可能更像一个铁路交叉口信号灯,火车总可以获得优先通行权。
在com.javaworld.primefinder.ThreadGate中包含了门实现的过程,如列表3所示。
列表3.门实现
2
3 class ThreadGate
4
5 {
6
7 private int waitPosition;
8
9 private int generationSize;
10
11 private int openAttempts;
12
13 private int activeSearchThreads;
14
15 private boolean open;
16
17 public ThreadGate(int searchThreadsCount)
18
19 public synchronized void await() throws InterruptedException
20
21 public synchronized void close() throws InterruptedException
22
23 public synchronized void open()
24
25 }
为了对该类进行解析,首先看一下类的字段,共有以下五个:
·waitPosition:该字段记录了等待线程的创建索引。它可以确保公平,因为被允许通过的线程是自上一次门被打开后持续到达的线程集。每次门的open()方法被调用,它的值就会被增加,这样可以强制以后到达的线程进入一个新的批处理结合中。
·generationSize:记录目前一批等待门打开的线程集的大小。每次门的open()或close()方法被调用,它的值会被重新设定。
·openAttempts:一个整数计数器,表明自上次门被关闭以来所进行的打开门的尝试次数。实际上,它真正记录的数字是自上次门被关闭以来结果被增加到结果缓冲器中的次数。如果你仔细看一下com.javaworld.primefinder.ConcurrentPrimeNumberSource中的结果处理实现过程,就会非常清楚这一点,每次一个新的素数被增加到结果缓冲器中,都会调用一次open()方法。该字段的值不仅仅随着open()方法的调用而改变,而且会在每次调用close()方法后被归零。
·activeSearchThreads:如果没有探测到任何结果可用,读取线程无需等待。探测是否有结果可用的一个关键指示器就是是否有任何活跃的搜索线程。因此,线程门保持记录活跃搜索线程的数量是非常重要的。如果这个字段数值为0,那么await()方法将被阻止,读取线程就可以被允许自由通过该门。
·open:一个二进制值,表明门的状态是被打开还是关闭。
尽管我们无需对门实现的方法进行更多说明,你还是应该花点时间来研究一下await()和open()方法。await()方法的代码如列表4所示。
列表4.强制线程集等待的方法
2
3 {
4
5 int generationIndex = waitPosition;
6
7 generationSize++;
8
9 while (!open && generationIndex ==waitPosition
10
11 && activeSearchThreads!=0)
12
13 wait();
14
15 }
从以上代码中你可以看到,这个方法会获取目前等待位置字段的数值——索引,在门被关闭和索引未被提高的情况下,它将保持处于等待状态。这意味着,自上次门被打开以来到达的线程必须排队等候,直到上一批线程被处理完,并且门经历过另一次close()到open()的循环,或者搜索线程终止。
列表5说明了open()方法的使用。需要明白的一点是,每次门被重新打开,线程组计数器的值将增加1。
列表5.ThreadGate.open()方法实现
2
3 {
4
5 openAttempts++;
6
7 if (generationSize<=openAttempts)
8
9 {
10
11 ++waitPosition; //reset the counter
12
13 open=true;
14
15 generationSize = 0;
16
17 notifyAll();
18
19 }
20
21 }
你能够发现,在上面代码中使用了notifyAll()方法而并非notify()方法,目的是确保在一指定组中的所有线程都有机会被处理。
在下一部分我们将概要介绍PrimeNumberSource接口的实现,该接口被用于实现访问搜索结果。
PrimeNumberSource实现
结果句柄被封装在com.javaworld.primefinder.ConcurrentPrimeNumberSource编译包中;它的内容架构如列表6所示。
列表6.PrimeNumberSource实现
2
3 implements PrimeNumberSource
4
5 {
6
7 private ThreadGate barrier;
8
9 private List resultsBuffer;
10
11 private int resultsBufferAccessIndex;
12
13 protected ConcurrentPrimeNumberSource(
14
15 int searchThreadsCount)
16
17 public synchronized BigInteger nextPrime()
18
19 //delegates to barrier
20
21 protected void searchThreadCompleted()
22
23 //updates results
24
25 protected void addResultToBuffer(
26
27 BigInteger result)
28
29 }
从以上代码中可以看到,这个类的实例中包含一个门引用,可以用来控制并发访问被字段resultBuffer封装的结果缓冲器。
整数计数器resultsBufferAccessIndex被作为一个指针来使用,指向最后一次读取缓冲器的位置。需要指出的是,该字段不是原子型的,因此当nextPrime()方法被定义为同步方式,以防止破坏性更新。在列表7中,你还可以看到,只有在结果缓冲器被用完且门未被关闭的情况下,门的close()方法才会被调用。
列表7.nextPrime()方法的实现
2
3 {
4
5 BigInteger result;
6
7 int resultIndex = resultsBufferAccessIndex++;
8
9 try
10
11 {
12
13 if (!barrier.isClosed() &&
14
15 (resultIndex == resultsBuffer.size()))
16
17 barrier.close();
18
19 barrier.await();
20
21 result = resultsBuffer.get(resultIndex);
22
23 }
24
25 catch (InterruptedException exce)
26
27 {
28
29 ..........................................
30
31 }
32
33 catch (IndexOutOfBoundsException exce)
34
35 {
36
37 //search exhausted
38
39 result = null;
40
41 }
42
43 }
接下来,我们将一起看一下PrimeNumberSearcher返回句柄实例的实现。
搜索执行器
划分搜索空间和分配连续块给不同线程的任务由类com.javaworld.primefinder.GatedPrimeNumberSearcher来完成;这是契约com.javaworld.primefinder.PrimeNumberSearcher的一个实现。
在实例化后,这种类型的对象判断系统处理器的数量,并根据它来将搜索空间划分为近似大小的块,然后将它们分配给不同的任务。块信息被封装在com.javaworld.primefinder.PartitionInfo类中,如列表8所示。
列表8.搜索空间分区信息的封装
2
3 {
4
5 PartitionInfo(int numberOfBuckets, long bucketSize)
6
7 public int getNumberOfBuckets().....
8
9 public long getBucketSize() ........
10
11 }
这个类的实例由列表9中的方法来创建,该操作并定义在com.javaworld.primefinder.GatedPrimeNumberSearcher任务/搜索线程实现中。
列表9.搜索空间块划分的方法
2
3 long upperBound)
4
5 {
6
7 PartitionInfo result;
8
9 int proposedBucketCount = numberOfProcessors;
10
11 long bucketSize = (upperBound-lowerBound) /
12
13 proposedBucketCount;
14
15 result = new PartitionInfo(proposedBucketCount,
16
17 bucketSize);
18
19 return result;
20
21 }
在列表10中你可以发现,PartitionInfo实例被用来创建一个或多个搜索线程,或者更准确的说,创建一个或多个可运行的任务/目标,由com.javaworld.primefinder类来实现。除了搜索范围信息外,每一个线程还具有一个指向结果句柄的指针,以允许它将可用的搜索结果写到该句柄的内部结果缓冲器中。
列表10.契约findPrimeNumbers()的实现
2
3 BigInteger aUpperBound)
4
5 {
6
7 if (aUpperBound.longValue()<=aLowerBound.longValue())
8
9 throw new IllegalArgumentException("Upperbound must be
10
11 greater than lowerbound");
12
13 long lowerBound = aLowerBound.longValue(),
14
15 upperBound = aUpperBound.longValue();
16
17 final ConcurrentPrimeNumberSource result;
18
19 PartitionInfo partitionInfo =getPartitionInfo(lowerBound,
20
21 upperBound);
22
23 result = new ConcurrentPrimeNumberSource(
24
25 partitionInfo.getNumberOfBuckets());
26
27 Thread searchThread;
28
29 long shiftingLowerBound = lowerBound,
30
31 shiftingUpperBound = partitionInfo.getBucketSize();
32
33 while (shiftingUpperBound <= upperBound)
34
35 {
36
37 searchThread =
38
39 new Thread(new PrimeSearchThread(
40
41 BigInteger.valueOf(shiftingLowerBound),
42
43 BigInteger.valueOf(shiftingUpperBound),
44
45 result));
46
47 searchThread.start();
48
49 shiftingLowerBound = shiftingUpperBound;
50
51 shiftingUpperBound+= partitionInfo.getBucketSize();
52
53 }
54
55 return result;
56
57 }
在调试测试程序之前,还有一个应用程序类需要提及:消费者/读取器(consumer/reader)线程实现类com.javaworld.primefinder.PrimeNumberReader,其核心方法如列表11所示。
列表11.素数消费者
2
3 public void run()
4
5 {
6
7 BigInteger nextVal=null;
8
9 do
10
11 {
12
13 nextVal = source.nextPrime();
14
15 if (nextVal!=null)
16
17 System.out.println(nextVal + ", ");
18
19 else break;
20
21 }while (true);
22
23 }
在JUnit 4测试工具中,这个类的几个实例用来模拟多个客户端线程在线程门中排队的效果。
测试工具包
在本篇文章源文件的test文件夹下有一个名为com.javaworld.primefinder.PrimeFinderTest的JUnit 4测试包。这个测试类会创建一个GatedPimeNumberSearcher实例,来搜索从1到适当上限的素数;它还创建几个阅读器线程,来打印出系统输出的结果数据。你需要根据你的计算机的处理能力来调整搜索空间的上限值,然后运行测试来观察线程门的执行。
结论
本篇文章介绍了线程门的概念,并演示了如何实现和使用一个基本的门类。另外还提供了源示例和源代码来让读者进一步理解这一概念。如果想了解本文中所提到的多线程问题,你可以参阅一下并发专家 Brian Goetz所编写的《Java并发实践(http://www.javaworld.com/javaworld/jw-09-2006/jw-0904-threads.html)》一书和其它相关书籍。