首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

I/O的学问储备

2012-12-20 
I/O的知识储备背景前一段时间一直在关注一些nio的相关技术,也和公司的架构师交流了一下,学到了一些相关原

I/O的知识储备
背景

前一段时间一直在关注一些nio的相关技术,也和公司的架构师交流了一下,学到了一些相关原理,比如nio的优势和劣势。以及一些排查nio bug问题的方式,受益量多。为自己做一下技术储备,以后可以多玩玩nio的相关技术

知识点

unix网络编程第6章: 几种unix下的I/O模型。

?

阻塞I/O模型 ? ?(java io)非阻塞I/O模型I/O复用 ? ? ? ? ?(java 5的nio)信号驱动I/O异步I/O ? ? ? ? ?(java7的aio)几点说明:?1. ?阻塞和非阻塞的概念?? 两者的区别侧重点在于,当前的线程是否会处于挂起,阻塞的状态。2. ?同步和异步的概念?? 两者的区别侧重点在于,是当前业务的处理方式是否是一个串行的过程,异步的操作也可能是阻塞的动作。
几种模型的比较:?I/O的学问储备
阻塞I/O模型: 
这个在java中平时使用比较多,不用多做介绍。 注意下stream & ?reader的区别,自己面试别人也问的比较多。
I/O复用模型: 
介绍java nio之前,先了解一下unix中几种i/o复用的支持: select / poll 模型。?
linux早期有select / pselect : 
select 函数:?
#include<sys/select.h>int select(int maxfdp1, fd_set *restrict readfds, fd_set *restrict writefds,fd_set *restrict exceptfds, struct timeval* restrict tvptr);
参数说明: 1. ?最大描述符大小,一般是最大值+1。2. ?中间三个参数readfds、writefds和exceptfds是指向描述符集的指针。这三个描述符集说明了我们关心的可读、可写或出于异常条件的各个描述符。就是一个位表,每次check一下当前描述符在对应位上的值是否为13. ?第4个参数,就是对应的超时参数,精确到微妙,细节我也不关注。
返回值: 0超时,-1出错, 正数代表准备好的描述符
pselect函数: 
#include <sys/select.h>int pselect(int maxfdp1, fd_set *restrict readfds, fd_set *restrict writefds,fd_set *restrict exceptfds, const struct timespec *restrict tsptr,const sigset_t *restrict sigmask);
?

?

它与select的区别在于:??pselect使用timespec结构指定超时值。timespec结构以秒和纳秒表示时间,而非秒和微秒。??pselect的超时值被声明为const,这保证了调用pselect不会改变timespec结构。??pselect可使用一个可选择的信号屏蔽字。在调用pselect时,以原子操作的方式安装该信号屏蔽字,在返回时恢复以前的信号屏蔽字。 多了参数5
poll函数: 
#include <poll.h>int poll(struct pollfd fdarray[], nfds_t nfds, int timeout);
参数说明: 1. pollfd是一个结构体,我们需要关注的描述符

?

struct pollfd {          int fd; /* file descriptor to check, or <0 to ignore */     short events; /* events of interest on fd */     short revents; /* events that occurred on fd */};

2. nfds代表pollfd的长度

?

返回值:0超时,-1出错, 正数代表准备好的描述符

?

同样变种的有ppoll函数,具体可以见man ppoll,两者的区别和select/pselect区别一样,多了时间精度的支持+信号屏蔽字

?

和select系列的区别点,poll不再受限于select中位数组的长度限制,我们可以将关心的描述符添加到poolfd中。

?

再看epoll函数,是对select/poll的一个增强版:

int epoll_create(int size);int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);1. 第一个参数是epoll_create()的返回值.2. 第二个参数表示动作,用三个宏来表示: EPOLL_CTL_ADD:注册新的fd到epfd中; EPOLL_CTL_MOD:修改已经注册的fd的监听事件; EPOLL_CTL_DEL:从epfd中删除一个fd;3. 第三个参数是需要监听的fd4. 第四个参数是告诉内核需要监听什么事events可以是以下几个宏的集合: EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); EPOLLOUT:表示对应的文件描述符可以写; EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); EPOLLERR:表示对应的文件描述符发生错误; EPOLLHUP:表示对应的文件描述符被挂断; EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);1. 等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,2. 参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有 说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

?

?

再看一下jdk中对nio的使用:

?

