Memcached源码分析之请求处理(状态机)

6399℃

作者:Calix

一)上文

在上一篇线程模型的分析中,我们知道,worker线程和主线程都调用了同一个函数,conn_new进行事件监听,并返回conn结构体对象。最终有事件到达时,调用同一个函数event_handler最终来到执行drive_machine。

二)conn结构体

首先,很有必要地先分析一个结构体:conn

这个conn在memcached里面是这样一个角色,听名字也知道它代表一个“连接”,但这个“连接”不一定是已经连接上的连接,可以是监听中的连接,例如主线程在监听listen fd的时候,也通过conn_new创建了一个conn实例对象,而这个conn对象的conn_states值为conn_listening,代表“监听中的连接”。

而worker线程监听的client fd是已经连接上了,也为这个连接创建一个“conn”对象,而连接状态conn_states则不是conn_listening,最开始的时候为conn_cmd_new,听名字也知道,这个连接处于“新命令”状态。

每一个“连接”都有当前的状态,监听中,还是等待新命令中,还是后面会看到的“写数据”中,“关闭中”等等,所以这个conn结构体的定义是合理的。

所以最后总结出,无论是主线程监听listen fd还是worker线程监听client fd,只要是与客户端有关的fd的监听都以一个conn对象来表示。

下面大概分析一下conn的结构,(建议先大体看下各个字段的意义,具体到某个字段被使用时再详讲):

typedef struct conn conn;
struct conn {
 int sfd; //连接的socket fd
 sasl_conn_t *sasl_conn;
 bool authenticated;
 enum conn_states state; //当前的连接状态
 enum bin_substates substate;
 rel_time_t last_cmd_time;
 struct event event; // 监听的事件
 short ev_flags; //监听的事件 类型
 short which; /** which events were just triggered */ //刚触发的事件

 /**
 读buffer会涉及两个方向上的“读”:
 一个是从socket读进来到rbuf里面
 一个是从rbuf里面把数据读出去解析,读buffer相当于一个中介,从socket读进来最终还是得被别人读出去解析,而
 rcurr工作指针与rbytes就是在rbuf数据被读出去解析的时候用到,用来控制可以读出去被解析的数据还剩余多少。
 */
 char *rbuf; /** buffer to read commands into */ //读buffer
 char *rcurr; /** but if we parsed some already, this is where we stopped */ //读buffer的当前指针
 int rsize; /** total allocated size of rbuf */ //读buffer大小
 int rbytes; /** how much data, starting from rcur, do we have unparsed */ //剩余待解析的buffer字节数

 //下面4个属性和上面4个类似
 char *wbuf;
 char *wcurr;
 int wsize;
 int wbytes;

 /** which state to go into after finishing current write */
 enum conn_states write_and_go; //完成当前写操作后,连接状态将会置为此状态
 void *write_and_free; /** free this memory after finishing writing */

 char *ritem; /** when we read in an item's value, it goes here */ //这个指针指向item结构体中data中的value地址
 int rlbytes; //尚未读完item的data的value的字节数
 void *item; /* for commands set/add/replace */ //当执行set/add/replace 命令时,此指针用于指向分配的item空间

 /* data for the swallow state */
 int sbytes; /* how many bytes to swallow */

 //下面是往socket写出数据时用的字段
 struct iovec *iov;
 int iovsize; /* number of elements allocated in iov[] */
 int iovused; /* number of elements used in iov[] */
 struct msghdr *msglist;
 int msgsize; /* number of elements allocated in msglist[] */
 int msgused; /* number of elements used in msglist[] */
 int msgcurr; /* element in msglist[] being transmitted now */
 int msgbytes; /* number of bytes in current msg */
 item **ilist; /* list of items to write out */
 int isize;
 item **icurr;
 int ileft;
 char **suffixlist;
 int suffixsize;
 char **suffixcurr;
 int suffixleft;
 enum protocol protocol; /* which protocol this connection speaks */
 enum network_transport transport; /* what transport is used by this connection */

 //UDP相关的字段
 int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
 struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
 socklen_t request_addr_size;
 unsigned char *hdrbuf; /* udp packet headers */
 int hdrsize; /* number of headers' worth of space is allocated */
 bool noreply; /* True if the reply should not be sent. */

 /* current stats command */
 struct {
  char *buffer;
  size_t size;
  size_t offset;
 } stats;

 // 二进制相关的字段
 protocol_binary_request_header binary_header;
 uint64_t cas; /* the cas to return */
 short cmd; /* current command being processed */
 int opaque;
 int keylen;
 conn *next; /* Used for generating a list of conn structures */
 LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};




/* conn_states是一个枚举:*/

enum conn_states {
 conn_listening, /**< the socket which listens for connections */
 conn_new_cmd, /**< Prepare connection for next command */
 conn_waiting, /**< waiting for a readable socket */
 conn_read, /**< reading in a command line */
 conn_parse_cmd, /**< try to parse a command from the input buffer */
 conn_write, /**< writing out a simple response */
 conn_nread, /**< reading in a fixed number of bytes */
 conn_swallow, /**< swallowing unnecessary bytes w/o storing */
 conn_closing, /**< closing this connection */
 conn_mwrite, /**< writing out many items sequentially */
 conn_closed, /**< connection is closed */
 conn_max_state /**< Max state value (used for assertion) */
};

知道conn的意义之后,主线程和worker线程都调用conn_new监听fd并创建conn对象就合情合理了,大家都有conn对象,只是状态不一样,event_handler被触发,调用drive_machine,进入不一样的case完成不一样的操作。

这句话压缩来说:“根据状态不同去做不同的事情”,这种工作方式其实就是下面要讲的“状态机”。

三)状态机

状态机drive_machine函数是worker线程网络请求进行业务逻辑处理的核心。

它的实现方式是:

一个while循环里面有一个巨大的switch case,根据连接对象 conn当前的连接状态conn_state,进入不同的case,而每个case可能会改变conn的连接状态,也就是说在这个while+switch中,conn会不断的发生状态转移,最后被分发到合适的case上作处理。可以理解为,这里是一个有向图,每个case是一个顶点,有些case通过改变conn对象的连接状态让程序在下一次循环中进入另一个case,几次循环后程序最终进入到“无出度的顶点”然后结束状态机,这里的无出度的顶点就是带设置stop=true的case分支。

看下大概的代码结构:

static void drive_machine(conn *c) {
    while (!stop) {
        switch(c->state) {
           case conn_listening: 
                 //。。。。
           case conn_waiting:
                //。。。
                stop = true; break;
               //。。。
        }
   }
}

主线程状态机的行为我们已经知道了,永远只会是conn_listening状态,永远只会进入drive_machine的conn_listening分支,accept连接把client fd 通过dispatch_conn_new函数分发给worker线程。

下面我们来看一下worker线程执行状态机:

当主线程调用dispatch_conn_new的时候,worker线程创建conn对象,初始状态为conn_new_cmd。所以当有worker线程监听的client fd有请求过来时,例如客户端发了一行命令(set xxx\r\n)会进入conn_new_cmd分支:

         case conn_new_cmd:
            /*
             这里的reqs是请求的意思,其实叫“命令”更准确。一次event发生,有可能包含多个命令,
             从client fd里面read到的一次数据,不能保证这个数据只是包含一个命令,有可能是多个
             命令数据堆在一起的一次事件通知。这个nreqs是用来控制一次event最多能处理多少个命令。
            */
            --nreqs;
            if (nreqs >= 0) {
                /**
                准备执行命令。为什么叫reset cmd,reset_cmd_handler其实做了一些解析执行命令之前
                的初始化动下一个,都会重新进入这个case作。而像上面说的,一次event有可能有多个命令,每执行一个命令,如果还有
                 conn_new_cmd,reset一下再执行下一个命令。
                */
                reset_cmd_handler(c);
            } else {
//。。。
            }
            break;

