首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

JMS、AMQP范例讲解

2012-07-01 
JMS、AMQP实例讲解?使用Git从GitHub上将samples代码拷贝到本机,然后导入到IDE中?RabbitMQ默认有一个exchang

JMS、AMQP实例讲解

?

使用Git从GitHub上将samples代码拷贝到本机,然后导入到IDE中
?RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃。这就是为什么代码中channel执行basicPulish方法时,第二个参数本应该为routing key,却被写上了QUEUE_NAME。Recv.java
channel.queueDeclare:第一个参数:队列名字,第二个参数:队列是否可持久化即重启后该队列是否依然存在,第三个参数:该队列是否时独占的即连接上来时它占用整个网络连接,第四个参数:是否自动销毁即当这个队列不再被使用的时候即没有消费者对接上来时自动删除,第五个参数:其他参数如TTL(队列存活时间)等。channel.basicConsume:第一个参数:队列名字,第二个参数:是否自动应答,如果为真,消息一旦被消费者收到,服务端就知道该消息已经投递,从而从队列中将消息剔除,否则,需要在消费者端手工调用channel.basicAck()方法通知服务端,如果没有调用,消息将会进入unacknowledged状态,并且当消费者连接断开后变成ready状态重新进入队列,第三个参数,具体消费者类。work queuesWorker.java
在本代码中,channel执行basicConsume方法时autoAck为false,这就意味着接受者在收到消息后需要主动通知RabbitMQ才能将该消息从队列中删除,否则该在接收者跟MQ连接没断的情况下,消息将会变为untracked状态,一旦接收者断开连接,消息重新变为ready状态。通知MQ需要调用channel.basicAck(int, boolean),如果不调用,消息永远不会从队列中消失。该方法第一个参数为一个标志,一般是delivery.getEnvelope().getDeliveryTag(),其实就是一个递增的数字,它表示这个这个队列中第几个消息。以下解释错误!第二个参数为true表示通知所有untracked的消息,false标志只通知第一个参数对应的那个消息。不管是true还是false,只要执行了channel.basicAck方法,消息都会从队列中删除。第二个参数
?publish subscribeEmitLog.java
?ReceiveLogs.java
?发布订阅,本代码演示的是fanout exchange,这种类型的exchange将它收到的所有消息直接发送给所有跟它绑定的队列,这里说了直接,是因为rouring key对于fanout exchange来说没有任何意义!不管一个队列以怎样的routing key和fanout exhange绑定,只要他们绑定了,消息就会送到队列。代码中发送端将消息发到logs名字的fanout exchange,routing key为空字符串,你可以将它改成任何其他值或者null试试看。另外,接收端代码使用channel声明了一个临时队列,并将这个队列通过空字符串的routing key绑定到fanout exchange。这个临时队列的名字的随机取的,如:amq.gen-U0srCoW8TsaXjNh73pnVAw==,临时队列在后面的请求响应模式中有用到。routingEmitLogDirect.java
?ReceiveLogsDirect.java
?本代码演示了另外一种exchange,direct exchange,该exchange根据routing key将消息发往使用该routing key和exchange绑定的一个或者多个队列里,如果没找到,则消息丢弃。本代码中可以启动3个接收端,分别使用info,warning,error作为routing key,代表3种级别的日志。只要将不同级别的日志发往不同接收端只需将日志级别当作routing key。topicsEmitLogTopic.java
?ReceiveLogsTopic.java
?本代码演示了最后一种类型的exchange,topic exchange,topic exchange和direct exchange最大的不同就是它绑定的routing key是一种模式,而不是简单的一个字符串。为什么要有模式(Patten)这个概念?模式可以理解为对事物描述的一种抽象。以代码种的日志系统为例,使用direct exchange只能区别info,error,debug等等不同级别的日志,但是实际上不光有不同级别的日志,还有不同来源的日志,如操作系统内核的日志,定时脚本等, 使用模式就可以用<level>.<source>表示,更强大的是,模式允许使用通配符,*代表一个单词,#代表一个多个单词。RPCRPCClient.java
?RPCServer.java
?本代码实现了一个简单的RPC,英文全称Remote Procedure Call,中文一般翻译远程方法调用。RPC需要使用一个唯一标志代表请求,Java中使用java.util.UUID实现,发送端在发送消息前通过channel生成一个临时队列,并监听该队列,BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();这句代码生成的就是发送消息的基本属性,可以看到corrId就是UUID,replyQueueName就是临时队列名,这样当接收端收到消息后就知道返回的消息应该发回哪个队列了。
samples-amqp-springpom.xml
??spring-rabbitmq.xml
我们着重讲解以下xml配置文件,第一行就给我们创建了一个mq的连接工厂,第二行创建了一个RabbitTemplate,这是一个模板类,定义了amqp中绝大多数的发送,接收方法。第三行是一个管理器,该bean在创建的时候,会在Spring Context中扫描所有已经注册的queue,exchange,binding并将他们初始化好。第四行声明了一个队列,所见即所得,可以发现使用xml节省了好多代码量。work queuesMyWorker.java
?NewTask.java
?Worker.java
?spring-rabbit-sender.xml
?具体区别可以通过与前面RabbitMQ 原生API写的代码做对照看出来。publish subscribeEmitLog.java
?ReceiveLogs.java
?spring-rabbitmq.xml
?routingEmitLogDirect.java
?ReceiveLogsDirect.java
?spring-rabbitmq.xml
实际上exchange,binding的声明完全可以放在xml中,只是为了展示封装的代码底层到底是如何运行的,才在程序中手工调用方法。topicsEmitLogsTopic.java
?ReceiveLogsTopic.java
?spring-rabbitmq.xml
?rpcRPCClient.java
?spring-rabbitmq-client.xml
spring-rabbitmq-server.xml
??本代码演示了监听器的用法,RabbitTemplate提供的所有方法都是同步的,所有当使用RabbitTemplate的receive方法时,它马上连接到队列,查看是否由消息,有就收下来,并关闭连接,没有也不抛出异常,只返回一个null值。这就解释了为什么我上面代码中多次使用sleep10秒,因为如果先运行接收端,它不能不停循环地收消息,所以在发送端还没发消息时,它就已经结束了。而监听器(Listener)不一样,底层代码中会使用org.springframework.amqp.rabbit.listener.SimepleMessageListenerContainer中的内部类AsyncMessageProcessingConsumer实现,该类为一个线程类,在线程的run方法中执行了while的一段代码。RabbitTemplate提供了一个sendAndReceive()方法,它实现了一个简单的RPC模型。这里还有一个prefetch的含义,该含义同原生API中的Qos一样。spring-amqp-spring-remoting

