ActiveMQ的一个简单示例
最近由于公司项目需要,开始学习JMS,用的是ActiveMQ。由于这方面网上的例子不是很多,而且有的也不完整。于是经过几天的摸索学习,写了一个简单的小例子,现在贴出来与大家分享。
ProducerTool.java用于发送消息:
?
package?homework;
import?javax.jms.Connection;
import?javax.jms.DeliveryMode;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.MessageProducer;
import?javax.jms.Session;
import?javax.jms.TextMessage;
import?org.apache.activemq.ActiveMQConnection;
import?org.apache.activemq.ActiveMQConnectionFactory;
public?class?ProducerTool?...{
?private?String?user?=?ActiveMQConnection.DEFAULT_USER;
?private?String?password?=?ActiveMQConnection.DEFAULT_PASSWORD;
?private?String?url?=?ActiveMQConnection.DEFAULT_BROKER_URL;
?private?String?subject?=?"TOOL.DEFAULT";
?private?Destination?destination?=?null;
?private?Connection?connection?=?null;
?private?Session?session?=?null;
?private?MessageProducer?producer?=?null;
?//?初始化
?private?void?initialize()?throws?JMSException,?Exception?...{
??ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory(
????user,?password,?url);
??connection?=?connectionFactory.createConnection();
??session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
??destination?=?session.createQueue(subject);
??producer?=?session.createProducer(destination);
??producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
?}
?//?发送消息
?public?void?produceMessage(String?message)?throws?JMSException,?Exception?...{
??initialize();
??TextMessage?msg?=?session.createTextMessage(message);
??connection.start();
??System.out.println("Producer:->Sending?message:?"?+?message);
??producer.send(msg);
??System.out.println("Producer:->Message?sent?complete!");
?}
?//?关闭连接
?public?void?close()?throws?JMSException?...{
??System.out.println("Producer:->Closing?connection");
??if?(producer?!=?null)
???producer.close();
??if?(session?!=?null)
???session.close();
??if?(connection?!=?null)
???connection.close();
?}
}

?
ConsumerTool.java用于接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。
package?homework;
import?javax.jms.Connection;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.MessageConsumer;
import?javax.jms.Session;
import?javax.jms.MessageListener;
import?javax.jms.Message;
import?javax.jms.TextMessage;
import?org.apache.activemq.ActiveMQConnection;
import?org.apache.activemq.ActiveMQConnectionFactory;
public?class?ConsumerTool?implements?MessageListener?...{
?private?String?user?=?ActiveMQConnection.DEFAULT_USER;
?private?String?password?=?ActiveMQConnection.DEFAULT_PASSWORD;
?private?String?url?=?ActiveMQConnection.DEFAULT_BROKER_URL;
?private?String?subject?=?"TOOL.DEFAULT";
?private?Destination?destination?=?null;
?private?Connection?connection?=?null;
?private?Session?session?=?null;
?private?MessageConsumer?consumer?=?null;
?//?初始化
?private?void?initialize()?throws?JMSException,?Exception?...{
??ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory(
????user,?password,?url);
??connection?=?connectionFactory.createConnection();
??session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
??destination?=?session.createQueue(subject);
??consumer?=?session.createConsumer(destination);
??
?}
?//?消费消息
?public?void?consumeMessage()?throws?JMSException,?Exception?...{
??initialize();
??connection.start();
??
??System.out.println("Consumer:->Begin?listening...");
??//?开始监听
??consumer.setMessageListener(this);
??//?Message?message?=?consumer.receive();
?}
?//?关闭连接
?public?void?close()?throws?JMSException?...{
??System.out.println("Consumer:->Closing?connection");
??if?(consumer?!=?null)
???consumer.close();
??if?(session?!=?null)
???session.close();
??if?(connection?!=?null)
???connection.close();
?}
?//?消息处理函数
?public?void?onMessage(Message?message)?...{
??try?...{
???if?(message?instanceof?TextMessage)?...{
????TextMessage?txtMsg?=?(TextMessage)?message;
????String?msg?=?txtMsg.getText();
????System.out.println("Consumer:->Received:?"?+?msg);
???}?else?...{
????System.out.println("Consumer:->Received:?"?+?message);
???}
??}?catch?(JMSException?e)?...{
???//?TODO?Auto-generated?catch?block
???e.printStackTrace();
??}
?}
}

下面是测试类Test.java:
?
package?homework;
import?javax.jms.JMSException;
public?class?Test?...{
?/**?*//**
??*?@param?args
??*/
?public?static?void?main(String[]?args)?throws?JMSException,?Exception?...{
??//?TODO?Auto-generated?method?stub
??ConsumerTool?consumer?=?new?ConsumerTool();
??ProducerTool?producer?=?new?ProducerTool();
??//?开始监听
??consumer.consumeMessage();
??
??//?延时500毫秒之后发送消息
??Thread.sleep(500);
??producer.produceMessage("Hello,?world!");
??producer.close();
??
??//?延时500毫秒之后停止接受消息
??Thread.sleep(500);
??consumer.close();
?}
}
?
以上就是我学习ActiveMQ之后写的一个简单的例子,希望对你有帮助,如果有什么错误还请指正。?