HornetQ的简单例子最近对MQ(Message Queue)很感兴趣,准备用到项目上。因为是做Java开发的,所以我找了JMS的M
HornetQ的简单例子
最近对MQ(Message Queue)很感兴趣,准备用到项目上。因为是做Java开发的,所以我找了JMS的MQ。开始看了ActiveMQ和HornetQ。我选择了HornetQ,因为看了一些性能测试,我觉得这款JBoss的MQ相当出色。发现国内关于HornetQ的资料很少,国外有一部分,但是版本都很久了。
自己写了一个例子,环境如下:
1、HornetQ的版本是?hornetq-2.2.5.Final
2、JDK1.6.0_30-b12
3、HornetQ自带example里面的config/stand-alone/non-clustered的配置文件
4、HornetQ作为独立的服务器,运行在一台Dell1950(2CPU,16G内存)上
5、自己写了两个小例子,很简单,就是一个Producer和Consumer
6、发送Person类的实例(必须实现Serializable接口)
7、注意:Producer和Consumer用到的Person类,必须在各个项目的相同的package下面,具有相同的serialVersionUID
8、在classpath下面有jndi.properties文件,里面放置着寻找服务器上面JNDI Server必须的配置
9、在classpath下面有个log4j的配置文件,用来答应日志
代码如下:
?
public class Producer {private static Logger logger = Logger.getLogger(Producer.class);/** * @param args */public static void main(String[] args) {try {runExample();} catch (Exception e) {e.printStackTrace();}}private static void runExample() throws NamingException, JMSException {InitialContext ic = new InitialContext();ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");Queue orderQueue = (Queue) ic.lookup("/queue/ExpiryQueue");Connection connection = cf.createConnection();Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);MessageProducer producer = session.createProducer(orderQueue);connection.start();int count = 0;try {while (true) {Person one = new Person(count++, "xuepeng_" + count);ObjectMessage msg = session.createObjectMessage(one);producer.send(msg);logger.info(Producer.class.getName()+ " start to sent message: " + one);}} finally {session.close();}}}
?
?
?
public class Person implements Serializable {private static final long serialVersionUID = 2670718766927459001L;private Integer id;private String name;private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");private String time = format.format(new Date());/** * @param id * @param name */public Person(Integer id, String name) {super();this.id = id;this.name = name;}/** * @return the id */public Integer getId() {return id;}/** * @param id * the id to set */public void setId(Integer id) {this.id = id;}/** * @return the name */public String getName() {return name;}/** * @param name * the name to set */public void setName(String name) {this.name = name;}/* (non-Javadoc) * @see java.lang.Object#toString() */@Overridepublic String toString() {return "Person [id=" + id + ", name=" + name + ", time=" + time + "]";}/* * (non-Javadoc) * * @see java.lang.Object#hashCode() */@Overridepublic int hashCode() {final int prime = 37;int result = 17;result = prime * result + ((id == null) ? 0 : id.hashCode());result = prime * result + ((name == null) ? 0 : name.hashCode());return result;}/* * (non-Javadoc) * * @see java.lang.Object#equals(java.lang.Object) */@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;Person other = (Person) obj;if (id == null) {if (other.id != null)return false;} else if (!id.equals(other.id))return false;if (name == null) {if (other.name != null)return false;} else if (!name.equals(other.name))return false;return true;}}
?
?
?
public class Consumer {private static Logger logger = Logger.getLogger(Consumer.class);/** * @param args */public static void main(String[] args) {try {runExample();} catch (Exception e) {e.printStackTrace();}}private static void runExample() throws NamingException, JMSException {InitialContext ic = new InitialContext();ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");Queue orderQueue = (Queue) ic.lookup("/queue/ExpiryQueue");Connection connection = cf.createConnection();Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);MessageConsumer consumer = session.createConsumer(orderQueue);connection.start();try {while (true) {ObjectMessage messageReceived = (ObjectMessage) consumer.receive();org.hornetq.jms.example.Person one = (org.hornetq.jms.example.Person)messageReceived.getObject();logger.info(Consumer.class.getName()+ " start to receive message: " + one);}} finally {session.close();}}}
?启动Linux上面的HorneQ服务之后,运行上面的Producer和Consumer均可以实现通讯。
1 楼 sjxinrui 2012-05-28 用的2.2。14+springmvc。。。悲催啊。。。。完全不懂这玩意。。。。据说可以直接用jboss7.0? 2 楼 kanpiaoxue 2012-05-29 sjxinrui 写道用的2.2。14+springmvc。。。悲催啊。。。。完全不懂这玩意。。。。据说可以直接用jboss7.0?
JBoss 7.0用到JMS的时候,也是在JBoss的基础上配置HornetQ。不懂HornetQ的用法?去看看JMS规范,在看看HornetQ的manual.pdf,应该就没什么问题了。
3 楼 sjxinrui 2012-05-29 kanpiaoxue 写道sjxinrui 写道用的2.2。14+springmvc。。。悲催啊。。。。完全不懂这玩意。。。。据说可以直接用jboss7.0?
JBoss 7.0用到JMS的时候,也是在JBoss的基础上配置HornetQ。不懂HornetQ的用法?去看看JMS规范,在看看HornetQ的manual.pdf,应该就没什么问题了。
好的,谢谢指教~~~~我先按照LZ思路走一遍,如果LZ还有啥经验,希望LZ多多分享,再次感谢下 4 楼 sjxinrui 2012-05-29 请问hornetaq需要什么jar包。。 5 楼 sjxinrui 2012-05-29
javax.naming.NameNotFoundException: queue not bound
at org.jnp.server.NamingServer.getBinding(NamingServer.java:771)
at org.jnp.server.NamingServer.getBinding(NamingServer.java:779)
at org.jnp.server.NamingServer.getObject(NamingServer.java:785)
at org.jnp.server.NamingServer.lookup(NamingServer.java:396)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:305)
at sun.rmi.transport.Transport$1.run(Transport.java:159)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:155)
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:535)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:790)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:649)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:255)
at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:233)
at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:142)
at org.jnp.server.NamingServer_Stub.lookup(Unknown Source)
at org.jnp.interfaces.NamingContext.lookup(NamingContext.java:726)
at org.jnp.interfaces.NamingContext.lookup(NamingContext.java:686)
at javax.naming.InitialContext.lookup(InitialContext.java:392)
at org.hornetq.jms.example.Producer.runExample(Producer.java:35)
at org.hornetq.jms.example.Producer.main(Producer.java:24)
求解释。。。。。。。。。。。。。。。。。。。。 6 楼 kanpiaoxue 2012-05-30 sjxinrui 写道请问hornetaq需要什么jar包。。
请你仔细阅读hornetQ的manual手册。下载完hornetq的压缩包之后,它的docs\user-manual下面有它的manual手册。里面有关于它依赖的jar的说明。 7 楼 kanpiaoxue 2012-05-30 sjxinrui 写道
javax.naming.NameNotFoundException: queue not bound
at org.jnp.server.NamingServer.getBinding(NamingServer.java:771)
at org.jnp.server.NamingServer.getBinding(NamingServer.java:779)
at org.jnp.server.NamingServer.getObject(NamingServer.java:785)
at org.jnp.server.NamingServer.lookup(NamingServer.java:396)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:305)
at sun.rmi.transport.Transport$1.run(Transport.java:159)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:155)
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:535)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:790)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:649)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:255)
at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:233)
at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:142)
at org.jnp.server.NamingServer_Stub.lookup(Unknown Source)
at org.jnp.interfaces.NamingContext.lookup(NamingContext.java:726)
at org.jnp.interfaces.NamingContext.lookup(NamingContext.java:686)
at javax.naming.InitialContext.lookup(InitialContext.java:392)
at org.hornetq.jms.example.Producer.runExample(Producer.java:35)
at org.hornetq.jms.example.Producer.main(Producer.java:24)
求解释。。。。。。。。。。。。。。。。。。。。
看你的错误,你用的应该是JNDI吧?没有找到绑定的queue。请看手册,这个很基础。