首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

关于ActiveMQ 讯息接收和转发

2012-11-06 
关于ActiveMQ 消息接收和转发关于ActiveMQ的东西如果不是这次出差,我还真没怎么用过,以前只是用过openJMS。

关于ActiveMQ 消息接收和转发
    关于ActiveMQ的东西如果不是这次出差,我还真没怎么用过,以前只是用过openJMS。这次用ActiveMQ 主要是用来解决一个问题:linux C++写的消息客户端向 window 2003Server Java服务器端发送消息。而服务器端要求所有其他客户端服务器以ISO8859-1的编码格式发送,由于消息中涉及到中文,而由C++写的消息客户端发送的内容,服务器一直接收的是乱码(这个问题知道怎么回事的给我留个言,3Q)。然后没办法,公司要求我在现场自己用java 写的转发的程序,主要功能就是接收C++客户端以UTF-8的编码方式发送过来的消息,然后再用JAVA转码成ISO8859-1的方式发送给第三方消息服务器。
    要实现的功能很简单,消息客户端是Linux 系统用C++实现的ActiveMQ Client,转发服务器是Linux系统用JAVA实现消息的接收并将消息转码后发送,消息服务器是Windows 2003 Server用JAVA实现ActiveMQ Server.转发代码如下:

package com.mq;import java.io.FileInputStream;import java.util.Enumeration;import java.util.HashMap;import java.util.Properties;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.log4j.Logger;public class MQForward {private static Logger log = Logger.getLogger(MQForward.class);private final String filePath = "config/mq.properties";private FileInputStream in = null;private Properties props = new Properties();private static HashMap<String, String> propMap = new HashMap<String, String>();//activeMQ 相关定义-------接收端定义private ConnectionFactory receive_ConnFactory;private Connection receive_Conn;private Session receive_Session;private Topic receive_Topic;private MessageConsumer receive_MsgConsume;//activeMQ 相关定义-------发送端定义private ConnectionFactory sender_ConnFactory;private Connection sender_Conn;private Session sender_Session;private Topic sender_Topic;private MessageProducer sender_MsgProducer;public MQForward(){//初始化数据boolean isOK1 = init_Properties();if(!isOK1){log.debug("MQ properties parameter init is error :-( ");return ;}boolean isOK2 = init_MqObject();if(!isOK2){log.debug("MQ Object parameter init is error :-( ");return ;}new Thread("<<<Receive MQ Message Thread.>>>"){public void run(){try{log.debug("Begin start forward ....");receive_MsgConsume.setMessageListener(new JieShouMessage());}catch(Exception ex){ex.printStackTrace();log.error("[ERROR]----"+ex);}}}.start();}/** * 从properties 文件中读取配置参数 * @return */public boolean init_Properties(){try{in = new FileInputStream(filePath);props.load(in);Enumeration en = props.propertyNames();while (en.hasMoreElements()) {String key = (String)en.nextElement();String property = props.getProperty(key);propMap.put(key, property);}}catch(Exception ex){ex.printStackTrace();log.error(ex);return false;}finally{try{if(in!=null){in.close();}}catch(Exception ex){ex.printStackTrace();log.error("[ERROR]----init_Properties() : "+ex);return false;}}return true;}public boolean init_MqObject(){try{//接收receive_ConnFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, propMap.get("senderBrokerURL"));receive_Conn = receive_ConnFactory.createConnection();receive_Conn.start();receive_Session = receive_Conn.createSession(false, Session.AUTO_ACKNOWLEDGE);receive_Topic = receive_Session.createTopic(propMap.get("senderTopicName"));receive_MsgConsume = receive_Session.createConsumer(receive_Topic);//发送sender_ConnFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, propMap.get("receiveBrokerURL"));sender_Conn = sender_ConnFactory.createConnection();sender_Conn.start();sender_Session = sender_Conn.createSession(false, Session.AUTO_ACKNOWLEDGE);sender_Topic = sender_Session.createTopic(propMap.get("receiveTopicName"));sender_MsgProducer = sender_Session.createProducer(sender_Topic);sender_MsgProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);log.debug("MQ parameter object init success!");return true;}catch(Exception ex){ex.printStackTrace();log.error("[ERROR]----init_MqObject() : "+ex);return false;}}public static void main(String[] args) {new MQForward();}private class JieShouMessage implements MessageListener {public void onMessage(Message message) {TextMessage txtMsg = (TextMessage)message;try {//最开始接收到信息String s = txtMsg.getText();log.debug("接收消息  >>>  "+s);//将接收到的消息转发sendMsg(s);} catch (Exception ex) {ex.printStackTrace();log.error("[ERROR]----"+ex);}}public void sendMsg(String messageTxt){try{String s = new String(messageTxt.getBytes("UTF-8"), "iso8859-1");TextMessage msg = sender_Session.createTextMessage(s);log.debug("发送出的消息  >>>  "+s);sender_MsgProducer.send(msg);}catch(Exception ex){ex.printStackTrace();log.error("[ERROR]----"+ex);}}}}

中间涉及到从properties 文件中读取的参数是Topic名称和Broker
-----自己学习,仅作记录-----

热点排行