首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

JAVA NIO异步通信框架MINA选型跟使用的几个细节(概述入门,UDP, 心跳)

2012-06-26 
JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)???? Apache MINA 2 是一个开发高性能

JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)

???? Apache MINA 2 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 2 可以作为开发网络应用程序的一个良好基础。

??? Apache MINA是非常著名的基于java nio的通信框架,以前都是自己直接使用udp编程,新项目选型中考虑到网络通信可能会用到多种通信方式,因此使用了MINA。

???? 本文结构:

???? (1)客户端和服务器代码;虽然是udp的,但是mina的优美的设计使得所有的通信方式能够以统一的形式使用,perfect。当然注意的是,不同的通信方式,背后的机理和有效的变量、状态是有区别的,所以要精通,那还是需要经验积累和学习的。

???? (2)超时和Session的几个实际问题

???? (3)心跳,纠正几个错误

?

???? 既然是使用,废话少说,直接整个可用的例子。当然了,这些代码也不是直接可用的,我们应用的逻辑有点复杂,不会这么简单使用的。

请参考mina的example包和文档http://mina.apache.org/udp-tutorial.html。

?

版本2.0 RC1

1.1 服务器端

view plain
  1. ????????????????NioDatagramAcceptor?acceptor?=?new?NioDatagramAcceptor();??????????????????acceptor.setHandler(new?MyIoHandlerAdapter());//你的业务处理,最简单的,可以extends?IoHandlerAdapter??
  2. ??DefaultIoFilterChainBuilder?chain?=?acceptor.getFilterChain();??
  3. chain.addLast("keep-alive",?new?HachiKeepAliveFilterInMina());?//心跳??chain.addLast("toMessageTyep",?new?MyMessageEn_Decoder());???
  4. ??????????????//将传输的数据转换成你的业务数据格式。比如下面的是将数据转换成一行行的文本??????????????????//acceptor.getFilterChain().addLast("codec",new?ProtocolCodecFilter(new?TextLineCodecFactory(Charset.forName("UTF-8"))));???
  5. ??chain.addLast("logger",?new?LoggingFilter());??
  6. DatagramSessionConfig?dcfg?=?acceptor.getSessionConfig();??dcfg.setReuseAddress(true);??
  7. acceptor.bind(new?InetSocketAddress(ClusterContext.getHeartBeatPort()));??

?

?

1.2 客户端

view plain
  1. ??????????????NioDatagramConnector?connector?=?new?NioDatagramConnector();??connector.setConnectTimeoutMillis(60000L);??
  2. connector.setConnectTimeoutCheckInterval(10000);??connector.setHandler(handler);??
  3. ??DefaultIoFilterChainBuilder?chain?=?connector.getFilterChain();??
  4. chain.addLast("keep-alive",?new?HachiKeepAliveFilterInMina());//心跳??chain.addLast("toMessageTyep",?new?MyMessageEn_Decoder());??
  5. chain.addLast("logger",?new?LoggingFilter());??ConnectFuture?connFuture?=?connector.connect(new?InetSocketAddress("10.1.1.1",8001));??
  6. connFuture.awaitUninterruptibly();??IoSession?session?=?connFuture.getSession();??
  7. ????????????????//发送消息长整型?1000????????????????IoBuffer?buffer?=?IoBuffer.allocate(8);??
  8. ??????????????buffer.putLong(1000);????????????????buffer.flip();??
  9. ??????????????session.write(buffer);???????????????????//关闭连接??
  10. ?????????????????session.getCloseFuture().awaitUninterruptibly();???connector.dispose();??

?

2. 超时的几个经验总结:

??? udp session默认是60秒钟超时,此时状态为closing,数据就发不出去了。

Session的接口是IoSession,udp的最终实现是NioSession。如果交互在60秒内不能处理完成,就需要使用Keep-alive机制,即心跳机制。

?

3. 心跳机制

