`
wsql
  • 浏览: 11872705 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

nginx 源码学习笔记(十七)—— ngx_worker_process_cycle子进程执行

 
阅读更多

上一节主要讲解主进程如何开启子进程,并且讲解了主进程做的一些操作,这一节主要学习子进程处理函数ngx_worker_process_cycle

src/os/unix/ngx_process_cycle.c

static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
    ngx_uint_t         i;
    ngx_connection_t  *c;
    
    //在master中,ngx_process被设置为NGX_PROCESS_MASTER
    ngx_process = NGX_PROCESS_WORKER;
    
    //初始化
    ngx_worker_process_init(cycle, 1);

    ngx_setproctitle("worker process");

#if (NGX_THREADS)
    //暂不讲解线程代码
#endif

    for ( ;; ) {
        //如果进程退出,关闭所有连接
        if (ngx_exiting) {

            c = cycle->connections;

            for (i = 0; i < cycle->connection_n; i++) {

                /* THREAD: lock */

                if (c[i].fd != -1 && c[i].idle) {
                    c[i].close = 1;
                    c[i].read->handler(c[i].read);
                }
            }

            if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)
            {
                ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");

                ngx_worker_process_exit(cycle);
            }
        }

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
        
        //处理时间和计时
        ngx_process_events_and_timers(cycle);
        
        //收到NGX_CMD_TERMINATE命令
        if (ngx_terminate) {
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
            
            //清理后进程退出,会调用所有模块的钩子exit_process
            ngx_worker_process_exit(cycle);
        }
        
        //收到NGX_CMD_QUIT命令
        if (ngx_quit) {
            ngx_quit = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
                          "gracefully shutting down");
            ngx_setproctitle("worker process is shutting down");
            //如果进程没有"正在退出"
            if (!ngx_exiting) {
                //关闭监听socket,设置退出正在状态
                ngx_close_listening_sockets(cycle);
                ngx_exiting = 1;
            }
        }
        //收到NGX_CMD_REOPEN命令,重新打开log
        if (ngx_reopen) {
            ngx_reopen = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
            ngx_reopen_files(cycle, -1);
        }
    }
}


下面来看一下子进程的初始化操作:

1.全局性的设置,根据全局的配置信息设置执行环境、优先级、限制、setgid、setuid、信号初始化等;

2.调用所有模块的钩子init_process;

src/os/unix/ngx_process_cycle.c

for (i = 0; ngx_modules[i]; i++) {
    if (ngx_modules[i]->init_process) {
        if (ngx_modules[i]->init_process(cycle) == NGX_ERROR) {
            /* fatal */
            exit(2);
        }
    }
}


3.关闭不使用的socket,关闭当前worker的channel[0]句柄和其他worker的channel[1]句柄,当前worker会使用其他worker的channel[0]句柄发送消息,使用当前worker的channel[1]句柄监听可读事件:

src/os/unix/ngx_process_cycle.c

for (n = 0; n < ngx_last_process; n++) {
    //遍历所有的worker进程
    if (ngx_processes[n].pid == -1) {
        continue;
    }
    if (n == ngx_process_slot) {
        如果是自己
        continue;
    }

    if (ngx_processes[n].channel[1] == -1) {
        continue;
    }
    //关闭所有其他worker进程channel[1]句柄(用于监听)
    if (close(ngx_processes[n].channel[1]) == -1) {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "close() channel failed");
    }
}
//关闭自己的channel[0]句柄(用于发送信息)
if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
    ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                  "close() channel failed");
}
//这也就是为什么,用其他worker的channel[0]句柄发消息
//用当前的worker的channel[1]句柄监听可读时间


4.在当前worker的channel[1]句柄监听可读事件:

if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
                          ngx_channel_handler)
    == NGX_ERROR)
{
    /* fatal */
    exit(2);
}


ngx_add_channel_event把句柄ngx_channel(当前worker的channel[1])上建立的连接的可读事件加入事件监控队列,事件处理函数为ngx_channel_hanlder(ngx_event_t *ev)。当有可读事件的时候,ngx_channel_handler负责处理消息,具体代码可以查看src/os/unix/ngx_channel.c,过程如下:

src/os/unix/ngx_process_cycle.c

static void
ngx_channel_handler(ngx_event_t *ev)
{
    ngx_int_t          n;
    ngx_channel_t      ch;
    ngx_connection_t  *c;

    if (ev->timedout) {
        ev->timedout = 0;
        return;
    }

    c = ev->data;

    ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "channel handler");

    for ( ;; ) {
        //从channel[1]中读取消息
        n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);

        ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "channel: %i", n);

        if (n == NGX_ERROR) {

            if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
                ngx_del_conn(c, 0);
            }

            ngx_close_connection(c);
            return;
        }

        if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {
            if (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR) {
                return;
            }
        }

        if (n == NGX_AGAIN) {
            return;
        }

        ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
                       "channel command: %d", ch.command);
        //处理消息命令
        switch (ch.command) {

        case NGX_CMD_QUIT:
            ngx_quit = 1;
            break;

        case NGX_CMD_TERMINATE:
            ngx_terminate = 1;
            break;

        case NGX_CMD_REOPEN:
            ngx_reopen = 1;
            break;

        case NGX_CMD_OPEN_CHANNEL:

            ngx_log_debug3(NGX_LOG_DEBUG_CORE, ev->log, 0,
                           "get channel s:%i pid:%P fd:%d",
                           ch.slot, ch.pid, ch.fd);

            ngx_processes[ch.slot].pid = ch.pid;
            ngx_processes[ch.slot].channel[0] = ch.fd;
            break;

        case NGX_CMD_CLOSE_CHANNEL:

            ngx_log_debug4(NGX_LOG_DEBUG_CORE, ev->log, 0,
                           "close channel s:%i pid:%P our:%P fd:%d",
                           ch.slot, ch.pid, ngx_processes[ch.slot].pid,
                           ngx_processes[ch.slot].channel[0]);

            if (close(ngx_processes[ch.slot].channel[0]) == -1) {
                ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                              "close() channel failed");
            }

            ngx_processes[ch.slot].channel[0] = -1;
            break;
        }
    }
}


总结:再看总结之前希望给位同学阅读了这一节和上一节的内容和代码:

该总结来自于http://blog.csdn.net/lu_ming/article/details/5151930感谢作者无私奉献

1. ngx_start_worker_processes()函数,这个函数按指定数目n,以ngx_worker_process_cycle()函数为参数调用ngx_spawn_process()创建work进程并初始化相关资源和属性;执行子进程的执行函数ngx_worker_process_cycle;向之前已经创建的所有worker广播当前创建的worker进程的信息;每个进程打开一个通道(ngx_pass_open_channel())。
ngx_start_worker_processes()函数的主要逻辑如下:


a) 首先要说明的是参数respawn有两种含义:type 即创建新进程式的方式,如NGX_PROCESS_RESPAWN, NGX_PROCESS_JUST_RESPAWN…,是负值;另一种表示进程信息表的下标,此时是非负;
b) 查找ngx_processes[]进程信息表中一个空项;
c) 若不是分离的子进程(respawn != NGX_PROCESS_DETACHED),
i) 创建一对已经连接的无名socket;
ii) 设置socket(channel[0]和channel[1]))为非阻塞模式;
iii) 开启channel[0]的消息驱动IO;
iv) 设置channel[0]的属主,控制channel[0]的SIGIO信号只发给这个进程;
v) 设置channel[0]和channel[1]的FD_CLOEXEC属性(进程执行了exec后关闭socket);
vi) 取得用于监听可读事件的socket(即ngx_channel = ngx_processes[s].channel[1];);
d) 分离的子进程时,ngx_processes[s].channel[0] = -1;
ngx_processes[s].channel[1] = -1;
e) 设置当前子进程的进程表项索引;ngx_process_slot = s;
f) 创建子进程;
g) 子进程式中,设置当前子进程的进程id,运行执行函数;
h) respawn >= 0时,直接返回pid;
i) 设置其他的进程表项字段;
ngx_processes[s].proc = proc;
ngx_processes[s].data = data;
ngx_processes[s].name = name;
ngx_processes[s].exiting = 0;
j) 根据respawn表示的类型,按不同方式设置表项。

创建socketpair用于进程间通信,master进程为每个worker创建一对socket, master进程空间打开所有socketpair的channel[0],channel[1]两端句柄。 当创建一个worker的时候,这个worker会继承master当前已经创建并打开的所有socketpair,这个worker初始化的时候(调用ngx_worker_process_init)会关闭掉本进程对应socketpair的channel[0]和其他worker对应的channel[1],保持打开本进程对应socketpair的channel[1]和其他worker对应的channel[0],并监听本进程对应socketpair的channel[1]的可读事件。这样,每个worker就拥有了其他worker的channel[0],可以sendmsg(channel[0], ...)向其他worker发送消息。

先于当前worker创建的worker通过继承得到了其channel[0],但是之后创建的进程的channel[0]该如何获得呢,答案在上面(ngx_start_worker_processes) master在创建并启动完成一个worker之后,会调用ngx_pass_open_channel 把这个worker的channel[0]和进程id、在进程表中的偏移slot广播(ngx_write_channel())给所有其他已经创建的worker,这样,创建完所有进程之后,每个worker就获得了所有其他worker的channel[0]了。

ngx_worker_process_cycle()函数是每个进程的实际工作内容。这个函数中首先调用ngx_create_thread()初始化各线程。我们知道每个线程都有一个启动处理函数,nginx的线程处理函数为ngx_worker_thread_cycle(),内部过程中最重要的是对ngx_event_thread_process_posted()函数的调用,用于实际处理每一次请求。

初始化线程结束后,首先调用ngx_process_events_and_timers()函数,该函数继续调用ngx_process_events接口监听事件,一般情况下对应的函数是ngx_epoll_process_events(),如果使用的是其它种 类的IO模型,则应该实现相应的实际函数。这个接口负责把事件投递到ngx_posted_events事件队列里,并在 ngx_event_thread_process_posted()函数中进行处理。

2. ngx_start_cache_manager_processes()函数,这个函数在ngx_cycle全局对象的path数组中,检查是否有manager和loader函数。有manage函数,创建缓冲管理进程,打开相应的通道,负责管理工作;有loader函数,创建缓冲管理进程,打开相应通道,负责载入工作。

3. ngx_pass_open_channel()函数,这个函数在全局的ngx_processes数组中所有pid不为-1,channel[0]不为-1的进程打开通道。也就是为创建的每个work进程打开一个通道,发送消息(ngx_write_channel())给工作进程。

4. ngx_signal_worker_processes()函数,这个函数主要是向各个work进程发送一个信号。如果是ngx_signal_value(NGX_REOPEN_SIGNAL)信号,设ngx_processes中的标志exiting为1。

5. ngx_reap_children()函数,这个函数清理要退出的进程。ngx_processes数组中每个项保存对应一个进程的信号,有pid,status,channel等。如果有子进程还没有退出exiting或! detached,则live设为1。

ngx_master_process_exit()函数,这个函数删除pid文件,接着调用模块中的exit_master钩子函数,销毁内存池。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics