首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 移动开发 > 移动开发 >

初涉 JMS 值得学习旁人的

2012-09-14 
初涉 JMS 值得学习别人的1. JMS基本概念 ??? JMS(Java Message Service)?即Java消息服务。它提供标准的产生

初涉 JMS 值得学习别人的

1. JMS基本概念
??? JMS(Java Message Service)?即Java消息服务。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。它支持两种消息通信模型:点到点(point-to-point)(P2P)模型和发布/订阅(Pub/Sub)模型。P2P模型规定了一个消息只能有一个接收者;Pub/Sub 模型允许一个消息可以有多个接收者。
??? 对于点到点模型,消息生产者产生一个消息后,把这个消息发送到一个Queue(队列)中,然后消息接收者再从这个Queue中读取,一旦这个消息被一个接收者读取之后,它就在这个Queue中消失了,所以一个消息只能被一个接收者消费。与点到点模型不同,发布/订阅模型中,消息生产者产生一个消息后,把这个消息发送到一个Topic中,这个Topic可以同时有多个接收者在监听,当一个消息到达这个Topic之后,所有消息接收者都会收到这个消息。?

几个重要概念
??? Destination:消息发送的目的地,也就是前面说的Queue和Topic。创建好一个消息之后,只需要把这个消息发送到目的地,消息的发送者就可以继续做自己的事情,而不用等待消息被处理完成。至于这个消息什么时候,会被哪个消费者消费,完全取决于消息的接受者。

Message:从字面上就可以看出是被发送的消息。它有下面几种类型:?

??? StreamMessage:Java 数据流消息,用标准流操作来顺序的填充和读取。?
????MapMessage:一个Map类型的消息;名称为 string 类型,而值为 Java 的基本类型。?
????TextMessage:普通字符串消息,包含一个String。?
????ObjectMessage:对象消息,包含一个可序列化的Java 对象?
????BytesMessage:二进制数组消息,包含一个byte[]。?
????XMLMessage:? 一个XML类型的消息。

最常用的是TextMessage和ObjectMessage。
????????????? Session:与JMS提供者所建立的会话,通过Session我们才可以创建一个Message。
????????????? Connection:与JMS提供者建立的一个连接。可以从这个连接创建一个会话,即Session。
?????????? ConnectionFactory:那如何创建一个Connection呢?这就需要下面讲到的ConnectionFactory了。通过这个工厂类就可以得到一个与JMS提供者的连接,即Conection。


????????????? Producer:消息的生产者,要发送一个消息,必须通过这个生产者来发送。
????????????? MessageConsumer:与生产者相对应,这是消息的消费者或接收者,通过它来接收一个消息。

?????????? 前面多次提到JMS提供者,因为JMS给我们提供的只是一系列接口,当我们使用一个JMS的时候,还是需要一个第三方的提供者,它的作用就是真正管理这些Connection,Session,Topic和Queue等。

?????????????? 通过下面这个简图可以看出上面这些概念的关系。

???????????? ConnectionFactory---->Connection--->Session--->Message
???????????? Destination + Session------------------------------------>Producer
???????????? Destination +?Session------------------------------------>MessageConsumer?

?

那么可能有人会问: ConnectionFactory和Destination 从哪儿得到??这就和JMS提供者有关了. 如果在一个JavaEE环境中, 可以通过JNDI查找得到, 如果在一个非JavaEE环境中,?那只能通过JMS提供者提供给我们的接口得到了.

?

?2.?一个JMS的例子
首先需要做的是选择一个JMS提供者, 如果在JavaEE环境中可以不用考虑这些. 我们选择ActiveMQ, 官方地址:
http://activemq.apache.org/.

?

ActiveMQConnectionFactory?connectionFactory?=??初涉 JMS 值得学习旁人的?初涉 JMS 值得学习旁人的?初涉 JMS 值得学习旁人的?初涉 JMS 值得学习旁人的?初涉 JMS 值得学习旁人的?初涉 JMS 值得学习旁人的?初涉 JMS 值得学习旁人的

?

?运行这个例子会得到下面的输出结果:

??????????? Consumer1 get Message:0
??????????? Consumer2 get Message:1
??????????? Consumer1 get Message:2
??????????? Consumer2 get Message:3
??????????? Consumer1 get Message:4
??????????? Consumer2 get Message:5
??????????? Consumer1 get Message:6
??????????? Consumer2 get Message:7
??????????? Consumer1 get Message:8
??????????? Consumer2 get Message:9

