【IT168 技术文章】CSP 是对并发对象之间的复杂交互进行建模的范式。使用 CSP 的主要优势之一是:对程序每一阶段所包含对象的行为进行精确地指定和验证。CSP 的理论和实践对于并发设计和编程领域有深远的影响。它是 occam 这样的编程语言的基础,对其他语言(例如 Ada)的设计也有影响。
CSP 基础
CSP 的基本构造是进程和进程之间各种形式的通信。CSP 中的每件事都是进程,甚至(子)进程网络也是进程。但是,在进程之间没有直接交互 —— 所有交互都通过 CSP 的同步对象(例如各级进程订阅的通信通道和事件边界)实现的。
CSP 进程 与典型的 Java 对象不同:封装在进程组件中的数据 和 操纵数据的算法都是私有的。也就是说,进程没有对外可以调用的方法(除了启动进程必须调用的方法之外),算法只能在进程自己的控制线程内执行。如果把这种方法与 Java 语言中的方法调用进行对比,就可以立即看出 CSP 是如何消除显式锁定的需求的:
在 Java 语言中,在对象上调用的方法总是在调用者的线程中运行。但也有一个特殊的控制线程是通过系统中的多个对象进行工作的。对于大部分情况来说,对象没有自己的生命 —— 它们只是在运行线程调用它们的方法时才存在。因此,不同的执行线程可以在同一时间试图调用同一对象的同一方法,显然,这种情况在 CSP 中永远不会发生。
通信通道和进程网络
进程间通信最简单的机制就是通过通道读写数据。CSP 中基本的通道构造是同步的(synchronous) 和 点对点的(point-to-point);也就是说,它不包含内部缓冲,并且把一个进程连接到另外一个进程。从这个基本通道开始,有可能构建多个阅读器/写入器通道(即一对多、多对一和多对多)。
CSP 中的进程构成了复杂系统的基本构造块 —— 一个进程可以同一个或多个其他进程连接起来(全都设置成并行的),从而构成一个进程网络。可以把这个网络本身想像成一个进程,这个进程还可以递归地与其他进程、它们自己的网络或者其他类似东西组合在一起,形成一个为了最好地解决手上问题而设计的复杂排列的金字塔。
如果单独考虑,那么进程仅仅是一个独立的串行程序,它只与外部 I/O 设备交互。这个程序本身并不需要考虑在 I/O 通道另一端的进程是否存在或对方的性质。
CSP 理论已经在许多基于 Java 的框架中实现了,包括面向 Java 的通信顺序进程(Communicating Sequential Processes for Java,JCSP) 库。
JCSP 库
JCSP 库由英国坎特伯雷市肯特大学的 Peter Welch 教授和 Paul Austin 开发。对于本文余下的大部分内容来说,我会把重点放在 CSP 概念在 JCSP 中的实现方式上。因为 Java 语言没有提供对 CSP 构造的自带支持,所以 JCSP 库内部使用 Java 语言 实际 支持的、自带的并发构造,例如 synchronized、wait 和 notify。为了帮助您正确地理解 JCSP 的工作方式,我将从这些 Java 构造的角度对 JCSP 库中某些类的内部实现进行了解释。
JCSP 中的进程
在 JCSP 中,进程实际上就是实现了 CSProcess 接口的类。清单 1 显示了这个接口:
清单 1. CSProcess 接口
2 public interface CSProcess
3 {
4 public void run();
5 }
6
注意,CSProcess 接口看起来就像 Java 语言的 Runnable 接口,而且它也充当着类似的角色。虽然 JCSP 目前是用标准 Java API 实现的,但是并不需要这样,而且在未来可能真的不需要这样。出于这个原因,在 JCSP 中没有直接使用 Runnable 接口。
JCSP 定义了两个接口用于从通道读取对象和向通道写入对象。从通道读取对象的接口叫作 ChannelInput ,它只有一个方法,叫作 read()。如果进程调用一个实现 ChannelInput 接口的对象的这个方法,那么进程会阻塞,直到在通道另一端的进程实际向通道写入了一个对象。 一旦在通道上有对象可用,对象就被返回给调用进程。类似地,ChannelOutput 接口也只有一个方法,叫作 write(Object o)。如果进程调用 一个实现 ChannelOutput 接口的对象的这个方法,进程也会阻塞,直到通道接受对象。正如前面提到过的,最简单的通道类型没有缓冲,所以它在另一端(读取)的进程调用 read() 之前不会接受对象。
从现在开始,我将使用代码示例来演示这些和其他 JCSP 构造如何工作。在清单 2 中,可以看到一个非常简单的进程,它输出 1 到 100 之间的所有偶数:
清单 2. 生成 1 到 100 之间偶数的进程
2 public class SendEvenIntsProcess implements CSProcess
3 {
4 private ChannelOutput out;
5 public SendEvenIntsProcess(ChannelOutput out)
6 {
7 this.out = out;
8 }
9 public void run()
10 {
11 for (int i = 2; i <= 100; i = i + 2)
12 {
13 out.write (new Integer (i));
14 }
15 }
16 }
17
与每一个写进程对应,必须有一个读进程。如果不存在这样的进程,则会造成 SendEvenIntsProcess 在 ChannelOutput 对象的 out 进行第一次写操作之后立即无限期阻塞。清单 3 演示了一个简单的读进程,该进程与清单 2 介绍的写进程对应:
清单 3. 对应的消费者进程
2 public class ReadEvenIntsProcess implements CSProcess
3 {
4 private ChannelInput in;
5 public ReadEvenIntsProcess(ChannelInput in)
6 {
7 this.in = in;
8 }
9 public void run()
10 {
11 while (true)
12 {
13 Integer d = (Integer)in.read();
14 System.out.println("Read: " + d.intValue());
15 }
16 }
17 }
18