上面的nreqs在这里暂不详细分析。当client fd第一次有请求过来的时候,会进入reset_cmd_handler函数:

 static void reset_cmd_handler(conn *c) {
    c->cmd = -1;
    c->substate = bin_no_state;
    if(c->item != NULL) {
        item_remove(c->item);
        c->item = NULL;
    }
    conn_shrink(c); 

 //第一次有请求过来触发到此函数时,c->rbytes为0
    if (c->rbytes > 0) {
        conn_set_state(c, conn_parse_cmd);
    } else {
        conn_set_state(c, c
            onn_waiting);  //第一次请求进入此分支
    }
}

我们在conn_new函数里面把c->rbytes被始化为0,而直至此我们也没有看到这个c->rbytes有被重新赋新值,所以其实第一次有请求过来,这个值还是0,所以进入else分支,即执行conn_set_state(c,conn_waiting);然后重新回到状态机执行下一次循环,进入conn_waiting分支:

case conn_waiting:
    if (!update_event(c, EV_READ | EV_PERSIST)) {
        //。。。
    }
    conn_set_state(c, conn_read);
    stop = true;
    break;

在conn_waiting分支你会发现,这里的代码仅仅是把状态改变conn_read然后就stop=true,结束状态机了!没错,退出while循环了!这次事件触发就此结束了!
你会觉得很奇怪,我客户端明明发了一个请求,(set xxx\r\n),你什么都没处理就只是把连接状态改成conn_read就完事了?!没错,至少这一次状态机的执行行为是这样!

到底是怎么回事?其实这里是利用了一点:libevent的epoll默认是“水平触发”!也就是说,客户端发来一个set xxx\r\n,我这边一天没有read,epoll还会有下一次通知,也就是说,这个请求有两次事件通知!第一次通知的作用仅是为了把连接状态改为conn_read! 当worker线程因为同一个client fd同一个请求收到第二次通知的时候,再次执行状态机,然后进入conn_read分支。

为了验证这一点,我在drive_machine函数代码执行的开头处打了一下log:

static void drive_machine(conn *c) {
    fprintf(stderr, "event arrive!\n");

然后重新编译memcached运行,测试一下是否worker线程事件通知发生了两次(左边是服务端,右边是客户端):

客户端telnet发起连接,event_base通知主线程,所以这里会有一次调用drive_machine的情况:

6619078690909560085

客户端输入“set testkey 0 0 4”的命令后:

6608929099073920313

可以看到当服务端收到命令后,先利用第一次事件通知(上面图中的第二个event arrive)把状态置为conn_read,然后等待第二次事件通知。非常快地,第二次事件通知就到达(上面图中的第三个event arrive),然后进入conn_read状态继续执行。

下面我们看一下收到第二次通知的时候进入conn_read分支后的代码:

         case conn_read:
            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
            switch (res) {
            case READ_NO_DATA_RECEIVED:
                conn_set_state(c, conn_waiting);
                break;
            case READ_DATA_RECEIVED:
                conn_set_state(c, conn_parse_cmd);
                break;
            case READ_ERROR:
                conn_set_state(c, conn_closing);
                break;
            case READ_MEMORY_ERROR:
                break;
            }
            break;

进入conn_read此时才调用函数try_read_network函数读出请求(set xxx\r\n):

 static enum try_read_result try_read_network(conn *c) {
    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
    int res;
    int num_allocs = 0;
    assert(c != NULL);
    if (c->rcurr != c->rbuf) {
        if (c->rbytes != 0) /* otherwise there's nothing to copy */
            memmove(c->rbuf, c->rcurr, c->rbytes);
        c->rcurr = c->rbuf;
    }
    while (1) {
        if (c->rbytes >= c->rsize) {//读buffer空间扩充
            //。。。
        }
        int avail = c->rsize - c->rbytes; //读buffer的空间还剩余多少大小可以用
        res = read(c->sfd, c->rbuf + c->rbytes, avail); //往剩下的可用的地方里塞
        if (res > 0) {
            gotdata = READ_DATA_RECEIVED;
            /**
            rbytes是当前指针rcurr至读buffer末尾的数据大小,这里可简单地理解为对rbytes的初始化。
            */
            c->rbytes += res;
            if (res == avail) { //可能还没读完,此时读buffer可用空间满了,那么下次循环会进行读buffer空间扩充
                continue;
            } else {
                break; //socket的可读数据都读完了
            }
        }
//。。。
    }
    return gotdata;
}

try_read_network函数就是从socket中把数据读到c->rbuf中去而已,同时初始化一些变量例如rbytes等,读取数据成功则返回READ_DATA_RECEIVED,状态机 conn_set_state(c, conn_parse_cmd);进入conn_parse_cmd状态:

  case conn_parse_cmd :
            /**
            try_read_network后,到达conn_parse_cmd状态,但try_read_network并不确保每次到达
            的数据都足够一个完整的cmd(ascii协议情况下往往是没有"\r\n",即回车换行),
            所以下面的try_read_command之所以叫try就是这个原因,
            当读到的数据还不够成为一个cmd的时候,返回0,conn继续进入conn_waiting状态等待更多的数据到达。
            */
            if (try_read_command(c) == 0) {
                /* wee need more data! */
                conn_set_state(c, conn_waiting);
            }
            break;

进行conn_parse_cmd主要是调用try_read_command函数读取命令,上面注释也说明了数据不够一个cmd的情况,下面我们进入try_read_command,看看try_read_command不返回0时,也就是足够一个cmd后是怎么解析这个cmd的(只说明tcp ascii协议的情况):

static int try_read_command(conn *c) {
char *el, *cont;
        if (c->rbytes == 0)  //读buffer没有待解析的数据
            return 0;
        el = memchr(c->rcurr, '\n', c->rbytes); //找第一个命令的末尾,即换行符
        if (!el) {
            //。。。
            /*
            如果没有找到换行符,则说明读到的数据还不足以成为一个完整的命令,
            返回0
            */
            return 0;
        }
        cont = el + 1; //下一个命令的开头 
        /*
        下面这个if的作用是把el指向当前命令最后一个有效字符的下一个字符,即\r
        目的是为了在命令后面插上一个\0,字符串结束符。
        例如 GET abc\r\n******,变成GET abc\0\n*****,这样以后读出的字符串就是一个命令。
        */
        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
            el--;
        }
        *el = '\0';

        c->last_cmd_time = current_time;
        process_command(c, c->rcurr); //执行命令。分析详见process_command
        //当前命令执行完之后,把当前指针rcurr指向 下一个命令的开头,并调用rbytes(剩余未处理字节数大小)
        //逻辑上相当于把已处理的命令去掉。
        c->rbytes -= (cont - c->rcurr);
        c->rcurr = cont;
    }
    return 1;
}

上面try_read_command把命令读出(其实只是简单地找出一个完整的命令,在后面加个\0而已)。

在这里插一下memcached的SET命令的协议,或者你可以看memcached/doc/protocol.txt中的说明:

完成一个SET命令,其实需要两行,也就是需要按两次回车换行“\r\n”,第一行叫“命令行”,格式是SET key flags exptime bytes\r\n,如SET name 0 0 5\r\n, 键为name,flags标志位可暂时不管,超时设为0,value的字节长度是4。然后才有第二行叫“数据行”,格式为:value\r\n,例如:calix\r\n。这两行分别敲下去,SET命令才算完成。

所以处理SET命令时上面的try_read_command首先处理的是SET name 0 0 5\r\n这个“命令行”。

看看进入process_command函数如何执行:

