使用 ActiveMQ 示范
使用 ActiveMQ 示例企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索
使用 ActiveMQ 示例
企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索引。
在 Java 里有 JMS 的多个实现。其中 apache 下的 ActiveMQ 就是不错的选择。还有一个比较热的是 RabbitMQ (是 erlang 语言实现的)。这里示例下使用 ActiveMQ
用 ActiveMQ 最好还是了解下 JMS
JMS 公共点对点域发布/订阅域ConnectionFactoryQueueConnectionFactoryTopicConnectionFactoryConnectionQueueConnectionTopicConnectionDestinationQueueTopicSessionQueueSessionTopicSessionMessageProducerQueueSenderTopicPublisherMessageConsumerQueueReceiverTopicSubscriber
JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。
ConnectionFactory 是连接工厂,负责创建Connection。
Connection 负责创建 Session。
Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。
Destination 是消息的目的地。
详细的可以网上找些 JMS 规范(有中文版)。
下载 apache-activemq-5.3.0。http://activemq.apache.org/download.html, 解压,然后双击 bin/activemq.bat。运行后,可以在?http://localhost:8161/admin?观察。也有 demo,http://localhost:8161/demo。 把 activemq-all-5.3.0.jar 加入 classpath。
Jms 发送 代码:
- public?static?void?main(String[]?args)?throws?Exception?{??
- ????ConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();??
- ??
- ????Connection?connection?=?connectionFactory.createConnection();??
- ????connection.start();??
- ??
- ????Session?session?=?connection.createSession(Boolean.TRUE,?Session.AUTO_ACKNOWLEDGE);??
- ????Destination?destination?=?session.createQueue("my-queue");??
- ??
- ????MessageProducer?producer?=?session.createProducer(destination);??
- ????for(int?i=0;?i<3;?i++)?{??
- ????????MapMessage?message?=?session.createMapMessage();??
- ????????message.setLong("count",?new?Date().getTime());??
- ????????Thread.sleep(1000);??
- ????????//通过消息生产者发出消息??
- ????????producer.send(message);??
- ????}??
- ????session.commit();??
- ????session.close();??
- ????connection.close();??
- }??
Jms 接收代码:
- public?static?void?main(String[]?args)?throws?Exception?{??
- ????ConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();??
- ??
- ????Connection?connection?=?connectionFactory.createConnection();??
- ????connection.start();??
- ??
- ????final?Session?session?=?connection.createSession(Boolean.TRUE,?Session.AUTO_ACKNOWLEDGE);??
- ????Destination?destination?=?session.createQueue("my-queue");??
- ??
- ????MessageConsumer?consumer?=?session.createConsumer(destination);??
- ????/*//listener?方式?
- ????consumer.setMessageListener(new?MessageListener()?{?
- ?
- ????????public?void?onMessage(Message?msg)?{?
- ????????????MapMessage?message?=?(MapMessage)?msg;?
- ????????????//TODO?something....?
- ????????????System.out.println(" 收到消息:"?+?new?Date(message.getLong("count")));?
- ????????????session.commit();?
- ????????}?
- ?
- ????});?
- ????Thread.sleep(30000);?
- ????*/??
- ????int?i=0;??
- ????while(i<3)?{??
- ????????i++;??
- ????????MapMessage?message?=?(MapMessage)?consumer.receive();??
- ????????session.commit();??
- ??
- ????????//TODO?something....??
- ????????System.out.println("收到消 息:"?+?new?Date(message.getLong("count")));??
- ????}??
- ??
- ????session.close();??
- ????connection.close();??
- }??
启动 JmsReceiver 和 JmsSender 可以在看输出三条时间信息。当然 Jms 还指定有其它格式的数据,如 TextMessage
结合 Spring 的 JmsTemplate 方便用:
xml:
- <?xml?version="1.0"?encoding="UTF-8"?>??
- <beans?xmlns="http://www.springframework.org/schema/beans"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"??
- ????????xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">??
- ??
- <!--?在非?web?/?ejb?容器中使用?pool?时,要手动?stop,spring?不会为 你执行?destroy-method?的方法??
- ????<bean?id="jmsFactory"?class="org.apache.activemq.pool.PooledConnectionFactory"?destroy-method="stop">??
- ????????<property?name="connectionFactory">??
- ????????????<bean?class="org.apache.activemq.ActiveMQConnectionFactory">??
- ????????????????<property?name="brokerURL"?value="tcp://localhost:61616"?/>??
- ????????????</bean>??
- ????????</property>??
- ????</bean>??
- -->??
- ????<bean?id="jmsFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">??
- ????????<property?name="brokerURL"?value="tcp://localhost:61616"?/>??
- ????</bean>??
- ????<bean?id="jmsTemplate"?class="org.springframework.jms.core.JmsTemplate">??
- ????????<property?name="connectionFactory"?ref="jmsFactory"?/>??
- ????????<property?name="defaultDestination"?ref="destination"?/>??
- ????????<property?name="messageConverter">??
- ????????????<bean?class="org.springframework.jms.support.converter.SimpleMessageConverter"?/>??
- ????????</property>??
- ????</bean>??
- ??
- ????<bean?id="destination"?class="org.apache.activemq.command.ActiveMQQueue">??
- ????????<constructor-arg?index="0"?value="my-queue"?/>??
- ????</bean>??
- ??
- </beans>??
sender:
- public?static?void?main(String[]?args)?{??
- ????ApplicationContext?ctx?=?new?FileSystemXmlApplicationContext("classpath:app*.xml");??
- ??
- ????JmsTemplate?jmsTemplate?=?(JmsTemplate)?ctx.getBean("jmsTemplate");??
- ??
- ????jmsTemplate.send(new?MessageCreator()?{??
- ??
- ????????public?Message?createMessage(Session?session)?throws?JMSException?{??
- ????????????MapMessage?mm?=?session.createMapMessage();??
- ????????????mm.setLong("count",?new?Date().getTime());??
- ????????????return?mm;??
- ????????}??
- ??
- ????});??
- }??
receiver:
- public?static?void?main(String[]?args)?{??
- ????ApplicationContext?ctx?=?new?FileSystemXmlApplicationContext("classpath:app*.xml");??
- ??
- ????JmsTemplate?jmsTemplate?=?(JmsTemplate)?ctx.getBean("jmsTemplate");??
- ????while(true)?{??
- ????????Map<String,?Object>?mm?=??(Map<String,?Object>)?jmsTemplate.receiveAndConvert();??
- ????????System.out.println("收到消 息:"?+?new?Date((Long)mm.get("count")));??
- ????}??
- }??
注意:直接用 Jms 接口时接收了消息后要提交一下,否则下次启动接收者时还可以收到旧数据。有了 JmsTemplate 就不用自己提交 session.commit() 了。如果使用了 PooledConnectionFactory 要把 apache-activemq-5.3.0\lib\optional\activemq-pool-5.3.0.jar 加到 classpath