注册广播模式
注册广播模式是Observer模式的一种应用,Observer模式可见实用设计模式之一--Observer模式。
客户端可以往服务端注册关心的消息,服务端通过MQ自动广播消息到客户端。
public class MQService
{
private const string mMachinePrefix = @".\";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private System.Messaging.MessageQueue mServiceQueue;
private Dictionary<string, MessageQueue> mmClientQueues = new Dictionary<string, MessageQueue>();
private void InitServiceQueue()
{
// create the message queue
try
{
// check to make sure the message queue does not exist already
if (!System.Messaging.MessageQueue.Exists(mServiceQueuePath))
{
// create the new message queue and make it transactional
mServiceQueue = System.Messaging.MessageQueue.Create(mServiceQueuePath);
mServiceQueue.Close();
}
else
{
mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);
}
Type[] types = new Type[1];
types[0] = typeof(string);
mServiceQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);
mServiceQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler(MessageListenerEventHandler);
// Begin the asynchronous receive operation.
mServiceQueue.BeginReceive();
mServiceQueue.Close();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
return;
}
private void MessageListenerEventHandler(object sender, System.Messaging.ReceiveCompletedEventArgs e)
{
try
{
// Connect to the queue.
System.Messaging.MessageQueue mq = (System.Messaging.MessageQueue)sender;
// End the asynchronous receive operation.
System.Messaging.Message msg = mq.EndReceive(e.AsyncResult);
if(msg.Body.ToString() == "mq_register_1")
{
mmClientQueues.Add(msg.Label, msg.ResponseQueue);
}
else if(msg.Body.ToString() == "mq_unregister_1")
{
mmClientQueues[msg.Label].Purge();
mmClientQueues.Remove(msg.Label);
}
// Restart the asynchronous receive operation.
mq.BeginReceive();
}
catch (System.Messaging.MessageQueueException ex)
{
// Handle sources of MessageQueueException.
Console.WriteLine(ex.Message);
}
return;
}
private void Notify(string str)
{
if (mmClientQueues.Count > 0)
{
foreach(MessageQueue mq in mmClientQueues.Values)
{
mq.Send(str);
}
}
}
}
public class MQClient
{
private const string mMachinePrefix = @".\";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$";
private System.Messaging.MessageQueue mServiceQueue;
private System.Messaging.MessageQueue mClientQueue;
public void InitQueues()
{
// create the message queue
try
{
mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);
// check to make sure the message queue does not exist already
if (!System.Messaging.MessageQueue.Exists(mClientQueuePath))
{
// create the new message queue and make it transactional
mClientQueue = System.Messaging.MessageQueue.Create(mClientQueuePath);
mClientQueue.Close();
}
else
{
mClientQueue = new System.Messaging.MessageQueue(mClientQueuePath);
}
Type[] types = new Type[2];
types[0] = typeof(string);
types[1] = typeof(bool);
mClientQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);
//Initiate the asynchronous receive operation by telling the Message
// Queue to begin receiving messages and notify the event handler
// when finished
mClientQueue.ReceiveCompleted +=
new System.Messaging.ReceiveCompletedEventHandler(ClientQueueReceiveCompleted);
mClientQueue.BeginReceive();
mClientQueue.Close();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
return;
}
private void RegisterService()
{
try
{
System.Messaging.Message message = new System.Messaging.Message("mq_register_1");
message.Label = "client1";
message.ResponseQueue = mClientQueue;
mServiceQueue.Send(message);
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
}
private void UnregisterService()
{
try
{
System.Messaging.Message message = new System.Messaging.Message("mq_unregister_1");
message.Label = "client1";
mServiceQueue.Send(message);
Thread.Sleep(500);
mClientQueue.Purge();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
}
private void ClientQueueReceiveCompleted(Object source,
ReceiveCompletedEventArgs asyncResult)
{
try
{
// End the Asynchronous Receive Operation
Message message =
mClientQueue.EndReceive(asyncResult.AsyncResult);
if (message.Body is string)
{
Console.WriteLine(message.Body.ToString());
}
}
catch (MessageQueueException e)
{
Console.WriteLine
(String.Format(System.Globalization.CultureInfo.CurrentCulture,
"Failed to receive Message: {0} ", e.ToString()));
}
//Begin the next Asynchronous Receive Operation
mClientQueue.BeginReceive();
}
}
和请求回应模式相比MQService使用容器保存所有注册的客户端的Q,当需要notify的时候遍历所有客户端Q进行广播。MQClient建立广播Q,然后注册函数ClientQueueReceiveCompleted处理广播事件。MQ的应用能把Oberver模式应用跨进程和跨系统,消息订阅广播机制可以借助MQ和observer模式来实现。