《redis4.0 通信模块源码分析(一)》

embedded/2025/2/6 11:03:18/

       

redis导读】redis作为一款高性能的内存数据库,面试服务端开发,redis是绕不开的话题,如果想提升自己的网络编程的水平和技巧,redis这款优秀的开源软件是很值得大家去分析和研究的。

    

      笔者从大学毕业一直有分析redis源码的想法,但由于各种原因,一直没有付诸行动,今天抽空把redis4.0的源码做了一次深层次的剖析,redis作为一款高效的、支持高并发的内存型数据库,相信很多同学认为redis采用了非常复杂的网络通信架构,但实则不然!redis之所以性能高,redis4.0采用了单线程的模式(redis6.0不再是单线程模式),有效地避免了线程切换和同步所带的性能开销;redis键值对全部存储在内存中,redis自实现了一套高效的内存管理机制,数据的存取都是直接访问内存,无需进行磁盘IO访问。

1、前期准备工作

    centos的终端上运行:

wget http://download.redis.io/releases/redis-4.0.11.tar.gztar -zxvf redis-4.0.11.tar.gzcd redis-4.0.11make -j 5

     编译redis源码:

图片

      gdb调试redis-server:

 gdb redis-server r

图片

   在redis编译目录下,再启一个终端,运行如下指令,把redis-client运行起来:

gdb redis-clirset hello redis

图片

    这样就完成了redis的前期准备工作,可以高效地往redis-server中更新键值对,好那接下来看看redis-server关于服务端源码的剖析。

2、调试源码

      redis-server也是作为一个独立的进程,既然是独立的进程,那么程序肯定有入口点,也即是main函数入口,全局搜索了下redis的源码,可以看到server.c中有main函数有入口。

图片

