Active MQ (二) Java Application编程
1.准备工作
? 1)启动ActiveMQ
? 2)将%ActiveMQ_HOME%\activemq-all-5.1.0.jar导入对应的JAVA Project
?
2.对于MQ编程通常使用JMS编程,但是由于纯JAVA Application Project中不存在JNDI容器,所以还是在
? 创建ConnectionFactory时还是不得不与ActiveMQ API耦合
?
3.JMS体系结构
?? 
?
JMS有两种消息模式:
1)一对一/点到点 模式
2)预订/发布 模式
JMS编程主要的接口有:?
ConnectionFactory
Connection : 一般一个aplication中只有一个,线程安全,可复用。
Session
Queue
Topic
MessageProducer
MessageConsumer
MessageListener
?
4.实例
1)一对一/点到点
?
QueueReceiverListener.java?
?
package com.siyuan.jms;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
public class QueueReceiverListener implements MessageListener {
?
?private Connection connection;
?
?private String queueNameSendTo;
?
?/**
? * @return the connection
? */
?public Connection getConnection() {
??return connection;
?}
?/**
? * @param connection the connection to set
? */
?public void setConnection(Connection connection) {
??this.connection = connection;
?}
?/**
? * @return the queueNameSendTo
? */
?public String getQueueNameSendTo() {
??return queueNameSendTo;
?}
?/**
? * @param queueNameSendTo the queueNameSendTo to set
? */
?public void setQueueNameSendTo(String queueNameSendTo) {
??this.queueNameSendTo = queueNameSendTo;
?}
?public void onMessage(Message message) {
??if (message instanceof TextMessage) {
???try {
????System.out.println("Current time : " + new Date());
????System.out.println("Receive : " + ((TextMessage) message).getText());
????System.out.println("Receive JMSCorrelationID : " +? message.getJMSCorrelationID());
????
????Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
????Queue queueSendTo = session.createQueue(queueNameSendTo);
????MessageProducer sender = session.createProducer(queueSendTo);
????TextMessage messageToSend = session.createTextMessage();
????messageToSend.setText("Nice to meet u, too.");
????messageToSend.setJMSCorrelationID(message.getJMSCorrelationID());
????sender.send(messageToSend);
????System.out.println("Send : " + messageToSend.getText());
????System.out.println("Send JMSCorrelationID : " +? messageToSend.getJMSCorrelationID());
????System.out.println("--------------------------------------------");
????sender.close();
????session.close();
???} catch (JMSException e) {
????// TODO Auto-generated catch block
????e.printStackTrace();
???}
??}
?}
}
====================================================================
?
JMSQueueTest.java
?
package com.siyuan.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSQueueTest {
?
?public static String QUEUE_RECEIVE_FROM_NAME = "MY.TEST.SENDER";
?public static String QUEUE_SEND_TO_NAME = "MY.TEST.RECEIVER";
?
?/**
? * @param args
? * @throws JMSException
? */
?public static void main(String[] args) throws JMSException {
??
??ConnectionFactory connFactory
???= new ActiveMQConnectionFactory("tcp://localhost:61616");
??Connection connection = connFactory.createConnection();
??connection.start();
??
??Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??
??Queue queue = session.createQueue(QUEUE_RECEIVE_FROM_NAME);
??MessageConsumer messageProcessor = session.createConsumer(queue);
??QueueReceiverListener processor = new QueueReceiverListener();
??processor.setConnection(connection);
??processor.setQueueNameSendTo(QUEUE_SEND_TO_NAME);
??messageProcessor.setMessageListener(processor);
??
?}
}
?
说明:本例为模拟一消息接收处理器,即从一Queue中获取Req消息,处理后将
???????? Resp消息发送至另一Queue中,其中Req和Resp通过JMSCorrelationID
???????? 进行关联。相关的Req消息发送器将在ActiveMQ和Tomcat集成中在JSP中实现。
编程中需注意:(1) connection.start();
???????????????????? (2) 异步方式获取消息,不能connection.close();
?
2)预订/发布
?
JMSTopicTest.java
?
package com.siyuan.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSTopicTest {
?/**
? * @param args
? * @throws JMSException
? */
?public static void main(String[] args) throws JMSException {
??// TODO Auto-generated method stub
??ConnectionFactory connFactory
???= new ActiveMQConnectionFactory("tcp://localhost:61616");
??Connection connection = connFactory.createConnection();
??connection.start();
??
??Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??
??Topic topic = session.createTopic("MY.TEST.TOPIC");
? MessageConsumer subscriber = session.createConsumer(topic);
??MessageProducer publisher = session.createProducer(topic);
??publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);???
??TextMessage messageToPublish = session.createTextMessage();
??messageToPublish.setText("Hello all!");
??publisher.send(messageToPublish);
??
????Message messageFromSubscribe = subscriber.receive(3000);
??System.out.println(messageFromSubscribe);
??if (messageFromSubscribe instanceof TextMessage) {
???System.out.println(((TextMessage) messageFromSubscribe).getText());
??}
??publisher.close();
??subscriber.close();
??session.close();
??connection.close();
?}
}
?
编程中需注意:(1) subscriber定义必须在publisher定义之前
?????????????????????????????? 只能预订后才能收到发布的消息
?
?5.参考资料:
http://orange5458.iteye.com/admin/blogs/991311
JMS学习文档 PDF格式
?