首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

ActiveMQ之三 - 运用ActiveMQ来传送文件

2012-11-01 
ActiveMQ之三 -- 使用ActiveMQ来传送文件这个方法还有待研究,目前还有如下几个疑点:1. ActiveMQ 报出这样

ActiveMQ之三 -- 使用ActiveMQ来传送文件

这个方法还有待研究,目前还有如下几个疑点:
1. ActiveMQ 报出这样的信息:

INFO | Usage Manager memory limit (1048576) reached for topic://EXCHANGE.FILE. Producers will be throttled to the rate at which messages are removed from thisdestination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info

?

2. 这种以异步方式传送资料,能保证客户端能以正确的顺序接收到文件段麽?

?

使用ActiveMQ传送文件,发送端必须将文件拆成一段一段,每段封装在独立的Message中,逐次发送到客户端。例如下面的例子,Producer通过发送命令,告诉文件传送的开始,发送中,结束。客户端接收到这些命令之后,就知道如何接收资料了。

客户端收到内容后,根据命令将内容合并到一个文件中。?

?

package org.apache.activemq.exchange.file;import java.io.BufferedOutputStream;import java.io.FileOutputStream;import java.io.IOException;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.StreamMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Consumer {    /**     * @param args     */    public static void main(String[] args) throws JMSException, IOException {        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");        Connection connection = factory.createConnection();        connection.start();        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        Destination destination = session.createTopic("EXCHANGE.FILE");        MessageConsumer consumer = session.createConsumer(destination);        boolean appended = false;        try {            while (true) {                Message message = consumer.receive(5000);                if (message == null) {                    continue;                }                if (message instanceof StreamMessage) {                    StreamMessage streamMessage = (StreamMessage) message;                    String command = streamMessage.getStringProperty("COMMAND");                                        if ("start".equals(command)) {                        appended = false;                        continue;                    }                    if ("sending".equals(command)) {                        byte[] content = new byte[4096];                        String file_name = message.getStringProperty("FILE_NAME");                        BufferedOutputStream bos = null;                        bos = new BufferedOutputStream(new FileOutputStream("c:/" + file_name, appended));                        if (!appended) {                            appended = true;                        }                        while (streamMessage.readBytes(content) > 0) {                            bos.write(content);                        }                        bos.close();                        continue;                    }                    if ("end".equals(command)) {                        appended = false;                        continue;                    }                }            }        } catch (JMSException e) {            throw e;        } finally {            if (connection != null) {                connection.close();            }        }    }}

?

发送端将文件分包,逐次发送到客户端?

package org.apache.activemq.exchange.file;import java.io.BufferedInputStream;import java.io.IOException;import java.io.InputStream;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.StreamMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Publisher {    public static String FILE_NAME = "01.mp3";        public static void main(String[] args) throws JMSException, IOException {        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");        Connection connection = factory.createConnection();        connection.start();        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        Destination destination = session.createTopic("EXCHANGE.FILE");                MessageProducer producer = session.createProducer(destination);        long time = System.currentTimeMillis();                //通知客户端开始接受文件        StreamMessage message = session.createStreamMessage();        message.setStringProperty("COMMAND", "start");        producer.send(message);                //开始发送文件        byte[] content = new byte[4096];        InputStream ins = Publisher.class.getResourceAsStream(FILE_NAME);        BufferedInputStream bins = new BufferedInputStream(ins);        while (bins.read(content) > 0) {            //            message = session.createStreamMessage();            message.setStringProperty("FILE_NAME", FILE_NAME);            message.setStringProperty("COMMAND", "sending");            message.clearBody();            message.writeBytes(content);            producer.send(message);        }        bins.close();        ins.close();                //通知客户端发送完毕        message = session.createStreamMessage();        message.setStringProperty("COMMAND", "end");        producer.send(message);                connection.close();                System.out.println("Total Time costed : " + (System.currentTimeMillis() - time) + " mili seconds");    }}
?

?

热点排行