技术开发 频道

用 JCSP 进行并发编程

  【IT168 技术文章】CSP 是对并发对象之间的复杂交互进行建模的范式。使用 CSP 的主要优势之一是:对程序每一阶段所包含对象的行为进行精确地指定和验证。CSP 的理论和实践对于并发设计和编程领域有深远的影响。它是 occam 这样的编程语言的基础,对其他语言(例如 Ada)的设计也有影响。

  CSP 基础

  CSP 的基本构造是进程和进程之间各种形式的通信。CSP 中的每件事都是进程,甚至(子)进程网络也是进程。但是,在进程之间没有直接交互 —— 所有交互都通过 CSP 的同步对象(例如各级进程订阅的通信通道和事件边界)实现的。

  CSP 进程 与典型的 Java 对象不同:封装在进程组件中的数据 和 操纵数据的算法都是私有的。也就是说,进程没有对外可以调用的方法(除了启动进程必须调用的方法之外),算法只能在进程自己的控制线程内执行。如果把这种方法与 Java 语言中的方法调用进行对比,就可以立即看出 CSP 是如何消除显式锁定的需求的:

  在 Java 语言中,在对象上调用的方法总是在调用者的线程中运行。但也有一个特殊的控制线程是通过系统中的多个对象进行工作的。对于大部分情况来说,对象没有自己的生命 —— 它们只是在运行线程调用它们的方法时才存在。因此,不同的执行线程可以在同一时间试图调用同一对象的同一方法,显然,这种情况在 CSP 中永远不会发生。

  通信通道和进程网络

  进程间通信最简单的机制就是通过通道读写数据。CSP 中基本的通道构造是同步的(synchronous) 和 点对点的(point-to-point);也就是说,它不包含内部缓冲,并且把一个进程连接到另外一个进程。从这个基本通道开始,有可能构建多个阅读器/写入器通道(即一对多、多对一和多对多)。

  CSP 中的进程构成了复杂系统的基本构造块 —— 一个进程可以同一个或多个其他进程连接起来(全都设置成并行的),从而构成一个进程网络。可以把这个网络本身想像成一个进程,这个进程还可以递归地与其他进程、它们自己的网络或者其他类似东西组合在一起,形成一个为了最好地解决手上问题而设计的复杂排列的金字塔。

  如果单独考虑,那么进程仅仅是一个独立的串行程序,它只与外部 I/O 设备交互。这个程序本身并不需要考虑在 I/O 通道另一端的进程是否存在或对方的性质。

  CSP 理论已经在许多基于 Java 的框架中实现了,包括面向 Java 的通信顺序进程(Communicating Sequential Processes for Java,JCSP) 库。

  JCSP 库

  JCSP 库由英国坎特伯雷市肯特大学的 Peter Welch 教授和 Paul Austin 开发。对于本文余下的大部分内容来说,我会把重点放在 CSP 概念在 JCSP 中的实现方式上。因为 Java 语言没有提供对 CSP 构造的自带支持,所以 JCSP 库内部使用 Java 语言 实际 支持的、自带的并发构造,例如 synchronized、wait 和 notify。为了帮助您正确地理解 JCSP 的工作方式,我将从这些 Java 构造的角度对 JCSP 库中某些类的内部实现进行了解释。

  JCSP 中的进程

  在 JCSP 中,进程实际上就是实现了 CSProcess 接口的类。清单 1 显示了这个接口:

  清单 1. CSProcess 接口

1 package jcsp.lang;
2 public interface CSProcess
3 {
4     public void run();
5 }
6

  注意,CSProcess 接口看起来就像 Java 语言的 Runnable 接口,而且它也充当着类似的角色。虽然 JCSP 目前是用标准 Java API 实现的,但是并不需要这样,而且在未来可能真的不需要这样。出于这个原因,在 JCSP 中没有直接使用 Runnable 接口。

  JCSP 定义了两个接口用于从通道读取对象和向通道写入对象。从通道读取对象的接口叫作 ChannelInput ,它只有一个方法,叫作 read()。如果进程调用一个实现 ChannelInput 接口的对象的这个方法,那么进程会阻塞,直到在通道另一端的进程实际向通道写入了一个对象。 一旦在通道上有对象可用,对象就被返回给调用进程。类似地,ChannelOutput 接口也只有一个方法,叫作 write(Object o)。如果进程调用 一个实现 ChannelOutput 接口的对象的这个方法,进程也会阻塞,直到通道接受对象。正如前面提到过的,最简单的通道类型没有缓冲,所以它在另一端(读取)的进程调用 read() 之前不会接受对象。

  从现在开始,我将使用代码示例来演示这些和其他 JCSP 构造如何工作。在清单 2 中,可以看到一个非常简单的进程,它输出 1 到 100 之间的所有偶数:

  清单 2. 生成 1 到 100 之间偶数的进程

