技术开发 频道

.NET Compact Framework下的进程间通信之MSMQ开发

 注册广播模式

 注册广播模式是Observer模式的一种应用,Observer模式可见实用设计模式之一--Observer模式。

 客户端可以往服务端注册关心的消息,服务端通过MQ自动广播消息到客户端。

 public class MQService

 {

 private const string mMachinePrefix = @".\";

 private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";

 private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";

 private System.Messaging.MessageQueue mServiceQueue;

 private Dictionary<string, MessageQueue> mmClientQueues = new Dictionary<string, MessageQueue>();

 private void InitServiceQueue()

 {

 // create the message queue

 try

 {

 // check to make sure the message queue does not exist already

 if (!System.Messaging.MessageQueue.Exists(mServiceQueuePath))

 {

 // create the new message queue and make it transactional

 mServiceQueue = System.Messaging.MessageQueue.Create(mServiceQueuePath);

 mServiceQueue.Close();

 }

 else

 {

 mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);

 }

 Type[] types = new Type[1];

 types[0] = typeof(string);

 mServiceQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);

 mServiceQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler(MessageListenerEventHandler);

 // Begin the asynchronous receive operation.

 mServiceQueue.BeginReceive();

 mServiceQueue.Close();

 }

 // show message if we used an invalid message queue name;

 catch (System.Messaging.MessageQueueException MQException)

 {

 Console.WriteLine(MQException.Message);

 }

 return;

 }

 private void MessageListenerEventHandler(object sender, System.Messaging.ReceiveCompletedEventArgs e)

 {

 try

 {

 // Connect to the queue.

 System.Messaging.MessageQueue mq = (System.Messaging.MessageQueue)sender;

 // End the asynchronous receive operation.

 System.Messaging.Message msg = mq.EndReceive(e.AsyncResult);

 if(msg.Body.ToString() == "mq_register_1")

 {

 mmClientQueues.Add(msg.Label, msg.ResponseQueue);

 }

 else if(msg.Body.ToString() == "mq_unregister_1")

 {

 mmClientQueues[msg.Label].Purge();

 mmClientQueues.Remove(msg.Label);

 }

 // Restart the asynchronous receive operation.

 mq.BeginReceive();

 }

 catch (System.Messaging.MessageQueueException ex)

 {

 // Handle sources of MessageQueueException.

 Console.WriteLine(ex.Message);

 }

 return;

 }

 private void Notify(string str)

 {

 if (mmClientQueues.Count > 0)

 {

 foreach(MessageQueue mq in mmClientQueues.Values)

 {

 mq.Send(str);

 }

 }

 }

 }

 public class MQClient

 {

 private const string mMachinePrefix = @".\";

 private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";

 private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";

 private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$";

 private System.Messaging.MessageQueue mServiceQueue;

 private System.Messaging.MessageQueue mClientQueue;

 public void InitQueues()

 {

 // create the message queue

 try

 {

 mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);

 // check to make sure the message queue does not exist already

 if (!System.Messaging.MessageQueue.Exists(mClientQueuePath))

 {

 // create the new message queue and make it transactional

 mClientQueue = System.Messaging.MessageQueue.Create(mClientQueuePath);

 mClientQueue.Close();

 }

 else

 {

 mClientQueue = new System.Messaging.MessageQueue(mClientQueuePath);

 }

 Type[] types = new Type[2];

 types[0] = typeof(string);

 types[1] = typeof(bool);

 mClientQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);

 //Initiate the asynchronous receive operation by telling the Message

 // Queue to begin receiving messages and notify the event handler

 // when finished

 mClientQueue.ReceiveCompleted +=

 new System.Messaging.ReceiveCompletedEventHandler(ClientQueueReceiveCompleted);

 mClientQueue.BeginReceive();

 mClientQueue.Close();

 }

 // show message if we used an invalid message queue name;

 catch (System.Messaging.MessageQueueException MQException)

 {

 Console.WriteLine(MQException.Message);

 }

 return;

 }

 private void RegisterService()

 {

 try

 {

 System.Messaging.Message message = new System.Messaging.Message("mq_register_1");

 message.Label = "client1";

 message.ResponseQueue = mClientQueue;

 mServiceQueue.Send(message);

 }

 // show message if we used an invalid message queue name;

 catch (System.Messaging.MessageQueueException MQException)

 {

 Console.WriteLine(MQException.Message);

 }

 }

 private void UnregisterService()

 {

 try

 {

 System.Messaging.Message message = new System.Messaging.Message("mq_unregister_1");

 message.Label = "client1";

 mServiceQueue.Send(message);

 Thread.Sleep(500);

 mClientQueue.Purge();

 }

 // show message if we used an invalid message queue name;

 catch (System.Messaging.MessageQueueException MQException)

 {

 Console.WriteLine(MQException.Message);

 }

 }

 private void ClientQueueReceiveCompleted(Object source,

 ReceiveCompletedEventArgs asyncResult)

 {

 try

 {

 // End the Asynchronous Receive Operation

 Message message =

 mClientQueue.EndReceive(asyncResult.AsyncResult);

 if (message.Body is string)

 {

 Console.WriteLine(message.Body.ToString());

 }

 }

 catch (MessageQueueException e)

 {

 Console.WriteLine

 (String.Format(System.Globalization.CultureInfo.CurrentCulture,

 "Failed to receive Message: {0} ", e.ToString()));

 }

 //Begin the next Asynchronous Receive Operation

 mClientQueue.BeginReceive();

 }

 }

 和请求回应模式相比MQService使用容器保存所有注册的客户端的Q,当需要notify的时候遍历所有客户端Q进行广播。MQClient建立广播Q,然后注册函数ClientQueueReceiveCompleted处理广播事件。MQ的应用能把Oberver模式应用跨进程和跨系统,消息订阅广播机制可以借助MQ和observer模式来实现。

查看原文地址

0
相关文章