首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,下传了源码

2012-10-24 
【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,上传了源码环境需求:1. JDK 1.5 或者以上2. Apache A

【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,上传了源码
环境需求:

1. JDK 1.5 或者以上

2. Apache Ant, 在写本文时,用的是 Ant 1.7.1

3. ActiveMQ, 在写本文时,用的是 Apache ActiveMQ 5.4.1

技术需求:

1. JMS(Java Message Service)?

2. JNDI(Java Naming and Directory Interface)

?

在JMS的“发布/订阅(pub/sub)”模型中,消息的发布者(Publisher)通过主题(Topic)发布消息,订阅者(Subscriber)通过订阅主题获取消息。 一个主题可以同时有多个订阅者. 通过这种方式我们可以实现广播式(broadcast)消息。

为了更好的理解"发布/订阅(Pub/Sub)"模式,我在《Java消息服务器 第二版》上找到了一个很好的例子来说明他的使用。不过书上只提供了相关代码供我们理解,没有讲述整个创建过程,在这里打算记录下整个构建实例的过程:

?

1. ?创建项目目录入下图所示,并将activemq-all-*.jar 复制到项目的classpath中:

?

【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,下传了源码

?

2. 编写Chat代码:

?

?

public class Chat implements MessageListener {private TopicSession pubSession;private TopicPublisher pub;private TopicConnection conn;private String username;public Chat(String topicFactory, String topicName, String username)throws NamingException, JMSException {// 创建 JNDI contextInitialContext ctx = new InitialContext();//1. 创建 TopicConnectionFacotryTopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup(topicFactory);//2. 创建 TopicConnectionTopicConnection connection = factory.createTopicConnection();//3. 根据 Connection 创建 JMS 会话TopicSession pubSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);TopicSession subSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//4. 创建 TopicTopic topic = (Topic) ctx.lookup(topicName);//5. 创建 发布者 和 订阅者TopicPublisher pub = pubSession.createPublisher(topic);TopicSubscriber sub = subSession.createSubscriber(topic, null, true);//6. 为发布者设置消息监听sub.setMessageListener(this);this.conn = connection;this.pub = pub;this.pubSession = pubSession;this.username = username;//7. 开启JMS连接connection.start();}protected void writeMessage(String txt) {try {TextMessage message = pubSession.createTextMessage();message.setText(username + ": " + txt);pub.publish(message);} catch (JMSException e) {e.printStackTrace();}}public void onMessage(Message msg) {TextMessage txtMsg = (TextMessage) msg;try {System.out.println(txtMsg.getText());} catch (JMSException e) {e.printStackTrace();}}public void close() throws JMSException {this.conn.close();}public static void main(String[] args) throws NamingException,JMSException, IOException {if (args.length != 3) {System.out.println("Factory, Topic, or username missing");}Chat chat = new Chat(args[0], args[1], args[2]);BufferedReader cmd = new BufferedReader(new InputStreamReader(System.in));while (true) {String s = cmd.readLine();if (s.equalsIgnoreCase("exit")) {chat.close();System.exit(0);} else {chat.writeMessage(s);}}}}

?

?3.由于里我们使用了JNDI, 所以我们需要编辑jndi.properties。内容如下:

?

?

# START SNIPPET: jndijava.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory# use the following property to configure the default connectorjava.naming.provider.url = tcp://localhost:61616java.naming.security.principal=systemjava.naming.security.credentials=manager# use the following property to specify the JNDI name the connection factory# should appear as. #connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactryconnectionFactoryNames = topicConnectionFactry# register some queues in JNDI using the form# queue.[jndiName] = [physicalName]#queue.MyQueue = example.ChatQuetopic.chat = example.chat# register some topics in JNDI using the form# topic.[jndiName] = [physicalName]#topic.MyTopic = example.ChatTop# END SNIPPET: jndi

?

4. 到这里已经基本完成Chat 编码工作,使用如下指令即可运行这个示例:

?

<?xml version="1.0" encoding="utf-8" ?><project name="chat" default="run" basedir="."><property name="src.dir" value="src" /><property name="build.dir" value="build" /><property name="classes.dir" value="${build.dir}/classes" /><property name="jar.dir" value="${build.dir}/jar" /><property name="lib.dir" value="libs"/><!-- 设置main函数所在类 --><property name="main-class" value="com.dayang.jms.chat.Chat" /><!-- 定义classpath --><path id="classpath"><fileset dir="${lib.dir}" includes="**/*.jar" /></path><!-- 创建构建目录,用于存放构建生成的文件 --><target name="init"><mkdir dir="${build.dir}"/></target><!-- 编译 --><target name="compile" depends="init"><mkdir dir="${classes.dir}"/><javac srcdir="${src.dir}" destdir="${classes.dir}" classpathref="classpath"/><!-- copy properties file to classpath --><copy todir="${classes.dir}"><fileset dir="${src.dir}" excludes="**.*.jar" /></copy></target><!-- 打包 --><target name="jar" depends="compile"><mkdir dir="${jar.dir}"/><jar destfile="${jar.dir}/${ant.project.name}.jar" basedir="${classes.dir}"><manifest><attribute name="Main-Class" value="${main-class}" /></manifest></jar></target><!-- 运行client1 --><target name="run1" depends="jar"><java fork="true" classname="${main-class}"><arg value="topicConnectionFactry"/><arg value="chat"/><arg value="client1"/><classpath><path refid="classpath"/><path location="${jar.dir}/${ant.project.name}.jar"/></classpath></java></target><!-- 运行client2 --><target name="run2" depends="jar"><java fork="true" classname="${main-class}"><arg value="topicConnectionFactry"/><arg value="chat"/><arg value="client2"/><classpath><path refid="classpath"/><path location="${jar.dir}/${ant.project.name}.jar"/></classpath></java></target><target name="clean"><delete dir="${build.dir}"/></target><target name="clean-build" depends="clean,jar"/></project>?

?

6. 打开两个控制台窗口,分别使用ant run1 和 ant run2 指令来运行程序, 如果成功我们将看到如下结果:


【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,下传了源码

?

写在最后:

这个示例仅仅简单的说了JMS 发布/订阅 API的基本使用,更多特性需要在以后的使用中进行摸索。

发布/订阅 除了能够提供“1对多”的消息专递方式之外,还提供了消息持久化的特性。他允许订阅者在上线后接收离线时的消息,关于这部分特性,以及“发布/订阅”的应用场景打算在以后的文章中慢慢阐述。

参考资料:

1. JMS:http://baike.baidu.com/view/157103.htm

2. ActiveMQ: http://baike.baidu.com/view/433374.htm

3. JNDIhttp://baike.baidu.com/view/209575.htm

4. 《Java消息服务器 第二版》

?

5. Ant Manualhttp://ant.apache.org/manual/index.html

?

2011-05-18: 新增加了Demo的源代码, 需要的可以下载附件JSMDemo.rar

JMSDemo工程目录结构如下:


【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,下传了源码

?

?

1 楼 androidleader 2010-12-08   好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡 2 楼 GRDJE 2010-12-09   androidleader 写道好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡
人家就是写个helloworld的demo,
nc啊问这些问题..... 3 楼 liubey 2010-12-09   GRDJE 写道androidleader 写道好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡
人家就是写个helloworld的demo,
nc啊问这些问题.....
+1 4 楼 androidleader 2010-12-09   liubey 写道GRDJE 写道androidleader 写道好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡
人家就是写个helloworld的demo,
nc啊问这些问题.....
+1
我只是希望有路过的高手出来指点一二

nc的几位,激动个啥。。。 5 楼 witcheryne 2010-12-09   androidleader 写道好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ


6 楼 androidleader 2010-12-09   witcheryne 写道androidleader 写道好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠; 7 楼 witcheryne 2010-12-09   androidleader 写道witcheryne 写道androidleader 写道好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠;


这个你了解的比我多,性能测试方面我不太清楚, 关于测试方法希望你能分享一下。

我用JMeter测ActiveMQ 5.4.1, 开500个线程,1秒间隔, 循环 10 次~ 没发现什么异常...
我们用ActiveMQ主要目的是代替原先的SocketServer,将消息传递独立出来,解决C/S和B/S应用集成的问题。

需要高并发·你试试 ZeroMQ : http://www.infoq.com/cn/news/2010/09/introduction-zero-mq,
基于AMQP协议,用Erlang写的RabbitMQ你也可以试试: http://www.infoq.com/cn/articles/AMQP-RabbitMQ



8 楼 grady 2010-12-09   jms--Java Message Service lz貌似写错了 9 楼 witcheryne 2010-12-09   grady 写道jms--Java Message Service lz貌似写错了
多谢~ 立刻纠正... 10 楼 潜心修炼 2010-12-09   witcheryne 写道androidleader 写道witcheryne 写道androidleader 写道好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠;


这个你了解的比我多,性能测试方面我不太清楚, 关于测试方法希望你能分享一下。

我用JMeter测ActiveMQ 5.4.1, 开500个线程,1秒间隔, 循环 10 次~ 没发现什么异常...
我们用ActiveMQ主要目的是代替原先的SocketServer,将消息传递独立出来,解决C/S和B/S应用集成的问题。

需要高并发·你试试 ZeroMQ : http://www.infoq.com/cn/news/2010/09/introduction-zero-mq,
基于AMQP协议,用Erlang写的RabbitMQ你也可以试试: http://www.infoq.com/cn/articles/AMQP-RabbitMQ





可以参考一下HornetQ。
1.支持集群
2.支持主备,高可用
3.性能好,吞吐量能在5000+/秒。(跟环境相关),不过官方说能到几万
4.支持STOMP协议
还有其他优势,不过也有不足,主备的方式不是很让人满意,不过还是有别的办法来进行补足。
11 楼 witcheryne 2010-12-09   潜心修炼 写道witcheryne 写道androidleader 写道witcheryne 写道androidleader 写道好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠;


这个你了解的比我多,性能测试方面我不太清楚, 关于测试方法希望你能分享一下。

我用JMeter测ActiveMQ 5.4.1, 开500个线程,1秒间隔, 循环 10 次~ 没发现什么异常...
我们用ActiveMQ主要目的是代替原先的SocketServer,将消息传递独立出来,解决C/S和B/S应用集成的问题。

需要高并发·你试试 ZeroMQ : http://www.infoq.com/cn/news/2010/09/introduction-zero-mq,
基于AMQP协议,用Erlang写的RabbitMQ你也可以试试: http://www.infoq.com/cn/articles/AMQP-RabbitMQ





可以参考一下HornetQ。
1.支持集群
2.支持主备,高可用
3.性能好,吞吐量能在5000+/秒。(跟环境相关),不过官方说能到几万
4.支持STOMP协议
还有其他优势,不过也有不足,主备的方式不是很让人满意,不过还是有别的办法来进行补足。


支持集群就够了,还要主备?? 这个有点不理解....

ActiveMQ 对 C++ 和 Ajax 都提供了客户端实现,这个比较符合我们的需求...

如果纯java平台,HornetQ 感觉不错~ 文档很全面



12 楼 潜心修炼 2010-12-13   1.HornetQ也提供了对其他语言的支持,只不过需要通过其他的协议进行通信,比如STOMP
2.如果不是集群的话,主备是不是就有用了,呵呵

热点排行