ActiveMQ简单实例发送消息
??????? 在半年多的时间里,天天闲着,没有写blog也没有学习,现在要学习一些工作要使用的东西咯,公司使用的TongGTP和TongLinkQ最近一直不稳定,老大准备开发一个文件传输中间件,需要借鉴一些开源的mq框架学习和架构。
因为ActiveMQ是一个Apache的一个组件,比较活跃,并且有以下优点:
?
?
目前优点:
1.支持jms1.1和jms1.2等jms API。
2.容易跟当前的Spring框架整合。
3.容易JMX等整合,便于管理。
4.提供了高可靠性,可以采用主从服务的方式,Master broker只有在消息成功被复制到slave broker之后才会响应客户,保证消息的完整性。
5.高容错性提供了failover机制。
6.支持多语言种类,java,c#等。
7.支持消息的持久化为文件或者数据库信息。
8安全可靠,可以和jaas整合。
9.在消息发送方面效率高于jbossmq.
?
?
如下为一个简单的发送消息的方法:
需要的jar为:
<classpathentry kind="lib" path="src/activemq-all-5.5.0.jar"/>
?<classpathentry kind="lib" path="src/slf4j-api-1.5.2-sources.jar"/>
?<classpathentry kind="lib" path="src/slf4j-api-1.5.2.jar"/>
?<classpathentry kind="lib" path="src/slf4j-simple-1.5.2.jar"/>
?<classpathentry kind="lib" path="src/log4j-1.2.8.jar"/>
?<classpathentry kind="lib" path="src/commons-dbcp-1.4.jar"/>
?<classpathentry kind="lib" path="src/commons-pool-1.5.4.jar"/>
?<classpathentry kind="lib" path="src/commons-collections-3.2.1.jar"/>
?
1.首先使用activemq.bat启动ActiveMQ的broker进程,监听默认的61616端口可以使用如下命令查看端口:
netstat -a
检查显示信息有无61616端口信息。
?
2.开发消息生产者代码如下:
package easyway.app.activemq.demo2;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStream;import javax.jms.Connection;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.StreamMessage;import org.apache.activemq.ActiveMQConnectionFactory;/** * 消息的创建者 * @author longgangbai * */public class StreamMsgProducer {public void send(File file) {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();Connection conn = null;try {conn = factory.createConnection();conn.start();Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);Destination queue = session.createQueue("streamMsg");MessageProducer producer = session.createProducer(queue);InputStream in = new FileInputStream(file);byte[] buffer = new byte[2048];int c = -1;while ((c = in.read(buffer)) > 0) {StreamMessage smsg = session.createStreamMessage();smsg.writeBytes(buffer, 0, c);producer.send(smsg);System.out.println("send: " + c);}in.close();} catch (JMSException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}public static void main(String[] args) {File file = new File("c:\\send.txt");new StreamMsgProducer().send(file);}}?
3.开发消息消费者
package easyway.app.activemq.demo2;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStream;import javax.jms.Connection;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.StreamMessage;import org.apache.activemq.ActiveMQConnectionFactory;/** * 消息的消费者 * @author longgangbai * */public class StreamMsgConsumer {public void receive() {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();Connection conn = null;try {conn = factory.createConnection();conn.start();Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);Destination queue = session.createQueue("streamMsg");MessageConsumer consumer = session.createConsumer(queue);OutputStream out = new FileOutputStream("c:\\receive.txt");byte[] buffer = new byte[2048];while (true) {Message msg = consumer.receive(5000);if (msg == null) {break;}if (msg instanceof StreamMessage) {StreamMessage smsg = (StreamMessage) msg;int c = smsg.readBytes(buffer);out.write(buffer, 0, c);System.out.println("Receive: " + c);}}out.close();} catch (JMSException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}public static void main(String[] args) {new StreamMsgConsumer().receive();}}?
4.分别启动消息消费者和生产者即可。
?
?
1 楼 yuandaf 2011-11-07 有个成熟的文件传输中间件产品,可以参考 hulft(中文版叫 海度),不知有没有听说过?