深入Redis技术内幕:一条命令是如何执行的

2023年 8月 13日 93.0k 0

Redis 是一个高性能的、功能丰富且灵活的开源键值存储系统,其源码采用了许多优秀的编码实践和设计模式。通过学习 Redis 源码,可以深入了解 Redis 的内部工作原理和实现细节。从中学习到很多优秀的编程技巧、设计原则和架构思想,提升自己的编码能力,帮助我们更好地理解 Redis 的各种功能、性能特点和设计思想。

上一篇我们已经完成了一个自定义模块的开发、编译工作,接下来将从源码的角度分析这条自定义命令在Redis中是如何执行的,下文代码部分来自Redis6.0。

服务器启动主流程

每个Redis实例在内存中表现为一个redisServer结构,Redis的所有核心功能都和这个结构息息相关

// server.h
struct redisServer {
    ...
    redisDb *db;
    dict *commands;             /* Command table */
    dict *orig_commands;        /* Command table before command renaming. */
    aeEventLoop *el;
    _Atomic unsigned int lruclock; /* Clock for LRU eviction */
    ...
    dict *moduleapi;            /* Exported core APIs dictionary for modules. */
    dict *sharedapi;            /* Like moduleapi but containing the APIs that
                                   modules share with each other. */
    list *loadmodule_queue;     /* List of modules to load at startup. */
    ...
};

整个main函数的启动流程可以看做是对redisServer的构建过程