1 import jcsp.lang.*;
2 public class SendEvenIntsProcess implements CSProcess
3 {
4     private ChannelOutput out;
5     public SendEvenIntsProcess(ChannelOutput out)
6     {
7       this.out = out;
8     }
9     public void run()
10     {
11       for (int i = 2; i <= 100; i = i + 2)
12       {
13         out.write (new Integer (i));
14       }
15     }
16 }
17

  与每一个写进程对应,必须有一个读进程。如果不存在这样的进程,则会造成 SendEvenIntsProcess 在 ChannelOutput 对象的 out 进行第一次写操作之后立即无限期阻塞。清单 3 演示了一个简单的读进程,该进程与清单 2 介绍的写进程对应:

  清单 3. 对应的消费者进程

1 import jcsp.lang.*;
2 public class ReadEvenIntsProcess implements CSProcess
3 {
4     private ChannelInput in;
5     public ReadEvenIntsProcess(ChannelInput in)
6     {
7       this.in = in;
8     }
9     public void run()
10     {
11       while (true)
12       {
13         Integer d = (Integer)in.read();
14         System.out.println("Read: " + d.intValue());
15       }
16     }
17 }
18

        到目前为止,我只有两个独立的进程。下一步就是使用一个用作共享同步机制的公共通道把它们联系在一起,然后从中剔除一个进程。channel 接口是 JCSP 的 ChannelInput 和 ChannelOutput 接口的子接口,是读取和写入对象的公共接口。这个接口有许多可能的实现,就像下面描述的一样:

  类 One2OneChannel,顾名思义,实现了“单一写入器/单一阅读器”类型的通道。

  类 One2AnyChannel 实现了“单一写入器/多阅读器”对象通道。(注意,这不是广播机制,实际上,为了从通道读取对象,多个阅读器要进行相互竞争;在指定时间只有一个阅读器能使用通道和写入器进行沟通。)

  类 Any2OneChannel 实现了 “多写入器/单一阅读器”对象通道。同上面的情况一样,写入进程彼此竞争使用通道。在指定时间,只有阅读器和众多写入器中的一个在实际使用通道。

  类 Any2AnyChannel 实现了“多写入器/多阅读器”对象通道。读取进程彼此竞争使用的通道,写入进程也一样。在指定时间只有一个阅读器和一个写入器在实际使用通道。

  在清单 3 的示例中,我只有一个写入器进程和一个阅读器进程,所以 One2OneChannel 类就足够了。驱动器程序的示例代码如清单 4 所示:

  清单 4. 驱动器程序

