【IT168 技术文章】
最近一个项目中要用到WebSphere MQ进行消息转发。整个项目由多个子系统组成,由于其他子系统是基于VS2005,为了统一,老大也希望这部分开发能在VS2005下进行。
在网上搜索了一下,COPY了一些代码在办公机器上做测试,总的来说还比较顺利。为了方便回家测试,就把代码COPY到了NB上。回家发现居然运行出错,错误代码2059。
网上搜了一下,发现有这个问题的人很多。IBM提供的文档也比较水:先检查网络是否连通,队列管理器是否启动等等。
看到也有很多人提出2059是CCSID的问题,看了一下觉得不太可能,因为应用场景不一样。网上一般都是MQ Client连MQ Server,这样才会出现两台机器的CCSID不一致。本项目中的场景是在MQ Server上跑一个程序,读本机的MQ消息。
试着用JAVA访问,居然收发正常。想想会不会是VS环境的问题呢?因为我NB上装的是VS2008。就把2008卸了,换成2005,还是错误代码2059。后来试着把办公机上的程序COPY到NB上,发现运行出错。而NB的VS2005开发的程序COPY到办公机上能正常运行。估计就是NB的软件环境问题了。
看了一下.NET Framework的版本,发现一个是2.0,一个是2.0SP1+3.5。估计就是.NET Framework的问题了。后面没有再仔细求证了。猜测是3.5的原因大一些,因为在鬼佬的网站上也有人提出VS2008访问MQ总是不通,返回2059。
程序代码:
文件名:IMQ.cs
using System;
using System.Collections.Generic;
using System.Text;
namespace CNMC.WYB.MQ
{
public delegate void MessageArrivaledHandler(string mqMessage);
interface IMQ
{
void Connect();
void Disconnect();
void Send(string msg);
string Receive();
void Listen();
event MessageArrivaledHandler MessageArrivalEvent;
}
}
文件名:WebSphereMQManagement .cs
using System;
using System.Collections.Generic;
using System.Text;
using IBM.WMQ;
using IBM.WMQ.PCF;
namespace CNMC.WYB.MQ
{
public enum ConnectionState { Open, Closed }
class WebSphereMQManagement : IMQ
{
private bool isListen = false;//用来标识是否持续监听消息队列
/// <summary>
/// 获取或设置是否持续监听消息队列标识
/// </summary>
public bool IsListen
{
get { return isListen; }
set { isListen = value; }
}
private ConnectionState state = ConnectionState.Closed;
/// <summary>
/// 获取连接状态
/// </summary>
public ConnectionState State
{
get { return state; }
}
private string queueManagerName;
private string queueName;
private MQQueueManager queueManager = null;
private MQQueue queue = null;
private WebSphereMQManagement(string qmName, string qName)
{
queueManagerName = qmName.Trim();
queueName = qName.Trim();
}
/// <summary>
/// 连接MQ
/// </summary>
public void Connect()
{
if (state == ConnectionState.Open)
return;
try
{
queueManager = new MQQueueManager(queueManagerName);
queue = queueManager.AccessQueue(queueName, MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING);
}
catch (MQException e)
{
throw new Exception("连接队列管理器或打开队列出错: 结束码:" + e.CompletionCode + ";错误原因代码:" + e.ReasonCode);
}
state = ConnectionState.Open;
}
/// <summary>
/// 断开MQ连接
/// </summary>
public void Disconnect()
{
if (state == ConnectionState.Closed)
return;
queue.Close();
queueManager.Close();
state = ConnectionState.Closed;
} //MQ键值对,一个队列管理器下的队列对应一个实例
private static Dictionary<string, WebSphereMQManagement> mqDictionary = new Dictionary<string, WebSphereMQManagement>();
/// <summary>
///
/// </summary>
/// <param name="qmName">队列管理器名</param>
/// <param name="qName">队列名</param>
/// <returns></returns>
public static WebSphereMQManagement GetInstance(string qmName, string qName)
{
qmName = qmName.Trim();
qName = qName.Trim();
string key = qmName + "&" + qName;
if (mqDictionary.ContainsKey(key))
return mqDictionary[key];
WebSphereMQManagement mq = new WebSphereMQManagement(qmName, qName);
mqDictionary.Add(key, mq);
return mq;
}
/// <summary>
/// 接收消息
/// </summary>
/// <returns></returns>
public string Receive()
{
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options = MQC.MQGMO_WAIT;
gmo.WaitInterval = 15 * 1000;
gmo.MatchOptions = MQC.MQMO_NONE;
MQMessage message = new MQMessage();
int queueCurrentDepth = queue.CurrentDepth;
string returnValue = null;
if (queueCurrentDepth > 0)
{
try
{
queue.Get(message, gmo);
returnValue = message.ReadString(message.MessageLength);
}
catch (MQException ex)
{
throw new Exception("访问队列停止" + ex.InnerException.ToString());
}
}
return returnValue;
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="msg">消息内容</param>
public void Send(string msg)
{
MQMessage mqMessage = new MQMessage();
mqMessage.WriteString(msg);
try
{
queue.Put(mqMessage);
}
catch (MQException e)
{
throw new Exception("发送异常终止:" + e.Message);
}
}
/// <summary>
/// 开始监听消息队列
/// </summary>
public void Listen()
{
if (isListen)//已经在监听
return;
isListen = true;
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options = MQC.MQGMO_WAIT;
gmo.WaitInterval = MQC.MQWI_UNLIMITED;
gmo.MatchOptions = MQC.MQMO_NONE;
MQMessage message = new MQMessage();
string returnValue = null;
while (isListen)
{
System.Threading.Thread.Sleep(20);
try
{
queue.Get(message, gmo);
returnValue = message.ReadString(message.MessageLength);
if (MessageArrivalEvent != null)
MessageArrivalEvent(returnValue);
}
catch (MQException e)
{
throw new Exception("访问队列停止" + e.ToString());
}
}
}
/// <summary>
/// 收到消息后的处理事件
/// </summary>
public event MessageArrivaledHandler MessageArrivalEvent;
}
}