首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

activeMQ跟spring集成

2012-11-01 
activeMQ和spring集成activeMQ和spring集成activeMQ的服务端还是像以前那样配置,注册成为WINDOWS的服务,自

activeMQ和spring集成
activeMQ和spring集成

activeMQ的服务端还是像以前那样配置,注册成为WINDOWS的服务,自己启动。
和SPRING集成的,其实是消息发送和消息消费的CLIENT端

使用到了MDP,这样可以把一些发送邮件、发送短信的耗时操作都变成异步的。调用发送邮件、发送短信等,SERVICE马上
返回了,其实耗时操作没有完成,而是把这个任务放置到了队列里面,由MDP去调用真正的处理程序和方法,来处理队列里面
的数据。

貌似加入了这个JAR包:
apache-activemq-4.1.0-incubator.jar

在CONF里面增加了配置文件applicationContext-activemq.xml:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">

<beans>
<bean id="jmsConnectionFactory"
   />
    </bean>
   </property>
   <!-- custom MessageConverter -->
   <property name="messageConverter" ref="userMessageConverter" />
</bean>

<!-- Queue模式 -->
<bean id="destinationQueue"
   />

<!-- POJO which send Message uses Spring JmsTemplate -->
<bean id="userMessageProducer"
   ref="jmsTemplate" />
   <property name="destination" ref="destinationQueue" />
</bean>

<!-- Message Driven POJO (MDP) -->
<bean id="messageListener"
   value="printUser" />
   <property name="messageConverter" ref="userMessageConverter" />
</bean>

<!-- listener container,MDP无需实现接口 -->
<bean id="listenerContainer"
   ref="jmsConnectionFactory" />
   <property name="destination" ref="destinationQueue" />
   <property name="messageListener" ref="messageListener" />
</bean>
</beans>

其中配置引用了三个BEAN,其中一个比较重要的工具BEAN是转化MESSAGE的UserMessageConverter.java:

package com.sillycat.plugin.activemq;


import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.springframework.jms.support.converter.MessageConverter;

import com.sillycat.core.model.User;


public class UserMessageConverter implements MessageConverter {


/*
* (non-Javadoc)
*
* @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object,
*      javax.jms.Session)
*/
public Message toMessage(Object obj, Session session) throws JMSException {
   //check Type
   if (obj instanceof User) {
    ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
    HashMap map = new HashMap();
    try {
     //Order,Order,Product must implements Seralizable
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     ObjectOutputStream oos = new ObjectOutputStream(bos);
     oos.writeObject(obj);
     bos.close();
     map.put("User", bos.toByteArray());
     objMsg.setObjectProperty("Map", map);
 
    } catch (IOException e) {
     e.printStackTrace();
    }

    return objMsg;
   } else {
    throw new JMSException("Object:[" + obj + "] is not User");
   }

}

/*
* (non-Javadoc)
*
* @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message)
*/
public Object fromMessage(Message msg) throws JMSException {
   if (msg instanceof ObjectMessage) {
    HashMap map= (HashMap) ((ObjectMessage) msg)
    .getObjectProperty("Map");
             try {
            // Order,Order,Product must implements Seralizable
     ByteArrayInputStream bis=new ByteArrayInputStream((byte[]) map.get("User"));
                 ObjectInputStream ois=new ObjectInputStream(bis);
                 return ois.readObject();
             } catch (IOException e) {
     e.printStackTrace();
    } catch (ClassNotFoundException e) {
     e.printStackTrace();
    }
    return null;
   } else {
    throw new JMSException("Msg:[" + msg + "] is not Map");
   }
}

}

注意传递的对象要序列化

一个发送消息的BEAN是UserMessageProducer.java:

package com.sillycat.plugin.activemq;

import javax.jms.Queue;

import org.springframework.jms.core.JmsTemplate;

import com.sillycat.core.model.User;


public class UserMessageProducer {
private JmsTemplate template;

private Queue destination;

public void setTemplate(JmsTemplate template) {
   this.template = template;
}

public void setDestination(Queue destination) {
   this.destination = destination;
}

public void send(User order) {
   template.convertAndSend(this.destination, order);
}

}

一个接受消息的BEAN是UserMessageConsumer.java:
package com.sillycat.plugin.activemq;

import com.sillycat.core.model.User;


public class UserMessageConsumer {


public void printUser(User user) {
   user.getId();
   user.getName();
   System.out.println(user);
}

}

这里只是一个DEMO,所以打印出来对象信息就结束了,其实真正的可以实现一些异步的调用,比如说耗时比较常的发送邮件,发送短信啊。这些就可以写到这里

当然老习惯,配一个单元测试UserMessageTest.java:

package com.sillycat.plugin.activemq;

import com.sillycat.core.model.User;
import com.sillycat.plugin.commons.base.ServiceTestBase;

public class UserMessageTest extends ServiceTestBase{

private UserMessageProducer userMessageProducer;

private User user;

protected void setUp() throws Exception {
   super.setUp();
   userMessageProducer = (UserMessageProducer) appContext.getBean("userMessageProducer");
   user = getUser();
}

protected void tearDown() throws Exception {
   super.tearDown();
}

public void testDumy(){
   assertTrue(true);
}

public void testSendUserBean(){
   for(int i = 0;i<10;i++){
    userMessageProducer.send(user);
   }
   System.out.println("test end");
}



private User getUser() {
   User user = new User();
   user.setName("luohua");
   user.setId(Integer.valueOf("2"));
   return user;
}

}



热点排行