`

nginx 自定义协议 扩展模块开发

 
阅读更多
原文地址:http://blog.chinaunix.net/uid-26443921-id-3018781.html






公司内部协议均是固定包长的二进制协议,对于内部服务器通信来说足够了,但接口服务器还是采用了http协议,毕竟通用,况且私有二进制协议对外非常不好友,更何况还易遭防火墙拦截;写一个通用且配置功能强大的http server是比较困难的。项目组写的http框架非常难用,仅仅达到能用而已,效率低下,不灵活等等;
      在接触了nginx后,被其能扩展的特性深深吸引了,于是尝试为项目组的框架写一个能一个扩展模块,需求蛮明确的:就是将http协议转成服务器内部的二进制协议;
      在网上找资料,资料比较稀少,大多是一个简单的hello world例子,比较少参考性;《Emiller的Nginx模块开发心得.pdf》相对而言是一个完善的文档;但看了之后还是感觉一头雾水,不甚明了;最好的文档就是代码,下载了 nginx-1.0.8 源码;source insight 建项目,看代码,析流程;渐渐nginx流程在脑海中明晰起来;
      看代码熟悉nginx花3天时间;着手写代码到代码完成1天半,测试休bug到完成目标需求花费1天,为了写这个扩展,把整个周末都搭进去了,晚上还熬了下夜,最后看着内部服务器的数据通过扩展模块中介到nginx输出,还是有点小成就感的;
      废话少说,直接上代码:
       xdrive.rar  
      注:因代码中夹杂了些公司项目的业务,这些代码在protocal文件夹下,被我从压缩包中剔除了,但绝对不影响代码整个流程完整性;


      nginx 只支持c代码,扩展模块中加入了不少c++代码,也懒得去搞其他方法了,直接修改了 auto/make 文件,改动如下:

CPP = g++
LINK = \$(CPP) ##采用g++来链接
##line=338 below was changed by kevin_zhong on 2011-11-14

        ngx_obj=`echo $ngx_obj \
            | sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \
                  -e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \
                  -e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \
                  -e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`

        ngx_post_suffix=`echo $ngx_src \
            | sed -e "s#^.*\(\.c\)\\$#\1#g" \
                 -e "s#^.*\(\.cc\)\\$#\1#g" \
                 -e "s#^.*\(\.cpp\)\\$#\1#g"`

        if [ "$ngx_post_suffix"x = ".cpp"x ];then
            ngx_cc="\$(CPP) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
        else
            ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
        fi
上面的脚本是判断源代码后缀,如果是c++则生成makefile语句采用g++,否则采用gcc;


下面是具体代码分析:
/*
* Copyright (C) Igor Sysoev; kevin_zhong
* mail: qq2000zhong@gmail.com
* date: 2011-11-13
*/

//因是cpp文件,固包含c头文件需要 extern c
extern "C" {
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include "ngx_chain_util.h"
}

//与服务器内部通信二进制协议实现
#include "ngx_thrift_transport.h"
#include "ngx_xdrive_datagram.h"
#include "protocal/rc_updator_types.h"

using namespace xdrive::msg::rc_updator;
using namespace xdrive;

/*
* 扩展模块需要3个业务相关输入变量,uid,path,recusive
* 参考nginx.conf中的配置写法
*/

typedef struct
{
        ngx_http_upstream_conf_t upstream;

        //将uid和path以及recusive在配置中的index找出来,以后create request的时候需要
        ngx_int_t uid_index;
        ngx_int_t path_index;
        ngx_int_t recusive_index;
}
ngx_http_xdrive_rc_loc_conf_t;

/*
* 注明下,这个模块和网上诸多模块以及nginx特有模块差别最大的地方是:
*
* 1, 因为项目组的二进制协议不是流式协议,即必须将数据包全部收完整后,
* 才能调用decode解码,所以不能像其他模块那样采用流,即不能一边接
* 受数据,一边发送数据;只能先将数据全部缓存起来,等到收集到完整的resp包,
* 再一次性解码,然后再转换成 json 类格式一次性输出,这是这类协议最大最明显的缺点;
*
* 2,虽然从后端server收到的resp content length是确定的,但经过转换(从二进制到json类)
* 后,content len 已经变得不相等,且很不好计算;所以只能采用 chunk 方式返回给client
*
* 3,网上有的,或者<Emiller的Nginx模块开发心得.pdf>中有的,都不提,参考即可;
*/

typedef struct
{
        ngx_http_request_t *request;
        ngx_chain_pair_t body_buff;
        ngx_chain_t * tail_buff;
        uint64_t uid;
        ngx_str_t path;
        bool recusive;

        //后端剩余接受包体长度,即还有多少个字节等待从后端读取,本来不需要这个length的
        //开始代码是存储 r.out_headers.content_len_n,u->length = r.out_headers.content_len_n
        //upstream通过u->length==0判断后端数据是否接受完毕,但这样client回复包将得到一个不正确
        //的 content len,导致接受http包体数据异常...
        //参考 ngx_http_upstream.c:2391
        int rest_length;
}
ngx_http_xdrive_rc_ctx_t;


static ngx_int_t ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf);
static ngx_int_t ngx_http_xdrive_rc_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_process_header(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_filter_init(void *data);
static ngx_int_t ngx_http_xdrive_rc_filter(void *data, ssize_t bytes);
static void ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r);
static void ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc);

static void *ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);

static char *ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);


static ngx_conf_bitmask_t ngx_http_xdrive_rc_next_upstream_masks[] = {
        { ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },
        { ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT },
        { ngx_string("invalid_header"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER },
        { ngx_string("not_found"), NGX_HTTP_UPSTREAM_FT_HTTP_404 },
        { ngx_string("off"), NGX_HTTP_UPSTREAM_FT_OFF },
        { ngx_null_string, 0 }
};

/*
* 参数设置,不可变,注意和变量的区别
*/
static ngx_command_t ngx_http_xdrive_rc_commands[] = {
        { ngx_string("xdrive_rc_pass"),
          NGX_HTTP_LOC_CONF | NGX_HTTP_LIF_CONF | NGX_CONF_TAKE1,
          ngx_http_xdrive_rc_pass,
          NGX_HTTP_LOC_CONF_OFFSET,
          0,
          NULL },

        { ngx_string("xdrive_rc_connect_timeout"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
          ngx_conf_set_msec_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.connect_timeout),
          NULL },

        { ngx_string("xdrive_rc_send_timeout"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
          ngx_conf_set_msec_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.send_timeout),
          NULL },

        { ngx_string("xdrive_rc_buffer_size"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
          ngx_conf_set_size_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.buffer_size),
          NULL },

        { ngx_string("xdrive_rc_read_timeout"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
          ngx_conf_set_msec_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.read_timeout),
          NULL },

        { ngx_string("xdrive_rc_next_upstream"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_1MORE,
          ngx_conf_set_bitmask_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.next_upstream),
          &ngx_http_xdrive_rc_next_upstream_masks },

        ngx_null_command
};


static ngx_http_module_t ngx_http_xdrive_rc_module_ctx = {
        ngx_http_xdrive_rc_add_variables, /* preconfiguration */
        NULL, /* postconfiguration */

        NULL, /* create main configuration */
        NULL, /* init main configuration */

        NULL, /* create server configuration */
        NULL, /* merge server configuration */

        ngx_http_xdrive_rc_create_loc_conf, /* create location configration */
        ngx_http_xdrive_rc_merge_loc_conf /* merge location configration */
};


ngx_module_t ngx_http_xdrive_rc_module = {
        NGX_MODULE_V1,
        &ngx_http_xdrive_rc_module_ctx, /* module context */
        ngx_http_xdrive_rc_commands, /* module directives */
        NGX_HTTP_MODULE, /* module type */
        NULL, /* init master */
        NULL, /* init module */
        NULL, /* init process */
        NULL, /* init thread */
        NULL, /* exit thread */
        NULL, /* exit process */
        NULL, /* exit master */
        NGX_MODULE_V1_PADDING
};

//业务相关变量,get_handler = NULL,因为这三个是从conf里面通过
//正则匹配得到的,为什么不直接通过 get handler 从http requeset里面获取了
//因为这样更灵活,conf可以随时改,比如现在 uid 是从 url 里面获取,但如果
//业务需要uid放在 query_string,这时候就只需要改配置即可了
//思路来源于 ngx_http_memcached_module.c

static ngx_http_variable_t ngx_http_proxy_vars[] = {
        { ngx_string("uid"), NULL,
          NULL, 0,
          NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
          0 },
        { ngx_string("path"), NULL,
          NULL, 0,
          NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
          0 },
        { ngx_string("recusive"), NULL,
          NULL, 0,
          NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
          0 },
        { ngx_null_string, NULL,NULL,0, 0, 0 }
};


static ngx_int_t
ngx_http_xdrive_rc_handler(ngx_http_request_t *r)
{
        ngx_int_t rc;
        ngx_http_upstream_t *u;
        ngx_http_xdrive_rc_ctx_t *ctx;
        ngx_http_xdrive_rc_loc_conf_t *mlcf;

        if (!(r->method & (NGX_HTTP_GET | NGX_HTTP_HEAD)))
        {
                return NGX_HTTP_NOT_ALLOWED;
        }

        //get 请求,不需要包体
        rc = ngx_http_discard_request_body(r);

        if (rc != NGX_OK)
        {
                return rc;
        }

        if (ngx_http_set_content_type(r) != NGX_OK)
        {
                return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }

        if (ngx_http_upstream_create(r) != NGX_OK)
        {
                return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }

        u = r->upstream;

        ngx_str_set(&u->schema, "xdrive_rc://");//schema,没发现有什么用,打log貌似会有点用
       
        u->output.tag = (ngx_buf_tag_t)&ngx_http_xdrive_rc_module;

        mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);

        u->conf = &mlcf->upstream;

        //设置回调,网上大都只讲这里
        u->create_request = ngx_http_xdrive_rc_create_request;
        u->reinit_request = ngx_http_xdrive_rc_reinit_request;
        u->process_header = ngx_http_xdrive_rc_process_header;
        u->abort_request = ngx_http_xdrive_rc_abort_request;
        u->finalize_request = ngx_http_xdrive_rc_finalize_request;

        //分配context内存
        ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_palloc(r->pool, sizeof(ngx_http_xdrive_rc_ctx_t));
        if (ctx == NULL)
        {
                return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }
        ngx_memzero(ctx, sizeof(ngx_http_xdrive_rc_ctx_t));

        ctx->request = r;

        ngx_http_set_ctx(r, ctx, ngx_http_xdrive_rc_module);

        u->input_filter_init = ngx_http_xdrive_rc_filter_init;

        /*
        * 非常关键的设置,后端服务器包体数据到达的时候,upstream 会回调 input_filter,默认的
        * input_filter 是 ngx_http_upstream_non_buffered_filter(ngx_http_upstream.c:2475),默认
        * filter 就是收到数据立马发送给client;而因为需求必须将包体缓存起来,所以这里替换成了我们
        * 的回调函数,函数里面的功能就是: 缓存包体,等待包体接受完毕,解码,然后一次回复给client
        */
        u->input_filter = ngx_http_xdrive_rc_filter;
        u->input_filter_ctx = ctx;

        u->buffering = 0; //note, no buffering...cause too complicated !!

        r->main->count++;

        //不需要包体,直接初始化 upstream 即可,若需要接受包体,只需要
        //调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);
        ngx_http_upstream_init(r);

        return NGX_DONE;
}


static ngx_int_t
ngx_http_xdrive_rc_create_request(ngx_http_request_t *r)
{
        size_t len;
        ngx_buf_t *b;
        ngx_chain_t *cl;
        ngx_http_xdrive_rc_ctx_t *ctx;
        ngx_http_variable_value_t *vv;
        ngx_http_xdrive_rc_loc_conf_t *mlcf;

        mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);

        //根据配置文件uid index号从变量中获取uid的变量值
        vv = ngx_http_get_indexed_variable(r, mlcf->uid_index);

        if (vv == NULL || vv->not_found || vv->len == 0)
        {
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                              "the \"$uid\" variable is not set");
                return NGX_ERROR;
        }

        ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
        ctx->uid = ngx_atoof(vv->data, vv->len);
        if (ctx->uid == (off_t)NGX_ERROR)
        {
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                              "the \"$uid\" variable is err %s set", vv->data);
                return NGX_ERROR;
        }

        //根据配置文件path index号从变量中获取path的变量值
        vv = ngx_http_get_indexed_variable(r, mlcf->path_index);
        if (vv == NULL || vv->not_found || vv->len == 0)
        {
                ngx_str_set(&ctx->path, "/");
        }
        else {
                ctx->path.data = vv->data;
                ctx->path.len = vv->len;
        }

        vv = ngx_http_get_indexed_variable(r, mlcf->recusive_index);
        if (vv == NULL || vv->not_found || vv->len == 0)
        {
                ctx->recusive = false;
        }
        else {
                ctx->recusive = ngx_atoi(vv->data, vv->len);
        }

        RcUpdateReq list_req;
        list_req._user_id = ctx->uid;
        list_req._path.assign((char *)ctx->path.data, (char *)ctx->path.data + ctx->path.len);
        list_req._recursive = ctx->recusive;

        static uint32_t seq = ngx_time();

        //编码,注意这里的变量使用的内存是从pool里面获取的,成功后,会将buf chain返回;
        //细节见具体代码,不表
        cl = ngx_datagram_encode(r->pool, r->connection->log, mlcf->upstream.buffer_size,
                                 &list_req, ++seq, 0xC01);
        if (cl == NULL)
                return NGX_ERROR;

        //准备发送
        r->upstream->request_bufs = cl;

        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http xdrive_rc request uid=\"%d\", path=\"%V\", recur=%d",
                       ctx->uid, &ctx->path, ctx->recusive);

        return NGX_OK;
}


static ngx_int_t
ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r)
{
        return NGX_OK;
}

/*
* 读取二进制包体头部
*/
static ngx_int_t
ngx_http_xdrive_rc_process_header(ngx_http_request_t *r)
{
        ngx_http_upstream_t *u;
        ngx_http_xdrive_rc_ctx_t *ctx;

        u = r->upstream;

        //因包头固定长度,所以很好判断
        if (u->buffer.last - u->buffer.pos < NGX_XDRIVE_DATAGRAM_HEADER)
                return NGX_AGAIN;

        ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);

        ngx_xdrive_datagram_header_t header;
        //解包头,获取最重要参数 : 包体长度,根据包体长度收包
        if (ngx_decode_header(u->buffer.pos, NGX_XDRIVE_DATAGRAM_HEADER,
                              &header, r->connection->log) != NGX_OK)
        {
                return NGX_HTTP_UPSTREAM_INVALID_HEADER;
        }

        //业务代码
        if (header._type != 0x08C01)
        {
                ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
                              "xdrive_rc ret type not legal = %d", header._type);

                return NGX_HTTP_UPSTREAM_INVALID_HEADER;
        }

        //业务代码
        if (header._status != 0)
        {
                ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
                              "xdrive_rc ret status not ok in response = %d", header._status);

                return NGX_HTTP_UPSTREAM_INVALID_HEADER;
        }

        //非常关键一句,这句意思是返回client包包体长度不定,必须采用chunk filter;
        ngx_http_clear_content_length(r);

        //因upstream不知道该从upstream收取多少包体数据(我们故意没设置包体长度)
        //所以我们必须自己处理记录剩余包体长度;
        ctx->rest_length = header._length - NGX_XDRIVE_DATAGRAM_HEADER;

        u->headers_in.status_n = NGX_HTTP_OK;
        u->state->status = NGX_HTTP_OK;

        //包头数据已经处理完毕,必须丢弃;
        u->buffer.pos += NGX_XDRIVE_DATAGRAM_HEADER;

        return NGX_OK;
}


//其实没啥用
static ngx_int_t
ngx_http_xdrive_rc_filter_init(void *data)
{
        ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;

        ngx_http_upstream_t *u;

        u = ctx->request->upstream;

        return NGX_OK;
}

/*
* 缓存包体,等待包体接受完毕,解码,然后一次回复给client
*/
static ngx_int_t
ngx_http_xdrive_rc_filter(void *data, ssize_t bytes)
{
        ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;

        u_char *last;
        ngx_buf_t *b;
        ngx_chain_t *cl, **ll;
        ngx_http_upstream_t *u;

        ngx_http_xdrive_rc_loc_conf_t *mlcf;

        mlcf = (ngx_http_xdrive_rc_loc_conf_t *)
               ngx_http_get_module_loc_conf(ctx->request, ngx_http_xdrive_rc_module);

        u = ctx->request->upstream;
        b = &u->buffer;

        size_t buff_size = mlcf->upstream.buffer_size;
        //assert(bytes <= buff_size);

        ctx->rest_length -= bytes;

        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,
                       "recv resp len=%d, rest-len=%d", bytes, ctx->rest_length);

        //特殊情况下,如果包体数据很短(和缓冲区长度比),很可能一次就将包体收完了,这时候
        //直接交互内存即可,不再需要内存拷贝,否则...
        if (ctx->rest_length == 0 && ctx->body_buff._chain_head == NULL)
        {
                cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
                ctx->body_buff._chain_head = cl;

                cl->buf->flush = 1;
                cl->buf->memory = 1;

                last = b->last;
                cl->buf->pos = last;
                b->last += bytes;
                cl->buf->last = b->last;
                cl->buf->tag = u->output.tag;
        }
        else {
                //做一次内存拷贝到 body buf 中去
                if (ngx_chain_write(ctx->request->pool, &u->free_bufs, &ctx->body_buff, buff_size,
                                    b->last, bytes) != NGX_OK)
                        return NGX_ERROR;

                b->last += bytes;
        }

        //判断upstream包体是否收完整
        if (ctx->rest_length > 0)
        {
                return NGX_OK;
        }

        //包体收完,进行解码
        RcUpdateResp list_resp;
        if (ngx_datagram_decode_body(ctx->body_buff._chain_head,
                                     ctx->request->connection->log,
                                     &list_resp) != NGX_OK)
        {
                ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
                              "xdrive_rc RcUpdateResp decode failed");

                return NGX_ERROR;
        }

        ngx_log_error(NGX_LOG_NOTICE, ctx->request->connection->log, 0,
                      "xdrive_rc RcUpdateResp list num=%d",
                      list_resp._action_list.size());

        //内容已经存入 list_resp 中,body buf失去作用,回收到free bufs里面去,刚好下面用
        ngx_chain_t *busy_bufs = NULL;
        ngx_chain_update_chains(&u->free_bufs, &busy_bufs, &ctx->body_buff._chain_head, b->tag);

        //transfer...
        ngx_chain_pair_t chain_pair;
        ngx_memzero(&chain_pair, sizeof(chain_pair));

        //转成 json 格式
        if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
                                        "uid=%d, path=%V, recusive=%d, week_dcid=\"%s\", used_space=%d, list_num=%d\n",
                                        ctx->uid, &ctx->path, ctx->recusive,
                                        list_resp._weak_dcid.c_str(),
                                        list_resp._used_space,
                                        list_resp._action_list.size()
                                        ))
                return NGX_ERROR;

        //转成 json 格式
        for (size_t i = 0; i < list_resp._action_list.size(); ++i)
        {
                ActionThrft *ac = &list_resp._action_list[i];
                if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
                                                "[path=\"%s\", node_type=%d, status=%d, gcid=%s, size=%d]\n",
                                                ac->m_path.c_str(), ac->m_node_type, ac->m_status,
                                                ac->m_gcid.c_str(), ac->m_file_size
                                                ))
                        return NGX_ERROR;
        }

        //这句非常有意思,标志这是回包最后一个buf,upstraem通过这标志得知后端收据收集处理完毕
        //关后端连接,回前端包
        chain_pair._chain_last->buf->last_buf = 1;

        for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next)
        {
                ll = &cl->next;
        }
        *ll = chain_pair._chain_head;

        return NGX_OK;
}


static void
ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r)
{
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "abort http xdrive_rc request");
        return;
}


static void
ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "finalize http xdrive_rc request");
        return;
}


static void *
ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf)
{
        ngx_http_xdrive_rc_loc_conf_t *conf;

        conf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_pcalloc(cf->pool,
                                                            sizeof(ngx_http_xdrive_rc_loc_conf_t));
        if (conf == NULL)
        {
                return NULL;
        }

        conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC;
        conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC;
        conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;

        conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;

        /* the hardcoded values */
        conf->upstream.cyclic_temp_file = 0;
        conf->upstream.buffering = 0;
        conf->upstream.ignore_client_abort = 0;
        conf->upstream.send_lowat = 0;
        conf->upstream.bufs.num = 0;
        conf->upstream.busy_buffers_size = 0;
        conf->upstream.max_temp_file_size = 0;
        conf->upstream.temp_file_write_size = 0;
        conf->upstream.intercept_errors = 1;
        conf->upstream.intercept_404 = 1;
        conf->upstream.pass_request_headers = 0;
        conf->upstream.pass_request_body = 0;

        conf->uid_index = NGX_CONF_UNSET;
        conf->path_index = NGX_CONF_UNSET;
        conf->recusive_index = NGX_CONF_UNSET;
       
        return conf;
}


static char *
ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
        ngx_http_xdrive_rc_loc_conf_t *prev = (ngx_http_xdrive_rc_loc_conf_t *)parent;
        ngx_http_xdrive_rc_loc_conf_t *conf = (ngx_http_xdrive_rc_loc_conf_t *)child;

        ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
                                  prev->upstream.connect_timeout, 60000);

        ngx_conf_merge_msec_value(conf->upstream.send_timeout,
                                  prev->upstream.send_timeout, 60000);

        ngx_conf_merge_msec_value(conf->upstream.read_timeout,
                                  prev->upstream.read_timeout, 60000);

        ngx_conf_merge_size_value(conf->upstream.buffer_size,
                                  prev->upstream.buffer_size,
                                  (size_t)ngx_pagesize);

        ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,
                                     prev->upstream.next_upstream,
                                     (NGX_CONF_BITMASK_SET
                                      | NGX_HTTP_UPSTREAM_FT_ERROR
                                      | NGX_HTTP_UPSTREAM_FT_TIMEOUT));

        if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF)
        {
                conf->upstream.next_upstream = NGX_CONF_BITMASK_SET
                                               | NGX_HTTP_UPSTREAM_FT_OFF;
        }

        if (conf->upstream.upstream == NULL)
        {
                conf->upstream.upstream = prev->upstream.upstream;
        }

        if (conf->uid_index == NGX_CONF_UNSET) {
                conf->uid_index = prev->uid_index;
        }
        if (conf->path_index == NGX_CONF_UNSET) {
                conf->path_index = prev->path_index;
        }
        if (conf->recusive_index == NGX_CONF_UNSET) {
                conf->recusive_index = prev->recusive_index;
        }

        return NGX_CONF_OK;
}


static char *
ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
        ngx_http_xdrive_rc_loc_conf_t *mlcf = (ngx_http_xdrive_rc_loc_conf_t *)conf;

        ngx_str_t *value;
        ngx_url_t u;
        ngx_http_core_loc_conf_t *clcf;

        if (mlcf->upstream.upstream)
        {
                return "is duplicate";
        }

        value = (ngx_str_t *)cf->args->elts;

        ngx_memzero(&u, sizeof(ngx_url_t));

        u.url = value[1];
        u.no_resolve = 1;

        mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
        if (mlcf->upstream.upstream == NULL)
        {
                return (char *)(NGX_CONF_ERROR);
        }

        clcf = (ngx_http_core_loc_conf_t *)ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);

        clcf->handler = ngx_http_xdrive_rc_handler;

        if (clcf->name.data[clcf->name.len - 1] == '/')
        {
                clcf->auto_redirect = 1;
        }

        //保存变量index用
        mlcf->uid_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[0].name);
        if (mlcf->uid_index == NGX_ERROR)
        {
                return (char *)(NGX_CONF_ERROR);
        }
        mlcf->path_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[1].name);
        if (mlcf->path_index == NGX_ERROR)
        {
                return (char *)(NGX_CONF_ERROR);
        }
        mlcf->recusive_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[2].name);
        if (mlcf->recusive_index == NGX_ERROR)
        {
                return (char *)(NGX_CONF_ERROR);
        }

        return NGX_CONF_OK;
}



static ngx_int_t
ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf)
{
        ngx_http_variable_t *var, *v;

        for (v = ngx_http_proxy_vars; v->name.len; v++)
        {
                var = ngx_http_add_variable(cf, &v->name, v->flags);
                if (var == NULL)
                {
                        return NGX_ERROR;
                }

                var->get_handler = v->get_handler;
                var->data = v->data;
        }

        return NGX_OK;
}

代码中一些有意思的地方:
//和buf差不多的思想的 buf chain
typedef  struct
{
        ngx_chain_t* _chain_head;
        ngx_chain_t* _chain_pos;
        ngx_chain_t* _chain_last;
        ngx_chain_t* _chain_tail;
}
ngx_chain_pair_t;

//从buf chain中读取len长内存出来
size_t ngx_cdecl
ngx_chain_read(ngx_chain_pair_t* chain_pair
                , uint8_t *buf, uint32_t len);
//将buf写入到buf chain中
ngx_int_t ngx_cdecl
ngx_chain_write(ngx_pool_t* pool
                , ngx_chain_t** free_bufs
                , ngx_chain_pair_t* chain_pair
                , size_t write_chunk_size
                , const uint8_t *buf, uint32_t len);
//写json或者xml之类回复有用
ngx_int_t ngx_cdecl
ngx_chain_sprintf(ngx_pool_t *pool
             , ngx_chain_t **free_bufs
             , ngx_chain_pair_t *chain_pair
             , size_t write_chunk_size
             , const char *fmt, ...);

下面是nginx配置文件中的关键部分
location ~* /rc_list/([0-9]+).html$ {
                xdrive_rc_buffer_size 4096;
                set $uid $1;
                set $path /;
                set $recusive 0;
                if ($query_string ~* (|&)recusive=(0|1)(|&)) {
                        set $recusive $2;
                }
                xdrive_rc_pass 127.0.0.1:11001;
        }
解释下上面配置文件意思,将url中匹配的用户数值放入uid参数,根据后缀参数判断是否递归将值放入
recusive 参数中;扩展模块将从这三个参数中将需要的值提取出来;
思路来源于:ngx_http_memcached_module.c 模块,应该还有其他的各种各样的实现方式,不知道还有没有更简单明了的途径;
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics