ActiveMQ在C#中的使用
ActiveMQ在C#中的应用ActiveMQ是个好东东,不必多说。ActiveMQ提供多种语言支持,如Java, C, C++, C#, Ruby,
ActiveMQ在C#中的应用
ActiveMQ是个好东东,不必多说。ActiveMQ提供多种语言支持,如Java, C, C++, C#, Ruby, Perl, Python, PHP等。由于我在windows下开发GUI,比较关心C++和C#,其中C#的ActiveMQ很简单,Apache提供NMS(.Net Messaging Service)支持.Net开发,只需如下几个步骤即能建立简单的实现。C++的应用相对麻烦些,稍后写文章介绍。
1、去ActiveMQ官方网站下载最新版的ActiveMQ,网址:http://activemq.apache.org/download.html。我之前下的是5.3.1,5.3.2现在也已经出来了。
2、去ActiveMQ官方网站下载最新版的Apache.NMS,网址:http://activemq.apache.org/nms/download.html,需要下载Apache.NMS和Apache.NMS.ActiveMQ两个bin包,如果对源码感兴趣,也可下载src包。这里要提醒一下,如果下载1.2.0版本的NMS.ActiveMQ,Apache.NMS.ActiveMQ.dll在实际使用中有个bug,即停止ActiveMQ应用时会抛WaitOne函数异常,查看src包中的源码发现是由于Apache.NMS.ActiveMQ-1.2.0-src\src\main\csharp\Transport\InactivityMonitor.cs中的如下代码造成的,修改一下源码重新编译即可。看了一下最新版1.3.0已经修复了这个bug,因此下载最新版即可。
view plaincopy to clipboardprint?
- private?void?StopMonitorThreads()??
- ????????{??
- ????????????lock(monitor)??
- ????????????{??
- ????????????????if(monitorStarted.CompareAndSet(true,?false))??
- ????????????????{??
- ????????????????????AutoResetEvent?shutdownEvent?=?new?AutoResetEvent(false);??
- ??
- ????????????????????//?Attempt?to?wait?for?the?Timers?to?shutdown,?but?don't?wait??
- ????????????????????//?forever,?if?they?don't?shutdown?after?two?seconds,?just?quit.??
- ????????????????????this.readCheckTimer.Dispose(shutdownEvent);??
- ????????????????????shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));??
- ????????????????????this.writeCheckTimer.Dispose(shutdownEvent);??
- ????????????????????shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));??
- ??????????????????????
- ????????????????????????????????????????????????????//WaitOne的定义:public?virtual?bool?WaitOne(TimeSpan?timeout,bool?exitContext)??
- ??????????????????????
- ????????????????????this.asyncTasks.Shutdown();??
- ????????????????????this.asyncTasks?=?null;??
- ????????????????????this.asyncWriteTask?=?null;??
- ????????????????????this.asyncErrorTask?=?null;??
- ????????????????}??
- ????????????}??
- ????????}??
3、运行ActiveMQ,找到ActiveMQ解压后的bin文件夹:...\apache-activemq-5.3.1\bin,执行activemq.bat批处理文件即可启动ActiveMQ服务器,默认端口为61616,这可在配置文件中修改。
4、写C#程序实现ActiveMQ的简单应用。新建C#工程(一个Producter项目和一个Consumer项目),WinForm或Console程序均可,这里建的是Console工程,添加对Apache.NMS.dll和Apache.NMS.ActiveMQ.dll的引用,然后即可编写实现代码了,简单的Producer和Consumer实现代码如下:
producer:
view plaincopy to clipboardprint?
- using?System;??
- using?System.Collections.Generic;??
- using?System.Text;??
- using?Apache.NMS;??
- using?Apache.NMS.ActiveMQ;??
- using?System.IO;??
- using?System.Xml.Serialization;??
- using?System.Runtime.Serialization.Formatters.Binary;??
- ??
- namespace?Publish??
- {??
- ????class?Program??
- ????{??
- ????????static?void?Main(string[]?args)??
- ????????{??
- ????????????try??
- ????????????{??
- ????????????????//Create?the?Connection?Factory??
- ????????????????IConnectionFactory?factory?=?new?ConnectionFactory("tcp://localhost:61616/");??
- ????????????????using?(IConnection?connection?=?factory.CreateConnection())??
- ????????????????{??
- ????????????????????//Create?the?Session??
- ????????????????????using?(ISession?session?=?connection.CreateSession())??
- ????????????????????{??
- ????????????????????????//Create?the?Producer?for?the?topic/queue??
- ????????????????????????IMessageProducer?prod?=?session.CreateProducer(??
- ????????????????????????????new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));??
- ??????????????????????????
- ????????????????????????//Send?Messages??
- ????????????????????????int?i?=?0;??
- ??
- ????????????????????????while?(!Console.KeyAvailable)??
- ????????????????????????{??
- ????????????????????????????ITextMessage?msg?=?prod.CreateTextMessage();??
- ????????????????????????????msg.Text?=?i.ToString();??
- ????????????????????????????Console.WriteLine("Sending:?"?+?i.ToString());??
- ????????????????????????????prod.Send(msg,?Apache.NMS.MsgDeliveryMode.NonPersistent,?Apache.NMS.MsgPriority.Normal,?TimeSpan.MinValue);??
- ??
- ????????????????????????????System.Threading.Thread.Sleep(5000);??
- ????????????????????????????i++;??
- ????????????????????????}??
- ????????????????????}??
- ????????????????}??
- ??
- ????????????????Console.ReadLine();??
- ???????????}??
- ????????????catch?(System.Exception?e)??
- ????????????{??
- ????????????????Console.WriteLine("{0}",e.Message);??
- ????????????????Console.ReadLine();??
- ????????????}??
- ????????}??
- ????}??
- }??
consumer:
view plaincopy to clipboardprint?
- using?System;??
- using?System.Collections.Generic;??
- using?System.Text;??
- using?Apache.NMS;??
- using?Apache.NMS.ActiveMQ;??
- using?System.IO;??
- using?System.Xml.Serialization;??
- using?System.Runtime.Serialization.Formatters.Binary;??
- ??
- namespace?Subscribe??
- {??
- ????class?Program??
- ????{??
- ????????static?void?Main(string[]?args)??
- ????????{??
- ????????????try??
- ????????????{??
- ????????????????//Create?the?Connection?factory??
- ????????????????IConnectionFactory?factory?=?new?ConnectionFactory("tcp://localhost:61616/");??
- ??????????????????
- ????????????????//Create?the?connection??
- ????????????????using?(IConnection?connection?=?factory.CreateConnection())??
- ????????????????{??
- ????????????????????connection.ClientId?=?"testing?listener";??
- ????????????????????connection.Start();??
- ??
- ????????????????????//Create?the?Session??
- ????????????????????using?(ISession?session?=?connection.CreateSession())??
- ????????????????????{??
- ????????????????????????//Create?the?Consumer??
- ????????????????????????IMessageConsumer?consumer?=?session.CreateDurableConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"),