请求回应模式
public class MQService
{
private const string mMachinePrefix = @".\";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private System.Messaging.MessageQueue mServiceQueue;
private void InitServiceQueue()
{
// create the message queue
try
{
// check to make sure the message queue does not exist already
if (!System.Messaging.MessageQueue.Exists(mServiceQueuePath))
{
// create the new message queue and make it transactional
mServiceQueue = System.Messaging.MessageQueue.Create(mServiceQueuePath);
mServiceQueue.Close();
}
else
{
mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);
}
Type[] types = new Type[1];
types[0] = typeof(string);
mServiceQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);
mServiceQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler(MessageListenerEventHandler);
// Begin the asynchronous receive operation.
mServiceQueue.BeginReceive();
mServiceQueue.Close();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
return;
}
private void MessageListenerEventHandler(object sender, System.Messaging.ReceiveCompletedEventArgs e)
{
try
{
// Connect to the queue.
System.Messaging.MessageQueue mq = (System.Messaging.MessageQueue)sender;
// End the asynchronous receive operation.
System.Messaging.Message msg = mq.EndReceive(e.AsyncResult);
if (msg.Body.ToString() == "mq_reques_1")
{
msg.ResponseQueue.Send("mq_respond_1");
}
else if (msg.Body.ToString() == "mq_reques_2")
{
msg.ResponseQueue.Send(true);
}
// Restart the asynchronous receive operation.
mq.BeginReceive();
}
catch (System.Messaging.MessageQueueException ex)
{
// Handle sources of MessageQueueException.
Console.WriteLine(ex.Message);
}
return;
}
}
public class MQClient
{
private const string mMachinePrefix = @".\";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$";
private System.Messaging.MessageQueue mServiceQueue;
private System.Messaging.MessageQueue mClientQueue;
public void InitQueues()
{
// create the message queue
try
{
mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);
// check to make sure the message queue does not exist already
if (!System.Messaging.MessageQueue.Exists(mClientQueuePath))
{
// create the new message queue and make it transactional
mClientQueue = System.Messaging.MessageQueue.Create(mClientQueuePath);
mClientQueue.Close();
}
else
{
mClientQueue = new System.Messaging.MessageQueue(mClientQueuePath);
}
Type[] types = new Type[2];
types[0] = typeof(string);
types[1] = typeof(bool);
mClientQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);
mClientQueue.Close();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
return;
}
private void SendRequest()
{
try
{
System.Messaging.Message message = new System.Messaging.Message("mq_reques_1");
message.ResponseQueue = mClientQueue;
mClientQueue.Purge();
mServiceQueue.Send(message);
System.Messaging.Message msg = mClientQueue.Receive(new TimeSpan(0, 0, 4));
//handle the result.
Console.WriteLine(msg.Body.ToString());
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
}
}
MQService是服务程序,负责服务队列".\Private$\MQServiceQueue$"的建立和管理,当有新消息发送到该服务队列时MessageListenerEventHandler函数就会callback,取出消息进行分析处理和发送返回,返回是通过client原先建立的Q进行返回,不是通过原服务Q返回,因为MQ的队列是单向的。MQClient负责客户端队列".\Private$\MQClientQueue$"的建立,在发送请求的时候把客户端队列赋值到properties ResponseQueue里,让服务程序可以返回到这个客户端的队列里面,同时在等待返回的时候有超时控制。