首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

rabbitmq 学习-7- 官方rabbitmq+spring进展远程接口调用

2013-02-24 
rabbitmq 学习-7- 官方rabbitmq+spring进行远程接口调用到http://github.com/momania/spring-rabbitmq下载

rabbitmq 学习-7- 官方rabbitmq+spring进行远程接口调用
到http://github.com/momania/spring-rabbitmq下载其示例程序

实行远程接口调用,主要在com.rabbitmq.spring.remoting下几个类:
发布服务端(Server):RabbitInvokerServiceExporter.java
接口调用客户端(Client):RabbitInvokerProxyFactoryBean.java,RabbitInvokerClientInterceptor.java,
RabbitRpcClient.java(对RpcClient的简单封装,添加了发送消息时的选项:
mandatory--是否强制发送,immediate--是否立即发送,timeOutMs--超时时间)


发布服务端(Server)——RabbitInvokerServiceExporter.java说明:
package com.rabbitmq.spring.remoting;

import static java.lang.String.format;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationBasedExporter;
import org.springframework.remoting.support.RemoteInvocationResult;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.RpcServer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.spring.ExchangeType;
import com.rabbitmq.spring.InvalidRoutingKeyException;
import com.rabbitmq.spring.channel.RabbitChannelFactory;

public class RabbitInvokerServiceExporter extends RemoteInvocationBasedExporter
        implements InitializingBean, DisposableBean, ShutdownListener {

    private final Log log = LogFactory
            .getLog(RabbitInvokerServiceExporter.class);

    private RabbitChannelFactory channelFactory;
    private String exchange;
    private ExchangeType exchangeType;
    private String queueName;
    private String routingKey;

    private Object proxy;
    private List<RpcServer> rpcServerPool;
    private int poolsize = 1;

    public void afterPropertiesSet() {
        // 检查exchange type类型不能为fanout
        if (exchangeType.equals(ExchangeType.FANOUT)) {
            throw new InvalidRoutingKeyException(String.format(
                    "Exchange type %s not allowed for service exporter",
                    exchangeType));
        }
        exchangeType.validateRoutingKey(routingKey);

        // 调用org.springframework.remoting.support.RemoteExporter的getProxyForService(),得到代理对象
        proxy = getProxyForService();

        // 初始化rpcServer池
        rpcServerPool = new ArrayList<RpcServer>(poolsize);

        // 初始化RpcServer,并开始接收请求
        startRpcServer();
    }

    // 初始化RpcServer,并开始接收请求
    private void startRpcServer() {
        try {
            log.info("Creating channel and rpc server");

            // 创建临时的channel,用来定义queue,exchange,并进行bind
            // 这里有两个用处:
            // 1:在服务端也定义queue,避免因为先开服务端而出现queue没被定义的错误
            // 2:这里先用一个channel定义一下qeueue,后面的for循环里面就不用每个都去定义了
            Channel tmpChannel = channelFactory.createChannel();
            tmpChannel.getConnection().addShutdownListener(this);
            tmpChannel.queueDeclare(queueName, false, false, false, true, null);
            if (exchange != null) {
                tmpChannel.exchangeDeclare(exchange, exchangeType.toString());
                tmpChannel.queueBind(queueName, exchange, routingKey);
            }

            // 创建poolsize个RpcServer,每个RpcServer使用一个单独的channel,并且分别使用单独的线程去接收请求,提升接收速度
            for (int i = 1; i <= poolsize; i++) {
                try {
                    // 每次都创建一个新的channel,因为一个channel在多个线程中使用是会有问题的(官方文档和channel的JavaDoc上是这样说的)
                    Channel channel = channelFactory.createChannel();
                    String format = "Starting rpc server %d on exchange [%s(%s)] - queue [%s] - routingKey [%s]";
                    log.info(String.format(format, i, exchange, exchangeType,
                            queueName, routingKey));

                    // 使用当前的channel创建一个RpcServer去处理请求
                    final RpcServer rpcServer = createRpcServer(channel);
                    rpcServerPool.add(rpcServer);

                    // 创建一个线程让当前的RpcServer去处理请求
                    Runnable main = new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // rpcServer开始处理请求
                                throw rpcServer.mainloop();
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    };
                    // 线程开始
                    new Thread(main).start();
                } catch (IOException e) {
                    log.warn("Unable to create rpc server", e);
                }
            }
        } catch (Exception e) {
            log.error("Unexpected error trying to start rpc servers", e);
        }
    }

    // 创建RpcServer对象
    private RpcServer createRpcServer(Channel channel) throws IOException {
        return new RpcServer(channel, queueName) {

            // 重写处理接收到的消息的方法
            public byte[] handleCall(byte[] requestBody,
                    AMQP.BasicProperties replyProperties) {
                // 因为在客户端调用方法的时候,是将客户端调用的方法的信息封装成一个RemoteInvocation对象,然后序列化成一个byte数据再使用RpcClient发送到服务端的
                // 所以在这里(服务端接收消息),将消息(requestBody)反序列化成RemoteInvocation对象
                RemoteInvocation invocation = (RemoteInvocation) SerializationUtils
                        .deserialize(requestBody);

                // 根据RemoteInvocation的信息,服务端使用代理对象执行相应的方法,并得到执行结果
                RemoteInvocationResult result = invokeAndCreateResult(
                        invocation, proxy);

                // 将执行结果序列化为byte数据,然后返回给客户端
                return SerializationUtils.serialize(result);

            }
        };
    }

    public void setChannelFactory(RabbitChannelFactory channelFactory) {
        this.channelFactory = channelFactory;
    }

    @Required
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public Object getProxy() {
        return proxy;
    }

    @Override
    public void destroy() throws Exception {
        clearRpcServers();
    }

    // 清除所有的RpcServer
    private void clearRpcServers() {
        if (log.isInfoEnabled()) {
            log.info(format("Closing %d rpc servers", rpcServerPool.size()));
        }

        for (RpcServer rpcServer : rpcServerPool) {
            try {
                // 中止处理请求
                rpcServer.terminateMainloop();
                rpcServer.close();
            } catch (Exception e) {
                log.warn("Error termination rpcserver loop", e);
            }
        }
        rpcServerPool.clear();
        if (log.isInfoEnabled()) {
            log.info("Rpc servers closed");
        }

    }

    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        if (log.isInfoEnabled()) {
            log.info(String.format("Channel connection lost for reason [%s]",
                    cause.getReason()));
            log.info(String.format("Reference [%s]", cause.getReference()));
        }

        if (cause.isInitiatedByApplication()) {
            if (log.isInfoEnabled()) {
                log.info("Sutdown initiated by application");
            }
        } else if (cause.isHardError()) {
            log
                    .error("Shutdown is a hard error, trying to restart the RPC server...");
            startRpcServer();
        }
    }

    public void setExchange(String exchange) {
        this.exchange = exchange;
    }

    @Required
    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public void setPoolsize(int poolsize) {
        this.poolsize = poolsize;
    }

    @Required
    public void setExchangeType(ExchangeType exchangeType) {
        this.exchangeType = exchangeType;
    }
}
----------------------------------
服务端,发布接口
/**
* 获取连接的工厂类
*/
package com.sun.study.spring.rabbitmq;

import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.springframework.beans.factory.InitializingBean;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;
import com.rabbitmq.client.ShutdownSignalException;

/**
* @author sunjun 2010-5-9 下午01:49:40
*/
public class RabbitConnectionFactory implements InitializingBean {

    private final static Log logger = org.apache.commons.logging.LogFactory
            .getLog(RabbitConnectionFactory.class);

    private Map<String, Connection> CONNECTION_POOL = new HashMap<String, Connection>();
    private Address[] address;
    private ConnectionFactory connectionFactory;

    private String hosts;
    private String username = "guest";
    private String password = "guest";
    private String vhost = "/";

    /**
     * init address
     *
     * @return
     */
    private void initAddress() {
        String[] hostArray = hosts.split(";");
        List<Address> addressList = new LinkedList<Address>();
        for (int i = 0; i < hostArray.length; i++) {
            if (!StringUtils.isBlank(hostArray[i]))
                addressList.add(new Address(hostArray[i].trim(),
                        AMQP.PROTOCOL.PORT));
        }
        address = addressList.toArray(new Address[] {});
    }

    /**
     * init connectionFactory
     */
    private void initConnectionFactory() {
        ConnectionParameters connectionParameters = new ConnectionParameters();
        connectionParameters.setUsername(username);
        connectionParameters.setPassword(password);
        connectionParameters.setVirtualHost(vhost);
        connectionFactory = new ConnectionFactory(connectionParameters);
    }

    public void afterPropertiesSet() throws Exception {
        initAddress();
        initConnectionFactory();
    }

    /**
     * get connection
     *
     * @param name
     * @return
     * @throws Exception
     */
    public Connection getConnection(String name) throws Exception {
        synchronized (CONNECTION_POOL) {
            Connection connection = CONNECTION_POOL.get(name);
            if (connection == null || !connection.isOpen()) {
                connection = connectionFactory.newConnection(address);
                if (logger.isInfoEnabled()) {
                    logger.info("new rabbitmq connection sucess with host: "
                            + connection.getHost() + " at time " + new Date());
                }
                CONNECTION_POOL.put(name, connection);
            }
            return connection;
        }
    }

    public void shutdownCompleted(ShutdownSignalException cause) {
        if (logger.isInfoEnabled()) {
            logger.info(String.format(
                    "Channel connection lost for reason [%s]", cause
                            .getReason()));
            logger.info(String.format("Reference [%s]", cause.getReference()));
        }
        if (cause.isInitiatedByApplication()) {
            if (logger.isInfoEnabled()) {
                logger.info("Sutdown initiated by application");
            }
        } else if (cause.isHardError()) {
            logger
                    .error("Shutdown is a hard error, you can trying to reconnect...");
        }
    }

    public String getHosts() {
        return hosts;
    }

    public void setHosts(String hosts) {
        this.hosts = hosts;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getVhost() {
        return vhost;
    }

    public void setVhost(String vhost) {
        this.vhost = vhost;
    }
}
/**
* 处理客户端的请求,服务端核心处理类
*/
package com.sun.study.spring.rabbitmq;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.SerializationUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.remoting.support.RemoteExporter;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationResult;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.RpcServer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
* @author sunjun
* @create 2010-5-9 下午03:11:07
*/
public class RabbitmqInvokerExporter extends RemoteExporter implements
        InitializingBean, DisposableBean, ShutdownListener {

    private final static String PREFIX_QUEUE = "queue_";

    private RabbitConnectionFactory rabbitConnectionFactory;
    private int poolsize = 1;

    private List<RpcServer> rpcServerPool;
    private Object proxy;
    private String queueName;

    /**
     * create a channel
     *
     * @return
     * @throws Exception
     */
    private Channel createChannel() throws Exception {
        Connection connection = rabbitConnectionFactory
                .getConnection(getServiceInterface().getName());
        connection.addShutdownListener(this);
        Channel channel = connection.createChannel();
        channel.addShutdownListener(this);
        return channel;
    }

    /**
     * create a RpcServer
     *
     * @param channel
     * @return
     * @throws IOException
     */
    private RpcServer createRpcServer(Channel channel) throws IOException {
        return new RpcServer(channel, queueName) {

            public byte[] handleCall(byte[] requestBody,
                    BasicProperties replyProperties) {
                RemoteInvocation remoteInvocation = (RemoteInvocation) SerializationUtils
                        .deserialize(requestBody);
                RemoteInvocationResult result = null;
                try {
                    result = new RemoteInvocationResult(remoteInvocation
                            .invoke(proxy));
                } catch (Exception e) {
                    logger.error("handle request error...", e);
                    result = new RemoteInvocationResult(e);
                }
                return SerializationUtils.serialize(result);
            }

        };
    }

    /**
     * init a RpcServer
     *
     * @throws Exception
     * @throws Exception
     */
    private void initRpcServer(boolean queueDeclare) throws Exception {
        Channel channel = createChannel();
        if (queueDeclare)
            channel.queueDeclare(queueName);
        final RpcServer rpcServer = createRpcServer(channel);
        rpcServerPool.add(rpcServer);
        Runnable thread = new Runnable() {

            @Override
            public void run() {
                try {
                    throw rpcServer.mainloop();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

        };
        new Thread(thread).start();
    }

    /**
     * init RpcServers
     *
     * @throws IOException
     */
    private void initRpcServers() {
        try {
            Channel channel = createChannel();
            // queueDeclare(queue, false, false, false, false, null);
            channel.queueDeclare(queueName);
            for (int i = 0; i < poolsize; i++)
                initRpcServer(false);
            if (logger.isInfoEnabled())
                logger.info("init " + rpcServerPool.size()
                        + " RpcServer success for " + queueName);
        } catch (Exception e) {
            logger.error("init RpcServers error.", e);
        }
    }

    /**
     * check RpcServer
     */
    private void checkRpcServers() {
        Thread checkThread = new Thread() {
            public void run() {
                long checkTime = 2000;
                while (true) {
                    try {
                        Thread.sleep(checkTime);
                    } catch (InterruptedException e) {
                    }
                    if (rpcServerPool.isEmpty()) {
                        initRpcServers();
                        continue;
                    }
                    boolean error = false;
                    try {
                        for (RpcServer rpcServer : rpcServerPool) {
                            Channel channel = rpcServer.getChannel();
                            if (channel == null || !channel.isOpen()) {
                                try {
                                    rpcServer.terminateMainloop();
                                    rpcServer.close();
                                } catch (Exception e) {
                                    error = true;
                                }
                                rpcServerPool.remove(rpcServer);
                                initRpcServer(true);
                            }
                        }
                        for (int i = 0; i < poolsize - rpcServerPool.size(); i++)
                            initRpcServer(true);
                    } catch (Exception e) {
                        error = true;
                    }
                    checkTime = error ? 2000 : 5000;
                }
            }
        };
        checkThread.setDaemon(true);
        checkThread.start();
    }

    public void afterPropertiesSet() {
        proxy = getProxyForService();
        rpcServerPool = new ArrayList<RpcServer>(poolsize);
        queueName = PREFIX_QUEUE + getServiceInterface().getName();
        initRpcServers();
        checkRpcServers();
    }

    public void shutdownCompleted(ShutdownSignalException cause) {
        if (logger.isInfoEnabled()) {
            logger.info(String.format(
                    "Channel connection lost for reason [%s]", cause
                            .getReason()));
            logger.info(String.format("Reference [%s]", cause.getReference()));
        }
        if (cause.isInitiatedByApplication()) {
            if (logger.isInfoEnabled()) {
                logger.info("Sutdown initiated by application");
            }
        } else if (cause.isHardError()) {
            logger
                    .error("Shutdown is a hard error, you can trying to reconnect...");
        }
    }

    public void destroy() throws Exception {
        for (RpcServer rpcServer : rpcServerPool) {
            rpcServer.terminateMainloop();
            rpcServer.close();
        }
        rpcServerPool.clear();
    }

    public void setPoolsize(int poolsize) {
        this.poolsize = poolsize;
    }

    public void setRabbitConnectionFactory(
            RabbitConnectionFactory rabbitConnectionFactory) {
        this.rabbitConnectionFactory = rabbitConnectionFactory;
    }

}
-----------------------------
客户端,获得接口代理对象,进行远程调用
/**
* 包装RpcClient,进行远程调用后,需要同步等待结果
*/
package com.sun.study.spring.rabbitmq;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.MethodArgumentReader;
import com.rabbitmq.client.impl.MethodArgumentWriter;
import com.rabbitmq.utility.BlockingCell;

/**
* @author sunjun
* @create 2010-5-16 下午09:56:55
*/
class RabbitRpcClient {

    private final Channel channel;
    private final String exchange;
    private final String routingKey;

    private final Map<String, BlockingCell<Object>> continuationMap = new HashMap<String, BlockingCell<Object>>();
    private int correlationId;

    private final String replyQueue;
    private DefaultConsumer consumer;

    private final boolean mandatory;
    private final boolean immediate;
    private final int timeOutMs;

    public RabbitRpcClient(Channel channel, String exchange, String routingKey,
            int timeOutMs) throws IOException {
        this(channel, exchange, routingKey, timeOutMs, false, false);
    }

    @SuppressWarnings( { "ConstructorWithTooManyParameters" })
    public RabbitRpcClient(Channel channel, String exchange, String routingKey,
            int timeOutMs, boolean mandatory, boolean immediate)
            throws IOException {
        this.channel = channel;
        this.exchange = exchange;
        this.routingKey = routingKey;
        this.timeOutMs = timeOutMs;
        this.mandatory = mandatory;
        this.immediate = immediate;
        correlationId = 0;

        replyQueue = setupReplyQueue();
        consumer = setupConsumer();
    }

    void checkConsumer() throws IOException {
        if (consumer == null) {
            throw new EOFException("RpcClient is closed");
        }
    }

    public void close() throws IOException {
        if (consumer != null) {
            channel.basicCancel(consumer.getConsumerTag());
            consumer = null;
        }
    }

    private String setupReplyQueue() throws IOException {
        return channel.queueDeclare("", false, false, true, true, null)
                .getQueue();
    }

    private DefaultConsumer setupConsumer() throws IOException {
        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleShutdownSignal(String consumerTag,
                    ShutdownSignalException signal) {

                synchronized (continuationMap) {
                    for (Map.Entry<String, BlockingCell<Object>> entry : continuationMap
                            .entrySet()) {
                        entry.getValue().set(signal);
                    }
                    RabbitRpcClient.this.consumer = null;
                }
            }

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                synchronized (continuationMap) {
                    String replyId = properties.getCorrelationId();
                    BlockingCell<Object> blocker = continuationMap.get(replyId);
                    continuationMap.remove(replyId);
                    blocker.set(body);
                }
            }
        };
        channel.basicConsume(replyQueue, true, consumer);
        return consumer;
    }

    void publish(AMQP.BasicProperties props, byte[] message) throws IOException {
        channel.basicPublish(exchange, routingKey, mandatory, immediate, props,
                message);

    }

    public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
            throws IOException, ShutdownSignalException, TimeoutException {
        AMQP.BasicProperties localProps = props;
        checkConsumer();
        BlockingCell<Object> k = new BlockingCell<Object>();
        synchronized (continuationMap) {
            correlationId++;
            String replyId = "" + correlationId;
            if (localProps != null) {
                localProps.setCorrelationId(replyId);
                localProps.setReplyTo(replyQueue);
            } else {
                localProps = new AMQP.BasicProperties(null, null, null, null,
                        null, replyId, replyQueue, null, null, null, null,
                        null, null, null);
            }
            continuationMap.put(replyId, k);
        }
        publish(localProps, message);
        Object reply = k.uninterruptibleGet(timeOutMs);
        if (reply instanceof ShutdownSignalException) {
            ShutdownSignalException sig = (ShutdownSignalException) reply;
            ShutdownSignalException wrapper = new ShutdownSignalException(sig
                    .isHardError(), sig.isInitiatedByApplication(), sig
                    .getReason(), sig.getReference());
            wrapper.initCause(sig);
            throw wrapper;
        } else {
            return (byte[]) reply;
        }
    }

    public byte[] primitiveCall(byte[] message) throws IOException,
            ShutdownSignalException, TimeoutException {
        return primitiveCall(null, message);
    }

    public String stringCall(String message) throws IOException,
            ShutdownSignalException, TimeoutException {
        return new String(primitiveCall(message.getBytes()));
    }

    @SuppressWarnings( { "IOResourceOpenedButNotSafelyClosed" })
    public Map<String, Object> mapCall(Map<String, Object> message)
            throws IOException, ShutdownSignalException, TimeoutException {
        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
        MethodArgumentWriter writer = new MethodArgumentWriter(
                new DataOutputStream(buffer));
        writer.writeTable(message);
        writer.flush();
        byte[] reply = primitiveCall(buffer.toByteArray());
        MethodArgumentReader reader = new MethodArgumentReader(
                new DataInputStream(new ByteArrayInputStream(reply)));
        return reader.readTable();
    }

    public Map<String, Object> mapCall(Object[] keyValuePairs)
            throws IOException, ShutdownSignalException, TimeoutException {
        Map<String, Object> message = new HashMap<String, Object>();
        for (int i = 0; i < keyValuePairs.length; i += 2) {
            message.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
        }
        return mapCall(message);
    }

    public Channel getChannel() {
        return channel;
    }

    public String getExchange() {
        return exchange;
    }

    public String getRoutingKey() {
        return routingKey;
    }

    public Map<String, BlockingCell<Object>> getContinuationMap() {
        return Collections.unmodifiableMap(continuationMap);
    }

    public int getCorrelationId() {
        return correlationId;
    }

    public String getReplyQueue() {
        return replyQueue;
    }

    public Consumer getConsumer() {
        return consumer;
    }
}
/**
* 生成接口代理对象,进行远程调用
*/
package com.sun.study.spring.rabbitmq;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.lang.SerializationUtils;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.remoting.support.RemoteAccessor;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationResult;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

/**
* @author sunjun
* @create 2010-5-9 下午04:49:58
*/
public class RabbitmqProxyFactoryBean extends RemoteAccessor implements
        FactoryBean, MethodInterceptor, InitializingBean, DisposableBean,
        ShutdownListener {

    private final static String PREFIX_QUEUE = "queue_";
    private final static String PREFIX_EXCHANGE = "exchange_";
    private final static String PREFIX_ROUTINGKEY = "routingkey_";
    private final static String EXCHANGE_TYPE = "topic";

    private String exchange;
    private String routingKey;
    private String queueName;

    private RabbitConnectionFactory rabbitConnectionFactory;
    private String exchangeType = EXCHANGE_TYPE;
    private int poolsize = 1;
    private int timeoutMs = 0;
    private boolean mandatory;
    private boolean immediate;

    private Object serviceProxy;
    private final BlockingQueue<RabbitRpcClient> rpcClientPool = new LinkedBlockingQueue<RabbitRpcClient>();

    /**
     * create a channel
     *
     * @return
     * @throws Exception
     */
    private Channel createChannel() throws Exception {
        Connection connection = rabbitConnectionFactory
                .getConnection(getServiceInterface().getName());
        connection.addShutdownListener(this);
        Channel channel = connection.createChannel();
        channel.addShutdownListener(this);
        return channel;
    }

    /**
     * init many RpcClient
     */
    private void initRpcClients() {
        try {
            Channel temChannel = createChannel();

            temChannel.queueDeclare(queueName);
            temChannel.exchangeDeclare(exchange, EXCHANGE_TYPE);
            temChannel.queueBind(queueName, exchange, routingKey);

            for (int i = 0; i < poolsize; i++) {
                RabbitRpcClient rpcClient = createRpcClient(exchange,
                        routingKey, false);
                rpcClientPool.add(rpcClient);
            }
        } catch (Exception e) {
            logger.error("create many RpcClient error.", e);
            for (int i = 0; i < poolsize - rpcClientPool.size(); i++)
                rpcClientPool.add(null);
        }
    }

    /**
     * create a RpcClient
     *
     * @throws Exception
     */
    private RabbitRpcClient createRpcClient(String exchange, String routingKey,
            boolean declareAndBind) throws Exception {
        Channel channel = createChannel();
        if (declareAndBind) {
            channel.queueDeclare(queueName);
            channel.exchangeDeclare(exchange, EXCHANGE_TYPE);
            channel.queueBind(queueName, exchange, routingKey);
        }
        final RabbitRpcClient rpcClient = new RabbitRpcClient(channel,
                exchange, routingKey, timeoutMs, mandatory, immediate);
        channel.setReturnListener(new ReturnListener() {
            public void handleBasicReturn(int replyCode, String replyText,
                    String exchange, String routingKey,
                    AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                // call handle result here, so uninterruptable cal will
                // be interrupted
                Throwable resultException;
                String msg;
                switch (replyCode) {
                case AMQP.NO_CONSUMERS:
                    msg = "No consumers for message [%s] - [%s] - [%s]";
                    resultException = new RuntimeException(String.format(msg,
                            SerializationUtils.deserialize(body), exchange,
                            routingKey));
                    break;
                case AMQP.NO_ROUTE:
                    msg = "Unroutable message [%s] - [%s] - [%s]";
                    resultException = new RuntimeException(String.format(msg,
                            SerializationUtils.deserialize(body), exchange,
                            routingKey));
                    break;
                default:
                    msg = "Message returned [%s] - [%s] - [%s] - [%d] - [%s]";
                    resultException = new RuntimeException(String.format(msg,
                            SerializationUtils.deserialize(body), exchange,
                            routingKey, replyCode, replyText));
                }
                RemoteInvocationResult remoteInvocationResult = new RemoteInvocationResult(
                        resultException);
                rpcClient.getConsumer().handleDelivery(null, null, properties,
                        SerializationUtils.serialize(remoteInvocationResult));
            }
        });
        if (logger.isInfoEnabled()) {
            String str = "Started rpc client on exchange [%s(%s)] - routingKey [%s]";
            logger.info(String.format(str, exchange, exchangeType, routingKey));
        }
        return rpcClient;
    }

    /**
     * check RpcClient
     *
     * @param rpcClient
     * @return
     * @throws Exception
     */
    private RabbitRpcClient checkRpcClient(RabbitRpcClient rpcClient)
            throws Exception {
        boolean create = false;
        if (rpcClient == null)
            create = true;
        else {
            Channel channel = rpcClient.getChannel();
            if (channel == null || !channel.isOpen())
                create = true;
        }
        if (!create)
            return rpcClient;
        return createRpcClient(exchange, routingKey, true);
    }

    public void afterPropertiesSet() {
        String serviceInterfaceName = getServiceInterface().getName();
        exchange = PREFIX_EXCHANGE + serviceInterfaceName;
        routingKey = PREFIX_ROUTINGKEY + serviceInterfaceName;
        queueName = PREFIX_QUEUE + serviceInterfaceName;

        initRpcClients();

        serviceProxy = new ProxyFactory(getServiceInterface(), this)
                .getProxy(getBeanClassLoader());
    }

    public Object invoke(MethodInvocation methodInvocation) throws Throwable {
        RemoteInvocation remoteInvocation = new RemoteInvocation(
                methodInvocation);
        RabbitRpcClient rpcClient = rpcClientPool.poll(timeoutMs,
                TimeUnit.MILLISECONDS);
        rpcClient = checkRpcClient(rpcClient);
        if (rpcClient != null) {
            byte[] response;
            try {
                byte[] message = SerializationUtils.serialize(remoteInvocation);
                response = rpcClient.primitiveCall(message);
            } finally {
                rpcClientPool.put(rpcClient);
            }
            RemoteInvocationResult remoteInvocationResult = (RemoteInvocationResult) SerializationUtils
                    .deserialize(response);
            return remoteInvocationResult.recreate();
        }
        throw new TimeoutException(
                "Timed out while waiting for available rpc client");
    }

    public void destroy() throws Exception {

    }

    public void shutdownCompleted(ShutdownSignalException cause) {

    }

    public Object getObject() throws Exception {
        return serviceProxy;
    }

    public Class getObjectType() {
        return getServiceInterface();
    }

    public boolean isSingleton() {
        return true;
    }

    public void setRabbitConnectionFactory(
            RabbitConnectionFactory rabbitConnectionFactory) {
        this.rabbitConnectionFactory = rabbitConnectionFactory;
    }

    public void setPoolsize(int poolsize) {
        this.poolsize = poolsize;
    }

    public void setTimeoutMs(int timeoutMs) {
        this.timeoutMs = timeoutMs;
    }

    public void setMandatory(boolean mandatory) {
        this.mandatory = mandatory;
    }

    public void setImmediate(boolean immediate) {
        this.immediate = immediate;
    }

    public void setExchangeType(String exchangeType) {
        this.exchangeType = exchangeType;
    }

}
---------------------------------
前面两步后,进行Spring的配置
src/main/resources/rabbitmq/spring-rabbitmq-base.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

