首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

ActiveMQ实战之 Topic公布订阅消息【转】

2012-12-18 
ActiveMQ实战之 Topic发布订阅消息【转】对于此类消息,其实就是指使用JMS中的发布订阅消息模型的消息,下面是

ActiveMQ实战之 Topic发布订阅消息【转】

对于此类消息,其实就是指使用JMS中的发布订阅消息模型的消息,下面是一个简单的例子

消息发布者

view plaincopy to clipboardprint?
  1. package?com.googlecode.garbagecan.jmsstudy.activemq.topic;????
  2. import?org.apache.activemq.ActiveMQConnectionFactory;????
  3. import?javax.jms.*;????
  4. public?class?TopicPublisher?{??????public?static?void?main(String[]?args)?throws?JMSException?{??
  5. ????????ActiveMQConnectionFactory?factory?=?new?ActiveMQConnectionFactory("tcp://localhost:61616");??????????Connection?connection?=?factory.createConnection();??
  6. ????????connection.start();????????????
  7. ????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??????????Topic?topic?=?session.createTopic("myTopic.messages");??
  8. ??????????MessageProducer?producer?=?session.createProducer(topic);??
  9. ????????producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);????
  10. ????????while(true)?{??????????????TextMessage?message?=?session.createTextMessage();??
  11. ????????????message.setText("message_"?+?System.currentTimeMillis());??????????????producer.send(message);??
  12. ????????????System.out.println("Sent?message:?"?+?message.getText());????
  13. ????????????try?{??????????????????Thread.sleep(1000);??
  14. ????????????}?catch?(InterruptedException?e)?{??????????????????e.printStackTrace();??
  15. ????????????}??????????}??
  16. ??//??????session.close();??
  17. //??????connection.stop();??//??????connection.close();??
  18. ????}??}??

消息订阅者(消息消费者)

view plaincopy to clipboardprint?
  1. package?com.googlecode.garbagecan.jmsstudy.activemq.topic;????
  2. import?org.apache.activemq.ActiveMQConnectionFactory;????
  3. import?javax.jms.*;????
  4. public?class?TopicSubscriber?{??????public?static?void?main(String[]?args)?throws?JMSException?{??
  5. ????????ActiveMQConnectionFactory?factory?=?new?ActiveMQConnectionFactory("tcp://localhost:61616");??????????Connection?connection?=?factory.createConnection();??
  6. ????????connection.start();????????????
  7. ????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??????????Topic?topic?=?session.createTopic("myTopic.messages");??
  8. ??????????MessageConsumer?consumer?=?session.createConsumer(topic);??
  9. ????????consumer.setMessageListener(new?MessageListener()?{??????????????public?void?onMessage(Message?message)?{??
  10. ????????????????TextMessage?tm?=?(TextMessage)?message;??????????????????try?{??
  11. ????????????????????System.out.println("Received?message:?"?+?tm.getText());??????????????????}?catch?(JMSException?e)?{??
  12. ????????????????????e.printStackTrace();??????????????????}??
  13. ????????????}??????????});??
  14. //??????session.close();??//??????connection.stop();??
  15. //??????connection.close();??????}??
  16. }??

分别运行两个类,就可以看到Publisher发布的类,Subscriber都可以接受到。

热点排行