技术开发 频道

利用WAS CE v2.1 创建基于JMS 的应用(1)

   【IT168技术文档】JMS 也是 Java EE 5 规范的一部分,主要用于对应用程序的消息传递及管理。目前 JMS 规范的最新版本为 1.1,规范定义了一组 API, JMS 的实现则留由不同厂商提供。JMS 的实现被称为 JMS Provider。  

JMS 是一种面向消息的中间件(Message Oriented Middleware,MOM)。与 CORBA、DCOM、RMI 等通信方式相比,JMS 具有很多新特点: 

1.消息的发送和接收是异步的,发送者无需等待。

2.可以一对多通信:对于一个消息可以有多个接收者。

3.消息发送者和接收者的生命周期无需相同:发送消息的时,接收者不一定运行;接收消息的时,发送者也不一定运行。

虽然其他通信方式也会具有 JMS 中的特点,但是 JMS 将三种特点集于一身,无疑为应用程序的开发带来了巨大的灵活性。因为 JMS 的良好特性,出现了各种 JMS Provider。其中,既有开源社区的实现,如 ActiveMQ、OpenMQ,也有商业实现,如 Websphere MQ。 

JMS 的消息模型  

JMS 的巨大灵活性与其消息模型有密切关系。JMS 中有两种消息模型:发布/订阅消息模型及点对点消息模型。  

1. 发布/订阅消息模型  

在发布/订阅模型中,消息的发送者与接收者通过主题 (Topic) 来进行通信。发送者在给定的 Topic 下发布消息;接收者订阅该 Topic 后,即可接收 Topic 下所发布的消息。发布/订阅模型的特点是:一个发布者可以将消息发送给多个接收者,甚至可以有多个发布者使用同一个 Topic 发布消息;所有订阅了 Topic 的接收者都可以接收到消息。  

根据发布/订阅消息模型的特点,可以在“一对多”的情形下,使用该消息模型。即当有一个发布者需要向多个接收者发送消息时,可以使用发布/订阅消息模型。这种场景与 RSS 的消息订阅十分相似,若干接收者订阅同一个“Feed”;该“Feed”下的任一消息都会被所有订阅者接收。  

2. 点对点消息模型  

与发布/订阅模型相比,点对点的消息模型较为简单。消息发送者与接收者使用同一个消息队列 (Queue),发送者将消息发送至消息队列中,接收者直接去队列中取出消息。点对点消息模型的特点是:消息队列中的消息,只能被一个接收者取走;如果没有接收者接收消息,消息会继续存放在消息队列中。  

根据点对点消息模型的特点,可以在“一对一”的情形下,使用该消息模型。即当有一个发送者需要一个特定接收者发送消息时,可以使用点对点消息模型。这种场景类似于发送者向接收者发送了一封“邮件”,只有实现指定的接收者,才能在自己的“邮箱”里看见“邮件”。

WAS CE 中的 JMS Provider  

前一篇文章简要介绍了通过一种 Java EE 5 认证的应用服务器-WAS CE (Websphere Application Server Community Edition)。WAS CE 以 ActiveMQ 作为自身的 JMS Provider。通过 ActiveMQ 可以方便的开发 JMS 的应用。ActiveMQ 是 Apache 社区的开源项目,具有以下特性:  

1.自带符合 JCA(Java Connector Architecture)1.5 规范的 Resource Adapter,可以使用在多种应用服务器上,如 Apache Geronimo, Redhat JBoss, GlassFish 及 Oracle WebLogic。

2.使用 JDBC 支持消息的快速持久化。

3.可以用于 Cluster、C/S 构架及 P2P(Peer-to-Peer) 的消息传递。

4.可以作为独立的 JMS 平台(无需应用服务器),适合单元测试。

作为 Apache 社区的优异项目,ActiveMQ 还在不断发展。其良好的特性也为 WAS CE 增色不少。本文余下部分,将扩展前一篇文章中的示例,以阐释在 WAS CE 2.1 中 JMS 应用的开发流程。  

