技术开发 频道

用 JCSP 进行并发编程

        到目前为止,我只有两个独立的进程。下一步就是使用一个用作共享同步机制的公共通道把它们联系在一起,然后从中剔除一个进程。channel 接口是 JCSP 的 ChannelInput 和 ChannelOutput 接口的子接口,是读取和写入对象的公共接口。这个接口有许多可能的实现,就像下面描述的一样:

  类 One2OneChannel,顾名思义,实现了“单一写入器/单一阅读器”类型的通道。

  类 One2AnyChannel 实现了“单一写入器/多阅读器”对象通道。(注意,这不是广播机制,实际上,为了从通道读取对象,多个阅读器要进行相互竞争;在指定时间只有一个阅读器能使用通道和写入器进行沟通。)

  类 Any2OneChannel 实现了 “多写入器/单一阅读器”对象通道。同上面的情况一样,写入进程彼此竞争使用通道。在指定时间,只有阅读器和众多写入器中的一个在实际使用通道。

  类 Any2AnyChannel 实现了“多写入器/多阅读器”对象通道。读取进程彼此竞争使用的通道,写入进程也一样。在指定时间只有一个阅读器和一个写入器在实际使用通道。

  在清单 3 的示例中,我只有一个写入器进程和一个阅读器进程,所以 One2OneChannel 类就足够了。驱动器程序的示例代码如清单 4 所示:

  清单 4. 驱动器程序

1 import jcsp.lang.*;
2 public class DriverProgram
3 {
4     public static void main(String[] args)
5     {
6       One2OneChannel chan = new One2OneChannel();
7       new Parallel
8       (
9         new CSProcess[]
10         {
11           new SendEvenIntsProcess (chan),
12           new ReadEvenIntsProcess (chan)
13         }
14       ).run ();
15     }
16 }
17

  正如代码表示的,我首先实例化一个新的 One2OneChannel 对象,然后把它传递给 SendEvenIntsProcess 和 ReadEventIntsProcess 进程的构造函数。这样做是因为 One2OneChannel 同时实现了两个接口 —— ChannelInput 和 ChannelOutput。

  通道内部

  因为通道在 JCSP 中是重要的概念,所以在进行下一步之前,要确定您确实理解了它们的工作方式。正如我在前面提到的,通道在默认情况下是非缓冲的,但是也可以把它们变成缓冲的。实现方式是:通道本身并不处理缓冲特性,而是把这个责任委托给其他类,其他类必须实现叫作 ChannelDataStore 的接口。JCSP 为这个接口提供了多个内置实现,其中包括以下几个实现:

  ZeroBuffer,对应默认的非缓冲特性。

  Buffer,为与之相关联的通道提供了一个阻塞的先进先出的缓冲语义。

  InfiniteBuffer,也提供先进先出语义,但是如果缓冲为空,那么可以将阅读器阻塞。写入器永远不会阻塞,因为缓冲可以无限扩展,或者至少到了底层内存系统设置的限制为止。

  通道实战

  考虑一个实际使用的通道示例。当我创建了如清单 4 所示的 One2OneChannel 实例时,我把它内部的 ChannelDatasource 设置成 ZeroBuffer 的一个新实例。ZeroBuffer 只能保存一个对象(或整数)。它有一个内部状态变量,该变量的起始值为 EMPTY,只要放进一个对象,该变量的值就变成 FULL 了。

  当 SendEvenIntsProcess 进程在它的 out 通道上进行 write 操作时,会发生什么呢?One2OneChannel 类的 write() 方法是一个 synchronized() 方法。因此,发送方进程运行所在的线程(很快就会看到发送方进程和阅读器进程运行在独立的线程中)就会得到与这个通道实例相关联的监视器锁,并继续处理方法。在该方法中,业务的第一个顺序就是调用内部持有的 ZeroBuffer 实例的 put 方法,把对象(或者在这个示例中是整数)写到 ZeroBuffer 实例。这样就把缓冲的状态变成 FULL。这时,调用线程调用 wait,造成线程进入监视器的 等候集,后面进行的操作是释放监视器锁和阻塞线程。

  稍后,阅读器线程调用通道上的 read 操作(这也是一个同步的方法,所以阅读器线程在继续处理之前必须得到监视器锁)。因为内部缓冲的状态是 FULL,所以可用数据将被返回,并发出一个 notify()。notify() 唤醒发送方线程,然后发送方线程退出监视器等候集,并重新申请监视器锁。

  在反过来的场景中,如果阅读器线程调用通道上的 read 方法时,通道的内部缓冲状态是 EMPTY,那么阅读器线程就不得不 wait,在这种情况下,发送方线程要在把数据对象写入内部缓冲之后通知阅读器线程。

  Parallel 构造

  在 清单 4 中,您可能已经注意到驱动器程序引入了一个新类,叫作 Parallel。Parallel 类是由 JCSP 以预定义 CSProcess 的形式提供的,它接受一组独立的 CSProcess 实例,并“平行地”运行它们 (除了最后一个之外,所有进程都在独立的线程中运行;最后一个进程由 Parallel 对象在自己的控制线程中运行)。 Parallel 进程的 run 方法只有在所有的部件进程终止的时候才终止。所以 Parallel 进程是一种把多个独立进程组织起来的机制,它用通道(在驱动器程序中示例中)作为“线”把进程连在一起。

  了解 Parallel 构造的另一个途径是说:它可以把小的、简单的组件组合成更高层次的进程。实际上,Parallel 允许通过迭代把前面迭代中创建的组件与新的组件连接起来,创建出任意复杂程度的完全连接的进程网络。生成的进程网络可以像一个 CSProcess 对象一样公开和使用。

  Parallel 示例

  JCSP 库提供了一组即插即用的组件,不过仅仅是出于教育的目的,正好适合我的目的:进入其中几个的内部实现,可以很好的表现如何在 JCSP 中组合成网络化的并发进程。我用下面的示例进程来表现 JCSP 中 Parallel 构造的内部工作方式:

  PlusInt 在两个输入流中都接受整数,把整数加在一起,然后把结果输出到输出流。

  Delta2Int 平行地把到达它的输入流的每个整数广播到它的两个输出通道。

  PrefixInt 在它的整数输入流之前加上一个(用户配置的)整数。(也就是说,在这个进程的输出通道上有整数可用之前,第一个输出是预先配置的整数。后面的输出才是从输入流得到的整数。)

  IntegrateInt 是一个用 Parallel 构造组合了前三个进程的进程。它的功能是输出来自它的输入通道的整数的中间汇总值。

  IntegrateInt 类的 run 方法如清单 5 所示:

  清单 5. IntegrateInt 进程

1 import jcsp.lang.*;
2 public class IntegrateInt implements CSProcess
3 {
4   private final ChannelInputInt in;
5   private final ChannelOutputInt out;
6   public IntegrateInt (ChannelInputInt in, ChannelOutputInt out)
7   {
8     this.in = in;
9     this.out = out;
10   }
11   public void run()
12   {
13       One2OneChannelInt a = new One2OneChannelInt ();
14       One2OneChannelInt b = new One2OneChannelInt ();
15       One2OneChannelInt c = new One2OneChannelInt ();
16       new Parallel
17       (
18         new CSProcess[]
19         {
20           new PlusInt (in, c, a),
21           new Delta2Int (a, out, b),
22           new PrefixInt (0, b, c)
23         }
24       ).run ();
25   }
26 }
27

  注意,与 请单 4 中使用的通道相比,这个示例中使用了不同种类的通道。 IntegrateInt 类使用 ChannelInputInt 和 ChannelOutputInt 通道,顾名思义,可以用它们传递 int 类型的整数。相比之下,清单 4 中的驱动器程序使用了 ChannelInput 和 ChannelOutput,它们是 对象 通道,可以用来在通道中从发送方给接收方发送任意对象。出于这个原因,在清单 4 中传递 int 值之前,我不得不把 int 值包装成 Integer 对象。

  在清单 5 中,还需要注意观察什么呢?实际上,PrefixInt 进程的第一个输出是 0,它是通过 PlusInt 进程添加到输入通道到达的第一个整数上的。这个结果被写入通道 a,它构成了 Delta2Int 进程的输入通道。Delta2Int 进程把整数结果写到 out (进程的整体输出通道)并把它发送到 PrefixInt 进程。然后 PrefixInt 进程把整数作为输入发送给 PlusInt 进程,并添加到流中的第二个整数,如此类推。

  IntegrateInt 进程组成的图示如图 1 所示:

  图 1. IntegrateInt 进程

  网络中的网络

  IntegrateInt 进程就是这样由三个小进程组成,它本身可以当作一个复合进程来用。JCSP 库提供了一个叫作 SquaresInt 的进程,顾名思义,它生成一个整数流,整数流是自然数 (1、2、3、4,等等)的平方。这个进程的代码如清单 6 所示:

  清单 6. SquaresInt 进程

