/// <summary>
/// 断开MQ连接
/// </summary>
public void Disconnect()
{
if (state == ConnectionState.Closed)
return;
queue.Close();
queueManager.Close();
state = ConnectionState.Closed;
} //MQ键值对,一个队列管理器下的队列对应一个实例
private static Dictionary<string, WebSphereMQManagement> mqDictionary = new Dictionary<string, WebSphereMQManagement>();
/// <summary>
///
/// </summary>
/// <param name="qmName">队列管理器名</param>
/// <param name="qName">队列名</param>
/// <returns></returns>
public static WebSphereMQManagement GetInstance(string qmName, string qName)
{
qmName = qmName.Trim();
qName = qName.Trim();
string key = qmName + "&" + qName;
if (mqDictionary.ContainsKey(key))
return mqDictionary[key];
WebSphereMQManagement mq = new WebSphereMQManagement(qmName, qName);
mqDictionary.Add(key, mq);
return mq;
}
/// <summary>
/// 接收消息
/// </summary>
/// <returns></returns>
public string Receive()
{
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options = MQC.MQGMO_WAIT;
gmo.WaitInterval = 15 * 1000;
gmo.MatchOptions = MQC.MQMO_NONE;
MQMessage message = new MQMessage();
int queueCurrentDepth = queue.CurrentDepth;
string returnValue = null;
if (queueCurrentDepth > 0)
{
try
{
queue.Get(message, gmo);
returnValue = message.ReadString(message.MessageLength);
}
catch (MQException ex)
{
throw new Exception("访问队列停止" + ex.InnerException.ToString());
}
}
return returnValue;
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="msg">消息内容</param>
public void Send(string msg)
{
MQMessage mqMessage = new MQMessage();
mqMessage.WriteString(msg);
try
{
queue.Put(mqMessage);
}
catch (MQException e)
{
throw new Exception("发送异常终止:" + e.Message);
}
}
/// <summary>
/// 开始监听消息队列
/// </summary>
public void Listen()
{
if (isListen)//已经在监听
return;
isListen = true;
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options = MQC.MQGMO_WAIT;
gmo.WaitInterval = MQC.MQWI_UNLIMITED;
gmo.MatchOptions = MQC.MQMO_NONE;
MQMessage message = new MQMessage();
string returnValue = null;
while (isListen)
{
System.Threading.Thread.Sleep(20);
try
{
queue.Get(message, gmo);
returnValue = message.ReadString(message.MessageLength);
if (MessageArrivalEvent != null)
MessageArrivalEvent(returnValue);
}
catch (MQException e)
{
throw new Exception("访问队列停止" + e.ToString());
}
}
}
/// <summary>
/// 收到消息后的处理事件
/// </summary>
public event MessageArrivaledHandler MessageArrivalEvent;
}
}