ActiveMQ BlobMessage发送消息的测试
?????? 最近项目中将发送文件,但是测试采用字节流ByteMessage和BlobMessage ,StreamMessage的学习应用,并测试。
下面是BlobMessage测试的情况。但是在测试的时候发送word和pdf文档传送之后,不能打开。不知道为什么要,具体怎么处理之后可以。但是发送文本类文件没有任何问题。
消息的发送者:
package easyway.app.activemq.demo.fileserver;import java.io.File;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ActiveMQSession;import org.apache.activemq.BlobMessage;/** * 消息的发送者 * 备注在测试Word文档和pdf文档时不能打开,但是发送文本类文件可以,没有任何问题。 * @author longgangbai * */public class ActiveMQFileServerSender {private String user = ActiveMQConnection.DEFAULT_USER;private String password = ActiveMQConnection.DEFAULT_PASSWORD;private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";private String subject = "Blob Queue";private Destination destination = null;private ActiveMQConnection connection = null;private ActiveMQSession session = null;private MessageProducer producer = null;// 初始化private void initialize() throws JMSException, Exception {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);connection = (ActiveMQConnection) connectionFactory.createConnection();connection.setCopyMessageOnSend(false);session = (ActiveMQSession) connection.createSession(false,Session.AUTO_ACKNOWLEDGE);destination = session.createQueue(subject);producer = session.createProducer(destination);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);}// 发送消息public void produceMessage(File file) throws JMSException, Exception {initialize();BlobMessage msg = session.createBlobMessage(file);connection.start();System.out.println("Producer:->Sending message: " + file.getName());producer.send(msg);System.out.println("Producer:->Message sent complete!");}// 关闭连接public void close() throws JMSException {System.out.println("Producer:->Closing connection");if (producer != null)producer.close();if (session != null)session.close();if (connection != null)connection.close();}}?
?
消息的接受者:
package easyway.app.activemq.demo.fileserver;import java.io.BufferedWriter;import java.io.File;import java.io.FileWriter;import java.io.InputStream;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ActiveMQSession;import org.apache.activemq.BlobMessage;import org.apache.activemq.command.ActiveMQBlobMessage;/** * 消息的接受者 * @author longgangbai * */public class ActiveMQFileServerReceiver {private String user = ActiveMQConnection.DEFAULT_USER;private String password = ActiveMQConnection.DEFAULT_PASSWORD;private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";private String subject = "Blob Queue";private Destination destination = null;private String filepath="D://photo_album_app_guide.bak.pdf";private ActiveMQConnection connection = null;private ActiveMQSession session = null;private MessageConsumer consumer = null;// 初始化private void initialize() throws JMSException, Exception {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);connection = (ActiveMQConnection) connectionFactory.createConnection();connection.setCopyMessageOnSend(false);session = (ActiveMQSession) connection.createSession(false,Session.AUTO_ACKNOWLEDGE);destination = session.createQueue(subject);consumer = session.createConsumer(destination);}//接受 消息public void receive(String filename) throws JMSException, Exception {initialize();this.filepath=filename;connection.start();Message msg =(BlobMessage)consumer.receive(); if(msg instanceof ActiveMQBlobMessage){} InputStream input = ((ActiveMQBlobMessage) msg).getInputStream(); StringBuilder b = new StringBuilder(); int i = input.read(); while (i != -1) { b.append((char) i); i = input.read(); } input.close(); File uploaded = new File(filepath); BufferedWriter bw=new BufferedWriter(new FileWriter(uploaded)); bw.write(b.toString()); System.out.println("consumer receiver sucessful ....."); bw.flush(); bw.close();}// 关闭连接public void close() throws JMSException {System.out.println("Producer:->Closing connection");if (consumer != null)consumer.close();if (session != null)session.close();if (connection != null)connection.close();}}?
?
测试代码:
package easyway.app.activemq.demo.fileserver;import java.io.File;/** * BlobMessage的使用 * @author longgangbai * */public class ActiveMQFileServerTest {/** * * @param args */public static void main(String[] args) throws Exception {ActiveMQFileServerSender producer = new ActiveMQFileServerSender();ActiveMQFileServerReceiver consumer = new ActiveMQFileServerReceiver();String fileName = "D://BlockingClient.java";String receiverFilename="D://BlockingClient_bak.java";File file = new File(fileName);producer.produceMessage(file); consumer.receive(receiverFilename);// 延时5000毫秒之后停止接受消息Thread.sleep(5000);// 开始监听producer.close();// 延时500毫秒之后发送消息Thread.sleep(2000);consumer.close();}}?
1 楼 xuyangcn 2011-10-18 接收二进制文件时不应该用StringBuilder或者StringBuffer之类的,需要用byte数组直接写入FileOutputStream。看看字节和字符的区别。 2 楼 longgangbai 2011-10-23 xuyangcn 写道接收二进制文件时不应该用StringBuilder或者StringBuffer之类的,需要用byte数组直接写入FileOutputStream。看看字节和字符的区别。