1 public class SquaresInt implements CSProcess
2 {
3   private final ChannelOutputInt out;
4   public SquaresInt (ChannelOutputInt out)
5   {
6     this.out = out;
7   }
8   public void run()
9   {
10       One2OneChannelInt a = new One2OneChannelInt ();
11       One2OneChannelInt b = new One2OneChannelInt ();
12       new Parallel
13       (
14         new CSProcess[]
15         {
16           new NumbersInt (a),
17           new IntegrateInt (a, b),
18           new PairsInt (b, out)
19         }
20       ).run ();
21   }
22 }
23

   我可以肯定您已经注意到清单 6 显示的两个新进程。NumbersInt 是一个内置进程,它只是在其输出通道中输出从 0 开始的自然数。PairsInt 进程则把连续的一对输入值相加并输出结果。这两个新进程和 IntegrateInt 一起构成了 SquaresInt 进程,如图 2 中的图表所示:

         图 2. SquaresInt 进程

  SquaresInt 的工作方式

  在进入下一部分之前,先来考虑 SquaresInt 进程的内部工作方式。在下面可以看到 SquaresInt 内部每个通道上的交通流向:

1 Channel "a":    [0, 1, 2, 3, 4, 5, 6, 7, 8, ...ad infinitum]
2 Channel "b":    [0, 1, 3, 6, 10, 15, 21, 28, 36, ...ad infinitum]
3 Channel "out":    [1, 4, 9, 16, 25, 36, 49, 64, 81 ...ad infinitum]
4

  您有没有看这样的模式:写入通道 a 的整数造成它们也被写入通道 b,因此也写到通道 out?在第一次“滴答”当中,NumbersInt 进程把整数 0 写入通道 a。IntegrateInt 进程也把整数 0 (是当前汇总的值)写入通道 b。PairsInt 进程在这次滴答中什么都不产生,因为它需要处理两个输入。在第二次滴答中,NumbersInt 进程在它的输出通道上写入整数 1。这造成 IntegrateInt 进程把汇总值修改成 0+1=1,所以把整数 1 写入通道 b。

  这时, PairsInt 有了两个整数输入可以处理 —— 整数 0 来自前一次滴答,整数 1 来自当前滴答。它把它们加在一起,并把输出 0+1=1 写到通道 out。请注意 1 是 1 的平方,所以我们现在可能是在正确的轨道上。继续把示例前进到下一个(第三个)滴答,NumbersInt 进程把把整数 2 写入通道 a。这使 IntegrateInt 进程把汇总值更新为 1 (前一个汇总值) + 2 (新值) = 3 并把这个整数写入通道 b。

  PairsInt 进程看到最后两个整数是什么?它们是 1 (在前一次滴答期间) 和 3 (在当前滴答期间)。所以,进程把这两个整数加在一起,并把 1+3=4 写入通道 out。您会注意到 4 是 2 的平方,这意味着 SquaresInt 工作起来就像它应当工作的那样。实际上,应当继续运行这个程序到任意数量的滴答,这样就可以验证写入通道 out 的整数总是在序列中的下一个整数的平方。我在下一节精确地这一操作。

  数学问题

  就在您纳闷的时候,我想解释一下生成平方值的数学基础。假设在 NumbersInt 进程已经把整数输出到某个 n-1 的时候,您偷看到了箱子内部。IntegrateInt 进程最后生成(而且通过共享通道 b 放到 PairsInt 进程)的中间汇总会是 [1+2+3+...+(n-1)] = (n-1)(n-2)/2。

  在下一次滴答期间,NumbersInt 会输出 n,这造成 IntegrateInt 进程的中间汇总增长为 (1+2+3+...+n) = n(n-1)/2。然后这个汇总会通过共享通道 b 传给 PairsInt 进程。 PairsInt 会把这两个数加在一起,生成 [(n-1)(n-2)/2 + n(n-1)/2] = [(n-2) + n](n-1)/2 = (2n-2)(n-1)/2 = (n-1)exp2。

  接下来,NumbersInt 进程会产生(n+1)。与之对应,IntegrateInt 进程会把 n(n+1)/2 送到 PairsInt 进程。然后 PairsInt 会生成 [n(n-1)/2 + n(n+1)/2] = nexp2。针对所有的 n 对这进行通用化,就会按照期望的那样产生全部平方。

0
相关文章