1 import jcsp.lang.*;
2 public class DriverProgram
3 {
4     public static void main(String[] args)
5     {
6       One2OneChannel chan = new One2OneChannel();
7       new Parallel
8       (
9         new CSProcess[]
10         {
11           new SendEvenIntsProcess (chan),
12           new ReadEvenIntsProcess (chan)
13         }
14       ).run ();
15     }
16 }
17

  正如代码表示的,我首先实例化一个新的 One2OneChannel 对象,然后把它传递给 SendEvenIntsProcess 和 ReadEventIntsProcess 进程的构造函数。这样做是因为 One2OneChannel 同时实现了两个接口 —— ChannelInput 和 ChannelOutput。

  通道内部

  因为通道在 JCSP 中是重要的概念,所以在进行下一步之前,要确定您确实理解了它们的工作方式。正如我在前面提到的,通道在默认情况下是非缓冲的,但是也可以把它们变成缓冲的。实现方式是:通道本身并不处理缓冲特性,而是把这个责任委托给其他类,其他类必须实现叫作 ChannelDataStore 的接口。JCSP 为这个接口提供了多个内置实现,其中包括以下几个实现:

  ZeroBuffer,对应默认的非缓冲特性。

  Buffer,为与之相关联的通道提供了一个阻塞的先进先出的缓冲语义。

  InfiniteBuffer,也提供先进先出语义,但是如果缓冲为空,那么可以将阅读器阻塞。写入器永远不会阻塞,因为缓冲可以无限扩展,或者至少到了底层内存系统设置的限制为止。

  通道实战

  考虑一个实际使用的通道示例。当我创建了如清单 4 所示的 One2OneChannel 实例时,我把它内部的 ChannelDatasource 设置成 ZeroBuffer 的一个新实例。ZeroBuffer 只能保存一个对象(或整数)。它有一个内部状态变量,该变量的起始值为 EMPTY,只要放进一个对象,该变量的值就变成 FULL 了。

  当 SendEvenIntsProcess 进程在它的 out 通道上进行 write 操作时,会发生什么呢?One2OneChannel 类的 write() 方法是一个 synchronized() 方法。因此,发送方进程运行所在的线程(很快就会看到发送方进程和阅读器进程运行在独立的线程中)就会得到与这个通道实例相关联的监视器锁,并继续处理方法。在该方法中,业务的第一个顺序就是调用内部持有的 ZeroBuffer 实例的 put 方法,把对象(或者在这个示例中是整数)写到 ZeroBuffer 实例。这样就把缓冲的状态变成 FULL。这时,调用线程调用 wait,造成线程进入监视器的 等候集,后面进行的操作是释放监视器锁和阻塞线程。

  稍后,阅读器线程调用通道上的 read 操作(这也是一个同步的方法,所以阅读器线程在继续处理之前必须得到监视器锁)。因为内部缓冲的状态是 FULL,所以可用数据将被返回,并发出一个 notify()。notify() 唤醒发送方线程,然后发送方线程退出监视器等候集,并重新申请监视器锁。

  在反过来的场景中,如果阅读器线程调用通道上的 read 方法时,通道的内部缓冲状态是 EMPTY,那么阅读器线程就不得不 wait,在这种情况下,发送方线程要在把数据对象写入内部缓冲之后通知阅读器线程。

  Parallel 构造

  在 清单 4 中,您可能已经注意到驱动器程序引入了一个新类,叫作 Parallel。Parallel 类是由 JCSP 以预定义 CSProcess 的形式提供的,它接受一组独立的 CSProcess 实例,并“平行地”运行它们 (除了最后一个之外,所有进程都在独立的线程中运行;最后一个进程由 Parallel 对象在自己的控制线程中运行)。 Parallel 进程的 run 方法只有在所有的部件进程终止的时候才终止。所以 Parallel 进程是一种把多个独立进程组织起来的机制,它用通道(在驱动器程序中示例中)作为“线”把进程连在一起。

  了解 Parallel 构造的另一个途径是说:它可以把小的、简单的组件组合成更高层次的进程。实际上,Parallel 允许通过迭代把前面迭代中创建的组件与新的组件连接起来,创建出任意复杂程度的完全连接的进程网络。生成的进程网络可以像一个 CSProcess 对象一样公开和使用。

  Parallel 示例

  JCSP 库提供了一组即插即用的组件,不过仅仅是出于教育的目的,正好适合我的目的:进入其中几个的内部实现,可以很好的表现如何在 JCSP 中组合成网络化的并发进程。我用下面的示例进程来表现 JCSP 中 Parallel 构造的内部工作方式:

  PlusInt 在两个输入流中都接受整数,把整数加在一起,然后把结果输出到输出流。

  Delta2Int 平行地把到达它的输入流的每个整数广播到它的两个输出通道。

  PrefixInt 在它的整数输入流之前加上一个(用户配置的)整数。(也就是说,在这个进程的输出通道上有整数可用之前,第一个输出是预先配置的整数。后面的输出才是从输入流得到的整数。)

  IntegrateInt 是一个用 Parallel 构造组合了前三个进程的进程。它的功能是输出来自它的输入通道的整数的中间汇总值。

  IntegrateInt 类的 run 方法如清单 5 所示:

  清单 5. IntegrateInt 进程

