首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

ActiveMQ5.0实战3:使用Spring发送,消费topic和queue消息

2012-10-24 
ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息ActiveMQ5.0实战一: 安装配置ActiveMQ5.0ActiveMQ

ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息
ActiveMQ5.0实战一: 安装配置ActiveMQ5.0ActiveMQ5.0实战二: 基本配置简介

实战一, 实战二介绍了ActiveMQ的基本概念和配置方式.

本篇将通过一个实例介绍使用spring发送,消费topic, queue类型消息的方法. 不懂topic和queue的google 之.

?

ActiveMQ5.0实战3:使用Spring发送,消费topic和queue消息

如图示, TOPIC和QUEUE分别代表一个topic和一个queue消息通道.

    TopicMessageProducer向topic发送消息, TopicConsumerA和TopicConsumerB则从topic消费消息.QueueMessageProducer向Queue发送消息, QueueConsumer从Queue中消费消息
Spring整合JMS

就像对orm, web的支持一样, spring同样支持jms, 为整合jms到已有的项目提供了很多便利的方法. 本篇主要讲实战, 是所以先从配置开始, spring配置jms基本上需要8个部分.

    ConnectionFactory. 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker.Destination. 有topic和queue两种方式.JmsTemplate. spring提供的jms模板.MessageConverter. 消息转换器.MessageProducer. 消息生产者.MessageConsumer. 消息消费者.MessageListener. 消息监听器MessageListenerContainer. 消息监听容器

下面以实例的方式介绍上面8个部分.

1. ConnectionFactory

?brokerURL是指要连接的activeMQ server的地址, activeMQ提供了多种brokerURL, 集体可参见文档.一般我们使用嵌套的ActiveMQ server. 配置如下, 这个配置使用消息的存储机制, 服务器重启也不会丢失消息.

?2. Destination

?在实例中我们使用了两种destination

?3. JmsTemplate
?4. MessageConverter

?? MessageConverter实现的是org.springframework.jms.support.converter.MessageConverter接口, 提供消息的转换功能. DefaultMessageConverter的实现见附件.

? 5. MessageProducer

?? 实例拥有两个消息生产者, 消息生产者都是POJO, 实现见附件.

?6. MessageConsumer

?TOPIC通道有两个消息消费者, QUEUE有一个消息消费者

?7. MessageListener

每一个消息消费者都对应一个MessageListener

?8. MessageListenerContainer

?有几个MessageListener既有几个MessageListenerContainer

?Summary

写spring配置文件的时候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer几个地方弄清楚:

    可以有一个或者多个消息生产者向同一个destination发送消息.queue类型的只能有一个消息消费者.topic类型的可以有多个消息消费者.每个消费者对应一个MessageListener和一个MessageListenerContainer.

?

?

?

?


<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="d:/amq"/>
</amq:persistenceAdapter>

<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>

  
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

    <!--  使用Queue方式-->
    <amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />

<!--  Spring JmsTemplate config -->
<bean id="jmsTemplate" ref="jmsConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<!-- converter  -->
<bean id="defaultMessageConverter" />

<bean id="queueMessageProducer" ref="jmsTemplate" />
<property name="destination" ref="QUEUE" />
</bean>
B应用
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

<amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />
 
 
<bean id="queueConsumer" />
    <bean id="queueListener" />
<!--  may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

   <!-- converter -->
<bean id="defaultMessageConverter" />
   
   
    <bean id="queueListenerContainer" ref="jmsConnectionFactory" />
<property name="destination" ref="QUEUE" />
<property name="messageListener" ref="queueListener" />
</bean>
启动tomcat时,出现如下信息
信息: ActiveMQ JMS Message Broker (localhost, ID:6b184e6c407c476-1632-1221635612890-0:0) started(此应是是A应用 brokerID)
2008-9-17 15:13:34 org.springframework.web.context.ContextLoader initWebApplicationContext
......
信息: ActiveMQ JMS Message Broker (localhost, ID:6b184e6c407c476-1642-1221635632312-0:0) started
2008-9-17 15:13:53 org.apache.activemq.broker.TransportConnector start
信息: Connector vm://localhost Started(此应是是B应用 brokerID)
.......
我在通过页面得提交给一个servlet一个信息,如123,在servlet调用如下代码
String text = request.getParameter("text");
FooMessage foo = new FooMessage();
int msg = Integer.parseInt(text);
foo.setId(msg);
System.out.println("********start send*********");
WebApplicationContext   ctx=WebApplicationContextUtils.getRequiredWebApplicationContext(this.getServletContext());
QueueMessageProducer sender   =   (QueueMessageProducer)ctx.getBean( "queueMessageProducer");
sender.send(foo);

System.out.println("******end send ***********");
结果显示
********start send*********
queue send start
2008-9-17 15:14:16 org.apache.activemq.broker.TransportConnector start
信息: Connector vm://localhost Started
2008-9-17 15:14:16 org.springframework.jms.connection.SingleConnectionFactory initConnection
信息: Established shared JMS Connection: ActiveMQConnection {id=ID:6b184e6c407c476-1632-1221635612890-2:0,clientId=null,started=false}
queue send end
******end send ***********
B应用没有接收到,这是为什么,B应用的配置对吗,对的话为什么broker是两个不同的id,又为什么收不到A的消息呢,恳请楼主指点


A,B没有链接同一个activeMq

A 中改为


B 中

改为



