技术开发 频道

.NET Compact Framework下的进程间通信之MSMQ开发

 请求回应模式

 public class MQService

 {

 private const string mMachinePrefix = @".\";

 private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";

 private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";

 private System.Messaging.MessageQueue mServiceQueue;

 private void InitServiceQueue()

 {

 // create the message queue

 try

 {

 // check to make sure the message queue does not exist already

 if (!System.Messaging.MessageQueue.Exists(mServiceQueuePath))

 {

 // create the new message queue and make it transactional

 mServiceQueue = System.Messaging.MessageQueue.Create(mServiceQueuePath);

 mServiceQueue.Close();

 }

 else

 {

 mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);

 }

 Type[] types = new Type[1];

 types[0] = typeof(string);

 mServiceQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);

 mServiceQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler(MessageListenerEventHandler);

 // Begin the asynchronous receive operation.

 mServiceQueue.BeginReceive();

 mServiceQueue.Close();

 }

 // show message if we used an invalid message queue name;

 catch (System.Messaging.MessageQueueException MQException)

 {

 Console.WriteLine(MQException.Message);

 }

 return;

 }

 private void MessageListenerEventHandler(object sender, System.Messaging.ReceiveCompletedEventArgs e)

 {

 try

 {

 // Connect to the queue.

 System.Messaging.MessageQueue mq = (System.Messaging.MessageQueue)sender;

 // End the asynchronous receive operation.

 System.Messaging.Message msg = mq.EndReceive(e.AsyncResult);

 if (msg.Body.ToString() == "mq_reques_1")

 {

 msg.ResponseQueue.Send("mq_respond_1");

 }

 else if (msg.Body.ToString() == "mq_reques_2")

 {

 msg.ResponseQueue.Send(true);

 }

 // Restart the asynchronous receive operation.

 mq.BeginReceive();

 }

 catch (System.Messaging.MessageQueueException ex)

 {

 // Handle sources of MessageQueueException.

 Console.WriteLine(ex.Message);

 }

 return;

 }

 }

 public class MQClient

 {

 private const string mMachinePrefix = @".\";

 private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";

 private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";

 private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$";

 private System.Messaging.MessageQueue mServiceQueue;

 private System.Messaging.MessageQueue mClientQueue;

 public void InitQueues()

 {

 // create the message queue

 try

 {

 mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);

 // check to make sure the message queue does not exist already

 if (!System.Messaging.MessageQueue.Exists(mClientQueuePath))

 {

 // create the new message queue and make it transactional

 mClientQueue = System.Messaging.MessageQueue.Create(mClientQueuePath);

 mClientQueue.Close();

 }

 else

 {

 mClientQueue = new System.Messaging.MessageQueue(mClientQueuePath);

 }

 Type[] types = new Type[2];

 types[0] = typeof(string);

 types[1] = typeof(bool);

 mClientQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);

 mClientQueue.Close();

 }

 // show message if we used an invalid message queue name;

 catch (System.Messaging.MessageQueueException MQException)

 {

 Console.WriteLine(MQException.Message);

 }

 return;

 }

 private void SendRequest()

 {

 try

 {

 System.Messaging.Message message = new System.Messaging.Message("mq_reques_1");

 message.ResponseQueue = mClientQueue;

 mClientQueue.Purge();

 mServiceQueue.Send(message);

 System.Messaging.Message msg = mClientQueue.Receive(new TimeSpan(0, 0, 4));

 //handle the result.

 Console.WriteLine(msg.Body.ToString());

 }

 // show message if we used an invalid message queue name;

 catch (System.Messaging.MessageQueueException MQException)

 {

 Console.WriteLine(MQException.Message);

 }

 }

 }

 MQService是服务程序,负责服务队列".\Private$\MQServiceQueue$"的建立和管理,当有新消息发送到该服务队列时MessageListenerEventHandler函数就会callback,取出消息进行分析处理和发送返回,返回是通过client原先建立的Q进行返回,不是通过原服务Q返回,因为MQ的队列是单向的。MQClient负责客户端队列".\Private$\MQClientQueue$"的建立,在发送请求的时候把客户端队列赋值到properties ResponseQueue里,让服务程序可以返回到这个客户端的队列里面,同时在等待返回的时候有超时控制。

0
相关文章