首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > .NET > C# >

[分享] .Net兑现 WebSphere MQ与Oracle数据库的XA事务管理

2012-09-19 
[分享] .Net实现 WebSphere MQ与Oracle数据库的XA事务管理WebSphere MQ以下简称为WMQ.在通讯项目中,有这样

[分享] .Net实现 WebSphere MQ与Oracle数据库的XA事务管理
WebSphere MQ以下简称为WMQ.
在通讯项目中,有这样的一个应用场景,简单描述如下:
1. 程序A需定时从MQ中取出消息(XML)
2. 将XML还原为DataSet
3. 将DataSet持久化到数据库

该场景在总线型的消息传输框架中较为常见, 在一切正常的情况下,程序工作正常,数据不会发生错误或丢失. 但程序A介于WMQ, 与数据库之间, 程序两端的网络因素, 或者任意一端服务停止,均有可能会导致消息丢失. 因此比较稳妥的做法是将以上步骤采用XA事务进行全局托管.

实现一个MQGet类:

C# code
using System;using System.Collections.Generic;using System.Text;using System.Collections;using System.Transactions;using IBM.WMQ;namespace WMQClient_WithXA{    public class MqGet    {        private String _host = "127.0.0.1";        private int _port;        private String _channelName = "SYSTEM.DEF.SVRCONN";        private String _queueManagerName = null;        private String _queueName = null;        private int _charSet;        private WMQTransactionType _transactionType;        private bool isTopic = false;        private String _transportMode = "managed";        private bool commit = true;        private MQQueueManager queueManager;        private MQQueue queue;        private MyMqObject myMqObj;        private Hashtable properties;        private MQMessage message;        private MQGetMessageOptions getMessageOptions;        public MqGet(string sMqQmgrName, string sQueueName, string sChannelName, string sHost, int iPort, int iCharacterSet, WMQTransactionType TransactionType, string sTransportMode = "managed")        {            getMessageOptions = new MQGetMessageOptions();            _host = sHost;            _port = iPort;            _channelName = sChannelName;            _queueManagerName = sMqQmgrName;            _queueName = sQueueName;            _transportMode = sTransportMode;            _charSet = iCharacterSet;            _transactionType = TransactionType;            getMessageOptions.Options += MQC.MQGMO_WAIT;            getMessageOptions.WaitInterval = 20000;  // 20 seconds wait            properties = new Hashtable();            properties.Add(MQC.HOST_NAME_PROPERTY, _host);            properties.Add(MQC.PORT_PROPERTY, _port);            properties.Add(MQC.CHANNEL_PROPERTY, _channelName);            switch (TransactionType)            {                case WMQTransactionType.NORMAL_TRANSACTION:                    getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;                    break;                case WMQTransactionType.XA_TRANSACTION:                    if (_transportMode == "managed")                    {                        properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED);                                         // for managed mode                                            }                    else                    {                        properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_XACLIENT);                    }                    getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;                    break;            }        }        private MQQueueManager getMqManager(MQQueueManager qmg)        {            if (qmg == null)            {                try                {                    qmg = new MQQueueManager(_queueManagerName, properties);                    return qmg;                }                catch(Exception err)                {                    Console.WriteLine(err.Message);                    return null;                }            }            else                return qmg;        }        private MQQueue getMqQ(MQQueue q)        {            if (q == null)            {                try                {                    q = myMqObj._qMg.AccessQueue(_queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);                    return q;                }                catch (Exception err)                {                    Console.WriteLine(err.Message);                    return null;                }            }            else                return q;        }        private MyMqObject GetMqObj(ref MyMqObject mqObj)        {            if (mqObj._qMg == null)            {                mqObj._qMg = getMqManager(mqObj._qMg);                mqObj._q = null;            }            if ( mqObj._qMg !=null )                mqObj._q = getMqQ(mqObj._q);            if ( mqObj._q == null )            {                CloseMQConn(ref mqObj);            }            return mqObj;        }        public void CloseMQConn(ref MyMqObject mqObj)        {            if (mqObj._q != null)            {                try                {                    mqObj._q.Close();                }                catch{}                mqObj._q = null;            }            if (mqObj._qMg != null)            {                try                {                    mqObj._qMg.Disconnect();                }                catch{}                mqObj._qMg = null;            }        }        public MyMessage GetMessage()        {            myMqObj = GetMqObj(ref myMqObj);            if (myMqObj._qMg != null)            {                message = new MQMessage();                MyMessage msg = new MyMessage();                try                {                    myMqObj._q.Get(message, getMessageOptions);                    byte[] buff = message.ReadBytes(message.MessageLength);                    msg.MsgBody = System.Text.Encoding.GetEncoding(CodePageTrans.getMsCodePage(_charSet)).GetString(buff);                    msg.PutTime = message.PutDateTime.AddHours(8);                    message.ClearMessage();                    return msg;                }                catch (MQException mqe)                {                    Console.WriteLine("获取消息失败. 原因: " + mqe.ToString());                    CloseMQConn(ref myMqObj);                }            }            return null;        }        public bool Commit()        {            try            {                switch (_transactionType)                {                    case WMQTransactionType.NORMAL_TRANSACTION:                        myMqObj._qMg.Commit();                        break;                }            }            catch { return false; }            return true;        }        public bool Rollback()        {            try            {                switch (_transactionType)                {                    case WMQTransactionType.NORMAL_TRANSACTION:                        myMqObj._qMg.Backout();                        break;                }            }            catch { return false; }            return true;        }    }} 



Oracle使用 ODP.NET 驱动, 测试代码使用显示事务方式, 代码如下:
C# code
private void btnGetMessage_Click(object sender, EventArgs e){    DbProviderFactory dbFactory = DbProviderFactories.GetFactory("Oracle.DataAccess.Client");    DbConnection dbConn = dbFactory.CreateConnection();    dbConn.ConnectionString = sConnStr;    dbConn.Open();    using (CommittableTransaction transScope = new CommittableTransaction())    {        CommittableTransaction.Current = transScope;        dbConn.EnlistTransaction(transScope);        WMQClient_WithXA.MyMessage ss = mqGet.GetMessage();  // 从消息队列1中取出消息        if (ss != null)        {            MessageBox.Show(ss.MsgBody);            DbCommand ocmd = dbFactory.CreateCommand();            ocmd.CommandText = string.Format("insert into HY (NAME) VALUES  ('{0}')", Guid.NewGuid().ToString());            ocmd.Connection = dbConn;            ocmd.ExecuteNonQuery();            mqPut.PutMessage(ss.MsgBody);     // 将消息放入到队列2            transScope.Commit();              // 或者回滚事务        }        CommittableTransaction.Current = null;        dbConn.Close();    }}


完整DEMO, 免积分下载地址: http://download.csdn.net/detail/hyblusea/4529167


[解决办法]
查水表了

[解决办法]
查水表了
 
[解决办法]
全部是高手?
[解决办法]
真的是高手啊
[解决办法]
高手啊
[解决办法]
好东西呀
[解决办法]
好东东
[解决办法]
不错,楼主辛苦了
[解决办法]
高手啊
[解决办法]

你的命名看的很爽
[解决办法]
高手高手啊
[解决办法]
高手高手
[解决办法]
学习共享
[解决办法]
好一个实现
[解决办法]
楼主高手

[解决办法]
高点积分下东西,谢谢

热点排行