JavaNIO处理长连接
之前在IBM的网站上看到过一篇介绍NIO的文章,收获很大。但文中的代码只适合短连接的情况,长连接时就不适用了。
最近恰好要写一个处理长连接的服务,接收日志包,然后打包成syslog形式再转发,所以在它的基础上改了一下。
主要改了两个类,一个是Server,因为我们只关注read事件,所以write事件我们暂不处理。另外,在处理完ON_READ事件后,不能执行key.cancel()。
?另一个改动的类是Reader,改变对-1的处理,这里不是break,而是抛出异常。在读取完buffer的数据后,将数据包传递给另外两个线程进行syslog的发送和入库操作。
?
package nioserver;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.LinkedList;import java.util.List;/** * <p>Title: 读线程</p> * <p>Description: 该线程用于读取客户端数据</p> * @author sxl * @version 1.0 */public class Reader extends Thread { private static List pool = new LinkedList(); private static Notifier notifier = Notifier.getNotifier(); public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { pool.wait(); } key = (SelectionKey) pool.remove(0); } // 读取数据 read(key); } catch (Exception e) { continue; } } } /** * 读取客户端发出请求数据 * @param sc 套接通道 */ private static int BUFFER_SIZE = 1024; public static byte[] readRequest(SocketChannel sc) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); int off = 0; int r = 0; byte[] data = new byte[BUFFER_SIZE * 10]; while ( true ) { buffer.clear(); r = sc.read(buffer); if(r == 0) break; if(r == -1)//如果是流的末尾,则抛出异常 throw new IOException(); if ((off + r) > data.length) {//数组扩容 data = grow(data, BUFFER_SIZE * 10); } byte[] buf = buffer.array(); System.arraycopy(buf, 0, data, off, r); off += r; } byte[] req = new byte[off]; System.arraycopy(data, 0, req, 0, off); return req; } /** * 处理连接数据读取 * @param key SelectionKey */ public void read(SelectionKey key) { SocketChannel sc = null; try { // 读取客户端数据 sc = (SocketChannel) key.channel(); byte[] clientData = readRequest(sc); if(clientData.length > 0){//有数据才处理 Request request = (Request)key.attachment(); request.setDataInput(clientData); // 提交到数据库写入线程 Writer.processRequest(request); // 提交到Syslog线程,发送syslog Syslog.processRequest(request); } } catch (Exception e) { if(sc != null)try {sc.socket().close();sc.close();} catch (IOException e1) {e1.printStackTrace();} } } /** * 处理客户请求,管理用户的联结池,并唤醒队列中的线程进行处理 */ public static void processRequest(SelectionKey key) { synchronized (pool) { pool.add(pool.size(), key); pool.notifyAll(); } } /** * 数组扩容 * @param src byte[] 源数组数据 * @param size int 扩容的增加量 * @return byte[] 扩容后的数组 */ public static byte[] grow(byte[] src, int size) { byte[] tmp = new byte[src.length + size]; System.arraycopy(src, 0, tmp, 0, src.length); return tmp; }}??
?
1 楼 48150084 2012-09-13 做过压力测试吗?非常怀疑会死锁。
传的数据前几位最好给出长度,不要做啥数组扩容,很不专业。
不要用wait,notify。LinkedList换成LinkedBlockingQueue。 2 楼 378629846 2012-09-13 48150084 写道做过压力测试吗?非常怀疑会死锁。
传的数据前几位最好给出长度,不要做啥数组扩容,很不专业。
不要用wait,notify。LinkedList换成LinkedBlockingQueue。
谢谢!是应该换成LinkedBlockingQueue,代码简洁多了。