public static SelectorProvider create() {PrivilegedAction pa = new GetPropertyAction("os.name");String osname = (String) AccessController.doPrivileged(pa);        if ("SunOS".equals(osname)) {            return new sun.nio.ch.DevPollSelectorProvider();        }        // use EPollSelectorProvider for Linux kernels >= 2.6        if ("Linux".equals(osname)) {            pa = new GetPropertyAction("os.version");            String osversion = (String) AccessController.doPrivileged(pa);            String[] vers = osversion.split("\\.", 0);            if (vers.length >= 2) {                try {                    int major = Integer.parseInt(vers[0]);                    int minor = Integer.parseInt(vers[1]);                    if (major > 2 || (major == 2 && minor >= 6)) {                        return new sun.nio.ch.EPollSelectorProvider();                    }                } catch (NumberFormatException x) {                    // format not recognized                }            }        }        return new sun.nio.ch.PollSelectorProvider();    }

?

?

比较明显: 如果当前是sunos系统,直接使用DevPoll,在linux 2.6内核下,使用Epoll模型,否则使用Poll。

?

DevPoll估计是sunos自己整的一套poll模型,公司一般使用redhat系列,内核2.6.18,64位主机。所以就介绍下Epoll的实现

?

java实现类: EPollSelectorImpl

?

// wake up使用的两描述符protected int fd0;protected int fd1;// The poll object , native的实现EPollArrayWrapper pollWrapper;// Maps from file descriptors to keys , 文件描述符和SelectKey的关系private HashMap fdToKey;

?

?

看下select()的实现:

PollSelectorImpl类: ==============protected int doSelect(long timeout) //具体执行epoll调用        throws IOException    {        if (closed)            throw new ClosedSelectorException();        processDeregisterQueue();        try {            begin();            pollWrapper.poll(timeout);        } finally {            end();        }        processDeregisterQueue();        int numKeysUpdated = updateSelectedKeys();        if (pollWrapper.interrupted()) {            // Clear the wakeup pipe            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);            synchronized (interruptLock) {                pollWrapper.clearInterrupted();                IOUtil.drain(fd0);                interruptTriggered = false;            }        }        return numKeysUpdated;    }继续看下EPollArrayWrappe :  ==========================  int poll(long timeout) throws IOException {        updateRegistrations();        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);        for (int i=0; i<updated; i++) { // 这里判断下当前响应的描述符是否为fd0,后面再细说            if (getDescriptor(i) == incomingInterruptFD) {                interruptedIndex = i;                interrupted = true;                break;            }        }        return updated;    }继续看下EPollArrayWrapper 的native实现: epollWait(): ?====================
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,                                            jlong address, jint numfds,                                            jlong timeout, jint epfd){    struct epoll_event *events = jlong_to_ptr(address);    int res;    if (timeout <= 0) {           /* Indefinite or no wait */        RESTARTABLE((*epoll_wait_func)(epfd, events, numfds, timeout), res);    } else {                      /* Bounded wait; bounded restarts */        res = iepoll(epfd, events, numfds, timeout);     }        if (res < 0) {        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");    }    return res;}static int iepoll(int epfd, struct epoll_event *events, int numfds, jlong timeout) {    jlong start, now;    int remaining = timeout;    struct timeval t;    int diff;    gettimeofday(&t, NULL);    start = t.tv_sec * 1000 + t.tv_usec / 1000;  //转化为ns单位    for (;;) {        int res = (*epoll_wait_func)(epfd, events, numfds, timeout);        if (res < 0 && errno == EINTR) {  //处理异常            if (remaining >= 0) {                gettimeofday(&t, NULL);                now = t.tv_sec * 1000 + t.tv_usec / 1000;                diff = now - start;                remaining -= diff;                if (diff < 0 || remaining <= 0) {                    return 0;                }                start = now;            }        } else {            return res;        }    }}

?

看下wakeup的实现 :?

?

EPollSelectorImpl类:     EPollSelectorImpl(SelectorProvider sp) {super(sp);        int[] fdes = new int[2];        IOUtil.initPipe(fdes, false);        fd0 = fdes[0];        fd1 = fdes[1];        pollWrapper = new EPollArrayWrapper();        pollWrapper.initInterrupt(fd0, fd1);    // 设置中断的两个描述符        fdToKey = new HashMap();    }     public Selector wakeup() {        synchronized (interruptLock) {            if (!interruptTriggered) {                pollWrapper.interrupt();   //调用warpper进行中断                interruptTriggered = true;            }        }return this;    }继续看下EPollArrayWrapper :     void initInterrupt(int fd0, int fd1) {        outgoingInterruptFD = fd1;  //保存pipeline的描述符        incomingInterruptFD = fd0;        epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);   //注册到epoll上。    }    public void interrupt() {        interrupt(outgoingInterruptFD);  //调用native方法    } 继续看下EPollArrayWrapper.c native实现: JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd){    int fakebuf[1];    fakebuf[0] = 1;    if (write(fd, fakebuf, 1) < 0) {  //发送一字节的内容,让epoll_wait()能得到及时响应        JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");    }}

?实现方式也是挺简单的,弄了两个fd,一个往另一个写1byte的内容,促使epoll_wait能得到响应。

?

异步I/O模型: 

?

暂时还未真实用过,只是大致看过其api,有兴趣可以自己baidu。?

?

?

最后

?

后续会起一篇,单独针对nio在服务端和客户端使用上的注意点,主要是吸收了一些大牛门的经验,需要做个总结,消化一下。

热点排行