`
simohayha
  • 浏览: 1387415 次
  • 性别: Icon_minigender_1
  • 来自: 火星
社区版块
存档分类
最新评论

nginx中的output chain的处理(二)

阅读更多
接着上次的分析继续,这次我们来看filter链中最关键的一个模块,那就是ngx_http_copy_filter_module模块,这个filter主要是用来将一些需要复制的buf(文件或者内存)重新复制一份然后发送给剩余的body filter,这里有个很重要的部分,那就是在这里nginx的剩余的body filter有可能会被调用多次,这个接下来我会一一阐述的。先来看它的初始化函数:

static ngx_int_t
ngx_http_copy_filter_init(ngx_conf_t *cf)
{
    ngx_http_next_filter = ngx_http_top_body_filter;
    ngx_http_top_body_filter = ngx_http_copy_filter;

    return NGX_OK;
}


可以看到,它只有body filter,而没有header filter,也就是说只有body filter才会使用这个filter。

然后这个模块对应也有一个命令,那就是output_buffers,这个命令保存值在它的conf的bufs中:

typedef struct {
    ngx_bufs_t  bufs;
} ngx_http_copy_filter_conf_t;


这里要知道在nginx的配置文件中所有的bufs的格式都是一样,个数+每个的大小。这个值我们接下来分析filter代码的时候会再次看到。

然后来看对应的merge方法,来看这个bufs的默认值是多少。

static char *
ngx_http_copy_filter_merge_conf(ngx_conf_t *cf, void *parent, void *child)
{
    ngx_http_copy_filter_conf_t *prev = parent;
    ngx_http_copy_filter_conf_t *conf = child;

//默认是1个buf,大小为32768字节
    ngx_conf_merge_bufs_value(conf->bufs, prev->bufs, 1, 32768);

    return NULL;
}


由于copy filter没有header filter,因此它的context的初始化也是放在body filter中的,而它的ctx就是ngx_output_chain_ctx_t,为什么名字是output_chain呢,这是因为copy filter的主要逻辑的处理都是放在ngx_output_chain中的,这个模块我们可以看到它是保存在core目录下的,而不是属于http目录的。

接下来我们就来看这个context的结构。

typedef struct {
//保存临时的buf
    ngx_buf_t                   *buf;
//保存了将要发送的chain
    ngx_chain_t                 *in;
//保存了已经发送完毕的chain,以便于重复利用
    ngx_chain_t                 *free;
//保存了还未发送的chain
    ngx_chain_t                 *busy;

//sendfile标记
    unsigned                     sendfile:1;
//directio标记
    unsigned                     directio:1;
#if (NGX_HAVE_ALIGNED_DIRECTIO)
    unsigned                     unaligned:1;
#endif
//是否需要在内存中保存一份(使用sendfile的话,内存中没有文件的拷贝的,而我们有时需要处理文件,此时就需要设置这个标记)
    unsigned                     need_in_memory:1;
//是否存在的buf复制一份,这里不管是存在在内存还是文件,后面会看到这两个标记的区别。
    unsigned                     need_in_temp:1;

    ngx_pool_t                  *pool;
//已经allocated的大小
    ngx_int_t                    allocated;
//对应的bufs的大小,这个值就是我们loc conf中设置的bufs
    ngx_bufs_t                   bufs;
//表示现在处于那个模块(因为upstream也会调用output_chain)
    ngx_buf_tag_t                tag;

//这个值一般是ngx_http_next_filter,也就是继续调用filter链
    ngx_output_chain_filter_pt   output_filter;
//当前filter的上下文,这里也是由于upstream也会调用output_chain
    void                        *filter_ctx;
} ngx_output_chain_ctx_t;



接下来我们来看具体函数的实现,就能更好的理解context中的这些域的意思。

来看copy_filter的body filter。

static ngx_int_t
ngx_http_copy_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    ngx_int_t                     rc;
    ngx_connection_t             *c;
    ngx_output_chain_ctx_t       *ctx;
    ngx_http_copy_filter_conf_t  *conf;

    c = r->connection;

//获取ctx
    ctx = ngx_http_get_module_ctx(r, ngx_http_copy_filter_module);

//如果为空,则说明需要初始化ctx
    if (ctx == NULL) {
        conf = ngx_http_get_module_loc_conf(r, ngx_http_copy_filter_module);

        ctx = ngx_pcalloc(r->pool, sizeof(ngx_output_chain_ctx_t));
        if (ctx == NULL) {
            return NGX_ERROR;
        }

        ngx_http_set_ctx(r, ctx, ngx_http_copy_filter_module);

//设置对应的域
        ctx->sendfile = c->sendfile;
//可以看到如果我们给request设置filter_need_in_memory的话,ctx的这个域就会被设置
        ctx->need_in_memory = r->main_filter_need_in_memory
                              || r->filter_need_in_memory;
//和上面类似
        ctx->need_in_temp = r->filter_need_temporary;

        ctx->pool = r->pool;
        ctx->bufs = conf->bufs;
        ctx->tag = (ngx_buf_tag_t) &ngx_http_copy_filter_module;
//可以看到output_filter就是body filter的next
        ctx->output_filter = (ngx_output_chain_filter_pt) ngx_http_next_filter;
//此时filter ctx为当前的请求
        ctx->filter_ctx = r;

        r->request_output = 1;
    }
//最关键的函数,下面会详细分析。
    rc = ngx_output_chain(ctx, in);
......................................................

    return rc;
}


然后就是ngx_output_chain这个函数了,这里nginx filter的主要逻辑都在这个函数里面。下面就是这个函数的原型。

ngx_int_t
ngx_output_chain(ngx_output_chain_ctx_t *ctx, ngx_chain_t *in)


然后我们来分段看它的代码,下面这段代码可以说是一个short path,也就是说我们能直接确定所有的in chain都不需要复制的时候,我们就可以直接调用output_filter来交给剩下的filter去处理。

  
 if (ctx->in == NULL && ctx->busy == NULL) {

//下面的注释解释的很详细
        /*
         * the short path for the case when the ctx->in and ctx->busy chains
         * are empty, the incoming chain is empty too or has the single buf
         * that does not require the copy
         */

        if (in == NULL) {
            return ctx->output_filter(ctx->filter_ctx, in);
        }

//这里说明只有一个chain,并且它的buf不需要复制
        if (in->next == NULL
#if (NGX_SENDFILE_LIMIT)
            && !(in->buf->in_file && in->buf->file_last > NGX_SENDFILE_LIMIT)
#endif
            && ngx_output_chain_as_is(ctx, in->buf))
        {
            return ctx->output_filter(ctx->filter_ctx, in);
        }
    }


上面我们看到了一个函数 ngx_output_chain_as_is,这个函数很关键,下面还会再次被调用,这个函数主要用来判断是否需要复制buf。返回1,表示不需要拷贝,否则为需要拷贝
static ngx_inline ngx_int_t
ngx_output_chain_as_is(ngx_output_chain_ctx_t *ctx, ngx_buf_t *buf)
{
    ngx_uint_t  sendfile;

//是否为specialbuf,是的话返回1,也就是不用拷贝
    if (ngx_buf_special(buf)) {
        return 1;
    }

//如果buf在文件中,并且使用了directio的话,需要拷贝buf
    if (buf->in_file && buf->file->directio) {
        return 0;
    }

//sendfile标记
    sendfile = ctx->sendfile;

#if (NGX_SENDFILE_LIMIT)
//如果pos大于sendfile的限制,设置标记为0
    if (buf->in_file && buf->file_pos >= NGX_SENDFILE_LIMIT) {
        sendfile = 0;
    }

#endif

    if (!sendfile) {
//此时如果buf不在内存中,则我们就需要复制到内存一份。
        if (!ngx_buf_in_memory(buf)) {
            return 0;
        }
//否则设置in_file为0.
        buf->in_file = 0;
    }

//如果需要内存中有一份拷贝,而并不在内存中,此时返回0,表示需要拷贝
    if (ctx->need_in_memory && !ngx_buf_in_memory(buf)) {
        return 0;
    }

//如果需要内存中有拷贝,并且存在于内存中或者mmap中,则返回0.
    if (ctx->need_in_temp && (buf->memory || buf->mmap)) {
        return 0;
    }

    return 1;
}


上面有两个标记要注意,一个是need_in_memory ,这个主要是用于当我们使用sendfile的时候,nginx并不会将请求文件拷贝到内存中,而有时我们需要操作文件的内容,此时我们就需要设置这个标记(设置方法前面初始化有介绍).然后我们在body filter就能操作内容了。

第二个是need_in_temp,这个主要是用于把本来就存在于内存中的buf复制一份拷贝出来,这里有用到的模块有charset,也就是编解码 filter.

然后接下来这段是复制in chain到ctx->in的结尾.它是通过调用ngx_output_chain_add_copy来进行add copy的,这个函数比较简单,这里就不分析了,不过只有一个要注意的,那就是如果buf是存在于文件中,并且file_pos超过了sendfile limit,此时就会切割buf为两个buf,然后保存在两个chain中,最终连接起来.

if (in) {
//复制到ctx->in中.
        if (ngx_output_chain_add_copy(ctx->pool, &ctx->in, in) == NGX_ERROR) {
            return NGX_ERROR;
        }
    }



然后就是主要的逻辑处理阶段。这里nginx做的非常巧妙也非常复杂,首先是chain的重用,然后是buf的重用。

先来看chain的重用。关键的几个结构以及域,ctx的free,busy以及ctx->pool的chain域。
其中每次发送没有发完的chain就放到busy中,而已经发送完毕的就放到free中,而最后会调用  ngx_free_chain来将free的chain放入到pool->chain中,而在ngx_alloc_chain_link中,如果pool->chain中存在chain的话,就不用malloc了,而是直接返回pool->chain,我们来看相关的代码。

//链接cl到pool->chain中
#define ngx_free_chain(pool, cl)                                             \
    cl->next = pool->chain;                                                  \
    pool->chain = cl

ngx_chain_t *
ngx_alloc_chain_link(ngx_pool_t *pool)
{
    ngx_chain_t  *cl;

    cl = pool->chain;
//如果cl存在,则直接返回cl
    if (cl) {
        pool->chain = cl->next;
        return cl;
    }
//否则才会malloc chain
    cl = ngx_palloc(pool, sizeof(ngx_chain_t));
    if (cl == NULL) {
        return NULL;
    }

    return cl;
}


然后是buf的重用,严格意义上来说buf的重用是从free中的chain中取得的,当free中的buf被重用,则这个buf对应的chain就会被链接到ctx->pool中,从而这个chain就会被重用.

也就是说buf的重用是第一被考虑的,只有当这个chain的buf确定不需要被重用(或者说已经被重用)的时候,chain才会被链接到ctx->pool中被重用。

还有一个就是ctx的allocated域,这个域表示了当前的上下文中已经分配了多少个buf,blog一开始我们有提到有个output_buffer命令用来设置output的buf大小以及buf的个数。而allocated如果比output_buffer大的话,我们就需要先发送完已经存在的buf,然后才能再次重新分配buf。

来看代码,上面所说的重用以及buf的控制,代码里面都可以看的比较清晰。这里代码我们分段来看,下面这段主要是拷贝buf前所做的一些工作,比如判断是否拷贝,以及给buf分贝内存等。

//out为我们最终需要传输的chain,也就是交给剩下的filter处理的chain
 out = NULL;
//last_out为out的最后一个chain
    last_out = &out;
    last = NGX_NONE;

for ( ;; ) {

//开始遍历chain
        while (ctx->in) {

//取得当前chain的buf大小
            bsize = ngx_buf_size(ctx->in->buf);

//跳过bsize为0的buf
            if (bsize == 0 && !ngx_buf_special(ctx->in->buf)) {
                ngx_debug_point();

                ctx->in = ctx->in->next;

                continue;
            }

//判断是否需要复制buf
            if (ngx_output_chain_as_is(ctx, ctx->in->buf)) {

                /* move the chain link to the output chain */
//如果不需要复制,则直接链接chain到out,然后继续循环
                cl = ctx->in;
                ctx->in = cl->next;

                *last_out = cl;
                last_out = &cl->next;
                cl->next = NULL;

                continue;
            }

//到达这里,说明我们需要拷贝buf,这里buf最终都会被拷贝进ctx->buf中,因此这里先判断ctx->buf是否为空
            if (ctx->buf == NULL) {

//如果为空,则取得buf,这里要注意,一般来说如果没有开启directio的话,这个函数都会返回NGX_DECLINED的(具体实现可以去看这个函数的代码)。
                rc = ngx_output_chain_align_file_buf(ctx, bsize);

                if (rc == NGX_ERROR) {
                    return NGX_ERROR;
                }

//大部分情况下,都会落入这个分支
                if (rc != NGX_OK) {

//准备分配buf,首先在free中寻找可以重用的buf
                    if (ctx->free) {

                        /* get the free buf */
//得到free buf
                        cl = ctx->free;
                        ctx->buf = cl->buf;
                        ctx->free = cl->next;
//将要重用的chain链接到ctx->poll中,以便于chain的重用.
                        ngx_free_chain(ctx->pool, cl);

                    } else if (out || ctx->allocated == ctx->bufs.num) {
//如果已经等于buf的个数限制,则跳出循环,发送已经存在的buf.这里可以看到如果out存在的话,nginx会跳出循环,然后发送out,等发送完会再次处理,这里很好的体现了nginx的流式处理
                        break;

                    } else if (ngx_output_chain_get_buf(ctx, bsize) != NGX_OK) {
//这个函数也比较关键,它用来取得buf.我们接下来会详细看这个函数
                        return NGX_ERROR;
                    }
                }
            }
............................................................
    }


上面的代码分析的时候有个很关键的函数,那就是ngx_output_chain_get_buf,这个函数是当没有可重用的buf的时候,用来分配buf的。

这里只有一个要注意的,那就是如果当前的buf是位于最后一个chain的话,会有特殊处理。这里特殊处理有两个地方,一个是buf的recycled域,一个是将要分配的buf的大小。

先来说recycled域,这个域表示我们当前的buf是需要被回收的。而我们知道nginx一般情况下(比如非last buf)是会缓存一部分buf,然后再发送的(默认是1460字节),而设置了recycled的话,我们就不会让它缓存buf,也就是尽量发送出去,然后以供我们回收使用。

因此如果是最后一个buf的话,一般来说我们是不需要设置recycled域的,否则的话,需要设置recycled域。因为不是最后一个buf的话,我们可能还会需要重用一些buf,而buf只有被发送出去的话,我们才能重用。

然后就是size的大小。这里会有两个大小,一个是我们需要复制的buf的大小,一个是nginx.conf中设置的size。如果不是最后一个buf,则我们只需要分配我们设置的buf的size大小就行了。如果是最后一个buf,则就处理不太一样,下面的代码会看到。

static ngx_int_t
ngx_output_chain_get_buf(ngx_output_chain_ctx_t *ctx, off_t bsize)
{
    size_t       size;
    ngx_buf_t   *b, *in;
    ngx_uint_t   recycled;

    in = ctx->in->buf;
//可以看到这里分配的buf,每个的大小都是我们在nginx.conf中设置的size
    size = ctx->bufs.size;
//默认有设置recycled域.
    recycled = 1;
//如果当前的buf是属于最后一个chain的时候。这里我们要特殊处理。
    if (in->last_in_chain) {
//这边注释很详细,我就不解释了.
        if (bsize < (off_t) size) {

            /*
             * allocate a small temp buf for a small last buf
             * or its small last part
             */
            size = (size_t) bsize;
            recycled = 0;

        } else if (!ctx->directio
                   && ctx->bufs.num == 1
                   && (bsize < (off_t) (size + size / 4)))
        {
            /*
             * allocate a temp buf that equals to a last buf,
             * if there is no directio, the last buf size is lesser
             * than 1.25 of bufs.size and the temp buf is single
             */

            size = (size_t) bsize;
            recycled = 0;
        }
    }
//开始分配buf内存.
    b = ngx_calloc_buf(ctx->pool);
    if (b == NULL) {
        return NGX_ERROR;
    }

    if (ctx->directio) {
//directio需要对齐

        b->start = ngx_pmemalign(ctx->pool, size, NGX_DIRECTIO_BLOCK);
        if (b->start == NULL) {
            return NGX_ERROR;
        }

    } else {
//大部分情况会走到这里.
        b->start = ngx_palloc(ctx->pool, size);
        if (b->start == NULL) {
            return NGX_ERROR;
        }
    }

    b->pos = b->start;
    b->last = b->start;
    b->end = b->last + size;
//设置temporary.
    b->temporary = 1;
    b->tag = ctx->tag;
    b->recycled = recycled;

    ctx->buf = b;
//更新allocated,可以看到每分配一个就加1.
    ctx->allocated++;

    return NGX_OK;
}


然后接下来这部分就是复制buf,然后调用filter链进行发送。

//复制buf.
rc = ngx_output_chain_copy_buf(ctx);

            if (rc == NGX_ERROR) {
                return rc;
            }
//如果返回AGAIn,一般来说不会返回这个值的.
            if (rc == NGX_AGAIN) {
                if (out) {
                    break;
                }

                return rc;
            }

            /* delete the completed buf from the ctx->in chain */
//如果ctx->in中处理完毕的buf则删除当前的buf
            if (ngx_buf_size(ctx->in->buf) == 0) {
                ctx->in = ctx->in->next;
            }

            cl = ngx_alloc_chain_link(ctx->pool);
            if (cl == NULL) {
                return NGX_ERROR;
            }
//链接chain到out.
            cl->buf = ctx->buf;
            cl->next = NULL;
            *last_out = cl;
            last_out = &cl->next;
            ctx->buf = NULL;
        }

        if (out == NULL && last != NGX_NONE) {

            if (ctx->in) {
                return NGX_AGAIN;
            }

            return last;
        }
//调用filter链
        last = ctx->output_filter(ctx->filter_ctx, out);

        if (last == NGX_ERROR || last == NGX_DONE) {
            return last;
        }
//update chain,这里主要是将处理完毕的chain放入到free,没有处理完毕的放到busy中.
        ngx_chain_update_chains(&ctx->free, &ctx->busy, &out, ctx->tag);
        last_out = &out;


ngx_chain_update_chains这个函数我以前的blog有分析过,想了解的,可以看我前面的blog .






1
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics