JAVA NIO异步通信框架MINA选型跟使用的几个细节(概述入门,UDP, 心跳)
JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)???? Apache MINA 2 是一个开发高性能
JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)
???? Apache MINA 2 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 2 可以作为开发网络应用程序的一个良好基础。
??? Apache MINA是非常著名的基于java nio的通信框架,以前都是自己直接使用udp编程,新项目选型中考虑到网络通信可能会用到多种通信方式,因此使用了MINA。
???? 本文结构:
???? (1)客户端和服务器代码;虽然是udp的,但是mina的优美的设计使得所有的通信方式能够以统一的形式使用,perfect。当然注意的是,不同的通信方式,背后的机理和有效的变量、状态是有区别的,所以要精通,那还是需要经验积累和学习的。
???? (2)超时和Session的几个实际问题
???? (3)心跳,纠正几个错误
?
???? 既然是使用,废话少说,直接整个可用的例子。当然了,这些代码也不是直接可用的,我们应用的逻辑有点复杂,不会这么简单使用的。
请参考mina的example包和文档http://mina.apache.org/udp-tutorial.html。
?
版本2.0 RC1
1.1 服务器端
view plain
- ????????????????NioDatagramAcceptor?acceptor?=?new?NioDatagramAcceptor();??????????????????acceptor.setHandler(new?MyIoHandlerAdapter());//你的业务处理,最简单的,可以extends?IoHandlerAdapter??
- ??DefaultIoFilterChainBuilder?chain?=?acceptor.getFilterChain();??
- chain.addLast("keep-alive",?new?HachiKeepAliveFilterInMina());?//心跳??chain.addLast("toMessageTyep",?new?MyMessageEn_Decoder());???
- ??????????????//将传输的数据转换成你的业务数据格式。比如下面的是将数据转换成一行行的文本??????????????????//acceptor.getFilterChain().addLast("codec",new?ProtocolCodecFilter(new?TextLineCodecFactory(Charset.forName("UTF-8"))));???
- ??chain.addLast("logger",?new?LoggingFilter());??
- DatagramSessionConfig?dcfg?=?acceptor.getSessionConfig();??dcfg.setReuseAddress(true);??
- acceptor.bind(new?InetSocketAddress(ClusterContext.getHeartBeatPort()));??
?
?
1.2 客户端
view plain
- ??????????????NioDatagramConnector?connector?=?new?NioDatagramConnector();??connector.setConnectTimeoutMillis(60000L);??
- connector.setConnectTimeoutCheckInterval(10000);??connector.setHandler(handler);??
- ??DefaultIoFilterChainBuilder?chain?=?connector.getFilterChain();??
- chain.addLast("keep-alive",?new?HachiKeepAliveFilterInMina());//心跳??chain.addLast("toMessageTyep",?new?MyMessageEn_Decoder());??
- chain.addLast("logger",?new?LoggingFilter());??ConnectFuture?connFuture?=?connector.connect(new?InetSocketAddress("10.1.1.1",8001));??
- connFuture.awaitUninterruptibly();??IoSession?session?=?connFuture.getSession();??
- ????????????????//发送消息长整型?1000????????????????IoBuffer?buffer?=?IoBuffer.allocate(8);??
- ??????????????buffer.putLong(1000);????????????????buffer.flip();??
- ??????????????session.write(buffer);???????????????????//关闭连接??
- ?????????????????session.getCloseFuture().awaitUninterruptibly();???connector.dispose();??
?
2. 超时的几个经验总结:
??? udp session默认是60秒钟超时,此时状态为closing,数据就发不出去了。
Session的接口是IoSession,udp的最终实现是NioSession。如果交互在60秒内不能处理完成,就需要使用Keep-alive机制,即心跳机制。
?
3. 心跳机制
??? 在代码中已经使用了心跳机制,是通过mina的filter实现的,mina自身带的心跳机制好处在于,它附加了处理,让心跳消息不会传到业务层,在底层就完成了。
??? 在上面代码实现中的HachiKeepAliveFilterInMina如下:
?
view plain
- public?class?HachiKeepAliveFilterInMina?extends?KeepAliveFilter?{??????private?static?final?int?INTERVAL?=?30;//in?seconds??
- ????private?static?final?int?TIMEOUT?=?10;?//in?seconds????????
- ????public?HachiKeepAliveFilterInMina(KeepAliveMessageFactory?messageFactory)?{??????????super(messageFactory,?IdleStatus.BOTH_IDLE,?new?ExceptionHandler(),?INTERVAL,?TIMEOUT);??
- ????}????????
- ????public?HachiKeepAliveFilterInMina()?{??????????super(new?KeepAliveMessageFactoryImpl(),?IdleStatus.BOTH_IDLE,?new?ExceptionHandler(),?INTERVAL,?TIMEOUT);??
- ????????this.setForwardEvent(false);?//此消息不会继续传递,不会被业务层看见??????}??
- }????
- class?ExceptionHandler?implements?KeepAliveRequestTimeoutHandler?{?????????public?void?keepAliveRequestTimedOut(KeepAliveFilter?filter,?IoSession?session)?throws?Exception?{?????
- ????????System.out.println("Connection?lost,?session?will?be?closed");?????????????session.close(true);???
- ????}?????}??
- ??/**?
- ?*?继承于KeepAliveMessageFactory,当心跳机制启动的时候,需要该工厂类来判断和定制心跳消息??*?@author?Liu?Liu?
- ?*??*/??
- class?KeepAliveMessageFactoryImpl?implements?KeepAliveMessageFactory?{??????private?static?final?byte?int_req?=?-1;??
- ????private?static?final?byte?int_rep?=?-2;???????private?static?final?IoBuffer?KAMSG_REQ?=?IoBuffer.wrap(new?byte[]{int_req});?????
- ????private?static?final?IoBuffer?KAMSG_REP?=?IoBuffer.wrap(new?byte[]{int_rep});?????????????
- ????public?Object?getRequest(IoSession?session)?{?????????????return?KAMSG_REQ.duplicate();?????
- ????}???????
- ????public?Object?getResponse(IoSession?session,?Object?request)?{?????????????return?KAMSG_REP.duplicate();?????
- ????}???????
- ????public?boolean?isRequest(IoSession?session,?Object?message)?{????????????if(!(message?instanceof?IoBuffer))??
- ????????????return?false;??????????IoBuffer?realMessage?=?(IoBuffer)message;??
- ????????if(realMessage.limit()?!=?1)??????????????return?false;??
- ??????????????????boolean?result?=?(realMessage.get()?==?int_req);??
- ????????realMessage.rewind();??????????return?result;??
- ????}???????
- ????public?boolean?isResponse(IoSession?session,?Object?message)?{??????????????if(!(message?instanceof?IoBuffer))??
- ????????????return?false;??????????IoBuffer?realMessage?=?(IoBuffer)message;??
- ????????if(realMessage.limit()?!=?1)??????????????return?false;??
- ??????????????????boolean?result?=?(realMessage.get()?==?int_rep);?????
- ????????realMessage.rewind();??????????return?result;??
- ????}?????}??
?
? 有人说:心跳机制的filter只需要服务器端具有即可——这是错误的,拍着脑袋想一想,看看factory,你就知道了。心跳需要通信两端的实现。
? 另外,版本2.0 RC1中,经过测试,当心跳的时间间隔INTERVAL设置为60s(Session的存活时间)的时候心跳会失效,所以最好需要小于60s的间隔。
?
更多可参考:
http://www.ibm.com/developerworks/cn/java/j-lo-mina2