示例简介  

在前一篇文章的示例中,作者假设了一个商店,应用程序只实现了用户登录和购买商品的功能。本文继续扩展这个示例,假设用户购买商品之后,商店还需要进行以下工作:  

1.通知会计 (Accountant) 更新账目并且告知库房 (Storage) 准备出货

2.库房取货后通知运输队 (Transporter) 送货

显然,上述工作需要传递多个消息,如图 1 所示。  

图 1. 示例流程图  

 示例中所需要的消息传递功能,可以使用 JMS 完成。

   创建 JMS 资源  

从图 1 可以看出,用户购买商品后,会计和库房都需要得到消息。这是一个消息发送者对应两个消息接收者的场景。根据 JMS 消息模型特性,这是一个“一对多”的消息发送方式,此处可以考虑使用发布/订阅模型。为此,需要定义一个 Topic,命名为 OrderTopic。然而,库房对运输队的通知则是明显的一对一消息发送,所以可以使用点对点消息发送模型。因此,需要定义一个 Queue,命名为 OrderQueue。 

与前文定义一个数据源相似,JMS 也可以通过两种方式定义 JMS 资源:通过管理控制台创建和通过资源描述文件创建。在这里我们使用第二种方式,所以需要在 ShopEAR 中的增加文件:jms-resources.xml。jms-resources.xml 中定义 JMS 资源的片段见清单 1。 

清单 1. jms-resources.xml

<resourceadapter>
        <resourceadapter-instance>
            <resourceadapter-name>jms-resources</resourceadapter-name>
            <nam:workmanager xmlns:nam="http://geronimo.apache.org/xml/ns/naming-1.2">
               <nam:gbean-link>DefaultWorkManager</nam:gbean-link>
            </nam:workmanager>
        </resourceadapter-instance>
      ......
     <adminobject>
        <adminobject-interface>javax.jms.Queue</adminobject-interface>
        <adminobject-class>
            org.apache.activemq.command.ActiveMQQueue
        </adminobject-class>
        <adminobject-instance>
            <message-destination-name>OrderQueue</message-destination-name>
            <config-property-setting name="PhysicalName">
               OrderQueue
            </config-property-setting>
        </adminobject-instance>
    </adminobject>
    <adminobject>
        <adminobject-interface>javax.jms.Topic</adminobject-interface>
        <adminobject-class>
                org.apache.activemq.command.ActiveMQTopic
       </adminobject-class>
       <adminobject-instance>
            <message-destination-name>OrderTopic</message-destination-name>
            <config-property-setting name="PhysicalName">
                 OrderTopic
            </config-property-setting>
        </adminobject-instance>
     </adminobject>

上述代码定义了一个 JMS 资源适配器 (Resource Adapter) 的实例,名称为 jms-resources。此处,可以将其理解为一个 JMS 资源组 (group)。其中,包含了三个 JMS 资源,分别为 OrderQueue、OrderTopic 及 OrderConnectionFactory。 

定义上述资源时,文件中引用了一些 ActiveMQ 的类。因此,还需要在 ShopEAR 的部署文件中,添加对 ActiveMQ 资源适配器 (Resource Adapter) 的依赖。同时,将位于 WASCE_HOME/repository/org/apache/geronimo/modules/geronimo-activemq-ra/2.1.3 的 geronimo-activemq-ra-2.1.3.rar 复制到 ShopEAR 中。 

在 application.xml 中添加以下代码: 

清单 2. application.xml

<module>
    <connector>geronimo-activemq-ra-2.1.3.rar</connector>
</module>

在 geronimo-application.xml 中添加以下代码: 

清单 3. geronimo-application.xml

<module>
    <connector>geronimo-activemq-ra-2.1.3.rar</connector>
    <alt-dd>jms-resources.xml</alt-dd>
 </module>

  经过上述配置之后,就可以在代码中使用已定义的 JMS 资源。

   发布/订阅消息  

发布消息 