int main(int argc, char **argv) {    ......    //初始化服务端    initServer();    //设置一些回调函数    aeSetBeforeSleepProc(server.el, beforeSleep);    aeSetAfterSleepProc(server.el, afterSleep);    //aeMain开启事件循环    aeMain(server.el);    ......    aeDeleteEventLoop(server.el);    return 0;}

   以上是server.c中main函数的主要执行流,只有一个主线程,初始化服务,设置回调,开始事件循环。那逐步开始拆解,先看看initServer()的执行流。

    备注:initServer()接口中很多细节值得大家去学习,也是编写服务端程序容易被遗漏的细节

/* Global vars */
struct redisServer server; /* Server global state */void setupSignalHandlers(void) {struct sigaction act;/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.* Otherwise, sa_handler is used. */sigemptyset(&act.sa_mask);act.sa_flags = 0;act.sa_handler = sigShutdownHandler;sigaction(SIGTERM, &act, NULL);sigaction(SIGINT, &act, NULL);......return;
}void initServer(void) 
{int j;/*忽略SIGHUP、SIGPIPE信号,否则这两个信号容易把redis进程给挂掉*/signal(SIGHUP, SIG_IGN);signal(SIGPIPE, SIG_IGN);//设置指定信号处理函数。setupSignalHandlers();....../*全局redisServer对象,生命周期和整个进程保持一致redisServer对象保存了事件循环、客户端队列等成员变量*/server.pid = getpid();server.current_client = NULL;server.clients = listCreate();server.clients_to_close = listCreate();server.slaves = listCreate();server.monitors = listCreate();//clients_pending_write表示已连接客户端,但未注册写事件的队列server.clients_pending_write = listCreate();server.slaveseldb = -1; server.unblocked_clients = listCreate();server.ready_keys = listCreate();//还未给回复的客户端队列server.clients_waiting_acks = listCreate();server.get_ack_from_slaves = 0;server.clients_paused = 0;server.system_memory_size = zmalloc_get_memory_size();createSharedObjects();adjustOpenFilesLimit();/*根据配置的参数,给主evetLoop的各成员队列初始化指定大小的空间比如: 读、写回调函数的aeFileEvent队列typedef struct aeFileEvent {int mask;//可读、可写、异常aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;}aeFileEvent;*///全局就一个redisServer,一个redisServer对应一个eventLoopserver.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);if (server.el == NULL) {serverLog(LL_WARNING,"Failed creating the event loop. Error message: '%s'",strerror(errno));exit(1);}server.db = zmalloc(sizeof(redisDb) * server.dbnum);//开启监听if (server.port != 0 &&listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)exit(1);if (server.unixsocket != NULL) {unlink(server.unixsocket); /* don't care if this fails */server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm, server.tcp_backlog);if (server.sofd == ANET_ERR) {serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);exit(1);}//将socket设置成非阻塞的anetNonBlock(NULL,server.sofd);}......//创建Redis定时器,用于执行定时任务if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {serverPanic("Can't create event loop timers.");exit(1);}/*1、为redisServer监听套接字设置连接建立成功回调函数acceptTcpHandler,只关注可读事件,监听套接字产生可读事件,说明连接建立成功。2、将监听socket绑定到IO复用模型上面去*/for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler, NULL) == AE_ERR){serverPanic("Unrecoverable error creating server.ipfd file event.");}}if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd, AE_READABLE,acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");/* 创建一个管道,用于主动唤醒被epoll_wait挂起的eventLoop */if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,moduleBlockedClientPipeReadable, NULL) == AE_ERR) {serverPanic("Error registering the readable event for the module ""blocked clients subsystem.");}......
}

     基于上述的主流程,我们进一步剖析,如何将监听socket绑定到IO多路复用模型上?进一步剖析aeCreateFileEvent接口。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}aeFileEvent *fe = &eventLoop->events[fd];if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}static int aeApiAddEvent(aeEventLoop *eventLoop,int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0};int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;mask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE)ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;//从这里看redis使用epoll模型,将fd绑定到epfd上if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}


      假设epoll模型检测到监听套接字有可读事件产生,那主Loop的势必从epoll_wait接口返回,再根据事件类型,转调我们提前设置的回调函数acceptTcpHandler中来。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;char cip[NET_IP_STR_LEN];UNUSED(el);UNUSED(mask);UNUSED(privdata);while(max--) {//调用accept接口,生成一个客户端套接字cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);if (cfd == ANET_ERR) {if (errno != EWOULDBLOCK)serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);return;}serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);//acceptCommonHandler(cfd,0,cip);}
}#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {client *c;//创建cif ((c = createClient(fd)) == NULL) {serverLog(LL_WARNING,"Error registering fd event for the new client: %s (fd=%d)",strerror(errno),fd);close(fd); /* May be already closed, just ignore errors */return;}......
}//以客户端套接字创建一个client对象
client *createClient(int fd) {client *c = zmalloc(sizeof(client));if (fd != -1) {//将客户端套接字设置成非阻塞的anetNonBlock(NULL,fd);//关闭nagel算法anetEnableTcpNoDelay(NULL,fd);//设置TCP链接保活机制if (server.tcpkeepalive)anetKeepAlive(NULL,fd,server.tcpkeepalive);/*将客户端套接字绑定到epfd上,同时设置可读事件回调函数readQueryFromClient*/  if (aeCreateFileEvent(server.el, fd, AE_READABLE,readQueryFromClient, c) == AE_ERR){close(fd);zfree(c);return NULL;}}......
}
 

    那接着看客户端套接字产生了可读事件,进而主Loop循环会执行到和当前客户端套接字相关的回调函数中来,一起看下readQueryFromClient的源码。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) 
{client *c = (client*)privdata;int nread, readlen;size_t qblen;UNUSED(el);UNUSED(mask);readlen = PROTO_IOBUF_LEN;if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= PROTO_MBULK_BIG_ARG){ssize_t remaining = (size_t)(c->bulklen+2) - sdslen(c->querybuf);if (remaining < readlen)readlen = remaining;}qblen = sdslen(c->querybuf);if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);nread = read(fd, c->querybuf + qblen, readlen);if (nread == -1) {if (errno == EAGAIN) {//说明当前接收缓冲区不够,没法读到最新的数据return;} else {//那说明真的出错了serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));freeClient(c);return;}} else if (nread == 0) {serverLog(LL_VERBOSE, "Client closed connection");freeClient(c);return;} else if (c->flags & CLIENT_MASTER){c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf + qblen, nread);}......if (!(c->flags & CLIENT_MASTER)) {processInputBuffer(c);} else {size_t prev_offset = c->reploff;processInputBuffer(c);size_t applied = c->reploff - prev_offset;if (applied) {replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);sdsrange(c->pending_querybuf, applied, -1);}}
}
 

    processInputBuffer 判断接收到的字符串是不是以星号( * )开头,如果以*开头,设置 client 对象的 reqtype 字段值为 PROTO_REQ_MULTIBULK ,接着调用 processMultibulkBuffer 函数继续处理剩余的字符串。处理后的字符串被解析成 redis 命令,如果是具体的命令,那么redis会按照指定的规则去执行。

    既然提到指令command,那么processInputBuffer 接口中肯定有和指令command处理相关的接口。

int processCommand(client *c) {//如果是quit指令,那么给客户端回应一个ok的应答replayif (!strcasecmp(c->argv[0]->ptr,"quit")) {addReply(c,shared.ok);c->flags |= CLIENT_CLOSE_AFTER_REPLY;return C_ERR;}//查找指令,执行对应的指令,出错了,给客户端回应一个错误信息c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);if (!c->cmd) {flagTransaction(c);sds args = sdsempty();int i;for (i=1; i < c->argc && sdslen(args) < 128; i++)args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",(char*)c->argv[0]->ptr, args);sdsfree(args);return C_OK;} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||(c->argc < -c->cmd->arity)) {flagTransaction(c);addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);return C_OK;}......
}
 

  那继续看看addReply接口:
 

void addReply(client *c, robj *obj) {if (prepareClientToWrite(c) != C_OK) return;if (sdsEncodedObject(obj)) {if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)_addReplyObjectToList(c,obj);} else if (obj->encoding == OBJ_ENCODING_INT) {......} else {serverPanic("Wrong obj->encoding in addReply()");}
}
 

    继续看prepareClientToWrite接口:

int prepareClientToWrite(client *c) {if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;if ((c->flags & CLIENT_MASTER) &&!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;if (c->fd <= 0) return C_ERR; if (!clientHasPendingReplies(c) &&!(c->flags & CLIENT_PENDING_WRITE) &&(c->replstate == REPL_STATE_NONE ||(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))){/*如果当前client没有CLIENT_PENDING_WRITE标记而且没有暂存的数据要发送,那么给它设置个CLIENT_PENDING_WRITE同时将当前client添加到redisServer的clients_pending_write链表中去*/  c->flags |= CLIENT_PENDING_WRITE;listAddNodeHead(server.clients_pending_write, c);}return C_OK;
}

      还有接口_addReplyToBuffer:

/*最重要的一步,将客户端请求command执行的结果添加到cliet对应的buf缓冲区中去。
*/  
int _addReplyToBuffer(client *c, const char *s, size_t len) 
{size_t available = sizeof(c->buf) - c->bufpos;if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;/*如果client对应的replay链表长度大于0,那么将该应答指令添加到replay链表中去*/     if (listLength(c->reply) > 0) return C_ERR;if (len > available) return C_ERR;memcpy(c->buf + c->bufpos, s, len);c->bufpos += len;return C_OK;
}//_addReplyToBuffer返回C_ERR,那将replay添加到replay链表
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)_addReplyObjectToList(c, obj);
 

   redis4.0最核心的部分就是这个主Loop循环:

void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS | AE_CALL_AFTER_SLEEP);}
}
 

     每次循环都会执行下beforesleep接口,beforesleep接口主要做了啥呢,可以看看beforesleep接口的实现:

void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);/* Handle writes with pending output buffers. */handleClientsWithPendingWrites();......
}int handleClientsWithPendingWrites(void) {listIter li;listNode *ln;int processed = listLength(server.clients_pending_write);//先处理有数据需要发送的链表clients_pending_writelistRewind(server.clients_pending_write, &li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);//注销掉CLIENT_PENDING_WRITE标记c->flags &= ~CLIENT_PENDING_WRITE;listDelNode(server.clients_pending_write,ln);//直接往socket写数据if (writeToClient(c->fd, c, 0) == C_ERR) continue;//如果当前client对象有需要发送的replayif (clientHasPendingReplies(c)) {int ae_flags = AE_WRITABLE;if (server.aof_state == AOF_ON &&server.aof_fsync == AOF_FSYNC_ALWAYS){ae_flags |= AE_BARRIER;}/*如果tcp窗口太小,那么数据有可能发不出去,将client的fd可写事件添加到epoll模型上去并注册可写回调函数sendReplyToClient*/if (aeCreateFileEvent(server.el, c->fd, ae_flags,sendReplyToClient, c) == AE_ERR){freeClientAsync(c);}}}return processed;
}//sendReplyToClient也是调用writeToClient接口
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {UNUSED(el);UNUSED(mask);writeToClient(fd,privdata,1);
}
 

     所以分析了这么多,感觉redis的通信模型就是单线程,外加一个主Loop循环,定义一个全局的redisServer对象,定义多个数据成员链表用于管理已连接的client对象集合,需要回复的client对象、有数据需要待发送的client对象集合,epoll模型监听listenSocket、AcceptSocket可读事件,客户端有请求指令发送过来,redisServer解析指令,执行指令,并给client回复执行结果,如果tcp窗口太小,给当前client的fd注册可写事件和可写回调函数sendReplyToClient,待TCP窗口满足发送数据要求时,sendReplyToClient再执行数据的发送。另外主Loop每次循环时都会主动检测待回复链表replay、待发送链表clients_pending_write,如果有数据需要发送给客户端,逐个遍历发送。

