首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 操作系统 >

ActiveMq事例代码

2012-06-27 
ActiveMq例子代码import javax.jms.Connectionimport javax.jms.DeliveryModeimport javax.jms.Destinat

ActiveMq例子代码

import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ActiveMQPrefetchPolicy;import org.apache.camel.component.jms.JmsMessage;import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;//发送TextMessagepublic class SendMessage {        private static final String url = "tcp://localhost:61616";;    private static final String QUEUE_NAME = "choice.queue";    protected String expectedBody = "<hello>world!</hello>";        public void sendMessage() throws JMSException{        Connection connection = null;                try{                        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);            connection = connectionFactory.createConnection();                        connection.start();            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                      Destination destination = session.createQueue(QUEUE_NAME);            MessageProducer producer = session.createProducer(destination);            TextMessage message = session.createTextMessage(expectedBody);            message.setStringProperty("headname", "remoteB");                producer.send(message);        }catch(Exception e){            e.printStackTrace();        }finally{            connection.close();        }    }***************************************************************************************import java.io.File;import java.io.FileInputStream;import java.io.IOException;import javax.jms.BytesMessage;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;//发送BytesMessagepublic class SendMessage {        private String url = "tcp://localhost:61616";            public void sendMessage() throws JMSException{        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);        Connection connection = connectionFactory.createConnection();        connection.start();        Session  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        Destination destination = session.createQueue("test.queue");        MessageProducer producer = session.createProducer(destination);        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);        BytesMessage message = session.createBytesMessage();        byte[] content = getFileByte("d://test.jar");        message.writeBytes(content);        try{            producer.send(message);            System.out.println("successful send message");        }catch(Exception e){            e.printStackTrace();            e.getMessage();        }finally{            session.close();            connection.close();        }                    }        private byte[] getFileByte(String filename){        byte[] buffer = null;        FileInputStream fin = null;        try {            File file = new File(filename);            fin = new FileInputStream(file);             buffer = new byte[fin.available()];            fin.read(buffer);        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                fin.close();            } catch (IOException e) {                e.printStackTrace();            }        }        return buffer;    }
发送完消息后可以访问 http://localhost:8161/admin/queues.jsp 看到相应的queue中是否有消息 

?

?

?

?

 适用收取TextMessage消息 
import javax.jms.Connection;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class ReceiveMessage {        private static final String url = "tcp://172.16.168.167:61616";    private static final String QUEUE_NAME = "szf.queue";        public void receiveMessage(){        Connection connection = null;        try{            try{                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);                connection = connectionFactory.createConnection();            }catch(Exception e){//                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);//                connection = connectionFactory.createConnection();            }                        connection.start();            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            Destination destination = session.createQueue(QUEUE_NAME);            MessageConsumer consumer = session.createConsumer(destination);            consumeMessagesAndClose(connection,session,consumer);        }catch(Exception e){                    }    }        protected void consumeMessagesAndClose(Connection connection,            Session session, MessageConsumer consumer) throws JMSException {        for (int i = 0; i < 1;) {            Message message = consumer.receive(1000);            if (message != null) {                i++;                onMessage(message);            }        }        System.out.println("Closing connection");        consumer.close();        session.close();        connection.close();    }        public void onMessage(Message message){        try{            if (message instanceof TextMessage) {                TextMessage txtMsg = (TextMessage)message;                String msg = txtMsg.getText();                System.out.println("Received: " + msg);            }        }catch(Exception e){            e.printStackTrace();        }    }            public static void main(String args[]){        ReceiveMessage rm = new ReceiveMessage();        rm.receiveMessage();    }}

热点排行