首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

Active MQ (2) Java Application编程

2012-12-21 
Active MQ (二)Java Application编程1.准备工作? 1)启动ActiveMQ? 2)将%ActiveMQ_HOME%\activemq-all-5.1.

Active MQ (二) Java Application编程

1.准备工作

? 1)启动ActiveMQ

? 2)将%ActiveMQ_HOME%\activemq-all-5.1.0.jar导入对应的JAVA Project

?

2.对于MQ编程通常使用JMS编程,但是由于纯JAVA Application Project中不存在JNDI容器,所以还是在

? 创建ConnectionFactory时还是不得不与ActiveMQ API耦合

?

3.JMS体系结构

??
Active MQ (2)  Java Application编程

?

JMS有两种消息模式:

1)一对一/点到点 模式

2)预订/发布 模式


JMS编程主要的接口有:?

ConnectionFactory

Connection : 一般一个aplication中只有一个,线程安全,可复用。

Session

Queue

Topic

MessageProducer

MessageConsumer

MessageListener

?

4.实例

1)一对一/点到点

?

QueueReceiverListener.java?

?

package com.siyuan.jms;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

public class QueueReceiverListener implements MessageListener {
?
?private Connection connection;
?
?private String queueNameSendTo;
?
?/**
? * @return the connection
? */
?public Connection getConnection() {
??return connection;
?}

?/**
? * @param connection the connection to set
? */
?public void setConnection(Connection connection) {
??this.connection = connection;
?}

?/**
? * @return the queueNameSendTo
? */
?public String getQueueNameSendTo() {
??return queueNameSendTo;
?}

?/**
? * @param queueNameSendTo the queueNameSendTo to set
? */
?public void setQueueNameSendTo(String queueNameSendTo) {
??this.queueNameSendTo = queueNameSendTo;
?}

?public void onMessage(Message message) {
??if (message instanceof TextMessage) {
???try {
????System.out.println("Current time : " + new Date());
????System.out.println("Receive : " + ((TextMessage) message).getText());
????System.out.println("Receive JMSCorrelationID : " +? message.getJMSCorrelationID());
????
????Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
????Queue queueSendTo = session.createQueue(queueNameSendTo);
????MessageProducer sender = session.createProducer(queueSendTo);
????TextMessage messageToSend = session.createTextMessage();
????messageToSend.setText("Nice to meet u, too.");
????messageToSend.setJMSCorrelationID(message.getJMSCorrelationID());
????sender.send(messageToSend);
????System.out.println("Send : " + messageToSend.getText());
????System.out.println("Send JMSCorrelationID : " +? messageToSend.getJMSCorrelationID());
????System.out.println("--------------------------------------------");
????sender.close();
????session.close();
???} catch (JMSException e) {
????// TODO Auto-generated catch block
????e.printStackTrace();
???}
??}
?}

}

====================================================================

?

JMSQueueTest.java

?

package com.siyuan.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSQueueTest {
?
?public static String QUEUE_RECEIVE_FROM_NAME = "MY.TEST.SENDER";

?public static String QUEUE_SEND_TO_NAME = "MY.TEST.RECEIVER";
?
?/**
? * @param args
? * @throws JMSException
? */
?public static void main(String[] args) throws JMSException {
??
??ConnectionFactory connFactory
???= new ActiveMQConnectionFactory("tcp://localhost:61616");
??Connection connection = connFactory.createConnection();
??connection.start();
??
??Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??
??Queue queue = session.createQueue(QUEUE_RECEIVE_FROM_NAME);
??MessageConsumer messageProcessor = session.createConsumer(queue);
??QueueReceiverListener processor = new QueueReceiverListener();
??processor.setConnection(connection);
??processor.setQueueNameSendTo(QUEUE_SEND_TO_NAME);
??messageProcessor.setMessageListener(processor);
??
?}

}

?

说明:本例为模拟一消息接收处理器,即从一Queue中获取Req消息,处理后将

???????? Resp消息发送至另一Queue中,其中Req和Resp通过JMSCorrelationID

???????? 进行关联。相关的Req消息发送器将在ActiveMQ和Tomcat集成中在JSP中实现。

编程中需注意:(1) connection.start();
???????????????????? (2) 异步方式获取消息,不能connection.close();

?

2)预订/发布

?

JMSTopicTest.java

?

package com.siyuan.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSTopicTest {

?/**
? * @param args
? * @throws JMSException
? */
?public static void main(String[] args) throws JMSException {
??// TODO Auto-generated method stub
??ConnectionFactory connFactory
???= new ActiveMQConnectionFactory("tcp://localhost:61616");
??Connection connection = connFactory.createConnection();
??connection.start();
??
??Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??
??Topic topic = session.createTopic("MY.TEST.TOPIC");

? MessageConsumer subscriber = session.createConsumer(topic);
??MessageProducer publisher = session.createProducer(topic);
??publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);???
??TextMessage messageToPublish = session.createTextMessage();
??messageToPublish.setText("Hello all!");
??publisher.send(messageToPublish);
??
????Message messageFromSubscribe = subscriber.receive(3000);
??System.out.println(messageFromSubscribe);
??if (messageFromSubscribe instanceof TextMessage) {
???System.out.println(((TextMessage) messageFromSubscribe).getText());
??}
??publisher.close();
??subscriber.close();
??session.close();
??connection.close();
?}

}

?

编程中需注意:(1) subscriber定义必须在publisher定义之前

?????????????????????????????? 只能预订后才能收到发布的消息

?

?5.参考资料:

http://orange5458.iteye.com/admin/blogs/991311

JMS学习文档 PDF格式

?

热点排行