解读NIO Socket非堵塞模式
解读NIO Socket非阻塞模式前言: ???? jdk供的无阻塞I/O(NIO)有效解决了多线程服务器存在的线程开销问题,但
解读NIO Socket非阻塞模式
前言:
???? jdk供的无阻塞I/O(NIO)有效解决了多线程服务器存在的线程开销问题,但在使用上略显得复杂一些。在NIO中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务线程,而是通过多线程充分使用用多个CPU的处理能力和处理中的等待时间,达到提高服务能力的目的。
??? 这段时间在研究NIO,写篇博客来记住学过的东西。还是从最简单的Hello World开始,
client多线程请求server端,server接收client的名字,并返回Hello! +名字的字符格式给client。当然实际应用并不这么简单,实际可能是访问文件或者数据库获取信息返回给client。非阻塞的NIO有何神秘之处?代码:
1)server端代码
Java代码
/** ? ?*? ? ?*?@author?Jeff ? ?* ? ?*/?? public?class?HelloWorldServer?{ ?? ?? ????static?int?BLOCK?=?1024; ?? ????static?String?name?=?""; ?? ????protected?Selector?selector; ?? ????protected?ByteBuffer?clientBuffer?=?ByteBuffer.allocate(BLOCK); ?? ????protected?CharsetDecoder?decoder; ?? ????static?CharsetEncoder?encoder?=?Charset.forName("GB2312").newEncoder(); ?? ?? ????public?HelloWorldServer(int?port)?throws?IOException?{ ?? ????????selector?=?this.getSelector(port); ?? ????????Charset?charset?=?Charset.forName("GB2312"); ?? ????????decoder?=?charset.newDecoder(); ?? ????} ?? ?? ????//?获取Selector ?? ????protected?Selector?getSelector(int?port)?throws?IOException?{ ?? ????????ServerSocketChannel?server?=?ServerSocketChannel.open(); ?? ????????Selector?sel?=?Selector.open(); ?? ????????server.socket().bind(new?InetSocketAddress(port)); ?? ????????server.configureBlocking(false); ?? ????????server.register(sel,?SelectionKey.OP_ACCEPT); ?? ????????return?sel; ?? ????} ?? ?? ????//?监听端口 ?? ????public?void?listen()?{ ?? ????????try?{ ?? ????????????for?(;;)?{ ?? ????????????????selector.select(); ?? ????????????????Iterator?iter?=?selector.selectedKeys().iterator(); ?? ????????????????while?(iter.hasNext())?{ ?? ????????????????????SelectionKey?key?=?(SelectionKey)?iter.next(); ?? ????????????????????iter.remove(); ?? ????????????????????process(key); ?? ????????????????} ?? ????????????} ?? ????????}?catch?(IOException?e)?{ ?? ????????????e.printStackTrace(); ?? ????????} ?? ????} ?? ?? ????//?处理事件 ?? ????protected?void?process(SelectionKey?key)?throws?IOException?{ ?? ????????if?(key.isAcceptable())?{?//?接收请求 ?? ????????????ServerSocketChannel?server?=?(ServerSocketChannel)?key.channel(); ?? ????????????SocketChannel?channel?=?server.accept(); ?? ????????????//设置非阻塞模式 ?? ????????????channel.configureBlocking(false); ?? ????????????channel.register(selector,?SelectionKey.OP_READ); ?? ????????}?else?if?(key.isReadable())?{?//?读信息 ?? ????????????SocketChannel?channel?=?(SocketChannel)?key.channel(); ?? ????????????int?count?=?channel.read(clientBuffer); ?? ????????????if?(count?>?0)?{ ?? ????????????????clientBuffer.flip(); ?? ????????????????CharBuffer?charBuffer?=?decoder.decode(clientBuffer); ?? ????????????????name?=?charBuffer.toString(); ?? ????????????????//?System.out.println(name); ?? ????????????????SelectionKey?sKey?=?channel.register(selector, ?? ????????????????????????SelectionKey.OP_WRITE); ?? ????????????????sKey.attach(name); ?? ????????????}?else?{ ?? ????????????????channel.close(); ?? ????????????} ?? ?? ????????????clientBuffer.clear(); ?? ????????}?else?if?(key.isWritable())?{?//?写事件 ?? ????????????SocketChannel?channel?=?(SocketChannel)?key.channel(); ?? ????????????String?name?=?(String)?key.attachment(); ?? ???????????? ?? ????????????ByteBuffer?block?=?encoder.encode(CharBuffer ?? ????????????????????.wrap("Hello?!"?+?name)); ?? ???????????? ?? ?? ????????????channel.write(block); ?? ?? ????????????//channel.close(); ?? ?? ????????} ?? ????} ?? ?? ????public?static?void?main(String[]?args)?{ ?? ????????int?port?=?8888; ?? ????????try?{ ?? ????????????HelloWorldServer?server?=?new?HelloWorldServer(port); ?? ????????????System.out.println("listening?on?"?+?port); ?? ???????????? ?? ????????????server.listen(); ?? ???????????? ?? ????????}?catch?(IOException?e)?{ ?? ????????????e.printStackTrace(); ?? ????????} ?? ????} ?? }??
server主要是读取client发过来的信息,并返回一条信息
2)client端代码
Java代码
?? /** ? ?*? ? ?*?@author?Jeff ? ?* ? ?*/?? public?class?HelloWorldClient?{ ?? ?? ????static?int?SIZE?=?10; ?? ????static?InetSocketAddress?ip?=?new?InetSocketAddress("localhost",?8888); ?? ????static?CharsetEncoder?encoder?=?Charset.forName("GB2312").newEncoder(); ?? ?? ????static?class?Message?implements?Runnable?{ ?? ????????protected?String?name; ?? ????????String?msg?=?""; ?? ?? ????????public?Message(String?index)?{ ?? ????????????this.name?=?index; ?? ????????} ?? ?? ????????public?void?run()?{ ?? ????????????try?{ ?? ????????????????long?start?=?System.currentTimeMillis(); ?? ????????????????//打开Socket通道 ?? ????????????????SocketChannel?client?=?SocketChannel.open(); ?? ????????????????//设置为非阻塞模式 ?? ????????????????client.configureBlocking(false); ?? ????????????????//打开选择器 ?? ????????????????Selector?selector?=?Selector.open(); ?? ????????????????//注册连接服务端socket动作 ?? ????????????????client.register(selector,?SelectionKey.OP_CONNECT); ?? ????????????????//连接 ?? ????????????????client.connect(ip); ?? ????????????????//分配内存 ?? ????????????????ByteBuffer?buffer?=?ByteBuffer.allocate(8?*?1024); ?? ????????????????int?total?=?0; ?? ?? ????????????????_FOR:?for?(;;)?{ ?? ????????????????????selector.select(); ?? ????????????????????Iterator?iter?=?selector.selectedKeys().iterator(); ?? ?? ????????????????????while?(iter.hasNext())?{ ?? ????????????????????????SelectionKey?key?=?(SelectionKey)?iter.next(); ?? ????????????????????????iter.remove(); ?? ????????????????????????if?(key.isConnectable())?{ ?? ????????????????????????????SocketChannel?channel?=?(SocketChannel)?key ?? ????????????????????????????????????.channel(); ?? ????????????????????????????if?(channel.isConnectionPending()) ?? ????????????????????????????????channel.finishConnect(); ?? ????????????????????????????channel ?? ????????????????????????????????????.write(encoder ?? ????????????????????????????????????????????.encode(CharBuffer.wrap(name))); ?? ?? ????????????????????????????channel.register(selector,?SelectionKey.OP_READ); ?? ????????????????????????}?else?if?(key.isReadable())?{ ?? ????????????????????????????SocketChannel?channel?=?(SocketChannel)?key ?? ????????????????????????????????????.channel(); ?? ????????????????????????????int?count?=?channel.read(buffer); ?? ????????????????????????????if?(count?>?0)?{ ?? ????????????????????????????????total?+=?count; ?? ????????????????????????????????buffer.flip(); ?? ?? ????????????????????????????????while?(buffer.remaining()?>?0)?{ ?? ????????????????????????????????????byte?b?=?buffer.get(); ?? ????????????????????????????????????msg?+=?(char)?b; ?? ???????????????????????????????????? ?? ????????????????????????????????} ?? ?? ????????????????????????????????buffer.clear(); ?? ????????????????????????????}?else?{ ?? ????????????????????????????????client.close(); ?? ????????????????????????????????break?_FOR; ?? ????????????????????????????} ?? ????????????????????????} ?? ????????????????????} ?? ????????????????} ?? ????????????????double?last?=?(System.currentTimeMillis()?-?start)?*?1.0?/?1000; ?? ????????????????System.out.println(msg?+?"used?time?:"?+?last?+?"s."); ?? ????????????????msg?=?""; ?? ????????????}?catch?(IOException?e)?{ ?? ????????????????e.printStackTrace(); ?? ????????????} ?? ????????} ?? ????} ?? ?? ????public?static?void?main(String[]?args)?throws?IOException?{ ?? ???? ?? ????????String?names[]?=?new?String[SIZE]; ?? ?? ????????for?(int?index?=?0;?index?<?SIZE;?index++)?{ ?? ????????????names[index]?=?"jeff["?+?index?+?"]"; ?? ????????????new?Thread(new?Message(names[index])).start(); ?? ????????} ?? ???? ?? ????} ?? }??