Redis代码阅读3--Redis网络监听(2)
这篇文章接上一篇,主要介绍Redis网络监听流程的各个步骤。
/* test for polling API */#ifdef __linux__#define HAVE_EPOLL 1#endif#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)#define HAVE_KQUEUE 1#endifaeCreateTimeEvent/:创建定时事件,注册了定时时间函数serverCron,作用放到后面介绍;
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); }}?在每次处理事件之前,都要执行一遍server.ae中设定的beforesleep方法,下面就介绍下beforesleep;
void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); listNode *ln; redisClient *c; /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; listRewind(server.io_ready_clients,&li); while((ln = listNext(&li))) { c = ln->value; struct redisCommand *cmd; /* Resume the client. */ listDelNode(server.io_ready_clients,ln); c->flags &= (~REDIS_IO_WAIT); server.vm_blocked_clients--; aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c); cmd = lookupCommand(c->argv[0]->ptr); redisAssert(cmd != NULL); call(c,cmd); resetClient(c); /* There may be more data to process in the input buffer. */ if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); } } /* Try to process pending commands for clients that were just unblocked. */ while (listLength(server.unblocked_clients)) { ln = listFirst(server.unblocked_clients); redisAssert(ln != NULL); c = ln->value; listDelNode(server.unblocked_clients,ln); c->flags &= ~REDIS_UNBLOCKED; /* Process remaining data in the input buffer. */ if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); } /* Write the AOF buffer on disk */ flushAppendOnlyFile();}?这个方法做了三件事情:
I.?? ?如果Redi开启了Virtual memory,那么某些clients请求的keys可能因为被swap了,因此这些client会被block住,当这些clients请求的keys又被swap到内存中时,则这些被block住的clients应该unblock,然后被处理;io_ready_clients就是用来维护这些clients的,为了尽快响应client的请求,因此在每次sleep前都先处理这些请求
II.?? ?某些Redis操作是blocking的,如BLPOP,那么执行这些操作的clients可能会被block住,unblocked_clients这个list就是用来维护那些刚被unblock的clients,如果这个list不为空,则也要尽快响应这些clients
III.?? ?flushAppendOnlyFile;因为clients的Socket的write只能在eventLoop里面进行,而flushAppendOnlyFile又是在每次sleep之前被调用,所以在eventLoop里面的所有AOF writes都是先写到内存里的一块buffer里面,flushAppendOnlyFile则负责把这个buffer内容flush到disk;执行完beforesleep后aeprocessEvents,该方法主要是处理各种监听到的文件读写事件和到期响应的定时事件,因为这个方法的代码比较长,而且逻辑简单,就不贴过来了,简单介绍下过程: ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? a)?? ?首先通过遍历eventLoop中注册的timeEvent找出离当前最近timeEvent(即shortest)。
b)?? ?调用epoll_wait()方法,等待I/O事件的发生, 为了尽快响应时间事件,epoll_wait()方法的等待时间为shortest与当前时间的差值,如果该差值小于零,则epoll_wait()轮询至有I/O事件发生;
c)?? ?响应eventLoop中fired的aeFileEvent,这里调用的就是之前设置的文件处理函数acceptTcpHandler。
d)?? ?响应完I/O事件后,则通过timeEventHead遍历timeEvent,逐一响应timeProc--serverCron。在响应定时事件的时候 需要注意几点点:
static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; while(te) { long now_sec, now_ms; long long id; if (te->id > maxId) { te = te->next; continue; } aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; /* After an event is processed our time event list may * no longer be the same, so we restart from head. * Still we make sure to don't process events registered * by event handlers itself in order to don't loop forever. * To do so we saved the max ID we want to handle. * * FUTURE OPTIMIZATIONS: * Note that this is NOT great algorithmically. Redis uses * a single time event so it's not a problem but the right * way to do this is to add the new elements on head, and * to flag deleted elements in a special way for later * deletion (putting references to the nodes to delete into * another linked list). */ if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { aeDeleteTimeEvent(eventLoop, id); } te = eventLoop->timeEventHead; } else { te = te->next; } } return processed;}??因为响应一个定时事件后,eventLoop里面的定时事件链表可能会改变了,所以又要从头结点开始遍历定时事件链表;因为每次都要从头结点开始遍历定时事件链表,因此要考虑如何避免响应循环调用,即在响应定时事件a时,如果a的处理函数timeProc中又register了新的定时事件b,如果响应完事件a后,又响应b的话,那么就会造成循环响应。为了解决这个情况,redis在eventLoop里维护了一个timeEventNextId,即下一个定时事件的id,比如当前eventLoop的只有一个timeEvent? a,那么timeEventNextId=2,a->id = 1当a的timeProc方法又注册了timeEvent? b,那么timeEventNextId = 3,b->id = 2.那么在redis在遍历定时事件开始的时候将遍历前的eventLoop里面的maxId= timeEventNextId-1保存起来,在遍历定时事件的时候,如果某个timeEvent->id >maxId,则跳过这个事件。作者也意识到了每次都从头结点开始遍历定时事件不是一个好的算法,但是由于目前Redis里面只有一个定时事件,所以目前对redis来说不是个问题,但是作者也提到在未来的版本会对此进行改进? ?acceptTcpHandler:这个方法主要是监听网络端口:???????????????????????????????????????????????????????????????????????????? i.?? ?通过调用anetTcpAccept方法获得监听端口上的client connection;redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(redisClient)); c->bufpos = 0; anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); if (!c) return NULL; if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } .........}?iv.?? ?readQueryFromClient:从指定的Socket中读取client发送过来的数据,并按照Redis的协议(后面将单独介绍)进行解析组装成Redis的各个command,然后通过查找commandTable,执行command