JMS 异步消息的开发(一)
??? JMS的简单应用:
点对点模型(Point-to-Point)
????????? 点对点消息传送模型允许JMS客户端通过队列(queue)这个虚拟通道来同步和异步发送、接收消息。在点对点模型中,消息生产者称为发送者(Sender),而消息消费者则称为接收者(receiver)。传统上,点对点模型是一个基于拉取(Pull)或基于轮询(polling)的消息传送模型,这种模型从队列中请求消息,而不是自动地将消息推送到客户端。点对点消息传送模型的一个突出特点就是:发送到队列的消息被一个而且仅仅一个接收者所接收,即使可能有多个接收者在一个队列中侦听同一消息时,也是如此。
<?xml version="1.0" encoding="UTF-8"?><!-- $Id: mail-service.xml 62350 2007-04-15 16:50:12Z dimitris@jboss.org $ --><server> <!-- ==================================================================== --> <!-- Mail Connection Factory --> <!-- ==================================================================== --> <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=longgangbai"> <attribute name="JNDIName">queue/longgangbai</attribute> <depends optional-attribute-name="DestinationManager" >jboss.mq:service=DestinationManager</depends> </mbean></server>
?
在客户端:
备注下面为:JMS1.1 TopicConnectionFactory 和QueueConnectionFactory合并为ConnectionFactory
package com.easyway.jboss.jms.ptp.service;import java.util.Properties;import javax.jms.BytesMessage;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.QueueConnection;import javax.jms.QueueConnectionFactory;import javax.jms.QueueSession;import javax.jms.StreamMessage;import javax.jms.TextMessage;import javax.naming.Context;import javax.naming.InitialContext;/** * 队列消息的发送者 * 采用JMS点对点,一对一的(PTP消息传递模型) * @author longgangbai * */public class QueueSender {/** * * @param args */public static void main(String[] args) {QueueConnection conn=null;QueueSession session=null;try {//得到一个JNDI初始化上下文Properties props=new Properties();//设置props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");props.setProperty(Context.PROVIDER_URL, "localhost:1099");props.setProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");InitialContext ctx=new InitialContext(props);//根据上下文查找一个连接工厂TopicConnectionFactory/QueueConnectionFactory (有了两种连接工厂,//根绝topic、queue来是哟偶那个相应的类型),该连接工厂是有JMS提供的,不需要我们自己创建,每一个//厂商为他绑定一个全局的JNDI,我们功过JNDI便可获取它。QueueConnectionFactory factory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");//从连接工厂 中得到一个连接(Connect 的类型有两种,TopicConnection、QueueConnection)conn=factory.createQueueConnection();//通过连接来建立一个会话(Session)//建立一个不需要事物的并且能自动确认消息已接收的会话session=conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);//查找目的地(目的地的类型有两种:Topic、Queue)Destination destination =(Queue)ctx.lookup("queue/longgangbai");//根绝会话以及目的地建立消息生产者MessageProducter(QueueSender 和TopicPublisher都扩展自MessageProducer接口)MessageProducer producer=session.createProducer(destination);TextMessage msg=session.createTextMessage("hi ,longgangbai ,this is jboss jms MDB message");//发送文本消息producer.send(msg);//发送对象消息producer.send(session.createObjectMessage(new SMS("wangnabaobao","this is my girl friend")));MapMessage mapmsg=session.createMapMessage();mapmsg.setObject("name", "baobao");producer.send(mapmsg);BytesMessage bmsg=session.createBytesMessage();bmsg.writeBytes("I am a good boy !".getBytes());producer.send(bmsg);StreamMessage smsg=session.createStreamMessage();smsg.writeString("阿里巴巴,http://www.alibaba.com");producer.send(smsg);} catch (Exception e) {e.printStackTrace();}finally{try {session.close();conn.close();} catch (JMSException e2) {e2.printStackTrace();}}}}?
运行JBoss在:
在jboss.mq.destination中的选择相关的消息服务信息:
?
?
选择listMesages:
?
?
查询
?
?
?接受消息的代码如下:
package com.easyway.jboss.jms.ptp.service;import java.io.ByteArrayOutputStream;import javax.ejb.ActivationConfigProperty;import javax.ejb.MessageDriven;import javax.jms.BytesMessage;import javax.jms.MapMessage;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.ObjectMessage;import javax.jms.StreamMessage;import javax.jms.TextMessage;/** * Queue消息的接受方的MDB * @author longgangbai * * * 在JBoss 的%JAVA_HOME%\server\default\deploy目录下添加相关配置 * @author longgangbai */@MessageDriven(activationConfig={@ActivationConfigProperty(propertyName = "destinationType", //消息的目标的类型: 取值可以为javax.jms.Queue或者javax.jms.TopicpropertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "destination", //消息的目标的地址,取值是目标地址的JDNI名称:propertyValue = "queue/longgangbai"), @ActivationConfigProperty(propertyName = "acknowledgeMode", //消息的确认模式,却只可以为:Auto-acknowledge或者Dups_ok_acknowledge //消息确认:是指JMS客户端通知JMS provider确认消息已经到达的一种机制,在EJB中,收到消息后发送确认是EJB容器的职责, //确认消息就是JMS Provider ,EJB容器已经收到并处理了消息,如没有确认,JMS provider 就不知道容器是否收到了,消息,进而重发消息propertyValue = "Auto-acknowledgeMode")})public class DisplayMessage implements MessageListener{@Overridepublic void onMessage(Message msg) {try {if(msg instanceof TextMessage){TextMessage tmsg=(TextMessage)msg;String content=tmsg.getText();System.out.println(content);}else if(msg instanceof ObjectMessage){ObjectMessage tmsg=(ObjectMessage)msg;SMS sms=(SMS)tmsg.getObject();String content=sms.getUsername()+":"+sms.getMessage();System.out.println(content);}else if(msg instanceof MapMessage){MapMessage mmsg=(MapMessage)msg;String content=mmsg.getString("name");System.out.println(content);}else if(msg instanceof BytesMessage){BytesMessage bytesmsg=(BytesMessage)msg;ByteArrayOutputStream byteStream=new ByteArrayOutputStream();byte[] buffer=new byte[256];int length=0;while((length=bytesmsg.readBytes(buffer))!=-1){byteStream.write(buffer,0,length);}String context=new String(byteStream.toByteArray());System.out.println(context);}else if(msg instanceof ObjectMessage){ObjectMessage tmsg=(ObjectMessage)msg;SMS sms=(SMS)tmsg.getObject();String content=sms.getUsername()+":"+sms.getMessage();System.out.println(content);}else if(msg instanceof StreamMessage){StreamMessage tmsg=(StreamMessage)msg;String content=tmsg.readString();System.out.println(content);}} catch (Exception e) {e.printStackTrace();}}}?
package com.easyway.jboss.jms.ptp.service;import java.util.Properties;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.Queue;import javax.jms.QueueConnection;import javax.jms.QueueConnectionFactory;import javax.jms.QueueSession;import javax.jms.TopicSession;import javax.naming.Context;import javax.naming.InitialContext;/** * 队列消息的接受者 * 采用JMS点对点的模式 * @author longgangbai * */public class QueueReceive {/** * * @param args */public static void main(String[] args) {QueueConnection conn=null;QueueSession session=null;try {//得到一个JNDI初始化上下文Properties props=new Properties();//设置props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");props.setProperty(Context.PROVIDER_URL, "localhost:1099");props.setProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");InitialContext ctx=new InitialContext(props);//根据上下文查找一个连接工厂TopicConnectionFactory/QueueConnectionFactory (有了两种连接工厂,//根绝topic、queue来是哟偶那个相应的类型),该连接工厂是有JMS提供的,不需要我们自己创建,每一个//厂商为他绑定一个全局的JNDI,我们功过JNDI便可获取它。QueueConnectionFactory factory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");//从连接工厂 中得到一个连接(Connect 的类型有两种,TopicConnection、QueueConnection)conn=factory.createQueueConnection();//通过连接来建立一个会话(Session)//建立一个不需要事物的并且能自动确认消息已接收的会话session=conn.createQueueSession(false,TopicSession.AUTO_ACKNOWLEDGE);//查找目的地(目的地的类型有两种:Topic、Queue)Destination destination =(Queue)ctx.lookup("queue/longgangbai");//根绝会话以及目的地建立消息生产者MessageProducter(QueueSender 和TopicPublisher都扩展自MessageProducer接口)MessageConsumer consumer=session.createConsumer(destination);DisplayMessage ml = new DisplayMessage(); consumer.setMessageListener(ml); System.out.println("开始处理休息..");conn.start();System.out.println("消息处理完毕...");} catch (Exception e) {e.printStackTrace();}finally{try {session.close();conn.close();} catch (JMSException e2) {e2.printStackTrace();}}}}?