`

memcache的线程模型

阅读更多

MC采用一master多worker的工作模型,由master负责accept客户端请求,然后以RR分发给worker;
-t 线程数,用于处理请求,默认为4
-b backlog队列长度,默认1024

线程结构体
typedef struct { 
    pthread_t thread_id;        /* unique ID of this thread */ 
    struct event_base *base;    /* libevent handle this thread uses */ 
    struct event notify_event;  /* listen event for notify pipe */ 
    int notify_receive_fd;      /* receiving end of notify pipe */ 
    int notify_send_fd;         /* sending end of notify pipe */ 
    CQ  new_conn_queue;         /* queue of new connections to handle */ 
} LIBEVENT_THREAD; 
每个线程都包含一个CQ队列,一条通知管道pipe 和一个libevent的实例event_base ;
当master接受新连接后,通过pipe通知worker;
无论是主线程还是workers线程全部通过libevent管理网络事件,实际上每个线程都是一个单独的libevent实例 ;

启动流程
 

 

 
 


代码实现
master调用accept等待客户端连接(accept为非阻塞调用),返回的sfd也同样被设置为非阻塞,之后分发给worker;
下面看看dispatch_conn_new内部是如何进行链接分发的。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, 
                       int read_buffer_size, enum network_transport transport) { 
    CQ_ITEM *item = cqi_new();//创建一个conn_item
    char buf[1]; 
    int tid = (last_thread + 1) % settings.num_threads;//通过round-robin算法选择一个线程

    LIBEVENT_THREAD *thread = threads + tid;//thread数组存储了所有的工作线程 

    cq_push(thread->new_conn_queue, item);//投递item信息到Worker线程的工作队列中 

    buf[0] = 'c'; 
    //在Worker线程的notify_send_fd写入字符c,表示有连接    
    if (write(thread->notify_send_fd, buf, 1) != 1) { 
        perror("Writing to thread notify pipe"); 
    }

worker thread在管道描述符notify_send_fd的回调函数为thread_libevent_process,故一旦有数据到达立刻执行
//子线程会在PIPE管道读上面建立libevent事件,事件回调函数是thread_libevent_process
static void thread_libevent_process(int fd, short which, void *arg) { 
    LIBEVENT_THREAD *me = arg; 
    CQ_ITEM *item; 
    char buf[1]; 
 
    if (read(fd, buf, 1) != 1)//PIPE管道读取一个字节的数据 
        if (settings.verbose > 0) 
            fprintf(stderr, "Can't read from libevent pipe\n"); 
 
    switch (buf[0]) { 
    case 'c'://如果是c,则处理网络连接 
    item = cq_pop(me->new_conn_queue);//从连接队列读出Master线程投递的消息 \

    if (NULL != item) { 
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags, 
                           item->read_buffer_size, item->transport, me->base);//创建连接

conn_new里面会建立sfd的网络监听libevent事件,事件回调函数为event_handler,而event_handler的执行流程最终会进入到业务处理的状态机中。
conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, 
        const int read_buffer_size, enum network_transport transport, 
        struct event_base *base) 

    conn *c = conn_from_freelist();//获取一个空闲连接,conn是Memcached内部对网络连接的一个封装 
 
    if (NULL == c)//如果没有空闲的连接 
    { 
        if (!(c = (conn *) calloc(1, sizeof(conn))))//申请空间 
        { 
            fprintf(stderr, "calloc()\n"); 
            return NULL; 
        }MEMCACHED_CONN_CREATE(c);

//每个conn都自带读入和输出缓冲区,在进行网络收发数据时,特别方便 
        c->rbuf = (char *) malloc((size_t) c->rsize); 
        c->wbuf = (char *) malloc((size_t) c->wsize); 
        c->ilist = (item **) malloc(sizeof(item *) * c->isize); 

 //建立sfd描述符上面的event事件,事件回调函数为event_handler 
    event_set(&c->event, sfd, event_flags, event_handler, (void *) c); 
    event_base_set(base, &c->event); 
    c->ev_flags = event_flags; 
    if (event_add(&c->event, 0) == -1) 
    {        
       //如果建立libevent事件失败,将创建的conn添加到空闲列表中 
       if (conn_add_to_freelist(c)) 
        { 
            conn_free(c); 
        } 
        perror("event_add"); 
        return NULL; 
    } 

//libevent事件回调函数的处理,回调函数被调用时,表明Memcached监听的端口号有网络事件到了 
void event_handler(const int fd, const short which, void *arg) 

conn *c; 
//进入业务处理状态机 
drive_machine(c); 
 


线程通信
master和worker通过连接队列进行单向通信,即Master接受到新的客户端连接时,将sfd封装到conn_queue_item并投送给woker,Worker从其连接队列中读取conn_queue_item同客户端连接进行交互,详情参见dispatch_conn_new;
struct conn_queue_item { 
    int               sfd;//accept之后的描述符 
    enum conn_states  init_state;//连接的初始状态 
    int               event_flags;//libevent标志 
    int               read_buffer_size;//读取数据缓冲区大小 
    enum network_transport     transport;//内部通信所用的协议 
    CQ_ITEM          *next;//用于实现链表的指针 
}; 

struct conn_queue { 
    CQ_ITEM *head;//头指针,注意这里是单链表,不是双向链表 
    CQ_ITEM *tail;//尾部指针, 
    pthread_mutex_t lock;//锁 
    pthread_cond_t  cond;//条件变量 
}; 

//获取一个连接 
static CQ_ITEM *cq_pop(CQ *cq) { 
    CQ_ITEM *item; 
 
    pthread_mutex_lock(&cq->lock);//执行加锁操作 
    item = cq->head;//获得头部指针指向的数据 
    if (NULL != item) { 
        cq->head = item->next;//更新头指针信息 
        if (NULL == cq->head)//这里为空的话,则尾指针也为空,链表此时为空 
            cq->tail = NULL; 
    }
    pthread_mutex_unlock(&cq->lock);//释放锁操作 
 
    return item; 

  • 大小: 30.1 KB
  • 大小: 61.2 KB
分享到:
评论

相关推荐

    memcache多线程将ORACLE数据流生成扫描件

    NULL 博文链接:https://gaochunhu.iteye.com/blog/2325434

    memcache图形监控工具phpmemcache

    memcache图形监控工具phpmemcache,尽是一个PHP文件就可以实现对memcache的监控。 使用方法:本地测试监控机安装Apache或者下载XAMPP(Apache+MySQL+PHP+PERL),安装后把memcachephp.zip中的memcache.php文件放到...

    memcache1.2.1 for windows

    windows下memcache安装包 附带php扩展包

    memcache安装包,memcache

    memcache是一套分布式的高速缓存系统,由LiveJournal的Brad Fitzpatrick开发,但目前被许多网站使用以提升网站的访问速度,尤其对于一些大型的、需要频繁访问数据库的网站访问。

    最新windows版php_memcache.dll和memcache.exe

    最新windows的memcache模块下载 这个模块是平和php5.3的,在我的windowsxp php5.3.5上安装成功 里面有两个php库,一个php_memcache.dll.vc6 和一个php_memcache.dll.vc9 另外一个windows的memcache.exe文件,都是网上...

    Memcache完全剖析 最实用的Memcache文档

    Memcache就不用多介绍了,做开发的人都知道。 但要用得好,却并不是那么容易的事。 如果用得不好,反而得不偿失。 这篇文档短小精悍,囊括了使用过程中需要要注意的方方面面。值得一读。

    MemCache开发说明文档

    Memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从...

    C语言memcache代码文档

    C语言memcache代码文档C语言memcache代码文档C语言memcache代码文档C语言memcache代码文档C语言memcache代码文档C语言memcache代码文档C语言memcache代码文档

    delphi memcache MemCache.0.2.0.zip

    MemCache.0.2.0.zip Memcached Client for Delphi 客户端调用类 MemCache.0.2.0.zip Show all LinksExternal links Memcached Project This project is a delphi unit which implements a thread safe client for ...

    php5.3的memcache

    memcache用于PHP5.3非线程版本,好些都是单独版本,全版本的不好找喔。

    memcache

    将信息保持memcache中

    Memcache win32

    windows memcache 安装服务,php_memcache.dll所有版本扩展dll 安装说明 在命令行下安装Memcache,输入 ‘c:/memcached/memcached.exe -d install’。 3.启动Memcache,再输入: ‘c:/memcached/memcached.exe -d ...

    memcache-3.0.8.tgz

    php的memcache扩展,linux下的,php的memcache扩展分为两种,一种是memcache,一种是基于libmemcached的memcached,这个是memcache版本的beta版本

    memcache for linux

    linux平台使用的memcache压缩包,解压缩之后运行make && make install安装, 然后/usr/local/memcache/bin/memcache -d -m 1024 -u root -p 11211 -c 1024命令运行memcache

    php_memcache 服务扩展

    $memcache = new Memcache; $memcache->connect("localhost",11211); echo "Server's version: " . $memcache->getVersion() . "\n"; $tmp_object = new stdClass; $tmp_object->str_attr = "test"; $tmp_object->...

    Memcache win版 服务器和.net驱动

    win版的memcache,包括.net的驱动

    简单的memcache命令

    简单的memcache命令

    PHP5.5/5.6的 32&63 VC11 Memcache扩展php_memcache.dll

    PHP 添加 Memcache 扩展 : 下载包中包括如下: php_memcache-3.0.8-5.5-nts-vc11-x64.zip php_memcache-3.0.8-5.5-nts-vc11-x86.zip php_memcache-3.0.8-5.5-ts-vc11-x64.zip ...

    PHP7.x 8.0 memcache dll php_memcache.dll

    php_memcache.dll

    Erlang_Memcache.pdf

    Erlang_Memcache

Global site tag (gtag.js) - Google Analytics