// server.c
int main(int argc, char **argv) {
    ...
    initServerConfig(); // 1
    ...
    moduleInitModulesSystem(); // 2
    ...
    if (argc >= 2) {
        ...
        loadServerConfig(server.configfile, config_from_stdin, options); // 3
    }
    ...
    initServer(); // 4
    ...
    if (!server.sentinel_mode) {
        ...
        moduleLoadFromQueue(); // 5
        ...
        InitServerLast(); // 6
        loadDataFromDisk(); // 7
        ...
    }
    aeMain(server.el); // 8
    aeDeleteEventLoop(server.el);
    return 0;
}
  • initServerConfig初始化服务器配置相关数据结构,包括命令字典、默认配置项等
  • moduleInitModulesSystem初始化模块相关数据结构,比如模块字典、模块API字典等
  • loadServerConfig如果启动参数中指定了配置文件,将解析、加载这些配置,覆盖默认配置项
  • initServer初始化一些关键数据结构,比如redisServer维护的一些链表结构、共享对象、网络模块、redisDb等
  • moduleLoadFromQueue加载配置文件中配置的模块动态链接库、执行模块初始化
  • InitServerLast主要工作是初始化Redis6.0增加的IO线程功能
  • loadDataFromDisk从AOF或者RDB文件中加载数据到内存
  • aeMain启动EventLoop,开始处理网络事件和周期性任务
  • 网络与EventLoop

    Redis使用一个单线程的事件循环来处理网络事件和周期性任务。EventLoop负责监听socket上的事件,如连接、读取和写入,并调用相应的回调函数进行处理。它基于操作系统提供的I/O多路复用机制(如selectepoll)来实现高效的事件监听。

    // ae.h
    typedef struct aeEventLoop {
        int maxfd;   /* highest file descriptor currently registered */
        int setsize; /* max number of file descriptors tracked */
        long long timeEventNextId;
        aeFileEvent *events; /* Registered events */
        aeFiredEvent *fired; /* Fired events */
        aeTimeEvent *timeEventHead;
        int stop;
        void *apidata; /* This is used for polling API specific data */
        aeBeforeSleepProc *beforesleep;
        aeBeforeSleepProc *aftersleep;
        int flags;
    } aeEventLoop;
    

    文件事件(aeFileEvent)

    文件事件是Redis对底层I/O事件的抽象表示。当一个socket上发生读写事件时,EventLoop会将该事件包装成文件事件,文件事件包含了socket描述符(数组下标)、事件类型(mask)和相应的回调函数(读或写)。

    // ae.h
    typedef struct aeFileEvent {
        int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
        aeFileProc *rfileProc;
        aeFileProc *wfileProc;
        void *clientData;
    } aeFileEvent;
    

    多路复用API

    Redis将IO多路复用抽象为以下一组接口函数,在EventLoop中通过这些API管理文件事件

    int aeApiCreate(aeEventLoop *eventLoop);
    int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
    void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask);
    int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
    

    Redis实现了多种IO多路复用,根据编译环境选择其中性能最好一种;

    // ae.c
    /* Include the best multiplexing layer supported by this system.
     * The following should be ordered by performances, descending. */
    #ifdef HAVE_EVPORT
    #include "ae_evport.c"
    #else
        #ifdef HAVE_EPOLL
        #include "ae_epoll.c"
        #else
            #ifdef HAVE_KQUEUE
            #include "ae_kqueue.c"
            #else
            #include "ae_select.c"
            #endif
        #endif
    #endif
    

    处理文件事件

    服务器启动后,EventLoop开始循环处理文件事件和周期性任务,代码简化后如下所示:

    while (1) {
        shortest = aeSearchNearestTimer(eventLoop);
        tvp = ...
        eventLoop->beforesleep(eventLoop); // 1
        numevents = aeApiPoll(eventLoop, tvp); // 2
        eventLoop->aftersleep(eventLoop);
        for (j = 0; j events[eventLoop->fired[j].fd];
            if (fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 3
            }
            if (fe->mask & mask & AE_WRITABLE) {
                fe->wfileProc(eventLoop,fd,fe->clientData,mask); // 4
            }
        }
        processTimeEvents(eventLoop); // 处理时间事件:serverCron
    }
    
  • beforeSleep 从redis6.0开始,增加了IO线程辅助数据读写,这里主要是将读写操作分派到IO线程和一些周期性工作;
  • 调用IO多路复用的实现函数,等待事件就绪;比如selectepoll_wait等;
  • 处理读事件,分两种情况,服务端socket:就是我们启动时监听在6379端口的socket,它的rfileProcacceptTcpHandler函数;客户端socket:连接到Redis服务器的客户端rfileProcreadQueryFromClient,这个函数从客户端socket接收数据,将数据读入客户端读缓冲区,根据Redis的请求协议,解析命令名称、参数和选项,解析完成后,将命令请求转换为一个命令对象,调用相应的命令处理函数来执行实际的命令逻辑
  • 处理写事件:一般情况下会在beforeSleep中直接调用writeToClient写数据到客户端,当writeToClient无法写出更多数据时(比如socket写缓冲区满了),才会安装写处理器为sendReplyToClient函数(对writeToClient的简单包装)
  • 网络功能初始化

    网络大部分功能在服务器启动时的initServer函数中完成初始化:

    // server.c
    void initServer(void) {
        server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); // 1
        if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); // 2
        if (server.unixsocket != NULL) {
            server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm, server.tcp_backlog); // 2
        }
        aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); // 创建时间事件:周期性调度serverCron
        for (j = 0; j < server.ipfd_count; j++) {
            aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL); // 3
        }
        aeSetBeforeSleepProc(server.el,beforeSleep); // 4
        aeSetAfterSleepProc(server.el,afterSleep); // 4
    }
    
  • 创建EventLoop,分配内存,初始化数据结构;
  • listenToPort分别调用socketbindlisten三个系统调用,创建一个服务端socket,监听在6379端口,等待接收客户端发送的连接请求;如果我们的Redis和应用部署在同一台机器上,可以启用Unix域协议(一种进程间通信方式,比TCP/IP协议栈更高效,有兴趣的同学可以参考《Unix网络编程》第15章关于Unix域协议的内容),则调用到anetUnixServer,其代码和listenToPort类似;
  • 将2中创建的socket注册到EventLoop,并安装读处理器为acceptTcpHandler,当新连接就绪时,会触发读事件,调用到acceptTcpHandler函数;
  • 设置EventLoop的beforesleepaftersleep处理函数;
  • 处理客户端连接

    服务端socket变为可读时,表示有新连接接入,触发对读处理器acceptTcpHandler的调用

    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        while(max--) {
            cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 1
            conn = connCreateAcceptedSocket(cfd); // 2
            acceptCommonHandler(conn,0,cip); // 3
        }
    }
    
  • 调用accept系统调用,从就绪队列(已完成TCP三次握手)中获取一个连接文件描述符cfd
  • cfd封装为一个connection结构,其实就是为cfd绑定一组操作函数(读、写等), 这些函数在CT_Socket结构中通过函数指针指定具体实现,这是C语言中一种常见的实现面向对象编程的方法;
  • // connection.c
    ConnectionType CT_Socket = {
        .ae_handler = connSocketEventHandler,
        .close = connSocketClose,
        .write = connSocketWrite,
        .read = connSocketRead,
        .accept = connSocketAccept,
        .connect = connSocketConnect,
        .set_write_handler = connSocketSetWriteHandler,
        .set_read_handler = connSocketSetReadHandler,
        .get_last_error = connSocketGetLastError,
        .blocking_connect = connSocketBlockingConnect,
        .sync_write = connSocketSyncWrite,
        .sync_read = connSocketSyncRead,
        .sync_readline = connSocketSyncReadLine,
        .get_type = connSocketGetType
    };
    

    这个函数会调用createClient为客户端连接创建一个client结构,作为这个连接在服务器的表示;createClient中会调用connSetReadHandler,然后调用conn->type->set_read_handler也就是CT_Socket.set_read_handler;最终调用到connSocketSetReadHandler,将cfd注册到EventLoop,设置读处理器为readQueryFromClient,之后接收到该客户端发送的数据时,会触发读事件,从而执行readQueryFromClient函数;

    // server.h
    typedef struct client {
        ...
        connection *conn;
        int resp;               /* RESP protocol version. Can be 2 or 3. */
        redisDb *db;            /* Pointer to currently SELECTed DB. */
        robj *name;             /* As set by CLIENT SETNAME. */
        sds querybuf;           /* Buffer we use to accumulate client queries. */
        ...
        int argc;               /* Num of arguments of current command. */
        robj **argv;            /* Arguments of current command. */
        list *reply;            /* List of reply objects to send to the client. */
        ...
        char buf[PROTO_REPLY_CHUNK_BYTES];
    } client;
    
    // networking.c
    client *createClient(connection *conn) {
        client *c = zmalloc(sizeof(client));
        if (conn) {
            connNonBlock(conn);
            connEnableTcpNoDelay(conn);
            if (server.tcpkeepalive)
                connKeepAlive(conn,server.tcpkeepalive);
            connSetReadHandler(conn, readQueryFromClient); // 设置读处理器为readQueryFromClient
            connSetPrivateData(conn, c);
        }
        ...
    }
    
    // connection.c 
    // CT_Socket.set_read_handler
    static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
        if (func == conn->read_handler) return C_OK;
    
        conn->read_handler = func;
        if (!conn->read_handler)
            aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
        else
            if (aeCreateFileEvent(server.el,conn->fd,
                        AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
        return C_OK;
    }
    

    命令执行

    Redis命令

    Redis命令在内存中表示为一个redisCommand结构,命令的执行其实就是执行proc函数指针指向的函数

    // sercer.h
    typedef void redisCommandProc(client *c);
    struct redisCommand {
        char *name;
        redisCommandProc *proc;
        int arity;
        char *sflags;   /* Flags as string representation, one char per flag. */
        uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
        ...
    };
    

    命令字典初始化

    Redis内置的命令定义在一个全局数组redisCommandTable

    // server.c
    struct redisCommand redisCommandTable[] = {
        {"module",moduleCommand,-2,
         "admin no-script",
         0,NULL,0,0,0,0,0,0},
    
        {"get",getCommand,2,
         "read-only fast @string",
         0,NULL,1,1,1,0,0,0},
        ...
    }
    

    服务器启动过程中,在initServerConfig函数中将静态命令表中的命令填充到server.commands命令字典中,执行客户端请求的命令时,会通过命令名称从字典中查找对应的命令;除此之外,自定义模块中也可以通过模块API创建自定义命令,这些命令会在模块加载时注册到server.commands命令字典中

    // server.c
    void initServerConfig(void) {
        ...
    		server.commands = dictCreate(&commandTableDictType,NULL);
        server.orig_commands = dictCreate(&commandTableDictType,NULL);
        populateCommandTable();
        ...
    }
    void populateCommandTable(void) {
        ...
        for (j = 0; j name), c);
            retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
        }
    }
    

    命令读取、解析、执行

  • 客户端通过网络发送命令,触发可读事件,此时读处理器为readQueryFromClient
  • readQueryFromClient将网络数据读取到客户端查询缓冲区client->querybuf
  • 然后调用processInputBuffer将查询缓存区中的字节数据根据RESP协议(Redis序列化协议是一个简单的文本协议,可以参考官网相关内容)解码为Redis命令参数client->argcclient->argv;
  • 最终调用processCommandAndResetClient->processCommand从命令字典中查找到目标命令,执行命令的proc指向的函数;
  • // server.c
    int processCommand(client *c) {
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 根据命令名称,查找命令字典(包括模块命令)
        ... 长达200多行的各种校验逻辑
        /* Exec the command */
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            queueMultiCommand(c); // multi之后的命令只入队不执行,等收到exec命令后才一起执行
            addReply(c,shared.queued);
        } else {
            call(c,CMD_CALL_FULL); // 执行命令,调用 c->cmd->proc(c);
            ...
        }
        return C_OK;
    }
    

    响应客户端

    客户端输出缓冲区

    Redis种有一系列addReplyXXX的函数,用于向客户端返回命令执行结果、错误信息等,这些命令会将数据写入一个16kb的客户端输出缓冲区client->buf,如果输出缓冲区空间不足,则写入一个输出缓冲队列client->reply,这些缓冲的数据会在合适的时机发送到客户端

    void addReply(client *c, robj *obj) {
        if (prepareClientToWrite(c) != C_OK) return;
    
        if (sdsEncodedObject(obj)) {
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) // 先尝试写输出缓冲区
                _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); // 失败则写入输出缓冲队列
        } else if (obj->encoding == OBJ_ENCODING_INT) {
            char buf[32];
            size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
            if (_addReplyToBuffer(c,buf,len) != C_OK)
                _addReplyProtoToList(c,buf,len);
        } else {
            serverPanic("Wrong obj->encoding in addReply()");
        }
    }
    

    发送数据到客户端

    入口函数为writeToClient,这里的connWrite会调用到CT_Socket.write,通过write系统调用将数据发送到客户端

    // networking.c
    int writeToClient(client *c, int handler_installed) {
        ...
        while(clientHasPendingReplies(c)) {
            if (c->bufpos > 0) {
                nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); // 写输出缓冲区
                ...
            } else {
                o = listNodeValue(listFirst(c->reply));
                ...
                // 写应答队列:如果响应数据很多,会将响应数据分为多块,放到一个队列中
                nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); 
                ...
            }
            ...
        }
        ...
        if (!clientHasPendingReplies(c)) {
            c->sentlen = 0;
            if (handler_installed) connSetWriteHandler(c->conn, NULL);
            if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
                freeClientAsync(c);
                return C_ERR;
            }
        }
        return C_OK;
    }
    

    Redis自定义模块

    Redis可以通过自定义模块来扩展其功能,在Redis服务器中添加新的命令、数据类型、钩子函数和其他功能。模块相关的功能可以参考Redis官网文档,这里分析一下模块是如何被加载生效的。

    模块加载

    模块加载的核心函数是 moduleLoad:

  • 通过dlopen库函数加载动态链接库;
  • 通过dlsym库函数查找动态链接库符号为RedisModule_OnLoad的函数,使用onload保存该函数地址;
  • 调用自定义的RedisModule_OnLoad函数,初始化模块;
  • // module.c
    int moduleLoad(const char *path, void **module_argv, int module_argc) {
        int (*onload)(void *, void **, int);
        void *handle;
        RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
        ...
        handle = dlopen(path,RTLD_NOW|RTLD_LOCAL); // 1
        if (handle == NULL) {
            serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror());
            return C_ERR;
        }
        onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad"); // 2
        ...
        if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) { // 3
            ...
        }
        /* Redis module loaded! Register it. */
        dictAdd(modules,ctx.module->name,ctx.module);
        ...
        return C_OK;
    }
    

    模块API初始化

    在上面自定义模块的代码中,我们首先调用了RedisModule_Init函数,执行模块相关API的初始化,这个步骤很有意思,它的API加载过程是这样的:RedisModule_GetApi -> RedisModuleCtx.getapifuncptr -> RM_GetApi,在函数RM_GetApi中将自定义模块中相关API的函数指针指向Redis中的具体实现函数,这样在自定义模块中就可以调用这些API了,这些实现函数在服务器启动时通过 main -> moduleInitModulesSystem -> moduleRegisterCoreAPI 注册到server.moduleapi字典,API函数实现在module.c文件中,命名方式为RM_XXX,比如RM_AllocRM_Free等等。

    // redismodule.h
    #define REDISMODULE_GET_API(name) \
        RedisModule_GetApi("RedisModule_" #name, ((void **)&RedisModule_ ## name))
    ...
    REDISMODULE_API void * (*RedisModule_Alloc)(size_t bytes) REDISMODULE_ATTR;
    REDISMODULE_API void * (*RedisModule_Realloc)(void *ptr, size_t bytes) REDISMODULE_ATTR;
    ...
    static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
        void *getapifuncptr = ((void**)ctx)[0];
        RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
        REDISMODULE_GET_API(Alloc); 
      	// RedisModule_GetApi("RedisModule_Alloc", ((void **)&RedisModule_Alloc); 宏展开后
        REDISMODULE_GET_API(Calloc);
        ...
    }
    
    // module.c
    #define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, NULL, {0}}
    
    struct RedisModuleCtx {
        void *getapifuncptr; // ((void**)ctx)[0]
        ...
    }
    
    // 这里注意**targetPtrPtr是指向函数指针RedisModule_Alloc的指针
    int RM_GetApi(const char *funcname, void **targetPtrPtr) {
        dictEntry *he = dictFind(server.moduleapi, funcname); // 查找模块API字典
        if (!he) return REDISMODULE_ERR;
        *targetPtrPtr = dictGetVal(he); // 将函数指针RedisModule_Alloc指向内部实现的函数
        return REDISMODULE_OK;
    }
    

    自定义命令

    模块初始化后我们通过调用RedisModule_CreateCommand创建了一个自定义命令FakeGPT.chat,这个API的实现函数为 RM_CreateCommand,如下所示,这里将向命令字典中增加一个执行cmdfunc函数的新命令。

    // module.c
    int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
        ...
        struct redisCommand *rediscmd;
        RedisModuleCommandProxy *cp;
        sds cmdname = sdsnew(name);
        if (lookupCommand(cmdname) != NULL) {
            sdsfree(cmdname);
            return REDISMODULE_ERR;
        }
        cp = zmalloc(sizeof(*cp));
        ...
        cp->func = cmdfunc;
        cp->rediscmd = zmalloc(sizeof(*rediscmd));
        cp->rediscmd->name = cmdname;
        cp->rediscmd->proc = RedisModuleCommandDispatcher; // 代理函数,里面还是调用到cp->func,传入RedisModuleCtx
        ...
        dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd); // 注册新命令
        ...
        return REDISMODULE_OK;
    }
    

    IO线程

    Redis是单线程还是多线程?你是否被问过这个问题。

    Redis在6.0版本新增了可选的IO线程功能,辅助读、写网络数据,但命令任然在主线程中执行;可以这么说:Redis6.0改多线程了,但又没有完全”多“。

    其实准确的说,Redis在6.0版本之前也是多线程的,类似bgsavebgrewriteaof之类的命令都会通过fork系统调用启用子进程处理IO任务(每个进程至少有个主线程,自然也算是多线程了)。

    IO线程初始化

    main函数的最后几行会调用到 InitServerLast函数,这个函数会调用initThreadedIO初始化IO线程,请注意io_threads_pending前面那么大的几个字_Atomic

    _Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
    void initThreadedIO(void) {
        server.io_threads_active = 0; /* We start with threads not active. */
        /* Don't spawn any thread if the user selected a single thread:
         * we'll handle I/O directly from the main thread. */
        if (server.io_threads_num == 1) return;
        /* Spawn and initialize the I/O threads. */
        for (int i = 0; i < server.io_threads_num; i++) {
            /* Things we do for all the threads including the main thread. */
            io_threads_list[i] = listCreate();
            if (i == 0) continue; /* Thread 0 is the main thread. */
    
            /* Things we do only for the additional threads. */
            pthread_t tid;
            pthread_mutex_init(&io_threads_mutex[i],NULL);
            io_threads_pending[i] = 0;
            pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
            if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { 
                serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
                exit(1);
            }
            io_threads[i] = tid;
        }
    }
    

    IO线程处理读、写任务

    IO线程的”run“方法是 IOThreadMain函数,每个线程都有一个自己的任务队列io_threads_list[id]和一个原子的计数器io_threads_pending[id](这里有兴趣的小伙伴可以想一想为什么只需要声明io_threads_pending_Atomic,参考cppreference)。

    每个线程通过自旋检测io_threads_pending[id]等待主线程向自己的任务队列分配任务,这种实现方式很!粗!暴!

    void *IOThreadMain(void *myid) {
        /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
         * used by the thread to just manipulate a single sub-array of clients. */
        long id = (unsigned long)myid;
        ...
        while(1) {
            /* Wait for start */
            for (int j = 0; j conn); // 核心读入口
                } else {
                    serverPanic("io_threads_op value is unknown");
                }
            }
            listEmpty(io_threads_list[id]);
            io_threads_pending[id] = 0;
        }
    }
    

    主线程任务分派

    主线程通过EventLoop的beforesleep将读、写事件分派到IO线程,其中读事件分派逻辑如下,写处理同理:

  • server.clients_pending_read待处理列表的读任务平均分配到所有处理线程(包括主线程);
  • 更新IO线程的待处理任务数,这个操作结果对IO线程是可见的,会触发IO线程执行任务(参考IO线程处理读、写任务);
  • 主线程也会处理一部分任务;
  • 主线程自旋等待所有IO线程执行完毕;
  • 此时所有的网络数据已经读取解析完毕,主线程执行命令,然后从待处理列表中删除;
  • // networking.c
    int handleClientsWithPendingReadsUsingThreads(void) {
        if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
        int processed = listLength(server.clients_pending_read);
        if (processed == 0) return 0;
        ...
        listRewind(server.clients_pending_read,&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            int target_id = item_id % server.io_threads_num;
            listAddNodeTail(io_threads_list[target_id],c); // 1
            item_id++;
        }
    
        /* Give the start condition to the waiting threads, by setting the
         * start condition atomic var. */
        io_threads_op = IO_THREADS_OP_READ;
        for (int j = 1; j conn); // 3
        }
        listEmpty(io_threads_list[0]);
    
        /* Wait for all the other threads to end their work. */
        while(1) {
            unsigned long pending = 0;
            for (int j = 1; j flags &= ~CLIENT_PENDING_READ;
            listDelNode(server.clients_pending_read,ln);
            ...
            if (processPendingCommandsAndResetClient(c) == C_ERR) { // 5
                continue;
            }
            ...
        }
    
        /* Update processed count on server */
        server.stat_io_reads_processed += processed;
    
        return processed;
    }
    

    开启IO线程后的读-解析-执行

    开启IO线程后,readQueryFromClient不会立即执行读-解析流程,而是将这个待客户端添加到server.clients_pending_read队列,等待后续主线程分派执行读-解析流程,同时标记客户端为CLIENT_PENDING_READ

    // networking.c
    void readQueryFromClient(connection *conn) {
        client *c = connGetPrivateData(conn);
        int nread, readlen;
        size_t qblen;
    
        /* Check if we want to read from the client later when exiting from
         * the event loop. This is the case if threaded I/O is enabled. */
        if (postponeClientRead(c)) return;
        ...
    }
    
    int postponeClientRead(client *c) {
        if (server.io_threads_active &&
            server.io_threads_do_reads &&
            !clientsArePaused() &&
            !ProcessingEventsWhileBlocked &&
            !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
        {
            c->flags |= CLIENT_PENDING_READ;
            listAddNodeHead(server.clients_pending_read,c);
            return 1;
        } else {
            return 0;
        }
    }
    

    同理,在解析完命令后也不会正在的执行命令,此时c->flags & CLIENT_PENDING_READ为true:

    // networking.c
    void processInputBuffer(client *c) {
        while(c->qb_pos querybuf)) {
            ... 解析命令
            /* Multibulk processing could see a argc == 0) {
                resetClient(c);
            } else {
                /* If we are in the context of an I/O thread, we can't really
                 * execute the command here. All we can do is to flag the client
                 * as one that needs to process the command. */
                if (c->flags & CLIENT_PENDING_READ) {
                    c->flags |= CLIENT_PENDING_COMMAND;
                    break;
                }
    
                /* We are finally ready to execute the command. */
                if (processCommandAndResetClient(c) == C_ERR) {
                    return;
                }
            }
        }
    		...
    }
    

    总结时刻

    命令执行

    吧啦吧啦说了这么一大堆,那么一条命令到底是怎么执行的呢?总结一下命令的整个过程如下:

  • 客户端连接到Redis服务器,Redis将这个连接注册到EventLoop,监听读事件,读处理器为readQueryFromClient
  • 客户端随后发送一条命令FakeGPT.chat 在吗?到Redis服务器,EventLoop触发读事件,调用readQueryFromClient
  • readQueryFromClient读取、解析网络数据,然后从server.commands字典中查找名为FakeGPT.chat的命令(这个命令在模块加载时被添加到server.commands字典中),最终执行到我们自定义模块的FakeGPT_chat函数
  • FakeGPT_chat函数中通过模块API向client输出缓冲区client->buf写入响应结果
  • 随后EventLoop在beforesleep中调用writeToClient将client输出缓冲区中的数据发送到远程客户端
  • 至此,整个命令的执行流程就结束了。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论