为了在用户触发“购买”动作后,向会计及库房传递消息,需要在 ShopWEB 中增加一个 OrderSenderBean。同时,需要对原有 goods.jsp 进行修改,增加链接,用于触发“购买”动作。 

图 2. 添加链接后的 goods.jsp 页面内容 

 

因为 OrderSenderBean 中引用了 JMS 资源,所以还需要在 ShopWEB 的部署文件中,添加对 JMS 资源引用。 

在 web.xml 中添加清单 5 所示代码。 

清单 5. web.xml

<resource-ref>
    <res-ref-name>OrderConnectionFactory</res-ref-name>
    <res-type>javax.jms.TopicConnectionFactory</res-type>
    <res-auth>Container</res-auth>
    <res-sharing-scope>Shareable</res-sharing-scope>
</resource-ref>
<message-destination-ref>
    <message-destination-ref-name>OrderTopic</message-destination-ref-name>
    <message-destination-type>javax.jms.Topic</message-destination-type>
    <message-destination-usage>Produces</message-destination-usage>
    <message-destination-link>OrderTopic</message-destination-link>
</message-destination-ref>

从代码中可以看出,此处引用的 JMS 资源正是在 jms-resources.xml 中定义的 OrderConnectionFactory 与 OrderTopic。当应用程序部署至 WAS CE 2.1 后,就可以使用这些 JMS 资源。 

在 OrderSenderBean 中,有如下代码片段: 

清单 6. OrderSenderBean.java

public class OrderSenderBean {
    @Resource(name = "OrderConnectionFactory")
    private ConnectionFactory factory;
    @Resource(name = "OrderTopic")
    private Topic orderTopic;
    public String orderGoods() {
     ......
    // sending message to accountant and storage
    Connection connection;
    try {
    connection = factory.createConnection();
    Session sess = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    TextMessage msg = sess.createTextMessage("<CustomerId=" + 
    customerId + "GoodsId=" + goodsId+">");
    MessageProducer messageProducer = sess.createProducer(orderTopic);
    messageProducer.send(msg);
    System.out.println("OrderSenderBean: Order request sent to accountant and storage");
     } catch (JMSException e) {
     System.out.println("Error " + e);
     e.printStackTrace();
     }
     return "ok";
     }

   OrderSenderBean 使用了 @Resource 来绑定 JMS 资源。此段代码也展示了利用 JMS 队列发送消息的流程。 

创建 MDB(Message Driven Bean) 

Java EE 中的 MDB 可以用于接收 JMS 消息。为此,创建两个 MDB-ShopAccountant 与 ShopStorage,分别表示会计与仓库。参见清单 7 和清单 8。 

清单 7. ShopAccountant 代码片段

@MessageDriven(activationConfig = {
   @ActivationConfigProperty(propertyName = "destinationType",
    propertyValue =  "javax.jms.Topic"),
   @ActivationConfigProperty(propertyName = "destination", 
    propertyValue = "OrderTopic")})
   public class ShopAccountant implements MessageListener { 	
     public void onMessage(Message message) {
     TextMessage textMessage = (TextMessage) message;
     try {
       System.out.println("ShopAccountant: Order Received \n" + 
       textMessage.getText());
       //do accountant work
        ......
        } catch (JMSException e) {
        e.printStackTrace();
        }
      }
    }

 

   清单 8. ShopStorage 代码片段

@MessageDriven(activationConfig = {
      @ActivationConfigProperty(propertyName = "destinationType",
       propertyValue = "javax.jms.Topic"),
      @ActivationConfigProperty(propertyName = "destination", 
      propertyValue = "OrderTopic")
        })
public class ShopStorage implements MessageListener {
      public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
           System.out.println("ShopStorage:  Order Received \n" + textMessage.getText());
           //find good in storage
           ......
           } catch (JMSException e) {
              e.printStackTrace();
           }
        }
}

   从代码中可见,MDB 继承了 java.jms.MessageListeneron 接口,并在 onMessage() 方法中接收并处理消息。当有消息到达时,MDB 的 onMessage() 方法会被自动执行,参数 message 就是所接收到的消息。通过 @MessageDriven 及 @ActivationConfigProperty 对 JMS 资源引用,指定了该 MDB 可以接收的消息来源。ShopAccountant 与 ShopStorage 都只接收 OrderTopic 中传递的消息。 