3、实测验证

     在centos7做下实测,我们同时开启两个redis-cli,先后给redis-server发送两个指令

set hello world

   此时看下redis-server的堆栈以及主线程:

图片

    redis处理客户端请求,并不是多线程并发处理,而是循环遍历去给pending client回复报文,逐一回应。

图片

     


http://www.ppmy.cn/embedded/160006.html

相关文章

vue2+vue3 HMCXY基础入门

vue2vue3 HMCXY基础入门 一、Vue2.x技术精讲1.Vue快速上手&#xff08;1&#xff09;Vue概念&#xff08;2&#xff09;创建实例&#xff08;3&#xff09;插值表达式&#xff08;4&#xff09;响应式特性&#xff08;5&#xff09;开发者工具 2.Vue指令二、Vue3.x技术精讲 一、…

【JavaEE】Spring(6):Mybatis(下)

一、Mybatis XML配置文件 Mybatis开发有两种方式&#xff1a; 注解XML 之前讲解了注解的方式&#xff0c;接下来学习XML的方式 1.1 配置数据库连接和Mybatis 直接在配置文件中配置即可&#xff1a; spring:datasource:url: jdbc:mysql://127.0.0.1:3306/mybatis_test?cha…

基于单片机的智能感控杆设计(论文+源码)

2.1功能设计 本次以智能感控杆设计为题&#xff0c;智能感控杆是一种可以应用在多种场合的设备&#xff0c;可以极大的节约人类的精力和时间。在此将其主要功能设计如下&#xff1a; 1.LCD1602液晶显示当前感控杆状态开启/关闭&#xff0c;显示当前模式手动/自动&#xff1b…

网易有道开源 “子曰 - o1” 推理模型

网易有道开源的 “子曰 - o1” 推理模型支持消费级显卡&#xff0c;主要通过以下技术实现&#xff1a; 轻量级模型设计&#xff1a;“子曰 - o1” 采用 14B 参数规模&#xff0c;相比许多对硬件配置要求高的大型推理模型&#xff0c;选择了较小的参数规模&#xff0c;从基础上降…

Linux 安装 RabbitMQ

Linux下安装RabbitMQ 1 、获取安装包 # 地址 https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.9/erlang-21.3.8.9-1.el7.x86_64.rpm erlang-21.3.8.9-1.el7.x86_64.rpmsocat-1.7.3.2-1.el6.lux.x86_64.rpm# 地址 https://github.com/rabbitmq/rabbitmq-se…

Maven(Ⅱ):依赖范围,依赖传递,依赖阻断,可选依赖

1. Maven 依赖范围 概念 依赖范围&#xff08;Dependency Scope&#xff09;用于控制依赖在不同构建阶段的可见性和可用性。Maven 定义了几种不同的依赖范围&#xff0c;每种范围都有其特定的使用场景。 常见依赖范围及用途 compile&#xff1a;默认的依赖范围&#xff0c;…

【AI知识点】特征编码(Feature Encoding)

特征编码 是将数据集中的分类特征&#xff08;Categorical Features&#xff09;转换为数值特征&#xff08;Numerical Features&#xff09;的过程。分类特征是机器学习模型&#xff08;尤其是数值模型&#xff0c;如线性回归、支持向量机等&#xff09;无法直接处理的数据类型…

图像特征点提取与匹配

引入 视觉里程计根据两帧图像确定机器人的位姿变化。视觉里程计的算法主要分为两个大类&#xff1a;特征点法和直接法。特征点法长久以来&#xff08;直到现在&#xff09;被认为是视觉里程计的主流方法。而特征点法首先需要提取特征点并进行匹配。 1. 特征点 1.1特征点的定义…