??? 可以看出每个消息直被消费了一次,但是如果有多个消费者同时监听一个Queue的话,无法确定一个消息最终会被哪一个消费者消费。

?

5.?实战Topic?

?

?与Queue不同的是,Topic实现的是发布/订阅模型,在下面的例子中,启动2个消费者共同监听一个Topic,然后循环给这个Topic中发送多个消息。

?

?初涉 JMS 值得学习旁人的

?

运行后得到下面的输出结果:

Consumer1 get Message:0
Consumer2 get Message:0
Consumer1 get Message:1
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:2
Consumer1 get Message:3
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:4
Consumer1 get Message:5
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:6
Consumer1 get Message:7
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:8
Consumer1 get Message:9
?Consumer2 get Message:9

?

?

6.?消息头

?

一个消息对象分为三部分:消息头(Headers),属性(Properties)和消息体(Payload)。对于StreamMessage和MapMessage,消息本身就有特定的结构,而对于TextMessage,ObjectMessage和BytesMessage是无结构的。一个消息可以包含一些重要的数据或者仅仅是一个事件的通知。

??? 消息的Headers部分通常包含一些消息的描述信息,它们都是标准的描述信息。包含下面一些值:

  JMSDestination
?????? 消息的目的地,Topic或者是Queue。

  JMSDeliveryMode
??????? 消息的发送模式:persistent或nonpersistent。前者表示消息在被消费之前,如果JMS提供者DOWN了,重新启动后消息仍然存在。后者在这种情况下表示消息会被丢失。可以通过下面的方式设置:
?????? Producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

?????? JMSTimestamp
?????? 当调用send()方法的时候,JMSTimestamp会被自动设置为当前事件。可以通过下面方式得到这个值:
?????? long timestamp = message.getJMSTimestamp();

  JMSExpiration
?????? 表示一个消息的有效期。只有在这个有效期内,消息消费者才可以消费这个消息。默认值为0,表示消息永不过期。可以通过下面的方式设置:
?????? producer.setTimeToLive(3600000); //有效期1小时 (1000毫秒 * 60秒 * 60分)

  JMSPriority
?????? 消息的优先级。0-4为正常的优先级,5-9为高优先级。可以通过下面方式设置:
?????? producer.setPriority(9);

  JMSMessageID
?????? 一个字符串用来唯一标示一个消息。

  JMSReplyTo
?????? 有时消息生产者希望消费者回复一个消息,JMSReplyTo为一个Destination,表示需要回复的目的地。当然消费者可以不理会它。

  JMSCorrelationID
?????? 通常用来关联多个Message。例如需要回复一个消息,可以把JMSCorrelationID设置为所收到的消息的JMSMessageID。

  JMSType
?????? 表示消息体的结构,和JMS提供者有关。

  JMSRedelivered
?????? 如果这个值为true,表示消息是被重新发送了。因为有时消费者没有确认他已经收到消息或者JMS提供者不确定消费者是否已经收到。

??? 除了Header,消息发送者可以添加一些属性(Properties)。这些属性可以是应用自定义的属性,JMS定义的属性和JMS提供者定义的属性。我们通常只适用自定义的属性。

??? 后面会讲到这些Header和属性的用法。

?

?7.?DeliveryMode例子

?

在下面的例子中,分别发送一个Persistent和nonpersistent的消息,然后关闭退出JMS。

?初涉 JMS 值得学习旁人的?初涉 JMS 值得学习旁人的

?

运行上面的程序,可以得到下面的输出结果:
Consumer get A persistent Message

可以看出消息消费者只接收到一个消息,它是一个Persistent的消息。而刚才发送的non persistent消息已经丢失了。
另外, 如果发送一个non persistent消息, 而刚好这个时候没有消费者在监听, 这个消息也会丢失

?

?

8. JMSReplyTo

??? 在下面的例子中,首先创建两个Queue,发送者给一个Queue发送,接收者接收到消息之后给另一个Queue回复一个Message,然后再创建一个消费者来接受所回复的消息。

?初涉 JMS 值得学习旁人的

?