/**
这里就是对命令的解析和执行了
(其实准确来说,这里只是执行了命令的一半(例如如果是SET命令,则是“命令行”部分),
然后根据命令类型再次改变conn_state使程序再次进入状态机,完成命令的
另一半工作,后面详说)
command此时的指针值等于conn的rcurr
*/
static void process_command(conn *c, char *command) {
    token_t tokens[MAX_TOKENS];
    size_t ntokens;
    int comm; //命令类型
    c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    if (add_msghdr(c) != 0) {
        out_of_memory(c, "SERVER_ERROR out of memory preparing response");
        return;
    }
    /**
    下面这个tokenize_command是一个词法分析,把command分解成一个个token
    */
    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
    //下面是对上面分解出来的token再进行语法分析,解析命令,下面的comm变量为最终解析出来命令类型
    if (ntokens >= 3 &&
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
        process_get_command(c, tokens, ntokens, false);
    } else if ((ntokens == 6 || ntokens == 7) &&
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
        //add/set/replace/prepend/append为“更新”命令,调用同一个函数执行命令。详见process_update_command定义处
        process_update_command(c, tokens, ntokens, comm, false);
    }
   //。。。  
}

上面的代码可以看出首先我们要对命令进行“解析”,词法语法分析等等(属于编译原理知识,在这不详讲),最终我们的set name 0 0 5\r\n命令会进入process_update_command函数中执行:

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
        out_string(c, "CLIENT_ERROR bad command line format"); //key过长,out_string函数的作用是输出响应,
        //详见out_string定义处
        return;
    }
    key = tokens[KEY_TOKEN].value; //键名
    nkey = tokens[KEY_TOKEN].length; //键长度
    //下面这个if同时把命令相应的参数(如缓存超时时间等)赋值给相应变量:exptime_int等
    if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
           && safe_strtol(tokens[3].value, &exptime_int)
           && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }
    exptime = exptime_int;
    if (exptime < 0)
        exptime = REALTIME_MAXDELTA + 1;
    //在这里执行内存分配工作。详见内存管理篇
    it = item_alloc(key, nkey, flags, realtime(exptime), vlen); 
    ITEM_set_cas(it, req_cas_id);
    c->item = it; //将item指针指向分配的item空间
    c->ritem = ITEM_data(it); //将 ritem 指向 it->data中要存放 value 的空间地址
    c->rlbytes = it->nbytes; //data的大小
    c->cmd = comm; //命令类型
    conn_set_state(c, conn_nread); //继续调用状态机,执行命令的另一半工作。
}

process_update_command函数最终执行了item_alloc为我们要set的数据(称为item)分配了内存。同时,为c对象赋了相应的一些值。

但是其实这里仅仅是为item分配了空间,还没有把value塞进去,因为我们仅仅执行了SET命令的“命令行“部分,根据“命令行”部分的信息分配空间。代码最后一行看到在这里,我们又把c的状态变成了conn_nread,等“数据行”达到,epoll事件触发状态机下一次循环进入conn_nread分支,其实就是完成SET命令的第二部分,读出“数据行”:

 case conn_nread:
            /**
            由process_update_command执行后进入此状态,process_update_command函数只执行了add/set/replace 等命令的一半,
            剩下的一半由这里完成。
            例如如果是上面的set命令,process_update_command只完成了“命令行”部分,分配了item空间,
            但还没有把value塞到对应的 item中去。因此,在这一半要完成的动作就是把value的数据从socket中读出来,
            塞到刚拿到的item空间中去
            */
            /*
            下面的rlbytes字段表示要读的“value数据”还剩下多少字节 (注意与"rbytes"的区别)
            如果是第一次由process_update_command进入到此,rlbytes此时在process_update_command中被初始化为item->nbytes, 
            即value的总字节数,SET name 0 0 5\r\n中的5。
            */
            if (c->rlbytes == 0) {
                /**
                注意rlbytes为0才读完,否则状态机一直会进来这个conn_nread分支继续读value数据,
                读完就调用complete_nread完成收尾工作,程序会跟着complete_nread进入下一个
                状态。所以执行完complete_nread会break; 
                */
                complete_nread(c);
                break;
            }
            //如果还有数据没读完,继续往下执行。可知,下面的动作就是继续从buffer中读value数据往item中的data的value位置塞。

            if (c->rbytes > 0) { 
                /**
                 进入到这个if,是因为有可能先前读到的buffer已经有“数据行”部分,因为一次事件通知,
                 不保证socket可读数据只有一个\r\n。
               */
                /**
                取rbytes与rlbytes中最小的值。
                为啥?
                因为这里我们的目的是剩下的还没读的value的字节,而rlbytes代表的是还剩下的字节数
                如果rlbytes比rbytes小,只读rlbytes长度就够了,rbytes中多出来的部分不是我们这个时候想要的
                如果rbytes比rlbytes小,即使你要rlbytes这么多,但buffer中没有这么多给你读。
                */
                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
                if (c->ritem != c->rcurr) {
                    memmove(c->ritem, c->rcurr, tocopy); //往分配的item中塞,即为key设置value的过程
                }
                c->ritem += tocopy;
                c->rlbytes -= tocopy;
                c->rcurr += tocopy;
                c->rbytes -= tocopy;
                if (c->rlbytes == 0) {
                    break;
                }
            }
            //这里往往是我们先前读到buffer的数据还没足够的情况下,从socket中读。
            res = read(c->sfd, c->ritem, c->rlbytes);//往分配的item中塞,即为key设置value的过程
            if (res > 0) {
                if (c->rcurr == c->ritem) {
                    c->rcurr += res;
                }
                c->ritem += res;
                c->rlbytes -= res;
                break;
            }

上面主要通过这一行 res = read(c->sfd, c->ritem, c->rlbytes); 把value塞到刚分配出来的item空间,完成“数据行”部分的工作,逻辑上就是对key“赋值”。赋值结束后,调用complete_nread做一些收尾的工作,在本篇“状态机”篇只需了解它的作用是向客户端输出命令执行结果(即往socket写“STORED”):

 static void complete_nread(conn *c) {
//。。。
        complete_nread_ascii(c);
//。。。
}

static void complete_nread_ascii(conn *c) {
     ret = store_item(it, comm, c);
     switch (ret)
     {
      case STORED:
          out_string(c, "STORED");
          break;
      //。。。
      }
    //。。。
}

static void out_string(conn *c, const char *str) {
    size_t len;
    c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    add_msghdr(c);
    len = strlen(str);

    memcpy(c->wbuf, str, len);
    memcpy(c->wbuf + len, "\r\n", 2);
    c->wbytes = len + 2;
    c->wcurr = c->wbuf;

    conn_set_state(c, conn_write);
    c->write_and_go = conn_new_cmd;
    return;
}

进入状态机conn_write状态进行输出:

 
        case conn_write:
           //。。。
           /* fall through... */
        case conn_mwrite:
              transmit(c);
           //。。。


static enum transmit_result transmit(conn *c) {
    //。。。
    res = sendmsg(c->sfd, m, 0);
   //。。。
}

最后通过调用sendmsg把我们的”STORED”字符串响应给客户端。

附上 处理 SET 命令状态机的状态转换图:

drive_machine

本文中我们分析了memcached是怎么利用状态机的方式对请求进行解析和处理,以及SET命令的代码实现细节。而在执行SET命令的时候,我们知道会调用item_alloc函数给数据分配空间,而到底item_alloc背后是怎么实现的?请看下一篇:《Memcached源码分析之内存管理》

转载请注明:Calix » Memcached源码分析之请求处理(状态机)

喜欢 (14)or分享 (0)
发表我的评论
取消评论
表情

亲~ 写下昵称哦~

  • 昵称 (必填)
  • 网址
(2)个小伙伴在吐槽
  1. 不错,唯一就你的文章让我读懂了状态机;(⊙﹏⊙)b感谢
    MT2016-08-24 23:35 回复