??? 在代码中已经使用了心跳机制,是通过mina的filter实现的,mina自身带的心跳机制好处在于,它附加了处理,让心跳消息不会传到业务层,在底层就完成了。

??? 在上面代码实现中的HachiKeepAliveFilterInMina如下:

?

view plain
  1. public?class?HachiKeepAliveFilterInMina?extends?KeepAliveFilter?{??????private?static?final?int?INTERVAL?=?30;//in?seconds??
  2. ????private?static?final?int?TIMEOUT?=?10;?//in?seconds????????
  3. ????public?HachiKeepAliveFilterInMina(KeepAliveMessageFactory?messageFactory)?{??????????super(messageFactory,?IdleStatus.BOTH_IDLE,?new?ExceptionHandler(),?INTERVAL,?TIMEOUT);??
  4. ????}????????
  5. ????public?HachiKeepAliveFilterInMina()?{??????????super(new?KeepAliveMessageFactoryImpl(),?IdleStatus.BOTH_IDLE,?new?ExceptionHandler(),?INTERVAL,?TIMEOUT);??
  6. ????????this.setForwardEvent(false);?//此消息不会继续传递,不会被业务层看见??????}??
  7. }????
  8. class?ExceptionHandler?implements?KeepAliveRequestTimeoutHandler?{?????????public?void?keepAliveRequestTimedOut(KeepAliveFilter?filter,?IoSession?session)?throws?Exception?{?????
  9. ????????System.out.println("Connection?lost,?session?will?be?closed");?????????????session.close(true);???
  10. ????}?????}??
  11. ??/**?
  12. ?*?继承于KeepAliveMessageFactory,当心跳机制启动的时候,需要该工厂类来判断和定制心跳消息??*?@author?Liu?Liu?
  13. ?*??*/??
  14. class?KeepAliveMessageFactoryImpl?implements?KeepAliveMessageFactory?{??????private?static?final?byte?int_req?=?-1;??
  15. ????private?static?final?byte?int_rep?=?-2;???????private?static?final?IoBuffer?KAMSG_REQ?=?IoBuffer.wrap(new?byte[]{int_req});?????
  16. ????private?static?final?IoBuffer?KAMSG_REP?=?IoBuffer.wrap(new?byte[]{int_rep});?????????????
  17. ????public?Object?getRequest(IoSession?session)?{?????????????return?KAMSG_REQ.duplicate();?????
  18. ????}???????
  19. ????public?Object?getResponse(IoSession?session,?Object?request)?{?????????????return?KAMSG_REP.duplicate();?????
  20. ????}???????
  21. ????public?boolean?isRequest(IoSession?session,?Object?message)?{????????????if(!(message?instanceof?IoBuffer))??
  22. ????????????return?false;??????????IoBuffer?realMessage?=?(IoBuffer)message;??
  23. ????????if(realMessage.limit()?!=?1)??????????????return?false;??
  24. ??????????????????boolean?result?=?(realMessage.get()?==?int_req);??
  25. ????????realMessage.rewind();??????????return?result;??
  26. ????}???????
  27. ????public?boolean?isResponse(IoSession?session,?Object?message)?{??????????????if(!(message?instanceof?IoBuffer))??
  28. ????????????return?false;??????????IoBuffer?realMessage?=?(IoBuffer)message;??
  29. ????????if(realMessage.limit()?!=?1)??????????????return?false;??
  30. ??????????????????boolean?result?=?(realMessage.get()?==?int_rep);?????
  31. ????????realMessage.rewind();??????????return?result;??
  32. ????}?????}??

?

? 有人说:心跳机制的filter只需要服务器端具有即可——这是错误的,拍着脑袋想一想,看看factory,你就知道了。心跳需要通信两端的实现

? 另外,版本2.0 RC1中,经过测试,当心跳的时间间隔INTERVAL设置为60s(Session的存活时间)的时候心跳会失效,所以最好需要小于60s的间隔。

?

更多可参考:

http://www.ibm.com/developerworks/cn/java/j-lo-mina2

热点排行