技术开发 频道

用 JCSP 进行并发编程

  JCSP 中的确定性

  以上示例演示了 CSP 的复合语言 —— 即如何用 Parallel 构造把细致的无状态的组件组成分层的网络。所有这类相互通信的平行进程的分层网络的卖点就是:它们是完全确定的。在这个上下文环境中 确定 意味着什么呢?它意味着这类分层网络的输出只取决于提供给它的输入,而不用考虑网络运行的运行时环境(JVM)的特性。也就是说,进程网络独立于 JVM 的调度策略,也独立于它所分布的多处理器。(我在这里假设的是个单一节点,但是,没有什么固有的东西会防碍把这个讨论引入物理上分布在多个节点上而在进程之间通过线路进行通信的进程网络上。)

  确定性会是工具包中的强大工具,因为它可以让您清晰地推断出程序的行为,不必担心运行时环境对它可能产生的影响。同时,确定性不是并发性编程惟一可能的技术或必需的技术。因为下一个(也是最后一个)实例将显示,非确定性在 JSP 中也是同样强大的实用概念。

  JCSP 中的非确定性

  非确定是许多真实的应用程序的因子,在这些应用程序,可见的输出是某个功能或者事件发生的顺序。换句话说,当结果取决于设计的调度,而 不是 取决于事件时,就是非确定性在并发应用程序中发挥作用的地方了。您将会看到,JCSP 显式地处理这类问题。

  例如,假设一个进程对于 下面要做什么 有许多备选项,每个备选项都有一个与之关联的 警卫(guard),警卫必须处于“就绪(ready)”状态,这样才能让备选项得以考虑。进程可以从可用的备选项(也就是就绪的)中选择一个选项;选择本身可能基于不同的策略,可能是任意选择、最高优先级选择或者公平选择。

  事件选择策略

  在 JCSP 的特定上下文中,提供了一个叫作 Guard 的抽象类,竞争进程选择的事件必须继续它。进程本身使用另一个预先提供的类,叫作 Alternative,这些警卫对象必须以对象数组的形式传递给它的构造函数。Alternative 类为三种事件选择策略提供了方法。

  Alternative 类的 select() 方法对应着 任意选择 策略。select() 方法调用一直受阻塞,直到一个或多个警卫就绪为止(请记住,所有竞争的警卫对于 Alternative 类来说都是已知的)。其中一个就绪的警卫被随机选中,它的索引(在传递进去的警卫数组中)也被返回。

  priSelect() 方法对应着 最高优先级 策略。也就是说,如果不止一个警卫就绪,则返回索引值最低的那个;这里面的假设是:在数组中传递给 Alternative 构造函数的警卫已经按照优先级顺序进行降序排序了。

  最后,方法 fairSelect 是在多个就绪警卫中进行 公平 选择:在这个方法的连续调用中,在其他就绪而且可用的警卫没被选中之前,不会有某个就绪的警卫被选中两次。所以,如果警卫的总数是 n,那么在最坏的情况下,就绪的警卫没获得选中的次数不会连续超过 n 次。

  如果进程不关心如何选择多个就绪警卫,那么任意选择策略最合适;如果进程想保证没有资源耗尽或者最差服务次数,例如在实时系统中,那么任意选择就不太适用了。在前面的情况下,推荐使用 fairSelect 方法,而在后面的情况下,用 priSelect() 方法最好。

  警卫类型

  大体来说,JCSP 提供了三类警卫:

  通道警卫 总是对应着进程等候从中读取数据的通道。也就是说,只有在通道另一端的进程已经输出数据,而该数据还没有被进程输入的时候,警卫才就绪。

  计时器警卫 总是和设置(绝对)超时对应。也就是说,如果超时,则计时器警卫就会就绪。

  跳过警卫 总是就绪。

  JCSP 中的通道警卫 可以是以下类型:AltingChannelInput/AltingChannelInputInt,只要在对应的通道中有了对象或整数数据,则这两个通道将就绪;或者 AltingChannelAccept,如果在通道中出现不可接受的“CALL”(这一点后面有更多介绍),则通道就会就绪。这些都是抽象类,它们拥有 One2One 和 Any2One 类型通道形式的具体实现。JCSP 中的计时器 警卫属于 CSTimer 类型,而 跳过警卫 则是以 Skip 类的形式提供的。

  运作中的警卫

  我用一个简单的示例,演示如何用 JCSP 警卫实现并发应用程序中的非确定性,借此总结对 JCSP 的介绍。假设您必须开发一个乘法(或者 倍增) 设计,读取的整数在输出通道以固定速率到达,可以用某个乘数乘以它们,然后把它们写入其输出通道。设备可以用一个初始乘数开始,但是这个乘数每 5 秒钟自动加倍。

  这个故事中介绍的方法是这样的:系统中存在着第二个控制器进程,它能通过专用通道向设备发送 suspend operation 信号。这使设备中止自身,并把乘数的当前值通过第二个通道发送给控制器。

  在中止的时候,设备只应当允许全部进入的整数不经变化地通过它的输出通道。控制器进程 —— 可能在用设备发送给它的乘数做了某些计算中的一种 —— 通过专用通道把一个新乘数发送回设备。(请注意:只要设备处于 中止 状态,就会被迫接受这个乘数。)

  更新过的乘数插入到设备,并充当设备的唤醒信号。设备现在继续执行它的放大操作,用新更新的乘数乘上输入的整数。计时器这时也重置,所以新的乘数也在 5 秒之后被设置成加倍数值,如此类推。

  图 3 中的图表说明了这个放大设备:

  图 3. 放大设备

  ScaleInt 进程

  放大设备的源代码在清单 7 中显示。这个示例中的非确定性是因为:output 的值基于 in 和 inject 流的值(同时还基于这些值到达的顺序)。

  清单 7. ScaleInt 进程

  1 import jcsp.lang.*;
  2 import jcsp.plugNplay.ints.*;
  3 public class ScaleInt implements CSProcess
  4 {
  5   private int s;
  6   private final ChannelOutputInt out, factor;
  7   private final AltingChannelInputInt in, suspend, inject;
  8   public ScaleInt (int s, AltingChannelInputInt suspend, AltingChannelInputInt in,
  9     ChannelOutputInt factor, AltingChannelInputInt inject, ChannelOutputInt out)
10   {
11     this.s = s;
12     this.in = in;
13     this.out = out;
14     this.suspend = suspend;
15     this.factor = factor;
16     this.inject = inject;
17   }
18   public void run()
19   {
20     final long second = 1000;               // Java timings are in millisecs
21     final long doubleInterval = 5*second;
22     final CSTimer timer = new CSTimer ();
23     final Alternative normalAlt = new Alternative (new Guard[] {suspend, timer, in});
24     
25     final int NORMAL_SUSPEND=0, NORMAL_TIMER=1, NORMAL_IN = 2;
26     final Alternative suspendedAlt = new Alternative (new Guard[] {inject, in});
27     
28     final int SUSPENDED_INJECT=0, SUSPENDED_IN = 1;
29     
30     long timeout = timer.read () + doubleInterval;
31     timer.setAlarm (timeout);
32     while (true)
33     {
34       switch (normalAlt.priSelect ())
35       {
36         case NORMAL_SUSPEND:
37           suspend.read ();              // don't care what's sent
38           factor.write (s);             // reply with the crucial information
39           boolean suspended = true;
40           while (suspended)
41           {
42             switch (suspendedAlt.priSelect ())
43             {
44               case SUSPENDED_INJECT:    // this is the resume signal as well
45                 s = inject.read ();     // get the new scaling factor
46                 suspended = false;      // and resume normal operations
47                 timeout = timer.read () + doubleInterval;
48                 timer.setAlarm (timeout);
49                 break;
50               case SUSPENDED_IN:
51                 out.write (in.read ());
52                 break;
53             }
54           }
55           break;
56         case NORMAL_TIMER:
57           timeout = timer.read () + doubleInterval;
58           timer.setAlarm (timeout);
59           s = s*2;
60           break;
61         case NORMAL_IN:
62           out.write (s * in.read ());
63           break;
64       }
65     }
66   }
67 }
68 import jcsp.lang.*;
69 import jcsp.plugNplay.ints.*;
70 public class Controller implements CSProcess
71 {
72   private long interval;
73   private final ChannelOutputInt suspend, inject;
74   private final ChannelInputInt factor;
75   public Controller (long interval, ChannelOutputInt suspend, ChannelOutputInt inject,
76     ChannelInputInt factor)
77   {
78     this.interval = interval;
79     this.suspend = suspend;
80     this.inject = inject;
81     this.factor = factor;
82   }
83   public void run ()
84   {
85     int currFactor = 0;
86     final CSTimer tim = new CSTimer ();
87     long timeout = tim.read ();
88     while (true)
89     {
90       timeout += interval;
91       tim.after (timeout);        // blocks until timeout reached
92       suspend.write (0);          // suspend signal (value irrelevant)
93       currFactor = factor.read ();            
94       currFactor ++;              // compute new factor
95       inject.write (currFactor);  // inject new factor
96     }
97   }
98 }
99 import jcsp.lang.*;
100 import jcsp.plugNplay.ints.*;
101 public class DriverProgram
102 {
103   public static void main(String args[])
104   {
105     try
106     {
107       final One2OneChannelInt temp = new One2OneChannelInt ();
108       final One2OneChannelInt in = new One2OneChannelInt ();
109       final One2OneChannelInt suspend = new One2OneChannelInt ();
110       final One2OneChannelInt factor = new One2OneChannelInt ();
111       final One2OneChannelInt inject = new One2OneChannelInt ();
112       final One2OneChannelInt out = new One2OneChannelInt ();
113         
114       new Parallel
115       (
116         new CSProcess[]
117         {
118           new NumbersInt (temp),
119           new FixedDelayInt (1000, temp, in),
120           new ScaleInt (2, suspend, in, factor, inject, out),
121           new Controller (6000, suspend, inject, factor),
122           new PrinterInt (out, "--> ", "\n")
123         }
124       ).run ();
125     }
126     catch (Exception e)
127     {
128         e.printStackTrace();
129     }
130   }
131 }
132

  上面的类 ScaleInt 对应着放大设备。正如前面提到的,这个类必须实现 CSProcess 接口。因为上面的代码演示了许多概念,所以我将逐个讨论它的不同方面。

  两个备选项

  在 ScaleInt 类中,我们感兴趣的第一个方法是 run()。在 run() 方法中,要做的第一件事是创建 Alternative 类的两个实例,每个都有一组不同的 Guard 对象。

  第一个 Alternative 实例由变量 normalAlt 表示,它是为设备正常操作的时候使用的。与之关联的警卫列表如下所示:

  suspend 是 One2OneChannelInt 的实例。正如前面提到过的,One2OneChannelInt 实现了单一阅读器/写入器整数通道,通道是零缓冲、完全同步的。这是控制器进程向设备发送中止信号的通道。

  timer 是 CSTimer 的实例,它被设置成每 5 秒触发一次,每次触发时,设备会把乘数的当前值加倍。

  in 是 One2OneChannelInt 的实例,设备通过它接收输入的整数。

  第二个 Alternative 实例由 suspendedAlt 表示,它是供设备在已经被 Controller 中止的情况下使用的。与之关联的警卫如下如示:

  inject 是 One2OneChannelInt 的实例,由控制器进程使用,用来向设备发送新的乘数(也充当唤醒信号)。

  in 是前面已经看到的 One2OneChannelInt 相同的实例;设备通过这个通道接收输入整数。

  两个 Alternative 实例被用在不同的情况下等候警卫就绪,列表的顺序是隐式的优先级顺序。例如,如果 normalAlt 的 suspend 和 timer 警卫恰好同时就绪,那么和 suspend 警卫对应的事件首先被处理。

  警卫就绪

  下一个我们感兴趣的是在每个警卫就绪的时候,发生了什么。我首先研究 normalSelect,假设设备操作正常(也就是说,还没有被中止):

  如果控制器向设备发送了 suspend 信号,那么这个事件以最高优先级得到处理。作为响应,设备把乘数的当前值通过叫作 factor 的通道发送给控制器。然后将叫作 suspended 的内部标志设置为 true,然后进入循环,等候别人发送信号,以继续其操作。在循环内部,设备调用第二个 Alternative 实例上的 priSelect() 方法 (suspendedAlt)。

  这个 Alternative 实例包含两个警卫:第一个表示控制器向设备发送乘数的事件,第二个表示整数到达设备的输入通道。在前一种情况下,设备用从 inject 通道读取的值来更新乘数(保存在变量 s 中),并将 suspended 标志设置回 false (这样就保证了在下一次迭代时可以退出内部循环),用当前计时器的值作为基值重新设置闹钟。在后一种情况下,设备只是从它的输入通道读取整数,并把整数写入输出通道(也就是说,在设备中止时,不许使用乘数的要求)。

  具有下一个优先级得到处理的事件是闹钟到期事件。这造成设备把当前乘数加倍,用当前计时器的值作为基值重新设置闹钟,然后返回,继续等候下一个事件。

  第三个可能是事件是从设备的输入通道接收整数的事件。与之对应的是,设备读取整数,用当前乘数 s 乘上它,并将结果写入设备的输出通道。

  Controller 类

  下一个要考虑的类是 Controller 类。请记住,控制器类的任务是周期性地(大概是基于复杂的计算)向设备进程插入乘数值。在这个示例中,周期基础只是一个计时器,该计时器按照规律的、配置好的间隔到期。每次到期时,控制器就在 suspend 上写一个 0(也就是说,它将中止设备),并在叫作 factor 的输入通道上读取当前的乘数。

  这时,控制器只是把这个值加一,然后通过一对一通道 (叫作 inject,专门用于为这个目的) 将它插回设备。这就通知设备继续工作的方式,这时计时器被重新设置成在适当间隔后到期。

  DriverProgram 类

  最后剩下的类是驱动器类 DriverProgram。这个类创建适当的通道和 CSProcess 实例数组。它用 JCSP 提供的类 NumbersInt 生成一系列自然数,通过temp 通道传递给另一个叫作 FixedDelayInt 的内置类。顾名思义,FixedDelayInt 将来自其输入通道的值在固定延迟(在示例代码中,该延迟是 1 秒)之后发送到它的输出通道。

  这个自然数的流每隔一秒就被发送到 ScaleInt 进程的 in 通道。ScaleInt 进程的 out 通道的输出传递给 JCSP 提供的 PrinterInt 进程,然后该进程再接着把整数值输出到 System.out。

  结束语

  在介绍适用于 Java 程序员的 CSP 的文章中,我解释并演示了并发编程中的 CSP 理论。然后是对 CSP 构造的概述,其中介绍了最流行的基于 Java 的 CSP 库 —— JCSP。由于 Java 语言没有对 CSP 构造提供自带的支持,所以 JCSP 库内部采 Java 支持 的自带构造,例如 synchronized()、wait() 和 notify()。为了帮助您正确地理解 JCSP 是如何工作的,我从 Java 构造的角度解释了一些 JCSP 类库的内部实现,然后在几个实际示例中演示了它们的用法。

0
相关文章