首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

JavaNIO处置长连接

2012-12-21 
JavaNIO处理长连接之前在IBM的网站上看到过一篇介绍NIO的文章,收获很大。但文中的代码只适合短连接的情况,

JavaNIO处理长连接

之前在IBM的网站上看到过一篇介绍NIO的文章,收获很大。但文中的代码只适合短连接的情况,长连接时就不适用了。

最近恰好要写一个处理长连接的服务,接收日志包,然后打包成syslog形式再转发,所以在它的基础上改了一下。

主要改了两个类,一个是Server,因为我们只关注read事件,所以write事件我们暂不处理。另外,在处理完ON_READ事件后,不能执行key.cancel()。

?另一个改动的类是Reader,改变对-1的处理,这里不是break,而是抛出异常。在读取完buffer的数据后,将数据包传递给另外两个线程进行syslog的发送和入库操作。

?

package nioserver;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.LinkedList;import java.util.List;/** * <p>Title: 读线程</p> * <p>Description: 该线程用于读取客户端数据</p> * @author sxl * @version 1.0 */public class Reader extends Thread {    private static List pool = new LinkedList();    private static Notifier notifier = Notifier.getNotifier();    public void run() {        while (true) {            try {                SelectionKey key;                synchronized (pool) {                    while (pool.isEmpty()) {                        pool.wait();                    }                    key = (SelectionKey) pool.remove(0);                }                // 读取数据                read(key);            }            catch (Exception e) {                continue;            }        }    }    /**     * 读取客户端发出请求数据     * @param sc 套接通道     */    private static int BUFFER_SIZE = 1024;    public static byte[] readRequest(SocketChannel sc) throws IOException {    ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);        int off = 0;        int r = 0;        byte[] data = new byte[BUFFER_SIZE * 10];        while ( true ) {            buffer.clear();            r = sc.read(buffer);            if(r == 0) break;            if(r == -1)//如果是流的末尾,则抛出异常            throw new IOException();            if ((off + r) > data.length) {//数组扩容                data = grow(data, BUFFER_SIZE * 10);            }            byte[] buf = buffer.array();            System.arraycopy(buf, 0, data, off, r);            off += r;        }        byte[] req = new byte[off];        System.arraycopy(data, 0, req, 0, off);        return req;    }    /**     * 处理连接数据读取     * @param key SelectionKey     */    public void read(SelectionKey key) {    SocketChannel sc = null;        try {            // 读取客户端数据            sc = (SocketChannel) key.channel();            byte[] clientData =  readRequest(sc);            if(clientData.length > 0){//有数据才处理                Request request = (Request)key.attachment();                request.setDataInput(clientData);                // 提交到数据库写入线程                Writer.processRequest(request);                // 提交到Syslog线程,发送syslog                Syslog.processRequest(request);            }        }        catch (Exception e) {        if(sc != null)try {sc.socket().close();sc.close();} catch (IOException e1) {e1.printStackTrace();}        }    }    /**     * 处理客户请求,管理用户的联结池,并唤醒队列中的线程进行处理     */    public static void processRequest(SelectionKey key) {        synchronized (pool) {            pool.add(pool.size(), key);            pool.notifyAll();        }    }    /**     * 数组扩容     * @param src byte[] 源数组数据     * @param size int 扩容的增加量     * @return byte[] 扩容后的数组     */    public static byte[] grow(byte[] src, int size) {        byte[] tmp = new byte[src.length + size];        System.arraycopy(src, 0, tmp, 0, src.length);        return tmp;    }}
?

?

?

1 楼 48150084 2012-09-13   做过压力测试吗?非常怀疑会死锁。
传的数据前几位最好给出长度,不要做啥数组扩容,很不专业。
不要用wait,notify。LinkedList换成LinkedBlockingQueue。 2 楼 378629846 2012-09-13   48150084 写道做过压力测试吗?非常怀疑会死锁。
传的数据前几位最好给出长度,不要做啥数组扩容,很不专业。
不要用wait,notify。LinkedList换成LinkedBlockingQueue。

谢谢!是应该换成LinkedBlockingQueue,代码简洁多了。

热点排行