【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,上传了源码
环境需求:
1. JDK 1.5 或者以上
2. Apache Ant, 在写本文时,用的是 Ant 1.7.1
3. ActiveMQ, 在写本文时,用的是 Apache ActiveMQ 5.4.1
技术需求:1. JMS(Java Message Service)?
2. JNDI(Java Naming and Directory Interface)
?
在JMS的“发布/订阅(pub/sub)”模型中,消息的发布者(Publisher)通过主题(Topic)发布消息,订阅者(Subscriber)通过订阅主题获取消息。 一个主题可以同时有多个订阅者. 通过这种方式我们可以实现广播式(broadcast)消息。
为了更好的理解"发布/订阅(Pub/Sub)"模式,我在《Java消息服务器 第二版》上找到了一个很好的例子来说明他的使用。不过书上只提供了相关代码供我们理解,没有讲述整个创建过程,在这里打算记录下整个构建实例的过程:
?
1. ?创建项目目录入下图所示,并将activemq-all-*.jar 复制到项目的classpath中:
?

?
2. 编写Chat代码:
?
?
public class Chat implements MessageListener {private TopicSession pubSession;private TopicPublisher pub;private TopicConnection conn;private String username;public Chat(String topicFactory, String topicName, String username)throws NamingException, JMSException {// 创建 JNDI contextInitialContext ctx = new InitialContext();//1. 创建 TopicConnectionFacotryTopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup(topicFactory);//2. 创建 TopicConnectionTopicConnection connection = factory.createTopicConnection();//3. 根据 Connection 创建 JMS 会话TopicSession pubSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);TopicSession subSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//4. 创建 TopicTopic topic = (Topic) ctx.lookup(topicName);//5. 创建 发布者 和 订阅者TopicPublisher pub = pubSession.createPublisher(topic);TopicSubscriber sub = subSession.createSubscriber(topic, null, true);//6. 为发布者设置消息监听sub.setMessageListener(this);this.conn = connection;this.pub = pub;this.pubSession = pubSession;this.username = username;//7. 开启JMS连接connection.start();}protected void writeMessage(String txt) {try {TextMessage message = pubSession.createTextMessage();message.setText(username + ": " + txt);pub.publish(message);} catch (JMSException e) {e.printStackTrace();}}public void onMessage(Message msg) {TextMessage txtMsg = (TextMessage) msg;try {System.out.println(txtMsg.getText());} catch (JMSException e) {e.printStackTrace();}}public void close() throws JMSException {this.conn.close();}public static void main(String[] args) throws NamingException,JMSException, IOException {if (args.length != 3) {System.out.println("Factory, Topic, or username missing");}Chat chat = new Chat(args[0], args[1], args[2]);BufferedReader cmd = new BufferedReader(new InputStreamReader(System.in));while (true) {String s = cmd.readLine();if (s.equalsIgnoreCase("exit")) {chat.close();System.exit(0);} else {chat.writeMessage(s);}}}}?
?3.由于里我们使用了JNDI, 所以我们需要编辑jndi.properties。内容如下:
?
?
# START SNIPPET: jndijava.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory# use the following property to configure the default connectorjava.naming.provider.url = tcp://localhost:61616java.naming.security.principal=systemjava.naming.security.credentials=manager# use the following property to specify the JNDI name the connection factory# should appear as. #connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactryconnectionFactoryNames = topicConnectionFactry# register some queues in JNDI using the form# queue.[jndiName] = [physicalName]#queue.MyQueue = example.ChatQuetopic.chat = example.chat# register some topics in JNDI using the form# topic.[jndiName] = [physicalName]#topic.MyTopic = example.ChatTop# END SNIPPET: jndi
?
4. 到这里已经基本完成Chat 编码工作,使用如下指令即可运行这个示例:
?
<?xml version="1.0" encoding="utf-8" ?><project name="chat" default="run" basedir="."><property name="src.dir" value="src" /><property name="build.dir" value="build" /><property name="classes.dir" value="${build.dir}/classes" /><property name="jar.dir" value="${build.dir}/jar" /><property name="lib.dir" value="libs"/><!-- 设置main函数所在类 --><property name="main-class" value="com.dayang.jms.chat.Chat" /><!-- 定义classpath --><path id="classpath"><fileset dir="${lib.dir}" includes="**/*.jar" /></path><!-- 创建构建目录,用于存放构建生成的文件 --><target name="init"><mkdir dir="${build.dir}"/></target><!-- 编译 --><target name="compile" depends="init"><mkdir dir="${classes.dir}"/><javac srcdir="${src.dir}" destdir="${classes.dir}" classpathref="classpath"/><!-- copy properties file to classpath --><copy todir="${classes.dir}"><fileset dir="${src.dir}" excludes="**.*.jar" /></copy></target><!-- 打包 --><target name="jar" depends="compile"><mkdir dir="${jar.dir}"/><jar destfile="${jar.dir}/${ant.project.name}.jar" basedir="${classes.dir}"><manifest><attribute name="Main-Class" value="${main-class}" /></manifest></jar></target><!-- 运行client1 --><target name="run1" depends="jar"><java fork="true" classname="${main-class}"><arg value="topicConnectionFactry"/><arg value="chat"/><arg value="client1"/><classpath><path refid="classpath"/><path location="${jar.dir}/${ant.project.name}.jar"/></classpath></java></target><!-- 运行client2 --><target name="run2" depends="jar"><java fork="true" classname="${main-class}"><arg value="topicConnectionFactry"/><arg value="chat"/><arg value="client2"/><classpath><path refid="classpath"/><path location="${jar.dir}/${ant.project.name}.jar"/></classpath></java></target><target name="clean"><delete dir="${build.dir}"/></target><target name="clean-build" depends="clean,jar"/></project>??
6. 打开两个控制台窗口,分别使用ant run1 和 ant run2 指令来运行程序, 如果成功我们将看到如下结果:

?
写在最后:这个示例仅仅简单的说了JMS 发布/订阅 API的基本使用,更多特性需要在以后的使用中进行摸索。
发布/订阅 除了能够提供“1对多”的消息专递方式之外,还提供了消息持久化的特性。他允许订阅者在上线后接收离线时的消息,关于这部分特性,以及“发布/订阅”的应用场景打算在以后的文章中慢慢阐述。
参考资料:1. JMS:http://baike.baidu.com/view/157103.htm
2. ActiveMQ: http://baike.baidu.com/view/433374.htm
3. JNDIhttp://baike.baidu.com/view/209575.htm
4. 《Java消息服务器 第二版》
?
5. Ant Manualhttp://ant.apache.org/manual/index.html
?
2011-05-18: 新增加了Demo的源代码, 需要的可以下载附件JSMDemo.rar
JMSDemo工程目录结构如下:
?
?
1 楼 androidleader 2010-12-08 好贴,就是有点简单。