NIO+reactor模式的网路服务器设计方案
NIO+reactor模式的网路服务器设计方案
?
服务端代码如下(单线程版本)import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.channels.spi.SelectorProvider;import java.util.Iterator;/** * @author jason * */public class NioServer implements Runnable {private InetAddress hostAddress;private int port;private ServerSocketChannel serverChannel;private Selector selector;public NioServer(InetAddress hostAddress, int port) throws IOException {this.hostAddress = hostAddress;this.port = port;// 初始化selector,绑定服务端监听套接字、感兴趣事件及对应的handlerthis.selector = initSelector();}public static void main(String[] args) {try {// 启动服务器new Thread(new NioServer(null, 9090)).start();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {while (true) {try {// 选择事件已经ready的selectionKey,该方法是阻塞的,只有当至少存在selectionKey,或者wakeup方法被调用,或者当前线程被中断,才会返回selector.select();// 循环处理每一个事件Iterator<SelectionKey> items = selector.selectedKeys().iterator();while (items.hasNext()) {SelectionKey key = (SelectionKey) items.next();items.remove();if (!key.isValid()) {continue;}// 事件处理分发dispatch(key);}} catch (Exception e) {e.printStackTrace();}}}/** * 事件处理分发 * * @param sk * 已经ready的selectionKey */private void dispatch(SelectionKey sk) {// 获取绑定的handlerRunnable r = (Runnable) sk.attachment();if (r != null) {r.run();}}/** * 初始化selector,绑定服务端监听套接字、感兴趣事件及对应的handler * * @return * @throws IOException */private Selector initSelector() throws IOException {// 创建一个selectorSelector socketSelector = SelectorProvider.provider().openSelector();// 创建并打开ServerSocketChannelserverChannel = ServerSocketChannel.open();// 设置为非阻塞serverChannel.configureBlocking(false);// 绑定端口serverChannel.socket().bind(new InetSocketAddress(hostAddress, port));// 用selector注册套接字,并返回对应的SelectionKey,同时设置Key的interest set为监听客户端连接事件SelectionKey selectionKey = serverChannel.register(socketSelector,SelectionKey.OP_ACCEPT);// 绑定handlerselectionKey.attach(new Acceptor());return socketSelector;}/** * 处理OP_ACCEPT事件的handler * */class Acceptor implements Runnable {@Overridepublic void run() {try {accept();} catch (IOException e) {e.printStackTrace();}}private void accept() throws IOException {System.out.println("connect");// 建立连接SocketChannel socketChannel = serverChannel.accept();System.out.println("connected");// 设置为非阻塞socketChannel.configureBlocking(false);// 创建Handler,专门处理该连接后续发生的OP_READ和OP_WRITE事件new Handler(selector, socketChannel);}}}?
handler代码如下import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;/** * @author jason * */final class Handler implements Runnable {final SocketChannel socketChannel;final SelectionKey key;static final int MAXIN = 8192, MAXOUT = 11240 * 1024;ByteBuffer readBuffer = ByteBuffer.allocate(MAXIN);ByteBuffer outBuffer = ByteBuffer.allocate(MAXOUT);static final int READING = 0;static final int SENDING = 1;int state = READING;Handler(Selector selector, SocketChannel socketChannel) throws IOException {this.socketChannel = socketChannel;// 用selector注册套接字,并返回对应的SelectionKey,同时设置Key的interest set为监听该连接上得read事件this.key = socketChannel.register(selector, SelectionKey.OP_READ);// 绑定handlerthis.key.attach(this);}/** * 处理write * * @throws IOException */private void write() throws IOException {socketChannel.write(outBuffer);if (outBuffer.remaining() > 0) {return;}state = READING;key.interestOps(SelectionKey.OP_READ);}/** * 处理read * * @throws IOException */private void read() throws IOException {readBuffer.clear();int numRead;try {// 读取数据numRead = socketChannel.read(readBuffer);} catch (Exception e) {key.cancel();socketChannel.close();return;}if (numRead == -1) {socketChannel.close();key.cancel();return;}// 处理数据process(numRead);}/** * 处理数据 * * @param numRead */private void process(int numRead) {byte[] dataCopy = new byte[numRead];System.arraycopy(readBuffer.array(), 0, dataCopy, 0, numRead);System.out.println(new String(dataCopy));outBuffer = ByteBuffer.wrap(dataCopy);state = SENDING;// 设置Key的interest set为监听该连接上的write事件key.interestOps(SelectionKey.OP_WRITE);}@Overridepublic void run() {try {if (state == READING) {read();} else if (state == SENDING) {write();}} catch (IOException e) {e.printStackTrace();}}}?客户端代码如下:package sampleNio;import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.channels.spi.SelectorProvider;import java.util.Iterator;/** * @author jason * */public class NioClient implements Runnable {private InetAddress hostAddress;private int port;private Selector selector;private ByteBuffer readBuffer = ByteBuffer.allocate(8192);private ByteBuffer outBuffer = ByteBuffer.wrap("nice to meet you".getBytes());public NioClient(InetAddress hostAddress, int port) throws IOException {this.hostAddress = hostAddress;this.port = port;initSelector();}public static void main(String[] args) {try {NioClient client = new NioClient(InetAddress.getByName("localhost"), 9090);new Thread(client).start();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {while (true) {try {selector.select();Iterator<?> selectedKeys = selector.selectedKeys().iterator();while (selectedKeys.hasNext()) {SelectionKey key = (SelectionKey) selectedKeys.next();selectedKeys.remove();if (!key.isValid()) {continue;}if (key.isConnectable()) {finishConnection(key);} else if (key.isReadable()) {read(key);} else if (key.isWritable()) {write(key);}}} catch (Exception e) {e.printStackTrace();}}}private void initSelector() throws IOException {// 创建一个selectorselector = SelectorProvider.provider().openSelector();// 打开SocketChannelSocketChannel socketChannel = SocketChannel.open();// 设置为非阻塞socketChannel.configureBlocking(false);// 连接指定IP和端口的地址socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));// 用selector注册套接字,并返回对应的SelectionKey,同时设置Key的interest set为监听服务端已建立连接的事件socketChannel.register(selector, SelectionKey.OP_CONNECT);}private void finishConnection(SelectionKey key) throws IOException {SocketChannel socketChannel = (SocketChannel) key.channel();try {// 判断连接是否建立成功,不成功会抛异常socketChannel.finishConnect();} catch (IOException e) {key.cancel();return;}// 设置Key的interest set为OP_WRITE事件key.interestOps(SelectionKey.OP_WRITE);}/** * 处理read * * @param key * @throws IOException */private void read(SelectionKey key) throws IOException {SocketChannel socketChannel = (SocketChannel) key.channel();readBuffer.clear();int numRead;try {numRead = socketChannel.read(readBuffer);} catch (Exception e) {key.cancel();socketChannel.close();return;}if (numRead == 1) {System.out.println("close connection");socketChannel.close();key.cancel();return;}// 处理响应handleResponse(socketChannel, readBuffer.array(), numRead);}/** * 处理响应 * * @param socketChannel * @param data * @param numRead * @throws IOException */private void handleResponse(SocketChannel socketChannel, byte[] data,int numRead) throws IOException {byte[] rspData = new byte[numRead];System.arraycopy(data, 0, rspData, 0, numRead);System.out.println(new String(rspData));socketChannel.close();socketChannel.keyFor(selector).cancel();}/** * 处理write * * @param key * @throws IOException */private void write(SelectionKey key) throws IOException {SocketChannel socketChannel = (SocketChannel) key.channel();socketChannel.write(outBuffer);if (outBuffer.remaining() > 0) {return;}// 设置Key的interest set为OP_READ事件key.interestOps(SelectionKey.OP_READ);}}?
4、 Reactor的其他实现方式
?
5、总结
本文分析了基于NIO和Reactor模式的网络服务器设计方案,在后续的blog中将结合Netty进一步分析高性能网络服务器的设计。
?
?