点对点消息 

发送消息 

示例中,仓库完成提货后需要通知运输部门。因此,需要在 ShopStorage 中添加相应代码,以完成发送消息的动作。 

以下是 ShopStorage 中添加的代码: 

清单 9. ShopStorage.java

public class ShopStorage implements MessageListener {
         @Resource(name = "OrderConnectionFactory")
         private ConnectionFactory factory;
         @Resource(name = "OrderQueue")
         private Queue orderQueue;
         public void onMessage(Message message) {
           TextMessage textMessage = (TextMessage) message;
           try {
              ......
              Connection connection;
              connection = factory.createConnection();
              Session sess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
              TextMessage msg = sess.createTextMessage();
              msg.setStringProperty("CustomerId", customerId);
              msg.setStringProperty("GoodsId", goodsId);
              MessageProducer messageProducer = sess.createProducer(orderQueue);
              messageProducer.send(msg);
              System.out.println("ShopStorage: get ready to deliver goods.");
              } catch (JMSException e) {
                   e.printStackTrace();
              }
         }
}

 上述代码与 OrderSenderBean 中发送消息的代码十分相似。不同点在于这里使用了 JMS 的消息队列 (Queue) 资源 -OrderQueue,而 OrderSenderBean 则使用了 JMS 的主题 (Topic) 资源。 

接收消息 

创建 ShopTransporter 类表示运输部门。与之前接收消息的方式相似,ShopTransporter 也是一个消息驱动组件 (MDB)。可见,MDB 可以方便的使用于接收任何一种 JMS 消息模型。 

清单 10. ShopTransporter 的代码

@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = 
    "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = 
    "destination", propertyValue = "OrderQueue"),})
public class ShopTransporter implements MessageListener {
        public void onMessage(Message message) {
                   ......
        }
}

此段代码与之前的 MDB 非常相似,同样使用了 @ActivationConfigProperty 与 @MessageDriven 来绑定 JMS 资源。不同点在于,ShopTransporter 使用了消息队列 (Queue) 而不再是主题 (Topic), 表示 ShopTransporter 直接收 OrderQueue 中传递的消息。 

运行 

与前一篇文章相同,运行应用程序前,需要将 ShopEAR 部署至应用服务器上。登录后,用户可以点击“buy”链接,购买商品。上述代码中的输出信息将显示在应用服务器的终端中。 

应用程序的输出信息如图 3 所示: 

图 3. 运行结果 

 

总结 

本文以 WAS CE v2.1 为平台,通过一个示例向您阐述了 JMS 应用的基本开发过程,包括 JMS 资源及 Resouces Adapter 的创建、两类 JMS 消息 ( 通过 Queue 及 Topic) 的发送、JMS 消息的接收。无论采用何种消息模型,利用 WAS CE 2.1 开发 JMS 应用的基本流程可以归纳如下: 

定义 JMS 资源 

•创建 JMS 资源定义文件,定义所需要的 JMS 资源

•加入 ActiveMQ 资源适配器,作为应用程序的一个模块

发送消息 

•利用 ConnectionFactory 创建 Connection,连接至 JMS Provider

•利用 Connection 创建 Session

•利用 Session 创建发送者及消息

•发送者发送消息

接收消息 

•创建 MDB,实现 java.jms.MessageListener 接口

•实现 onMessage() 方法,接收并处理消息

WAS CE v2.1 为各种 Java EE 技术提供了强大的支持。通过 ActiveMQ 作为 JMS Provider,开发人员可以在 WAS CE 上便捷地开发出各种复杂应用。

0
相关文章