    <bean id="rabbitConnectionFactory" value="localhost" />
        <property name="username" value="guest" />
        <property name="password" value="guest" />
        <property name="vhost" value="/" />
    </bean>

</beans>
src/main/resources/rabbitmq/spring-rabbitmq-server.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

    <import resource="spring-rabbitmq-base.xml" />

    <bean id="userService" />

    <bean ref="userService" />
        <property name="serviceInterface" value="com.sun.study.service.UserService" />
        <property name="poolsize" value="10" />
        <property name="rabbitConnectionFactory" ref="rabbitConnectionFactory" />
    </bean>

</beans>
src/main/resources/rabbitmq/spring-rabbitmq-client.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

    <import resource="spring-rabbitmq-base.xml" />

    <bean id="userService" value="com.sun.study.service.UserService" />
        <property name="rabbitConnectionFactory" ref="rabbitConnectionFactory" />
        <property name="timeoutMs" value="5000" />
        <property name="mandatory" value="true" />
        <property name="immediate" value="false" />
    </bean>

</beans>
--------------------------------
测试:
Server:
/**
* 开启服务端
*/
package rabbitmq;

import org.testng.annotations.Test;
import org.unitils.UnitilsTestNG;
import org.unitils.spring.annotation.SpringApplicationContext;

/**
* @author Administrator
*
*/
@Test
@SpringApplicationContext(value = { "classpath:/rabbitmq/spring-rabbitmq-server.xml" })
public class RpcServerTest extends UnitilsTestNG {

    public void test() {
        while (true) {
        }
    }

}
Client:
/**
* Client调用,测试
*/
package rabbitmq;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

import com.sun.study.service.UserService;

/**
* @author sunjun
* @create 2010-5-15 下午08:01:05
*/
public class Test {

    /**
     * @param args
     */
    public static void main(String[] args) {
        ApplicationContext context = new FileSystemXmlApplicationContext(
                "classpath:/rabbitmq/spring-rabbitmq-client.xml");
        UserService userService = (UserService) context.getBean("userService");
        System.out.println(userService.save(11));
    }
}

热点排行