随后会讲到Spring远程调用框架,在此先把代码列出来

Main.java

?

?

?MyService.java

?

?

?amqp-remoting-sender.xml

?

?

?amqp-remoting.xml

?

?

?关键的几个类有:

?

??MyInvokerClientInterceptor.java

?

?

?MyInvokerProxyFactoryBean.java

?

?

?从输出的结果可以看出,Spring将接口的参数,调用方法,类名字封装到RemoteInvocation类中,这个类是序列的,意味着它可以自由地以字节形式在网络上传输,jms,http,amqp都支持字节形式地消息传输,所以我们能基于接口远程方法调用,无论你采用那种网络传输协议。

samples-jms-plain

pom.xml

?

?

?Sender.java

?

?

?Sender.java

?

??Sender.java

?

?

?publish-subscribe

Receiver1.java

?

?

?Sender.java

?

?

?samples-jms-spring-remoting

pom.xml

?

?

MyService.java

?? ?jms-remoting-sender.xml

?

?

?jms-remoting.xml

?

?

?JMS跟AMQP有很大的区别,JMS有两种类型的队列,一个是点对点的,一种是主题订阅的,发送者直接将消息发送至队列,接受者从队列收消息,对于发布订阅模式,每个消费者都从队列中得到了相同的消息拷贝。





?

热点排行