ActiveMQ实战之 Topic公布订阅消息【转】
ActiveMQ实战之 Topic发布订阅消息【转】对于此类消息,其实就是指使用JMS中的发布订阅消息模型的消息,下面是
ActiveMQ实战之 Topic发布订阅消息【转】
对于此类消息,其实就是指使用JMS中的发布订阅消息模型的消息,下面是一个简单的例子
消息发布者
view plaincopy to clipboardprint?
- package?com.googlecode.garbagecan.jmsstudy.activemq.topic;????
- import?org.apache.activemq.ActiveMQConnectionFactory;????
- import?javax.jms.*;????
- public?class?TopicPublisher?{??????public?static?void?main(String[]?args)?throws?JMSException?{??
- ????????ActiveMQConnectionFactory?factory?=?new?ActiveMQConnectionFactory("tcp://localhost:61616");??????????Connection?connection?=?factory.createConnection();??
- ????????connection.start();????????????
- ????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??????????Topic?topic?=?session.createTopic("myTopic.messages");??
- ??????????MessageProducer?producer?=?session.createProducer(topic);??
- ????????producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);????
- ????????while(true)?{??????????????TextMessage?message?=?session.createTextMessage();??
- ????????????message.setText("message_"?+?System.currentTimeMillis());??????????????producer.send(message);??
- ????????????System.out.println("Sent?message:?"?+?message.getText());????
- ????????????try?{??????????????????Thread.sleep(1000);??
- ????????????}?catch?(InterruptedException?e)?{??????????????????e.printStackTrace();??
- ????????????}??????????}??
- ??//??????session.close();??
- //??????connection.stop();??//??????connection.close();??
- ????}??}??
消息订阅者(消息消费者)
view plaincopy to clipboardprint?
- package?com.googlecode.garbagecan.jmsstudy.activemq.topic;????
- import?org.apache.activemq.ActiveMQConnectionFactory;????
- import?javax.jms.*;????
- public?class?TopicSubscriber?{??????public?static?void?main(String[]?args)?throws?JMSException?{??
- ????????ActiveMQConnectionFactory?factory?=?new?ActiveMQConnectionFactory("tcp://localhost:61616");??????????Connection?connection?=?factory.createConnection();??
- ????????connection.start();????????????
- ????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??????????Topic?topic?=?session.createTopic("myTopic.messages");??
- ??????????MessageConsumer?consumer?=?session.createConsumer(topic);??
- ????????consumer.setMessageListener(new?MessageListener()?{??????????????public?void?onMessage(Message?message)?{??
- ????????????????TextMessage?tm?=?(TextMessage)?message;??????????????????try?{??
- ????????????????????System.out.println("Received?message:?"?+?tm.getText());??????????????????}?catch?(JMSException?e)?{??
- ????????????????????e.printStackTrace();??????????????????}??
- ????????????}??????????});??
- //??????session.close();??//??????connection.stop();??
- //??????connection.close();??????}??
- }??
分别运行两个类,就可以看到Publisher发布的类,Subscriber都可以接受到。