aeEventLoop 结构体与初始化
这个结构体是在事件驱动框架代码ae.h中定义的,记录了框架循环运行过程中的信息,其中,就包含了记录两类事件的变量,分别是:
- aeFileEvent 类型的指针 *events,表示 IO 事件。之所以类型名称为 aeFileEvent,是因为所有的 IO 事件都会用文件描述符进行标识;
- aeTimeEvent 类型的指针 *timeEventHead,表示时间事件,即按一定时间周期触发的事件。
typedef struct aeEventLoop {
…
aeFileEvent *events; //IO事件数组
aeFiredEvent *fired; //已触发事件数组
aeTimeEvent *timeEventHead; //记录时间事件的链表头
…
void *apidata; //和API调用接口相关的数据
aeBeforeSleepProc *beforesleep; //进入事件循环流程前执行的函数
aeBeforeSleepProc *aftersleep; //退出事件循环流程后执行的函数
} aeEventLoop;
aeCreateEventLoop 函数的初始化操作
aeCreateEventLoop 函数执行的操作,大致可以分成以下三个步骤。
第一步,aeCreateEventLoop 函数会创建一个 aeEventLoop 结构体类型的变量 eventLoop。然后,该函数会给 eventLoop 的成员变量分配内存空间,比如,按照传入的参数 setsize,给 IO 事件数组和已触发事件数组分配相应的内存空间。此外,该函数还会给 eventLoop 的成员变量赋初始值。
第二步,aeCreateEventLoop 函数会调用 aeApiCreate 函数。aeApiCreate 函数封装了操作系统提供的 IO 多路复用函数,假设 Redis 运行在 Linux 操作系统上,并且 IO 多路复用机制是 epoll,那么此时,aeApiCreate 函数就会调用 epoll_create 创建 epoll 实例,同时会创建 epoll_event 结构的数组,数组大小等于参数 setsize。
aeApiCreate 函数是把创建的 epoll 实例描述符和 epoll_event 数组,保存在了 aeApiState 结构体类型的变量 state,如下所示:
typedef struct aeApiState { //aeApiState结构体定义
int epfd; //epoll实例的描述符
struct epoll_event *events; //epoll_event结构体数组,记录监听事件
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
...
//将epoll_event数组保存在aeApiState结构体变量state中
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
...
//将epoll实例描述符保存在aeApiState结构体变量state中
state->epfd = epoll_create(1024);
紧接着,aeApiCreate 函数把 state 变量赋值给 eventLoop 中的 apidata。这样一来,eventLoop 结构体中就有了 epoll 实例和 epoll_event 数组的信息,这样就可以用来基于 epoll 创建和处理事件了。
eventLoop->apidata = state;
第三步,aeCreateEventLoop 函数会把所有网络 IO 事件对应文件描述符的掩码,初始化为 AE_NONE,表示暂时不对任何事件进行监听。
IO 事件处理
Redis 的 IO 事件主要包括三类,分别是可读事件、可写事件和屏障事件。
对应于 Redis 实例,我们可以从客户端读取数据或是向客户端写入数据。而屏障事件的主要作用是用来反转事件的处理顺序。
IO 事件创建
aeCreateFileEvent 函数的原型定义:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
这个函数的参数有 5 个,分别是循环流程结构体 eventLoop、IO 事件对应的文件描述符 fd、事件类型掩码 mask、事件处理回调函数proc,以及事件私有数据*clientData。
读事件处理
当 Redis server 接收到客户端的连接请求时,就会使用注册好的 acceptTcpHandler 函数进行处理。
acceptTcpHandler 函数是在networking.c文件中,它会接受客户端连接,并创建已连接套接字 cfd。然后,acceptCommonHandler 函数(在 networking.c 文件中)会被调用,同时,刚刚创建的已连接套接字 cfd 会作为参数,传递给 acceptCommonHandler 函数。
acceptCommonHandler 函数会调用 createClient 函数(在 networking.c 文件中)创建客户端。而在 createClient 函数中,我们就会看到,aeCreateFileEvent 函数被再次调用了。
此时,aeCreateFileEvent 函数会针对已连接套接字上,创建监听事件,类型为 AE_READABLE,回调函数是 readQueryFromClient(在 networking.c 文件中)。
好了,到这里,事件驱动框架就增加了对一个客户端已连接套接字的监听。一旦客户端有请求发送到 server,框架就会回调 readQueryFromClient 函数处理请求。这样一来,客户端请求就能通过事件驱动框架进行处理了。
client *createClient(int fd) {
…
if (fd != -1) {
…
//调用aeCreateFileEvent,监听读事件,对应客户端读写请求,使用readQueryFromclient回调函数处理
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
} }
…
}
写事件处理
在 Redis 事件驱动框架每次循环进入事件处理函数前,也就是在框架主函数 aeMain 中调用 aeProcessEvents,来处理监听到的已触发事件或是到时的时间事件之前,都会调用 server.c 文件中的 beforeSleep 函数,进行一些任务处理,这其中就包括了调用 handleClientsWithPendingWrites 函数,它会将 Redis sever 客户端缓冲区中的数据写回客户端。
下面给出的代码是事件驱动框架的主函数 aeMain。在该函数每次调用 aeProcessEvents 函数前,就会调用 beforeSleep 函数。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
//如果beforeSleep函数不为空,则调用beforeSleep函数
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
//调用完beforeSleep函数,再处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
时间事件
定义
typedef struct aeTimeEvent {
long long id; //时间事件ID
long when_sec; //事件到达的秒级时间戳
long when_ms; //事件到达的毫秒级时间戳
aeTimeProc *timeProc; //时间事件触发后的处理函数
aeEventFinalizerProc *finalizerProc; //事件结束后的处理函数
void *clientData; //事件相关的私有数据
struct aeTimeEvent *prev; //时间事件链表的前向指针
struct aeTimeEvent *next; //时间事件链表的后向指针
} aeTimeEvent;
时间事件结构体中主要的变量,包括以秒记录和以毫秒记录的时间事件触发时的时间戳 when_sec 和 when_ms,以及时间事件触发后的处理函数timeProc。另外,在时间事件的结构体中,还包含了前向和后向指针prev和*next,这表明时间事件是以链表的形式组织起来的。
时间事件创建
时间事件的创建函数是 aeCreateTimeEvent 函数。这个函数的原型定义如下所示:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc)
在它的参数中,有两个需要我们重点了解下,以便于我们理解时间事件的处理。一个是 milliseconds,这是所创建时间事件的触发时间距离当前时间的时长,是用毫秒表示的。另一个是 *proc,这是所创建时间事件触发后的回调函数。
aeCreateTimeEvent 函数的执行逻辑不复杂,主要就是创建一个时间事件的变量 te,对它进行初始化,并把它插入到框架循环流程结构体 eventLoop 中的时间事件链表中。在这个过程中,aeCreateTimeEvent 函数会调用 aeAddMillisecondsToNow 函数,根据传入的 milliseconds 参数,计算所创建时间事件具体的触发时间戳,并赋值给 te。
时间事件回调函数
serverCron 函数是在 server.c 文件中实现的。一方面,它会顺序调用一些函数,来实现时间事件被触发后,执行一些后台任务。比如,serverCron 函数会检查是否有进程结束信号,若有就执行 server 关闭操作。serverCron 会调用 databaseCron 函数,处理过期 key 或进行 rehash 等。
另一方面,serverCron 函数还会以不同的频率周期性执行一些任务,这是通过执行宏 run_with_period 来实现的。
比如,serverCron 函数中会以 1 秒 1 次的频率,检查 AOF 文件是否有写错误。如果有的话,serverCron 就会调用 flushAppendOnlyFile 函数,再次刷回 AOF 文件的缓存数据。下面的代码展示了这一周期性任务:
serverCron() {
…
//每1秒执行1次,检查AOF是否有写错误
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
…
}
时间事件的触发处理
时间事件的检测触发比较简单,事件驱动框架的 aeMain 函数会循环调用 aeProcessEvents 函数,来处理各种事件。而 aeProcessEvents 函数在执行流程的最后,会调用 processTimeEvents 函数处理相应到时的任务。
那么,具体到 proecessTimeEvent 函数来说,它的基本流程就是从时间事件链表上逐一取出每一个事件,然后根据当前时间判断该事件的触发时间戳是否已满足。如果已满足,那么就调用该事件对应的回调函数进行处理。这样一来,周期性任务就能在不断循环执行的 aeProcessEvents 函数中,得到执行了。
static int processTimeEvents(aeEventLoop *eventLoop) {
...
te = eventLoop->timeEventHead; //从时间事件链表中取出事件
while(te) {
...
aeGetTime(&now_sec, &now_ms); //获取当前时间
if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) //如果当前时间已经满足当前事件的触发时间戳
{
...
retval = te->timeProc(eventLoop, id, te->clientData); //调用注册的回调函数处理
...
}
te = te->next; //获取下一个时间事件
...
}
此文章为10月Day11学习笔记,内容来源于极客时间《Redis 源码剖析与实战》