首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

rabbitmq学习六:RPC

2012-09-09 
rabbitmq学习6:RPC?? 在《rabbitmq学习2:Work Queues》中我们已经知道了在多个worker如何分配耗时的任务。如

rabbitmq学习6:RPC

?? 在《rabbitmq学习2:Work Queues》中我们已经知道了在多个worker如何分配耗时的任务。如果我现在要在远程的机器上运行然后得到结果,那应当怎么做呢?那就要用到RPC(Remote Procedure Call or RPC)了!

?? 关于RPC的介绍请参考百度百科里的关于RPC的介绍:http://baike.baidu.com/view/32726.htm#sub32726

?? 现在来看看来看看Rabbitmq中RPC吧!RPC的工作示意图如下:

rabbitmq学习六:RPC

?? 上图中的C代表客户端,S表示服务器端;Rabbitmq中的RPC流程如下:

1、首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;

2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致

3、客户端从回调Queue中得到先前corrention_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。

?

? 对于上面所提到的回调Queue中的消费处理使用的是Message propertiesThe AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial. content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json. reply_to: Commonly used to name a callback queue. correlation_id: Useful to correlate RPC responses with requests.

?

package com.abin.rabbitmq;import java.util.UUID;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class RPCClient {private Connection connection;private Channel channel;private String requestQueueName = "rpc_queue";private String replyQueueName;private QueueingConsumer consumer;public RPCClient() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");connection = factory.newConnection();channel = connection.createChannel();replyQueueName = channel.queueDeclare().getQueue();consumer = new QueueingConsumer(channel);channel.basicConsume(replyQueueName, true, consumer);}public String call(String message) throws Exception {String response = null;String corrId = UUID.randomUUID().toString();BasicProperties props = new BasicProperties();props.setReplyTo(replyQueueName);props.setCorrelationId(corrId);channel.basicPublish("", requestQueueName, props, message.getBytes());while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();if (delivery.getProperties().getCorrelationId().equals(corrId)) {response = new String(delivery.getBody(), "UTF-8");break;}}return response;}public void close() throws Exception {connection.close();}public static void main(String[] argv) {RPCClient fibonacciRpc = null;String response = null;try {fibonacciRpc = new RPCClient();System.out.println(" [x] Requesting fib(30)");response = fibonacciRpc.call("30");System.out.println(" [.] Got '" + response + "'");System.out.println(" [x] Requesting fib(-1)");response = fibonacciRpc.call("-1");System.out.println(" [.] Got '" + response + "'");System.out.println(" [x] Requesting fib(a)");response = fibonacciRpc.call("a");System.out.println(" [.] Got '" + response + "'");} catch (Exception e) {e.printStackTrace();} finally {if (fibonacciRpc != null) {try {fibonacciRpc.close();} catch (Exception ignore) {}}}}}

?

?server的代码如下:

package com.abin.rabbitmq;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";private static int fib(int n) {if (n > 1)return fib(n - 1) + fib(n - 2);elsereturn n;}public static void main(String[] argv) {Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");connection = factory.newConnection();channel = connection.createChannel();channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(RPC_QUEUE_NAME, false, consumer);System.out.println(" [x] Awaiting RPC requests");while (true) {String response = null;QueueingConsumer.Delivery delivery = consumer.nextDelivery();BasicProperties props = delivery.getProperties();BasicProperties replyProps = new BasicProperties();replyProps.setCorrelationId(props.getCorrelationId());try {String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);System.out.println(" [.] fib(" + message + ")");response = "" + fib(n);} catch (Exception e) {System.out.println(" [.] " + e.toString());response = "";} finally {channel.basicPublish("", props.getReplyTo(), replyProps,response.getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}}} catch (Exception e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (Exception ignore) {}}}}}

?

先运行服务器端,运行结果如下:

 [x] Awaiting RPC requests

?? 再运行运行客户端,运行结果如下:

 [x] Requesting fib(30) [.] Got '832040' [x] Requesting fib(-1) [.] Got '-1' [x] Requesting fib(a) [.] Got ''

?? 在服务器还可以出现:

 [.] fib(30) [.] fib(-1) [.] java.lang.NumberFormatException: For input string: "a"

?

?

?

1 楼 linxisll 2012-05-15   client端第24行代码
replyQueueName = channel.queueDeclare().getQueue(); 
应该为 replyQueueName = channel.queueDeclare(requestQueueName, false, false, false, null).getQueue();   吧?

2 楼 linxisll 2012-05-15   看错了,这里是replyQueue。使用publish/subscibe时,P端不需要queueDeclare。

热点排行