首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

自定义的RPC的Java兑现

2012-10-29 
自定义的RPC的Java实现在看hadoop的源代码的时候,看到hadoop实现了一个自定义的RPC,于是有了自己写代码实

自定义的RPC的Java实现

在看hadoop的源代码的时候,看到hadoop实现了一个自定义的RPC,于是有了自己写代码实现RPC的想法。

RPC的全名Remote Process Call,即远程过程调用。使用RPC,可以像使用本地的程序一样使用远程服务器上的程序。下面是一个简单的RPC 调用实例,从中可以看到RPC如何使用以及好处:

?

public class MainClient {public static void main(String[] args) {Echo echo = RPC.getProxy(Echo.class, "127.0.0.1", 20382);System.out.println(echo.echo("hello,hello"));}}

?

public interface Echo {public String echo(String string);}
?

?

?

?

使用RPC.getProxy生成接口Echo的代理实现类。然后就可以像使用本地的程序一样来调用Echo中的echo方法。

使用RPC的好处是简化了远程服务访问。提高了开发效率。在分发代码时,只需要将接口分发给客户端使用,在客户端看来只有接口,没有具体类实现。这样保证了代码的可扩展性和安全性。

?

在看了RPCClient如何使用,我们再来定义一个RPC服务器的接口,看看服务器都提供什么操作:

?

?

public interface Server {public void stop();public void start();public void register(Class interfaceDefiner,Class impl);public void call(Invocation invo);public boolean isRunning();public int getPort();}

?

?服务器提供了start和stop方法。使用register注册一个接口和对应的实现类。call方法用于执行Invocation指定的接口的方法名。isRunning返回了服务器的状态,getPort()则返回了服务器使用的端口。

?

来看看Invocation的定义:

?

?

public class Invocation implements Serializable{/** *  */private static final long serialVersionUID = 1L;private Class interfaces;private Method method;private Object[] params;private Object result;/** * @return the result */public Object getResult() {return result;}/** * @param result the result to set */public void setResult(Object result) {this.result = result;}/** * @return the interfaces */public Class getInterfaces() {return interfaces;}/** * @param interfaces the interfaces to set */public void setInterfaces(Class interfaces) {this.interfaces = interfaces;}/** * @return the method */public Method getMethod() {return method;}/** * @param method the method to set */public void setMethod(Method method) {this.method = method;}/** * @return the params */public Object[] getParams() {return params;}/** * @param params the params to set */public void setParams(Object[] params) {this.params = params;}@Overridepublic String toString() {return interfaces.getName()+"."+method.getMethodName()+"("+Arrays.toString(params)+")";}}
?

?

?

?? ? 具体服务器实现类中的call方法是这样使用Invocation的:

?

?

?

@Overridepublic void call(Invocation invo) {Object obj = serviceEngine.get(invo.getInterfaces().getName()); //根据接口名,找到对应的处理类if(obj!=null) {try {Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());Object result = m.invoke(obj, invo.getParams());invo.setResult(result);} catch (Throwable th) {th.printStackTrace();}} else {throw new IllegalArgumentException("has no these class");}}
?

?

??下面来看服务器接收连接并处理连接请求的核心代码:

?

?

?

public class Listener extends Thread {private ServerSocket socket;private Server server;public Listener(Server server) {this.server = server;}@Overridepublic void run() {System.out.println("启动服务器中,打开端口" + server.getPort());try {socket = new ServerSocket(server.getPort());} catch (IOException e1) {e1.printStackTrace();return;}while (server.isRunning()) {try {Socket client = socket.accept();ObjectInputStream ois = new ObjectInputStream(client.getInputStream());Invocation invo = (Invocation) ois.readObject();server.call(invo);ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());oos.writeObject(invo);oos.flush();oos.close();ois.close();} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}try {if (socket != null && !socket.isClosed())socket.close();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
?

?

RPC具体的Server类是这样来使用Listener的:

?

public static class RPCServer implements Server{private int port = 20382;private Listener listener; private boolean isRuning = true;/** * @param isRuning the isRuning to set */public void setRuning(boolean isRuning) {this.isRuning = isRuning;}/** * @return the port */public int getPort() {return port;}/** * @param port the port to set */public void setPort(int port) {this.port = port;}private Map<String ,Object> serviceEngine = new HashMap<String, Object>();@Overridepublic void call(Invocation invo) {System.out.println(invo.getClass().getName());Object obj = serviceEngine.get(invo.getInterfaces().getName());if(obj!=null) {try {Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());Object result = m.invoke(obj, invo.getParams());invo.setResult(result);} catch (Throwable th) {th.printStackTrace();}} else {throw new IllegalArgumentException("has no these class");}}@Overridepublic void register(Class interfaceDefiner, Class impl) {try {this.serviceEngine.put(interfaceDefiner.getName(), impl.newInstance());System.out.println(serviceEngine);} catch (Throwable e) {// TODO Auto-generated catch blocke.printStackTrace();} }@Overridepublic void start() {System.out.println("启动服务器");listener = new Listener(this);this.isRuning = true;listener.start();}@Overridepublic void stop() {this.setRuning(false);}@Overridepublic boolean isRunning() {return isRuning;}}
?

?? ?服务器端代码搞定后,来看看客户端的代码,先看看我们刚开始使用RPC.getProxy方法:

?

public static <T> T getProxy(final Class<T> clazz,String host,int port) {final Client client = new Client(host,port);InvocationHandler handler = new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {Invocation invo = new Invocation();invo.setInterfaces(clazz);invo.setMethod(new org.jy.rpc.protocal.Method(method.getName(),method.getParameterTypes()));invo.setParams(args);client.invoke(invo);return invo.getResult();}};T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);return t;}
?

Client类的代码如下:

?

public class Client {private String host;private int port;private Socket socket;private ObjectOutputStream oos;private ObjectInputStream ois;public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}public Client(String host, int port) {this.host = host;this.port = port;}public void init() throws UnknownHostException, IOException {socket = new Socket(host, port);oos = new ObjectOutputStream(socket.getOutputStream());}public void invoke(Invocation invo) throws UnknownHostException, IOException, ClassNotFoundException {init();System.out.println("写入数据");oos.writeObject(invo);oos.flush();ois = new ObjectInputStream(socket.getInputStream());Invocation result = (Invocation) ois.readObject();invo.setResult(result.getResult());}}
?

?? ?至此,RPC的客户端和服务器端代码完成,启动服务器的代码如下:

?

public class Main {public static void main(String[] args) {Server server = new RPC.RPCServer();server.register(Echo.class, RemoteEcho.class);server.start();}}
?

?? 现在先运行服务器端代码,再运行客户端代码,就可以成功运行。

?? 详细的代码,参考附件的源代码。

?

?? ?在写这个RPC时,没有想太多。在数据串行化上,使用了java的标准io序列化机制,虽然不能跨平台,但是做DEMO还是不错的;另外在处理客户端请求上,使用了ServerSocket,而没有使用ServerSocketChannel这个java nio中的新特性;在动态生成接口的实现类上,使用了java.lang.reflet中的Proxy类。他可以动态创建接口的实现类。

?

?

?

1 楼 kimmking 2011-06-14   赞
我几年前也做过一个,考虑了跨平台。

http://code.google.com/p/rpcfx/ 2 楼 xin_jmail 2012-09-10   很好的教程,谢谢啦~我qq530422429,希望能与楼主交流…

热点排行