1 import jcsp.lang.*;
2 public class IntegrateInt implements CSProcess
3 {
4   private final ChannelInputInt in;
5   private final ChannelOutputInt out;
6   public IntegrateInt (ChannelInputInt in, ChannelOutputInt out)
7   {
8     this.in = in;
9     this.out = out;
10   }
11   public void run()
12   {
13       One2OneChannelInt a = new One2OneChannelInt ();
14       One2OneChannelInt b = new One2OneChannelInt ();
15       One2OneChannelInt c = new One2OneChannelInt ();
16       new Parallel
17       (
18         new CSProcess[]
19         {
20           new PlusInt (in, c, a),
21           new Delta2Int (a, out, b),
22           new PrefixInt (0, b, c)
23         }
24       ).run ();
25   }
26 }
27

  注意,与 请单 4 中使用的通道相比,这个示例中使用了不同种类的通道。 IntegrateInt 类使用 ChannelInputInt 和 ChannelOutputInt 通道,顾名思义,可以用它们传递 int 类型的整数。相比之下,清单 4 中的驱动器程序使用了 ChannelInput 和 ChannelOutput,它们是 对象 通道,可以用来在通道中从发送方给接收方发送任意对象。出于这个原因,在清单 4 中传递 int 值之前,我不得不把 int 值包装成 Integer 对象。

  在清单 5 中,还需要注意观察什么呢?实际上,PrefixInt 进程的第一个输出是 0,它是通过 PlusInt 进程添加到输入通道到达的第一个整数上的。这个结果被写入通道 a,它构成了 Delta2Int 进程的输入通道。Delta2Int 进程把整数结果写到 out (进程的整体输出通道)并把它发送到 PrefixInt 进程。然后 PrefixInt 进程把整数作为输入发送给 PlusInt 进程,并添加到流中的第二个整数,如此类推。

  IntegrateInt 进程组成的图示如图 1 所示:

  图 1. IntegrateInt 进程

  网络中的网络

  IntegrateInt 进程就是这样由三个小进程组成,它本身可以当作一个复合进程来用。JCSP 库提供了一个叫作 SquaresInt 的进程,顾名思义,它生成一个整数流,整数流是自然数 (1、2、3、4,等等)的平方。这个进程的代码如清单 6 所示:

  清单 6. SquaresInt 进程

1 public class SquaresInt implements CSProcess
2 {
3   private final ChannelOutputInt out;
4   public SquaresInt (ChannelOutputInt out)
5   {
6     this.out = out;
7   }
8   public void run()
9   {
10       One2OneChannelInt a = new One2OneChannelInt ();
11       One2OneChannelInt b = new One2OneChannelInt ();
12       new Parallel
13       (
14         new CSProcess[]
15         {
16           new NumbersInt (a),
17           new IntegrateInt (a, b),
18           new PairsInt (b, out)
19         }
20       ).run ();
21   }
22 }
23

   我可以肯定您已经注意到清单 6 显示的两个新进程。NumbersInt 是一个内置进程,它只是在其输出通道中输出从 0 开始的自然数。PairsInt 进程则把连续的一对输入值相加并输出结果。这两个新进程和 IntegrateInt 一起构成了 SquaresInt 进程,如图 2 中的图表所示:

         图 2. SquaresInt 进程

  SquaresInt 的工作方式

  在进入下一部分之前,先来考虑 SquaresInt 进程的内部工作方式。在下面可以看到 SquaresInt 内部每个通道上的交通流向:

1 Channel "a":    [0, 1, 2, 3, 4, 5, 6, 7, 8, ...ad infinitum]
2 Channel "b":    [0, 1, 3, 6, 10, 15, 21, 28, 36, ...ad infinitum]
3 Channel "out":    [1, 4, 9, 16, 25, 36, 49, 64, 81 ...ad infinitum]
4

  您有没有看这样的模式:写入通道 a 的整数造成它们也被写入通道 b,因此也写到通道 out?在第一次“滴答”当中,NumbersInt 进程把整数 0 写入通道 a。IntegrateInt 进程也把整数 0 (是当前汇总的值)写入通道 b。PairsInt 进程在这次滴答中什么都不产生,因为它需要处理两个输入。在第二次滴答中,NumbersInt 进程在它的输出通道上写入整数 1。这造成 IntegrateInt 进程把汇总值修改成 0+1=1,所以把整数 1 写入通道 b。

  这时, PairsInt 有了两个整数输入可以处理 —— 整数 0 来自前一次滴答,整数 1 来自当前滴答。它把它们加在一起,并把输出 0+1=1 写到通道 out。请注意 1 是 1 的平方,所以我们现在可能是在正确的轨道上。继续把示例前进到下一个(第三个)滴答,NumbersInt 进程把把整数 2 写入通道 a。这使 IntegrateInt 进程把汇总值更新为 1 (前一个汇总值) + 2 (新值) = 3 并把这个整数写入通道 b。

  PairsInt 进程看到最后两个整数是什么?它们是 1 (在前一次滴答期间) 和 3 (在当前滴答期间)。所以,进程把这两个整数加在一起,并把 1+3=4 写入通道 out。您会注意到 4 是 2 的平方,这意味着 SquaresInt 工作起来就像它应当工作的那样。实际上,应当继续运行这个程序到任意数量的滴答,这样就可以验证写入通道 out 的整数总是在序列中的下一个整数的平方。我在下一节精确地这一操作。

  数学问题

  就在您纳闷的时候,我想解释一下生成平方值的数学基础。假设在 NumbersInt 进程已经把整数输出到某个 n-1 的时候,您偷看到了箱子内部。IntegrateInt 进程最后生成(而且通过共享通道 b 放到 PairsInt 进程)的中间汇总会是 [1+2+3+...+(n-1)] = (n-1)(n-2)/2。

  在下一次滴答期间,NumbersInt 会输出 n,这造成 IntegrateInt 进程的中间汇总增长为 (1+2+3+...+n) = n(n-1)/2。然后这个汇总会通过共享通道 b 传给 PairsInt 进程。 PairsInt 会把这两个数加在一起,生成 [(n-1)(n-2)/2 + n(n-1)/2] = [(n-2) + n](n-1)/2 = (2n-2)(n-1)/2 = (n-1)exp2。

  接下来,NumbersInt 进程会产生(n+1)。与之对应,IntegrateInt 进程会把 n(n+1)/2 送到 PairsInt 进程。然后 PairsInt 会生成 [n(n-1)/2 + n(n+1)/2] = nexp2。针对所有的 n 对这进行通用化,就会按照期望的那样产生全部平方。

  JCSP 中的确定性

  以上示例演示了 CSP 的复合语言 —— 即如何用 Parallel 构造把细致的无状态的组件组成分层的网络。所有这类相互通信的平行进程的分层网络的卖点就是:它们是完全确定的。在这个上下文环境中 确定 意味着什么呢?它意味着这类分层网络的输出只取决于提供给它的输入,而不用考虑网络运行的运行时环境(JVM)的特性。也就是说,进程网络独立于 JVM 的调度策略,也独立于它所分布的多处理器。(我在这里假设的是个单一节点,但是,没有什么固有的东西会防碍把这个讨论引入物理上分布在多个节点上而在进程之间通过线路进行通信的进程网络上。)

  确定性会是工具包中的强大工具,因为它可以让您清晰地推断出程序的行为,不必担心运行时环境对它可能产生的影响。同时,确定性不是并发性编程惟一可能的技术或必需的技术。因为下一个(也是最后一个)实例将显示,非确定性在 JSP 中也是同样强大的实用概念。

  JCSP 中的非确定性

  非确定是许多真实的应用程序的因子,在这些应用程序,可见的输出是某个功能或者事件发生的顺序。换句话说,当结果取决于设计的调度,而 不是 取决于事件时,就是非确定性在并发应用程序中发挥作用的地方了。您将会看到,JCSP 显式地处理这类问题。

  例如,假设一个进程对于 下面要做什么 有许多备选项,每个备选项都有一个与之关联的 警卫(guard),警卫必须处于“就绪(ready)”状态,这样才能让备选项得以考虑。进程可以从可用的备选项(也就是就绪的)中选择一个选项;选择本身可能基于不同的策略,可能是任意选择、最高优先级选择或者公平选择。

  事件选择策略

  在 JCSP 的特定上下文中,提供了一个叫作 Guard 的抽象类,竞争进程选择的事件必须继续它。进程本身使用另一个预先提供的类,叫作 Alternative,这些警卫对象必须以对象数组的形式传递给它的构造函数。Alternative 类为三种事件选择策略提供了方法。

  Alternative 类的 select() 方法对应着 任意选择 策略。select() 方法调用一直受阻塞,直到一个或多个警卫就绪为止(请记住,所有竞争的警卫对于 Alternative 类来说都是已知的)。其中一个就绪的警卫被随机选中,它的索引(在传递进去的警卫数组中)也被返回。

  priSelect() 方法对应着 最高优先级 策略。也就是说,如果不止一个警卫就绪,则返回索引值最低的那个;这里面的假设是:在数组中传递给 Alternative 构造函数的警卫已经按照优先级顺序进行降序排序了。

  最后,方法 fairSelect 是在多个就绪警卫中进行 公平 选择:在这个方法的连续调用中,在其他就绪而且可用的警卫没被选中之前,不会有某个就绪的警卫被选中两次。所以,如果警卫的总数是 n,那么在最坏的情况下,就绪的警卫没获得选中的次数不会连续超过 n 次。

  如果进程不关心如何选择多个就绪警卫,那么任意选择策略最合适;如果进程想保证没有资源耗尽或者最差服务次数,例如在实时系统中,那么任意选择就不太适用了。在前面的情况下,推荐使用 fairSelect 方法,而在后面的情况下,用 priSelect() 方法最好。

  警卫类型

  大体来说,JCSP 提供了三类警卫:

  通道警卫 总是对应着进程等候从中读取数据的通道。也就是说,只有在通道另一端的进程已经输出数据,而该数据还没有被进程输入的时候,警卫才就绪。

  计时器警卫 总是和设置(绝对)超时对应。也就是说,如果超时,则计时器警卫就会就绪。

  跳过警卫 总是就绪。

  JCSP 中的通道警卫 可以是以下类型:AltingChannelInput/AltingChannelInputInt,只要在对应的通道中有了对象或整数数据,则这两个通道将就绪;或者 AltingChannelAccept,如果在通道中出现不可接受的“CALL”(这一点后面有更多介绍),则通道就会就绪。这些都是抽象类,它们拥有 One2One 和 Any2One 类型通道形式的具体实现。JCSP 中的计时器 警卫属于 CSTimer 类型,而 跳过警卫 则是以 Skip 类的形式提供的。

  运作中的警卫

  我用一个简单的示例,演示如何用 JCSP 警卫实现并发应用程序中的非确定性,借此总结对 JCSP 的介绍。假设您必须开发一个乘法(或者 倍增) 设计,读取的整数在输出通道以固定速率到达,可以用某个乘数乘以它们,然后把它们写入其输出通道。设备可以用一个初始乘数开始,但是这个乘数每 5 秒钟自动加倍。

  这个故事中介绍的方法是这样的:系统中存在着第二个控制器进程,它能通过专用通道向设备发送 suspend operation 信号。这使设备中止自身,并把乘数的当前值通过第二个通道发送给控制器。

  在中止的时候,设备只应当允许全部进入的整数不经变化地通过它的输出通道。控制器进程 —— 可能在用设备发送给它的乘数做了某些计算中的一种 —— 通过专用通道把一个新乘数发送回设备。(请注意:只要设备处于 中止 状态,就会被迫接受这个乘数。)

  更新过的乘数插入到设备,并充当设备的唤醒信号。设备现在继续执行它的放大操作,用新更新的乘数乘上输入的整数。计时器这时也重置,所以新的乘数也在 5 秒之后被设置成加倍数值,如此类推。

  图 3 中的图表说明了这个放大设备:

  图 3. 放大设备

  ScaleInt 进程

  放大设备的源代码在清单 7 中显示。这个示例中的非确定性是因为:output 的值基于 in 和 inject 流的值(同时还基于这些值到达的顺序)。

  清单 7. ScaleInt 进程

  1 import jcsp.lang.*;
  2 import jcsp.plugNplay.ints.*;
  3 public class ScaleInt implements CSProcess
  4 {
  5   private int s;
  6   private final ChannelOutputInt out, factor;
  7   private final AltingChannelInputInt in, suspend, inject;
  8   public ScaleInt (int s, AltingChannelInputInt suspend, AltingChannelInputInt in,
  9     ChannelOutputInt factor, AltingChannelInputInt inject, ChannelOutputInt out)
10   {
11     this.s = s;
12     this.in = in;
13     this.out = out;
14     this.suspend = suspend;
15     this.factor = factor;
16     this.inject = inject;
17   }
18   public void run()
19   {
20     final long second = 1000;               // Java timings are in millisecs
21     final long doubleInterval = 5*second;
22     final CSTimer timer = new CSTimer ();
23     final Alternative normalAlt = new Alternative (new Guard[] {suspend, timer, in});
24     
25     final int NORMAL_SUSPEND=0, NORMAL_TIMER=1, NORMAL_IN = 2;
26     final Alternative suspendedAlt = new Alternative (new Guard[] {inject, in});
27     
28     final int SUSPENDED_INJECT=0, SUSPENDED_IN = 1;
29     
30     long timeout = timer.read () + doubleInterval;
31     timer.setAlarm (timeout);
32     while (true)
33     {
34       switch (normalAlt.priSelect ())
35       {
36         case NORMAL_SUSPEND:
37           suspend.read ();              // don't care what's sent
38           factor.write (s);             // reply with the crucial information
39           boolean suspended = true;
40           while (suspended)
41           {
42             switch (suspendedAlt.priSelect ())
43             {
44               case SUSPENDED_INJECT:    // this is the resume signal as well
45                 s = inject.read ();     // get the new scaling factor
46                 suspended = false;      // and resume normal operations
47                 timeout = timer.read () + doubleInterval;
48                 timer.setAlarm (timeout);
49                 break;
50               case SUSPENDED_IN:
51                 out.write (in.read ());
52                 break;
53             }
54           }
55           break;
56         case NORMAL_TIMER:
57           timeout = timer.read () + doubleInterval;
58           timer.setAlarm (timeout);
59           s = s*2;
60           break;
61         case NORMAL_IN:
62           out.write (s * in.read ());
63           break;
64       }
65     }
66   }
67 }
68 import jcsp.lang.*;
69 import jcsp.plugNplay.ints.*;
70 public class Controller implements CSProcess
71 {
72   private long interval;
73   private final ChannelOutputInt suspend, inject;
74   private final ChannelInputInt factor;
75   public Controller (long interval, ChannelOutputInt suspend, ChannelOutputInt inject,
76     ChannelInputInt factor)
77   {
78     this.interval = interval;
79     this.suspend = suspend;
80     this.inject = inject;
81     this.factor = factor;
82   }
83   public void run ()
84   {
85     int currFactor = 0;
86     final CSTimer tim = new CSTimer ();
87     long timeout = tim.read ();
88     while (true)
89     {
90       timeout += interval;
91       tim.after (timeout);        // blocks until timeout reached
92       suspend.write (0);          // suspend signal (value irrelevant)
93       currFactor = factor.read ();            
94       currFactor ++;              // compute new factor
95       inject.write (currFactor);  // inject new factor
96     }
97   }
98 }
99 import jcsp.lang.*;
100 import jcsp.plugNplay.ints.*;
101 public class DriverProgram
102 {
103   public static void main(String args[])
104   {
105     try
106     {
107       final One2OneChannelInt temp = new One2OneChannelInt ();
108       final One2OneChannelInt in = new One2OneChannelInt ();
109       final One2OneChannelInt suspend = new One2OneChannelInt ();
110       final One2OneChannelInt factor = new One2OneChannelInt ();
111       final One2OneChannelInt inject = new One2OneChannelInt ();
112       final One2OneChannelInt out = new One2OneChannelInt ();
113         
114       new Parallel
115       (
116         new CSProcess[]
117         {
118           new NumbersInt (temp),
119           new FixedDelayInt (1000, temp, in),
120           new ScaleInt (2, suspend, in, factor, inject, out),
121           new Controller (6000, suspend, inject, factor),
122           new PrinterInt (out, "--> ", "\n")
123         }
124       ).run ();
125     }
126     catch (Exception e)
127     {
128         e.printStackTrace();
129     }
130   }
131 }
132

  上面的类 ScaleInt 对应着放大设备。正如前面提到的,这个类必须实现 CSProcess 接口。因为上面的代码演示了许多概念,所以我将逐个讨论它的不同方面。

  两个备选项

  在 ScaleInt 类中,我们感兴趣的第一个方法是 run()。在 run() 方法中,要做的第一件事是创建 Alternative 类的两个实例,每个都有一组不同的 Guard 对象。

  第一个 Alternative 实例由变量 normalAlt 表示,它是为设备正常操作的时候使用的。与之关联的警卫列表如下所示:

  suspend 是 One2OneChannelInt 的实例。正如前面提到过的,One2OneChannelInt 实现了单一阅读器/写入器整数通道,通道是零缓冲、完全同步的。这是控制器进程向设备发送中止信号的通道。

  timer 是 CSTimer 的实例,它被设置成每 5 秒触发一次,每次触发时,设备会把乘数的当前值加倍。

  in 是 One2OneChannelInt 的实例,设备通过它接收输入的整数。

  第二个 Alternative 实例由 suspendedAlt 表示,它是供设备在已经被 Controller 中止的情况下使用的。与之关联的警卫如下如示:

  inject 是 One2OneChannelInt 的实例,由控制器进程使用,用来向设备发送新的乘数(也充当唤醒信号)。

  in 是前面已经看到的 One2OneChannelInt 相同的实例;设备通过这个通道接收输入整数。

  两个 Alternative 实例被用在不同的情况下等候警卫就绪,列表的顺序是隐式的优先级顺序。例如,如果 normalAlt 的 suspend 和 timer 警卫恰好同时就绪,那么和 suspend 警卫对应的事件首先被处理。

  警卫就绪

  下一个我们感兴趣的是在每个警卫就绪的时候,发生了什么。我首先研究 normalSelect,假设设备操作正常(也就是说,还没有被中止):

  如果控制器向设备发送了 suspend 信号,那么这个事件以最高优先级得到处理。作为响应,设备把乘数的当前值通过叫作 factor 的通道发送给控制器。然后将叫作 suspended 的内部标志设置为 true,然后进入循环,等候别人发送信号,以继续其操作。在循环内部,设备调用第二个 Alternative 实例上的 priSelect() 方法 (suspendedAlt)。

  这个 Alternative 实例包含两个警卫:第一个表示控制器向设备发送乘数的事件,第二个表示整数到达设备的输入通道。在前一种情况下,设备用从 inject 通道读取的值来更新乘数(保存在变量 s 中),并将 suspended 标志设置回 false (这样就保证了在下一次迭代时可以退出内部循环),用当前计时器的值作为基值重新设置闹钟。在后一种情况下,设备只是从它的输入通道读取整数,并把整数写入输出通道(也就是说,在设备中止时,不许使用乘数的要求)。

  具有下一个优先级得到处理的事件是闹钟到期事件。这造成设备把当前乘数加倍,用当前计时器的值作为基值重新设置闹钟,然后返回,继续等候下一个事件。

  第三个可能是事件是从设备的输入通道接收整数的事件。与之对应的是,设备读取整数,用当前乘数 s 乘上它,并将结果写入设备的输出通道。

  Controller 类

  下一个要考虑的类是 Controller 类。请记住,控制器类的任务是周期性地(大概是基于复杂的计算)向设备进程插入乘数值。在这个示例中,周期基础只是一个计时器,该计时器按照规律的、配置好的间隔到期。每次到期时,控制器就在 suspend 上写一个 0(也就是说,它将中止设备),并在叫作 factor 的输入通道上读取当前的乘数。

  这时,控制器只是把这个值加一,然后通过一对一通道 (叫作 inject,专门用于为这个目的) 将它插回设备。这就通知设备继续工作的方式,这时计时器被重新设置成在适当间隔后到期。

  DriverProgram 类

  最后剩下的类是驱动器类 DriverProgram。这个类创建适当的通道和 CSProcess 实例数组。它用 JCSP 提供的类 NumbersInt 生成一系列自然数,通过temp 通道传递给另一个叫作 FixedDelayInt 的内置类。顾名思义,FixedDelayInt 将来自其输入通道的值在固定延迟(在示例代码中,该延迟是 1 秒)之后发送到它的输出通道。

  这个自然数的流每隔一秒就被发送到 ScaleInt 进程的 in 通道。ScaleInt 进程的 out 通道的输出传递给 JCSP 提供的 PrinterInt 进程,然后该进程再接着把整数值输出到 System.out。

  结束语

  在介绍适用于 Java 程序员的 CSP 的文章中,我解释并演示了并发编程中的 CSP 理论。然后是对 CSP 构造的概述,其中介绍了最流行的基于 Java 的 CSP 库 —— JCSP。由于 Java 语言没有对 CSP 构造提供自带的支持,所以 JCSP 库内部采 Java 支持 的自带构造,例如 synchronized()、wait() 和 notify()。为了帮助您正确地理解 JCSP 是如何工作的,我从 Java 构造的角度解释了一些 JCSP 类库的内部实现,然后在几个实际示例中演示了它们的用法。

0
相关文章