与应用集成
同步 MR 数据到 HQ
每个 CP 都会将清关数据及时的上报到 HQ,对于 MQ 来说, 这属于典型的点对点(PTP)消息处理模式。 以 CP1 为例,定义在 HQ 端的 Cluster 共享队列 MR2HQ.Q 对 CP1 来说是可见的, CP1 会周期的将新产生的 MR 数据封装为 ObjectMessage 类型的 JMS 消息, 并发送到本地 CP1.QM 队列管理器中的 MR2HQ.Q 队列,进而传送到 HQ 端。
要发送 JMS 消息到目标队列中,首先需要得到一个 ConnectionFactory 对象, 以及代表队列的 Destination 对象。经过上节的正确配置后, 可以很容易的通过查询 J2EE 容器提供的 Context,来获取这些对象实例。 示例代码如下:
QueueConnectionFactory queueConnectionFactory = null;
Queue queue = null;
Context context = new InitialContext();
queueConnectionFactory = (QueueConnectionFactory) context.lookup("BCP.CP1.QCF");
queue = (Queue) context.lookup("BCP.MR2HQ.Q");
Queue queue = null;
Context context = new InitialContext();
queueConnectionFactory = (QueueConnectionFactory) context.lookup("BCP.CP1.QCF");
queue = (Queue) context.lookup("BCP.MR2HQ.Q");
其次,创建 Connection 和 Session 对象,并新建 MessageSender, 以及用来存放消息的 Message 对象。 最后,使用 MessageSender 将 JMS 消息发送到指定的 Destination 中。示例代码如下:
QueueConnection queueConnection = null;
QueueSession queueSession = null;
try {
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender queueSender = queueSession.createSender(queue);
ObjectMessage message = queueSession.createObjectMessage();
// collect all MR data which has not been synchronized
Serializable mrData = collectNewMR();
message.setObject(mrData);
queueSender.send(message);
} finally {
try {
if (queueSession != null)
queueSession.close();
if (queueConnection != null)
queueConnection.close();
} catch (Exception e) {
//…
}
}
QueueSession queueSession = null;
try {
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender queueSender = queueSession.createSender(queue);
ObjectMessage message = queueSession.createObjectMessage();
// collect all MR data which has not been synchronized
Serializable mrData = collectNewMR();
message.setObject(mrData);
queueSender.send(message);
} finally {
try {
if (queueSession != null)
queueSession.close();
if (queueConnection != null)
queueConnection.close();
} catch (Exception e) {
//…
}
}