前面你也说到
transportConnector是定義鏈接activeMQ的brokerURL. 你可以為一個activeMQ server指定多個transportconnector, 比如vm, tcp, ssl, stomp, xmpp等.
那我A多定义几个,B是不是也有不同的brokerURL?那他们又是怎么链接同一个activeMq?
<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="c:/amq"/>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>

<!-- 连接外部的activeMQ -->
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616" />

<!--  ActiveMQ destinations  -->
<!--  使用topic方式-->
<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />

<!--  Spring JmsTemplate config -->
<bean id="jmsTemplate" ref="jmsConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<!-- converter  -->
<bean id="defaultMessageConverter" />

<!-- POJO which send Message uses  Spring JmsTemplate -->
<bean id="topicMessageProducer" ref="jmsTemplate" />
<property name="destination" ref="TOPIC" />
</bean>
A应用producer
public class TopicMessageProducer {
   
    private JmsTemplate template;

private Topic destination;

public void setTemplate(JmsTemplate template) {
this.template = template;
}

public void setDestination(Topic destination) {
this.destination = destination;
}

public void send(TestBean message) {
System.out.println("=====> message send! ");
template.convertAndSend(this.destination, message);
}
}
A应用中的Action
public class TestAction extends ActionSupport{

private TestBean tb;
private TopicMessageProducer topicMessageProducer;

public String test(){
System.out.println("===> username:" + tb.getUserName());
System.out.println("===> username:" + tb.getPassWord());
topicMessageProducer.send(tb);

return "success";
}
...geter(),seter();
}

B应用applicationContext-activemq.xml
<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="tcp://localhost:61616" />

<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />

<!-- converter  -->
<bean id="defaultMessageConverter"
/>

<!--  Message Driven POJO (MDP) -->
    <!-- consumer1 for topic a -->
    <bean id="topicConsumerA" />

    <!-- consumer2 for topic a -->
    <bean id="topicConsumerB" />
   
    <!-- Message Listener for  -->
<bean id="topicListenerA" />
<!--  may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<bean id="topicListenerB" />
<!--  may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<!--  listener container,MDP无需实现接口 -->
<bean id="topicListenerContainerA" ref="jmsConnectionFactory" />
<property name="destination" ref="TOPIC" />
<property name="messageListener" ref="topicListenerA" />
</bean>

    <bean id="topicListenerContainerB" ref="jmsConnectionFactory" />
<property name="destination" ref="TOPIC" />
<property name="messageListener" ref="topicListenerB" />
</bean>
B应用的consumerA和consumerB都一样
public class TopicConsumerA {
public void receive(TestBean message) {
System.out.println("************************************** Topic A userName: " + message.getUserName());
System.out.println("************************************** Topic A passWord: " + message.getPassWord());
}
}
目前是在tomcat控制台和http://127.0.0.1:8161/admin/中可以看消息已发送了,但是在tomcat控制台中和http://127.0.0.1:8161/admin/上看不到是否已经接受到了消息,我该如何查看或修改,谢谢!!!        <amq:persistenceAdapter> 
            <amq:amqPersistenceAdapter directory="d:/amq"/> 
        </amq:persistenceAdapter> 
        <amq:transportConnectors> 
            <amq:transportConnector uri="tcp://localhost:61616" /> 
                       <amq:transportConnector uri="vm://localhost:0" /> 
        </amq:transportConnectors> 
    </amq:broker> 这一段中的<amq:transportConnectors> 
            <amq:transportConnector uri="tcp://localhost:61616" /> 
                       <amq:transportConnector uri="vm://localhost:0" /> 
        </amq:transportConnectors>注释掉,即使用外部的ActiveMQ服务
==================


package com.work.activemq;public interface OrderNotifyInterface {/** *  通知用户订单已经发送!生成者接口 * @param order */public void notifyOrder(Order order);/** * 通过topic的方式发送信息 * @param order */public void notifyTopic(Order order);}实现如下:package com.work.activemq;/** * 用来测试mq的。 * @author wangmingjie * */public class OrderNotifyImpl implements OrderNotifyInterface {private OrderMessageProducer orderMessageProducer;public void setOrderMessageProducer(OrderMessageProducer orderMessageProducer) {this.orderMessageProducer = orderMessageProducer;}private TopicMessageProducer  topicMessageProducer;public void setTopicMessageProducer(TopicMessageProducer topicMessageProducer) {this.topicMessageProducer = topicMessageProducer;}/** * 向顾客的邮箱发送订单通知,使用JMS发送. */public void notifyOrder(Order order) {orderMessageProducer.send(order);}public void notifyTopic(Order order){topicMessageProducer.send(order);}}
40 楼 bluethink 2011-11-20   TestMain里面的
org.springframework.beans.factory.generic.GenericBeanFactoryAccessor;

依赖的是哪个包,我现在用的是Spring 3.0.5,并且加载了spring的所有包,还是提示
The import org.springframework.beans.factory.generic cannot be resolved,我下载的是
xbean-spring-3.5.jar 仍然找不到这个类,麻烦楼主把所有需要的jar包给列出来,谢谢 41 楼 xiaxiaorui2003 2012-02-22   我的这个GenericBeanFactoryAccessor也是找不到,spring和activemq的所有jar都导入了,还是找不到 42 楼 zh286091487 2012-02-28   4年前的文章,2012才享受到,悲剧,写楼主了

热点排行