`

Redis代码阅读3--Redis网络监听(3)

阅读更多

是介绍Redis网络监听的最后一篇文章,着重分析定时时间处理函数serverCron,这个函数其实已经和网络监听没多大关系了,当时因为其绑定在Redis自定义的事件库的定时事件上,所以放到一起来讲。serverCron的这个函数对Redis的正常运行来说很重要,对于Redis的使用者来说,最重要的就是能够迅速直观地看到Redis的当前的运行状况(keys,sizes,memory等),serverCron就能够使用户得知这些信息,此外,serverCron这个方法定时周期地运行,还承担了AOF Write,VM Swap,BGSAVE,Rehash的操作,使得Redis的运行更加平稳。还是来直接通过代码来分析:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    int j, loops = server.cronloops;
    REDIS_NOTUSED(eventLoop);
    REDIS_NOTUSED(id);
    REDIS_NOTUSED(clientData);

    /* We take a cached value of the unix time in the global state because
     * with virtual memory and aging there is to store the current time
     * in objects at every object access, and accuracy is not needed.
     * To access a global var is faster than calling time(NULL) */
    server.unixtime = time(NULL);
    /* We have just 22 bits per object for LRU information.
     * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
     * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
     *
     * Note that even if this will wrap after 1.5 years it's not a problem,
     * everything will still work but just some object will appear younger
     * to Redis. But for this to happen a given object should never be touched
     * for 1.5 years.
     *
     * Note that you can change the resolution altering the
     * REDIS_LRU_CLOCK_RESOLUTION define.
     */
    updateLRUClock();

    /* We received a SIGTERM, shutting down here in a safe way, as it is
     * not ok doing so inside the signal handler. */
    if (server.shutdown_asap) {
        if (prepareForShutdown() == REDIS_OK) exit(0);
        redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
    }

    /* Show some info about non-empty databases */
    for (j = 0; j < server.dbnum; j++) {
        long long size, used, vkeys;

        size = dictSlots(server.db[j].dict);
        used = dictSize(server.db[j].dict);
        vkeys = dictSize(server.db[j].expires);
        if (!(loops % 50) && (used || vkeys)) {
            redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
            /* dictPrintStats(server.dict); */
        }
    }

    /* We don't want to resize the hash tables while a bacground saving
     * is in progress: the saving child is created using fork() that is
     * implemented with a copy-on-write semantic in most modern systems, so
     * if we resize the HT while there is the saving child at work actually
     * a lot of memory movements in the parent will cause a lot of pages
     * copied. */
    if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) {
        if (!(loops % 10)) tryResizeHashTables();
        if (server.activerehashing) incrementallyRehash();
    }

    /* Show information about connected clients */
    if (!(loops % 50)) {
        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
            listLength(server.clients)-listLength(server.slaves),
            listLength(server.slaves),
            zmalloc_used_memory());
    }

    /* Close connections of timedout clients */
    if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
        closeTimedoutClients();

    /* Check if a background saving or AOF rewrite in progress terminated */
    if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            if (pid == server.bgsavechildpid) {
                backgroundSaveDoneHandler(statloc);
            } else {
                backgroundRewriteDoneHandler(statloc);
            }
            updateDictResizePolicy();
        }
    } else {
        /* If there is not a background saving in progress check if
         * we have to save now */
         time_t now = time(NULL);
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            if (server.dirty >= sp->changes &&
                now-server.lastsave > sp->seconds) {
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, sp->seconds);
                rdbSaveBackground(server.dbfilename);
                break;
            }
         }
    }

    /* Expire a few keys per cycle, only if this is a master.
     * On slaves we wait for DEL operations synthesized by the master
     * in order to guarantee a strict consistency. */
    if (server.masterhost == NULL) activeExpireCycle();

    /* Swap a few keys on disk if we are over the memory limit and VM
     * is enbled. Try to free objects from the free list first. */
    if (vmCanSwapOut()) {
        while (server.vm_enabled && zmalloc_used_memory() >
                server.vm_max_memory)
        {
            int retval = (server.vm_max_threads == 0) ?
                        vmSwapOneObjectBlocking() :
                        vmSwapOneObjectThreaded();
            if (retval == REDIS_ERR && !(loops % 300) &&
                zmalloc_used_memory() >
                (server.vm_max_memory+server.vm_max_memory/10))
            {
                redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
            }
            /* Note that when using threade I/O we free just one object,
             * because anyway when the I/O thread in charge to swap this
             * object out will finish, the handler of completed jobs
             * will try to swap more objects if we are still out of memory. */
            if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
        }
    }

    /* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. */
    if (!(loops % 10)) replicationCron();

    server.cronloops++;
    return 100;
}

/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
    REDIS_NOTUSED(eventLoop);
    listNode *ln;
    redisClient *c;

    /* Awake clients that got all the swapped keys they requested */
    if (server.vm_enabled && listLength(server.io_ready_clients)) {
        listIter li;

        listRewind(server.io_ready_clients,&li);
        while((ln = listNext(&li))) {
            c = ln->value;
            struct redisCommand *cmd;

            /* Resume the client. */
            listDelNode(server.io_ready_clients,ln);
            c->flags &= (~REDIS_IO_WAIT);
            server.vm_blocked_clients--;
            aeCreateFileEvent(server.el, c->fd, AE_READABLE,
                readQueryFromClient, c);
            cmd = lookupCommand(c->argv[0]->ptr);
            redisAssert(cmd != NULL);
            call(c,cmd);
            resetClient(c);
            /* There may be more data to process in the input buffer. */
            if (c->querybuf && sdslen(c->querybuf) > 0)
                processInputBuffer(c);
        }
    }

    /* Try to process pending commands for clients that were just unblocked. */
    while (listLength(server.unblocked_clients)) {
        ln = listFirst(server.unblocked_clients);
        redisAssert(ln != NULL);
        c = ln->value;
        listDelNode(server.unblocked_clients,ln);
        c->flags &= ~REDIS_UNBLOCKED;

        /* Process remaining data in the input buffer. */
        if (c->querybuf && sdslen(c->querybuf) > 0)
            processInputBuffer(c);
    }

    /* Write the AOF buffer on disk */
    flushAppendOnlyFile();
}

 i.    首先将server.cronloops的值赋给loops,server.cronloops指的是serverCron函数的运行次数,每运行一次serverCron函数,server.cronloops++,server.cronloops的内部执行逻辑随着server.cronloops值的不同而改变;
ii.    用server.unixtime = time(NULL)来保存当前时间,因为在virtual memory and aging的时候,需要知道每次Object的access时间,但是这个时间不需要很精确,所以通过全局变量来获取时间比调用time(NULL)快多了;
iii.    记录Redis的最大内存使用量;如果收到了SIGTERM信号,则试图终止Redis
iv.    serverCron方法每运行50次显示Redis内各个非空的DB的使用情况(used,keys,sizes)及当前连接的clients,使用的内存大小;
v.    serverCron方法每运行10次,将试图进行一次Rehash,如果一个a bacground saving正在进行,则不进行rehash,以免造成部分数据丢失;
vi.    关闭timeout的clients;
vii.    如果在执行BGSAVE期间,client执行了bgrewriteaof这个命令,则在serverCron将开始执行a scheduled AOF rewrite
viii.    如果当前Redis正在进行BGSAVE或者AOF rewrite,则check BGSAVE或者AOF rewrite是否已经终止,如果终止则调用相应的函数处理(backgroundSaveDoneHandler/backgroundRewriteDoneHandler),如果当前没有BGSAVE或者AOF rewrite操作,则判断是否进行此类操作,如果需要,则触发此类操作;
ix.    如果有AOF buffer flush操作被暂停了,则每次调用serverCron的时候,恢复AOF buffer flush操作
x.    如果是Master,则周期性地使某些key(随即挑选的)过期,注意这个操作仅仅只针对Master,如果是slaves,则只有通过master的del操作来同步key,以做到强一致性;
xi.    VM的Swap操作
xii.    每运行10次,进行replicationCron,如果存在slaves的话
xiii.    返回100,表示serverCron方法每100毫秒被调用一次,这一点在processTimeEvent这个方法里得以体现:

if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                aeDeleteTimeEvent(eventLoop, id);
            }
            

通过上面的分析,ServerCron侧重在Rehash,VM Swap, AOF write,BGSAVE等操作,而这些操作都是耗时,而且影响Redis对Clients的响应速度的,因此我们在实际应用的时候可以根据具体情况通过改变类似这样的操作:”loops % 10“来决定上述耗时操作的执行频率,有空我会测试下在不同频率下,redis在压力测试下的性能。

 

此次, Redis的网络监听部分都介绍完了。再回过头来看前面提到的几个问题:

1.         Redis支持 epoll select kquque,,通过配置文件来决定采取哪一种

2.         支持文件读写事件和定时事件

3.         采用数组来维护文件事件,链表来保存定时事件(在查找定时事件时,性能不高,有待提高)

4.         Redis Server单线程响应事件,按照先后顺序来响应事件,因此单台 Redis服务器的吞吐量会随着连接的 clients越来越多而下降,可以通过增加更多的 Redis服务器来解决这个问题

5.         Redis在很多代码里都考虑到了尽快地响应各种事件,如在 aeProcessEvent里面,轮询的 wait时间等于当前时间和最近的定时事件响应时间的差值;每次进入轮询 wait之前,在 beforesleep方法里先响应刚刚 unblock clients等。

 

分享到:
评论

相关推荐

    Redis集群下过期key监听的实现代码

    1. 前言 在使用redis集群时,发现...关于Redis集群配置代码此处不贴,直接贴配置监听类代码! redis.host1: 10.113.56.68 redis.port1: 7030 redis.host2: 10.113.56.68 redis.port2: 7031 redis.host3: 10.113.56.6

    逆向工程极速搭建SSM-Redis-activeMQ-Quartz整合项目

    一键生成webXml、initXml、contextXml等系统配置文件 配置控制生成输出文件(listenter、filter、quartz、activeMQ、redis、webXml、initXml、contextXml) 项目可统计在线人数,无操作超时退出,监听网络请求,...

    监听redis过期key,做对应业务处理

    1.springboot集成redis,并监听redis过期key做相应的业务处理

    监听redis的key的变化

    监听redis的key的变化

    docker-redis-cluster:一个 Redis 集群 Docker 镜像

    它将启动 6 个 Redis 服务器,监听7000~7005端口和一个主管以确保所有服务器都启动。 在所有服务器启动后,redis-trib 将创建一个 Redis 集群。 docker run -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -...

    redis过期监听.docx

    php+redis 键值过期自动监听 可以用作定时任务 当redis键值过期的时候就会触发回调方法,然后执行自己的程序,比如30分钟订单自动取消

    redis主从模式与哨兵模式例子

    # 哨兵监听的主服务器 sentinel myid 09a71e001825114de399e0b6214c4b6f5449ab3a # 3秒内mymaster没响应,则认为mymaster宕机 sentinel deny-scripts-reconfig yes # 如果10秒后,mymaster仍没启动过来,则启动...

    flycache:一致性哈希的redis分片存储 (Redis sharding by consistent-hashing)

    功能弹性伸缩,提供控制面板,管理员可以增加和删除Redis节点Redis运行状态监控,报警Redis故障或者网络故障的灾难应对原理consistent-hashing一致性hashzookeeper保持一致性和监听文章测试如果您愿意捐助一下项目,...

    springboot2.7.9+Redis6.2.5实现订单超时处理+数据超时+订单超时监听

    核心技术:Redis开启过期监听 springboot整合redis,springboot配置redis监听。 案例中通过访问addOrder方法实现向mysql数据库添加一条数据并在redis中设置该条数据的有效期为10秒,10秒过后触发redis监听,在监听中...

    redis-pubsub-websocket-server:基于websocket的redis发布订阅协议服务器

    WebSocket服务器上的Redis介绍基于websocket的redis发布订阅协议服务器此服务在localhost上监听2000扩展软件架构服务端示例客户端示例安装教程1.需要先安装redis redis默认监听6379扩展2.安装依赖yarn install使用...

    解决Spring session(redis存储方式)监听导致创建大量redisMessageListenerContailner-X线程问题

    主要介绍了解决Spring session(redis存储方式)监听导致创建大量redisMessageListenerContailner-X线程问题,需要的朋友可以参考下

    mac下redis安装、设置、启动停止方法详解

    需要下载release版本,下载地址: http://download.redis.io/releases/ 我这里下载的是: ...解压到/usr/local/redis目录中,然后...如上图,redis-server启动成功,并监听6379端口。 常用命令说明 redis-server redis

    redis-rs:Redis库防锈

    修订版 Redis-rs是Rust的高级Redis库。 它通过非常灵活但底层的API,提供对所有Redis功能的便捷访问。 它使用可自定义的类型转换特征,以便任何操作都可以返回所需类型的结果。 这带来了非常愉快的开发经验。 该板条...

    Redis 配置文件1

    1. Redis默认不是以守护进程的方式运行,可以通过该配置项修改,使 3. 指定Redis监听端口,默认端口为6379,作者在自己的一篇博文中解 4. 绑定的

    laravel-redis-socket:使用laravel,redis和socket.io在没有数据库的情况下向用户发送实时通知

    laravel-redis-socket 使用laravel,redis和socket.io在没有数据库的情况下向用户发送实时通知 首先,克隆后,您必须创建一个虚拟主机。 安装套件 npm install express ioredis socket.io --save composer require...

    spring boot+redis 监听过期Key的操作方法

    主要介绍了spring boot+redis 监听过期Key,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

    redis安装配置.docx

    redis安装配置 安装和配置Redis通常需要一系列步骤,以下是一般的指南: ### 1. 下载Redis 访问[Redis官方网站](https://redis.io/download)下载最新版本的Redis。你可以选择下载源码并手动编译,或者下载预编译...

    Redis实现系统监控

    为了收集数据进行监控和分析,请构建一个能持续创建并维护计数器的工具,该工具可以任意添加和删除计算器,每个计数器可以针对不同的指标数据,并可以以不同时间精度存储最新的120个数据样本,请实现如下功能: ...

    redis架构分析.docx

    一、前言 因为近期项目中开始使用Redis,为了更好的理解Redis并应用在适合的业务场景,需要对Redis设计与实现深入的理解。...5.网络监听服务启动前的准备工作 6.开启事件监听,开始接受客户端的请求

Global site tag (gtag.js) - Google Analytics