首先消息生产者发送一个消息,内容为“Andy”, 然后消费者收到这个消息之后根据消息的JMSReplyTo,回复一个消息,内容为“Hello Andy‘。 最后在回复的Queue上创建一个接收回复消息的消费者,它输出所回复的内容。

??? 运行上面的程序,可以得到下面的输出结果:
??????????Hello Andy

?

?

?9. Selector

?

????? 前面的例子中创建一个消息消费者使用的是:
????????? sesssion.createConsumer(destination)
? 另外,还提供了另一种方式:
????sesssion.createConsumer(destination, selector)
? 这里selector是一个字符串,用来过滤消息。也就是说,这种方式可以创建一个可以只接收特定消息的一个消费者。Selector的格式是类似于SQL-92的一种语法。可以用来比较消息头信息和属性。
? 下面的例子中,创建两个消费者,共同监听同一个Queue,但是它们的Selector不同,然后创建一个消息生产者,来发送多个消息。

?初涉 JMS 值得学习旁人的

?

?

?10. JMSCorrelationID与Selector

?

??? 前面讲过JMSCorrelationID主要是用来关联多个Message,例如需要回复一个消息的时候,通常把回复的消息的JMSCorrelationID设置为原来消息的ID。在下面这个例子中,创建了三个消息生产者A,B,C和三个消息消费者A,B,C。生产者A给消费者A发送一个消息,同时需要消费者A给它回复一个消息。B、C与A类似。
??? 简图如下:
???????生产者A-----发送----〉消费者A-----回复------〉生产者A
??????? 生产者B-----发送----〉消费者B-----回复------〉生产者B
??????? 生产者C-----发送----〉消费者C-----回复------〉生产者C
???
??? 需要注意的是,所有的发送和回复都使用同一个Queue,通过Selector区分。

?初涉 JMS 值得学习旁人的

?

运行结果为:
ConsumerA get:Message from ProducerA
ProducerA get reply:Reply from ConsumerA
ConsumerB get:Message from ProducerB
ProducerB get reply:Reply from ConsumerB
ConsumerC get:Message from ProducerC
ProducerC get reply:Reply from ConsumerC

?

?

11. TemporaryQueue和TemporaryTopic

?

?TemporaryQueue和TemporaryTopic,从字面上就可以看出它们是“临时”的目的地。可以通过Session来创建,例如:
??? TemporaryQueue replyQueue = session.createTemporaryQueue();
??? 虽然它们是由Session来创建的,但是它们的生命周期确实整个Connection。如果在一个Connection上创建了两个Session,则一个Session创建的TemporaryQueue或TemporaryTopic也可以被另一个Session访问。那如果这两个Session是由不同的Connection创建,则一个Session创建的TemporaryQueue不可以被另一个Session访问。
??? 另外,它们的主要作用就是用来指定回复目的地, 即作为JMSReplyTo。
??? 在下面的例子中,先创建一个Connection,然后创建两个Session,其中一个Session创建了一个TemporaryQueue,另一个Session在这个TemporaryQueue上读取消息。

?

?初涉 JMS 值得学习旁人的

?

运行结果为:
Get Message: SimpleMessage
Get reply: ReplyMessage
如果将:
Session session2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
更改为:
Connection connection2 = factory.createConnection();
Session session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
就会得到类似于下面的异常:
Exception in thread "main" javax.jms.InvalidDestinationException: Cannot use a Temporary destination from another Connection。

?

12.?MDB??

?

在EJB3中,一个MDB(消息驱动Bean)就是一个实现了MessageListener接口的POJO。下面就是一个简单的MDB。

?初涉 JMS 值得学习旁人的?初涉 JMS 值得学习旁人的?初涉 JMS 值得学习旁人的?
    public?class?MessageSenderClient?{? ?????????????????public?static?void?main(String[]?args)?throws?Exception?{? ?????????????????????Properties?props?=?new?Properties();? ?????????????????????props.setProperty(Context.INITIAL_CONTEXT_FACTORY,? ?????????????"org.jnp.interfaces.NamingContextFactory");? ?????????????????????props.setProperty(Context.PROVIDER_URL,?"localhost:2099");? ?????????????????????Context?context?=?new?InitialContext(props);? ?????????????????????IMessageSender?messageSender?=?(IMessageSender)? ?????????????context.lookup("MessageSender/remote");? ?????????????????????messageSender.sendMessage("Hello");? ?????????????????}? ?????????????}??

热点排行