epoll的两种模式
linux异步IO浅析
http://hi.baidu.com/_kouu/blog/item/e225f67b337841f42f73b341.html
epoll有两种模式,Edge Triggered(简称ET) 和 Level Triggered(简称LT).在采用这两种模式时要注意的是,如果采用ET模式,那么仅当状态发生变化时才会通知,而采用LT模式类似于原来的select/poll操作,只要还有没有处理的事件就会一直通知.
以代码来说明问题:
首先给出server的代码,需要说明的是每次accept的连接,加入可读集的时候采用的都是ET模式,而且接收缓冲区是5字节的,也就是每次只接收5字节的数据:
#include <iostream>#include <sys/socket.h>#include <sys/epoll.h>#include <netinet/in.h>#include <arpa/inet.h>#include <fcntl.h>#include <unistd.h>#include <stdio.h>#include <errno.h>using namespace std;#define MAXLINE 5#define OPEN_MAX 100#define LISTENQ 20#define SERV_PORT 5000#define INFTIM 1000void setnonblocking(int sock){ int opts; opts=fcntl(sock,F_GETFL); if(opts<0) { perror("fcntl(sock,GETFL)"); exit(1); } opts = opts|O_NONBLOCK; if(fcntl(sock,F_SETFL,opts)<0) { perror("fcntl(sock,SETFL,opts)"); exit(1); } }int main(){ int i, maxi, listenfd, connfd, sockfd,epfd,nfds; ssize_t n; char line[MAXLINE]; socklen_t clilen; //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件 struct epoll_event ev,events[20]; //生成用于处理accept的epoll专用的文件描述符 epfd=epoll_create(256); struct sockaddr_in clientaddr; struct sockaddr_in serveraddr; listenfd = socket(AF_INET, SOCK_STREAM, 0); //把socket设置为非阻塞方式 //setnonblocking(listenfd); //设置与要处理的事件相关的文件描述符 ev.data.fd=listenfd; //设置要处理的事件类型 ev.events=EPOLLIN|EPOLLET; //ev.events=EPOLLIN; //注册epoll事件 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); bzero(&serveraddr, sizeof(serveraddr)); serveraddr.sin_family = AF_INET; char *local_addr="127.0.0.1"; inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT); serveraddr.sin_port=htons(SERV_PORT); bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr)); listen(listenfd, LISTENQ); maxi = 0; for ( ; ; ) { //等待epoll事件的发生 nfds=epoll_wait(epfd,events,20,500); //处理所发生的所有事件 for(i=0;i<nfds;++i) { if(events[i].data.fd==listenfd) { connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); if(connfd<0){ perror("connfd<0"); exit(1); } //setnonblocking(connfd); char *str = inet_ntoa(clientaddr.sin_addr); cout << "accapt a connection from " << str << endl; //设置用于读操作的文件描述符 ev.data.fd=connfd; //设置用于注测的读操作事件 ev.events=EPOLLIN|EPOLLET; //ev.events=EPOLLIN; //注册ev epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); } else if(events[i].events&EPOLLIN) { cout << "EPOLLIN" << endl; if ( (sockfd = events[i].data.fd) < 0) continue; if ( (n = read(sockfd, line, MAXLINE)) < 0) { if (errno == ECONNRESET) { close(sockfd); events[i].data.fd = -1; } else std::cout<<"readline error"<<std::endl; } else if (n == 0) { close(sockfd); events[i].data.fd = -1; } line[n] = '\0'; cout << "read " << line << endl; //设置用于写操作的文件描述符 ev.data.fd=sockfd; //设置用于注测的写操作事件 ev.events=EPOLLOUT|EPOLLET; //修改sockfd上要处理的事件为EPOLLOUT //epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); } else if(events[i].events&EPOLLOUT) { sockfd = events[i].data.fd; write(sockfd, line, n); //设置用于读操作的文件描述符 ev.data.fd=sockfd; //设置用于注测的读操作事件 ev.events=EPOLLIN|EPOLLET; //修改sockfd上要处理的事件为EPOLIN epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); } } } return 0;}#!/usr/bin/perluse IO::Socket;my $host = "127.0.0.1";my $port = 5000;my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@";my $msg_out = "1234567890";print $socket $msg_out;print "now send over, go to sleep \n";while (1){ sleep(1);}#!/usr/bin/perluse IO::Socket;my $host = "127.0.0.1";my $port = 5000;my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@";my $msg_out = "1234567890";print $socket $msg_out;print "now send over, go to sleep \n";sleep(5);print "5 second gone send another line\n";print $socket $msg_out;while (1){ sleep(1);}struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */};while(rs){ buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0); if(buflen < 0) { // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读 // 在这里就当作是该次事件已处理处. if(errno == EAGAIN) break; else return; } else if(buflen == 0) { // 这里表示对端的socket已正常关闭. } if(buflen == sizeof(buf) rs = 1; // 需要再次读取 else rs = 0;}ssize_t socket_send(int sockfd, const char* buffer, size_t buflen){ ssize_t tmp; size_t total = buflen; const char *p = buffer; while(1) { tmp = send(sockfd, p, total, 0); if(tmp < 0) { // 当send收到信号时,可以继续写,但这里返回-1. if(errno == EINTR) return -1; // 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满, // 在这里做延时后再重试. if(errno == EAGAIN) { usleep(1000); continue; } return -1; } if((size_t)tmp == total) return buflen; total -= tmp; p += tmp; } return tmp;}struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; while(rs) { buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0); if(buflen < 0) { // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读 // 在这里就当作是该次事件已处理处. if(errno == EAGAIN) break; else return; } else if(buflen == 0) { // 这里表示对端的socket已正常关闭. } if(buflen == sizeof(buf) rs = 1; // 需要再次读取 else rs = 0; } ssize_t socket_send(int sockfd, const char* buffer, size_t buflen) { ssize_t tmp; size_t total = buflen; const char *p = buffer; while(1) { tmp = send(sockfd, p, total, 0); if(tmp < 0) { // 当send收到信号时,可以继续写,但这里返回-1. if(errno == EINTR) return -1; // 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满, // 在这里做延时后再重试. if(errno == EAGAIN) { usleep(1000); continue; } return -1; } if((size_t)tmp == total) return buflen; total -= tmp; p += tmp; } return tmp; } #include <iostream>#include <sys/socket.h>#include <sys/epoll.h>#include <netinet/in.h>#include <arpa/inet.h>#include <fcntl.h>#include <unistd.h>#include <stdio.h>#include <errno.h>using namespace std;#define MAXLINE 5#define OPEN_MAX 100#define LISTENQ 20#define SERV_PORT 5000#define INFTIM 1000void setnonblocking(int sock){ int opts; opts=fcntl(sock,F_GETFL); if(opts<0) { perror("fcntl(sock,GETFL)"); exit(1); } opts = opts|O_NONBLOCK; if(fcntl(sock,F_SETFL,opts)<0) { perror("fcntl(sock,SETFL,opts)"); exit(1); } }int main(){ int i, maxi, listenfd, connfd, sockfd,epfd,nfds; ssize_t n; char line[MAXLINE]; socklen_t clilen; //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件 struct epoll_event ev,events[20]; //生成用于处理accept的epoll专用的文件描述符 epfd=epoll_create(256); struct sockaddr_in clientaddr; struct sockaddr_in serveraddr; listenfd = socket(AF_INET, SOCK_STREAM, 0); //把socket设置为非阻塞方式 //setnonblocking(listenfd); //设置与要处理的事件相关的文件描述符 ev.data.fd=listenfd; //设置要处理的事件类型 ev.events=EPOLLIN|EPOLLET; //ev.events=EPOLLIN; //注册epoll事件 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); bzero(&serveraddr, sizeof(serveraddr)); serveraddr.sin_family = AF_INET; char *local_addr="127.0.0.1"; inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT); serveraddr.sin_port=htons(SERV_PORT); bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr)); listen(listenfd, LISTENQ); maxi = 0; for ( ; ; ) { //等待epoll事件的发生 nfds=epoll_wait(epfd,events,20,500); //处理所发生的所有事件 for(i=0;i<nfds;++i) { if(events[i].data.fd==listenfd) { connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); if(connfd<0){ perror("connfd<0"); exit(1); } //setnonblocking(connfd); char *str = inet_ntoa(clientaddr.sin_addr); cout << "accapt a connection from " << str << endl; //设置用于读操作的文件描述符 ev.data.fd=connfd; //设置用于注测的读操作事件 ev.events=EPOLLIN|EPOLLET; //ev.events=EPOLLIN; //注册ev epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); } else if(events[i].events&EPOLLIN) { cout << "EPOLLIN" << endl; if ( (sockfd = events[i].data.fd) < 0) continue; if ( (n = read(sockfd, line, MAXLINE)) < 0) { if (errno == ECONNRESET) { close(sockfd); events[i].data.fd = -1; } else std::cout<<"readline error"<<std::endl; } else if (n == 0) { close(sockfd); events[i].data.fd = -1; } line[n] = '\0'; cout << "read " << line << endl; //设置用于写操作的文件描述符 ev.data.fd=sockfd; //设置用于注测的写操作事件 ev.events=EPOLLOUT|EPOLLET; //修改sockfd上要处理的事件为EPOLLOUT //epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); } else if(events[i].events&EPOLLOUT) { sockfd = events[i].data.fd; write(sockfd, line, n); //设置用于读操作的文件描述符 ev.data.fd=sockfd; //设置用于注测的读操作事件 ev.events=EPOLLIN|EPOLLET; //修改sockfd上要处理的事件为EPOLIN epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); } } } return 0;}#!/usr/bin/perl use IO::Socket; my $host = "127.0.0.1"; my $port = 5000; my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@"; my $msg_out = "1234567890"; print $socket $msg_out; print "now send over, go to sleep\n"; while (1) { sleep(1); } #!/usr/bin/perl use IO::Socket; my $host = "127.0.0.1"; my $port = 5000; my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@"; my $msg_out = "1234567890"; print $socket $msg_out; print "now send over, go to sleep\n"; sleep(5); print "5 second gonesend another line\n"; print $socket $msg_out; while (1) { sleep(1); } int n = select(&readset,NULL,NULL,100); for (int i = 0; n > 0; ++i) { if (FD_ISSET(fdarray[i], &readset)) { do_something(fdarray[i]); --n; }} n=epoll_wait(epfd,events,20,500); for(i=0;i<n;++i) { do_something(events[n]); } 在epoll中,关键的数据结构epoll_event定义如下: typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };#include <iostream>#include <sys/socket.h>#include <sys/epoll.h>#include <netinet/in.h>#include <arpa/inet.h>#include <fcntl.h>#include <unistd.h>#include <stdio.h>#include <errno.h>#include <sys/types.h>#include <sys/wait.h>using namespace std;#define MAXLINE 5#define OPEN_MAX 100#define LISTENQ 20#define SERV_PORT 5000#define INFTIM 1000typedef struct task_t{ int fd; char buffer[100]; int n;}task_t;int CreateWorker(int nWorker){ if (0 < nWorker) { bool bIsChild; pid_t nPid; while (!bIsChild) { if (0 < nWorker) { nPid = ::fork(); if (nPid > 0) { bIsChild = false; --nWorker; } else if (0 == nPid) { bIsChild = true; printf("create worker %d success!\n", ::getpid()); } else { printf("fork error: %s\n", ::strerror(errno)); return -1; } } else { int nStatus; if (-1 == ::wait(&nStatus)) { ++nWorker; } } } } return 0;}void setnonblocking(int sock){ int opts; opts=fcntl(sock,F_GETFL); if(opts<0) { perror("fcntl(sock,GETFL)"); exit(1); } opts = opts|O_NONBLOCK; if(fcntl(sock,F_SETFL,opts)<0) { perror("fcntl(sock,SETFL,opts)"); exit(1); } }int main(){ int i, maxi, listenfd, connfd, sockfd,epfd,nfds; ssize_t n; char line[MAXLINE]; socklen_t clilen; struct epoll_event ev,events[20]; struct sockaddr_in clientaddr; struct sockaddr_in serveraddr; listenfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&serveraddr, sizeof(serveraddr)); serveraddr.sin_family = AF_INET; char *local_addr="127.0.0.1"; inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT); serveraddr.sin_port=htons(SERV_PORT); // 地址重用 int nOptVal = 1; socklen_t nOptLen = sizeof(int); if (-1 == ::setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &nOptVal, nOptLen)) { return -1; } setnonblocking(listenfd); bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr)); listen(listenfd, LISTENQ); CreateWorker(5); //把socket设置为非阻塞方式 //生成用于处理accept的epoll专用的文件描述符 epfd=epoll_create(256); //设置与要处理的事件相关的文件描述符 ev.data.fd=listenfd; //设置要处理的事件类型 ev.events=EPOLLIN|EPOLLET; //ev.events=EPOLLIN; //注册epoll事件 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); //CreateWorker(5); maxi = 0; task_t task; task_t *ptask; while(true) { //等待epoll事件的发生 nfds=epoll_wait(epfd,events,20,500); //处理所发生的所有事件 for(i=0;i<nfds;++i) { if(events[i].data.fd==listenfd) { connfd = accept(listenfd,NULL, NULL); if(connfd<0){ printf("connfd<0, listenfd = %d\n", listenfd); printf("error = %s\n", strerror(errno)); exit(1); } setnonblocking(connfd); //设置用于读操作的文件描述符 memset(&task, 0, sizeof(task)); task.fd = connfd; ev.data.ptr = &task; //设置用于注册的读操作事件 ev.events=EPOLLIN|EPOLLET; //ev.events=EPOLLIN; //注册ev epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); } else if(events[i].events&EPOLLIN) { cout << "EPOLLIN" << endl; ptask = (task_t*)events[i].data.ptr; sockfd = ptask->fd; if ( (ptask->n = read(sockfd, ptask->buffer, 100)) < 0) { if (errno == ECONNRESET) { close(sockfd); events[i].data.ptr = NULL; } else std::cout<<"readline error"<<std::endl; } else if (ptask->n == 0) { close(sockfd); events[i].data.ptr = NULL; } ptask->buffer[ptask->n] = '\0'; cout << "read " << ptask->buffer << endl; //设置用于写操作的文件描述符 ev.data.ptr = ptask; //设置用于注测的写操作事件 ev.events=EPOLLOUT|EPOLLET; //修改sockfd上要处理的事件为EPOLLOUT epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); } else if(events[i].events&EPOLLOUT) { cout << "EPOLLOUT" << endl; ptask = (task_t*)events[i].data.ptr; sockfd = ptask->fd; write(sockfd, ptask->buffer, ptask->n); //设置用于读操作的文件描述符 ev.data.ptr = ptask; //修改sockfd上要处理的事件为EPOLIN epoll_ctl(epfd,EPOLL_CTL_DEL,sockfd,&ev); cout << "write " << ptask->buffer; memset(ptask, 0, sizeof(*ptask)); close(sockfd); } } } return 0;}