技术开发 频道

VS2005下进行MQ应用开发

    【IT168 技术文章】

    最近一个项目中要用到WebSphere MQ进行消息转发。整个项目由多个子系统组成,由于其他子系统是基于VS2005,为了统一,老大也希望这部分开发能在VS2005下进行。

    在网上搜索了一下,COPY了一些代码在办公机器上做测试,总的来说还比较顺利。为了方便回家测试,就把代码COPY到了NB上。回家发现居然运行出错,错误代码2059。

    网上搜了一下,发现有这个问题的人很多。IBM提供的文档也比较水:先检查网络是否连通,队列管理器是否启动等等。

    看到也有很多人提出2059是CCSID的问题,看了一下觉得不太可能,因为应用场景不一样。网上一般都是MQ Client连MQ Server,这样才会出现两台机器的CCSID不一致。本项目中的场景是在MQ Server上跑一个程序,读本机的MQ消息。

    试着用JAVA访问,居然收发正常。想想会不会是VS环境的问题呢?因为我NB上装的是VS2008。就把2008卸了,换成2005,还是错误代码2059。后来试着把办公机上的程序COPY到NB上,发现运行出错。而NB的VS2005开发的程序COPY到办公机上能正常运行。估计就是NB的软件环境问题了。

    看了一下.NET Framework的版本,发现一个是2.0,一个是2.0SP1+3.5。估计就是.NET Framework的问题了。后面没有再仔细求证了。猜测是3.5的原因大一些,因为在鬼佬的网站上也有人提出VS2008访问MQ总是不通,返回2059。

    程序代码:

    文件名:IMQ.cs

    using System;

    using System.Collections.Generic;

    using System.Text;

    namespace CNMC.WYB.MQ

    {

    public delegate void MessageArrivaledHandler(string mqMessage);

    interface IMQ

    {

    void Connect();

    void Disconnect();

    void Send(string msg);

    string Receive();

    void Listen();

    event MessageArrivaledHandler MessageArrivalEvent;

    }

    }

    文件名:WebSphereMQManagement .cs

    using System;

    using System.Collections.Generic;

    using System.Text;

    using IBM.WMQ;

    using IBM.WMQ.PCF;

    namespace CNMC.WYB.MQ

    {

    public enum ConnectionState { Open, Closed }

    class WebSphereMQManagement : IMQ

    {

    private bool isListen = false;//用来标识是否持续监听消息队列

    /// <summary>

    /// 获取或设置是否持续监听消息队列标识

    /// </summary>

    public bool IsListen

    {

    get { return isListen; }

    set { isListen = value; }

    }

    private ConnectionState state = ConnectionState.Closed;

    /// <summary>

    /// 获取连接状态

    /// </summary>

    public ConnectionState State

    {

    get { return state; }

    }

    private string queueManagerName;

    private string queueName;

    private MQQueueManager queueManager = null;

    private MQQueue queue = null;

    private WebSphereMQManagement(string qmName, string qName)

    {

    queueManagerName = qmName.Trim();

    queueName = qName.Trim();

    }

    /// <summary>

    /// 连接MQ

    /// </summary>

    public void Connect()

    {

    if (state == ConnectionState.Open)

    return;

    try

    {

    queueManager = new MQQueueManager(queueManagerName);

    queue = queueManager.AccessQueue(queueName, MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING);

    }

    catch (MQException e)

    {

    throw new Exception("连接队列管理器或打开队列出错: 结束码:" + e.CompletionCode + ";错误原因代码:" + e.ReasonCode);

    }

    state = ConnectionState.Open;

    }

 

  /// <summary>

            /// 断开MQ连接

            /// </summary>

            public void Disconnect()

            {

                if (state == ConnectionState.Closed)

                    return;

                queue.Close();

                queueManager.Close();

                state = ConnectionState.Closed;

            }        //MQ键值对,一个队列管理器下的队列对应一个实例

            private static Dictionary<string, WebSphereMQManagement> mqDictionary = new Dictionary<string, WebSphereMQManagement>();

            /// <summary>

            ///

            /// </summary>

            /// <param name="qmName">队列管理器名</param>

            /// <param name="qName">队列名</param>

            /// <returns></returns>

            public static WebSphereMQManagement GetInstance(string qmName, string qName)

            {

                qmName = qmName.Trim();

                qName = qName.Trim();

                string key = qmName + "&" + qName;

                if (mqDictionary.ContainsKey(key))

                    return mqDictionary[key];

                WebSphereMQManagement mq = new WebSphereMQManagement(qmName, qName);

                mqDictionary.Add(key, mq);

                return mq;

            }

            /// <summary>

            /// 接收消息

            /// </summary>

            /// <returns></returns>

            public string Receive()

            {

                MQGetMessageOptions gmo = new MQGetMessageOptions();

                gmo.Options = MQC.MQGMO_WAIT;

                gmo.WaitInterval = 15 * 1000;

                gmo.MatchOptions = MQC.MQMO_NONE;

                MQMessage message = new MQMessage();

                int queueCurrentDepth = queue.CurrentDepth;

                string returnValue = null;

                if (queueCurrentDepth > 0)

                {

                    try

                    {

                        queue.Get(message, gmo);

                        returnValue = message.ReadString(message.MessageLength);

                    }

                    catch (MQException ex)

                    {

                        throw new Exception("访问队列停止" + ex.InnerException.ToString());

                    }

                }

                return returnValue;

            }

            /// <summary>

            /// 发送消息

            /// </summary>

            /// <param name="msg">消息内容</param>

            public void Send(string msg)

            {

                MQMessage mqMessage = new MQMessage();

                mqMessage.WriteString(msg);

                try

                {

                    queue.Put(mqMessage);

                }

                catch (MQException e)

                {

                    throw new Exception("发送异常终止:" + e.Message);

                }

            }

            /// <summary>

            /// 开始监听消息队列

            /// </summary>

            public void Listen()

            {

                if (isListen)//已经在监听

                    return;

                isListen = true;

                MQGetMessageOptions gmo = new MQGetMessageOptions();

                gmo.Options = MQC.MQGMO_WAIT;

                gmo.WaitInterval = MQC.MQWI_UNLIMITED;

                gmo.MatchOptions = MQC.MQMO_NONE;

                MQMessage message = new MQMessage();

                string returnValue = null;

                while (isListen)

                {

                    System.Threading.Thread.Sleep(20);

                    try

                    {

                        queue.Get(message, gmo);

                        returnValue = message.ReadString(message.MessageLength);

                        if (MessageArrivalEvent != null)

                            MessageArrivalEvent(returnValue);

                    }

                    catch (MQException e)

                    {

                        throw new Exception("访问队列停止" + e.ToString());

                    }

                }

            }

            /// <summary>

            /// 收到消息后的处理事件

            /// </summary>

            public event MessageArrivaledHandler MessageArrivalEvent;

        }

    }

   

0
相关文章