在阅读本文之前需要对Nginx Upstream流程有个了解,可以参考下我上一篇Nginx Upstream流程分析

源码基于以下master分支

https://github.com/openresty/lua-nginx-module

https://github.com/openresty/lua-resty-core

balancer_by_lua 指令

Nginx lua 模块相关的指令定义都在ngx_http_lua_module.c这个文件,我们来看下负载均衡相关的指令。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    { ngx_string("balancer_by_lua_block"),
      NGX_HTTP_UPS_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
      ngx_http_lua_balancer_by_lua_block,
      NGX_HTTP_SRV_CONF_OFFSET,
      0,
      (void *) ngx_http_lua_balancer_handler_inline },

    { ngx_string("balancer_by_lua_file"),
      NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
      ngx_http_lua_balancer_by_lua,
      NGX_HTTP_SRV_CONF_OFFSET,
      0,
      (void *) ngx_http_lua_balancer_handler_file },

ngx_http_lua_balancer_by_lua_block

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
char *
ngx_http_lua_balancer_by_lua_block(ngx_conf_t *cf, ngx_command_t *cmd,
    void *conf)
{
    char        *rv;
    ngx_conf_t   save;

    save = *cf;

    // ngx_conf_parse配置文件解析函数中调用
    cf->handler = ngx_http_lua_balancer_by_lua;
    cf->handler_conf = conf;
    
    // lua代码块的校验
    rv = ngx_http_lua_conf_lua_block_parse(cf, cmd);

    *cf = save;

    return rv;
}

ngx_http_lua_balancer_by_lua

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
char *
ngx_http_lua_balancer_by_lua(ngx_conf_t *cf, ngx_command_t *cmd,
    void *conf)
{
    size_t                       chunkname_len;
    u_char                      *chunkname;
    u_char                      *cache_key = NULL;
    u_char                      *name;
    ngx_str_t                   *value;
    ngx_http_lua_srv_conf_t     *lscf = conf;

    ngx_http_upstream_srv_conf_t      *uscf;

    dd("enter");

    /*  must specify a content handler */
    if (cmd->post == NULL) {
        return NGX_CONF_ERROR;
    }

    if (lscf->balancer.handler) {
        return "is duplicate";
    }


   /*
        balancer_by_lua_block {
            local balancer = require "ngx.balancer"

            -- well, usually we calculate the peer's host and port
            -- according to some balancing policies instead of using
            -- hard-coded values like below
            local host = "127.0.0.2"
            local port = 8080

            local ok, err = balancer.set_current_peer(host, port)
            if not ok then
                ngx.log(ngx.ERR, "failed to set the current peer: ", err)
                return ngx.exit(500)
            end
        }

        keepalive 10;  # connection pool
    }   
    */ 
    value = cf->args->elts; //value =  "balancer_by_lua_block {...}"

    lscf->balancer.handler = (ngx_http_lua_srv_conf_handler_pt) cmd->post;

    if (cmd->post == ngx_http_lua_balancer_handler_file) {
        /* Lua code in an external file */
        name = ngx_http_lua_rebase_path(cf->pool, value[1].data,
                                        value[1].len);
        if (name == NULL) {
            return NGX_CONF_ERROR;
        }

        cache_key = ngx_http_lua_gen_file_cache_key(cf, value[1].data,
                                                    value[1].len);
        if (cache_key == NULL) {
            return NGX_CONF_ERROR;
        }

        lscf->balancer.src.data = name;
        lscf->balancer.src.len = ngx_strlen(name);

    } else {
        cache_key = ngx_http_lua_gen_chunk_cache_key(cf, "balancer_by_lua",
                                                     value[1].data,
                                                     value[1].len);
        if (cache_key == NULL) {
            return NGX_CONF_ERROR;
        }

        chunkname = ngx_http_lua_gen_chunk_name(cf, "balancer_by_lua",
                                                sizeof("balancer_by_lua") - 1,
                                                &chunkname_len);
        if (chunkname == NULL) {
            return NGX_CONF_ERROR;
        }

        /* Don't eval nginx variables for inline lua code */
        lscf->balancer.src = value[1];
        lscf->balancer.chunkname = chunkname;
    }

    lscf->balancer.src_key = cache_key;

    uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);

    if (uscf->peer.init_upstream) {
        ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
                           "load balancing method redefined");
    }

    // 关键信息,修改peer.init_upstream,关于peer.init_upstream的作用可以参考upstream的工作流程
    uscf->peer.init_upstream = ngx_http_lua_balancer_init;

    uscf->flags = NGX_HTTP_UPSTREAM_CREATE
                  |NGX_HTTP_UPSTREAM_WEIGHT
                  |NGX_HTTP_UPSTREAM_MAX_FAILS
                  |NGX_HTTP_UPSTREAM_FAIL_TIMEOUT
                  |NGX_HTTP_UPSTREAM_DOWN;

    return NGX_CONF_OK;
}

ngx_http_lua_balancer_init

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
static ngx_int_t
ngx_http_lua_balancer_init(ngx_conf_t *cf,
    ngx_http_upstream_srv_conf_t *us)
{
    if (ngx_http_upstream_init_round_robin(cf, us) != NGX_OK) {
        return NGX_ERROR;
    }

    /* this callback is called upon individual requests */
    // 关键信息,修改peer.init,关于peer.init的作用可以参考upstream的工作流程
    us->peer.init = ngx_http_lua_balancer_init_peer;

    return NGX_OK;
}

ngx_http_lua_balancer_init_peer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
static ngx_int_t
ngx_http_lua_balancer_init_peer(ngx_http_request_t *r,
    ngx_http_upstream_srv_conf_t *us)
{
    ngx_http_lua_srv_conf_t            *bcf;
    ngx_http_lua_balancer_peer_data_t  *bp; //存放上游节点信息

    bp = ngx_pcalloc(r->pool, sizeof(ngx_http_lua_balancer_peer_data_t));
    if (bp == NULL) {
        return NGX_ERROR;
    }

    r->upstream->peer.data = &bp->rrp;

    if (ngx_http_upstream_init_round_robin_peer(r, us) != NGX_OK) {
        return NGX_ERROR;
    }

    // 关键信息,修改peer.init和peer.init,关于这俩的作用同上参考upstream工作流程
    r->upstream->peer.get = ngx_http_lua_balancer_get_peer;
    r->upstream->peer.free = ngx_http_lua_balancer_free_peer;

#if (NGX_HTTP_SSL)
    r->upstream->peer.set_session = ngx_http_lua_balancer_set_session;
    r->upstream->peer.save_session = ngx_http_lua_balancer_save_session;
#endif

    bcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_lua_module);

    bp->conf = bcf;
    bp->request = r;

    return NGX_OK;
}

struct ngx_http_lua_balancer_peer_data_s {
    /* the round robin data must be first */
    ngx_http_upstream_rr_peer_data_t    rrp;

    ngx_http_lua_srv_conf_t            *conf;
    ngx_http_request_t                 *request;

    ngx_uint_t                          more_tries;
    ngx_uint_t                          total_tries;

    struct sockaddr                    *sockaddr;
    socklen_t                           socklen;

    ngx_str_t                          *host;
    in_port_t                           port;

    int                                 last_peer_state;

#if !(HAVE_NGX_UPSTREAM_TIMEOUT_FIELDS)
    unsigned                            cloned_upstream_conf;  /* :1 */
#endif
};

ngx_http_lua_balancer_get_peer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
static ngx_int_t
ngx_http_lua_balancer_get_peer(ngx_peer_connection_t *pc, void *data)
{
    lua_State                          *L;
    ngx_int_t                           rc;
    ngx_http_request_t                 *r;
    ngx_http_lua_ctx_t                 *ctx;
    ngx_http_lua_srv_conf_t            *lscf;
    ngx_http_lua_main_conf_t           *lmcf;
    ngx_http_lua_balancer_peer_data_t  *bp = data;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
                   "lua balancer peer, tries: %ui", pc->tries);

    lscf = bp->conf;

    r = bp->request;

    ngx_http_lua_assert(lscf->balancer.handler && r);

    ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);

    if (ctx == NULL) {
        ctx = ngx_http_lua_create_ctx(r);
        if (ctx == NULL) {
            return NGX_ERROR;
        }

        L = ngx_http_lua_get_lua_vm(r, ctx);

    } else {
        L = ngx_http_lua_get_lua_vm(r, ctx);

        dd("reset ctx");
        ngx_http_lua_reset_ctx(r, L, ctx);
    }

    ctx->context = NGX_HTTP_LUA_CONTEXT_BALANCER;

    bp->sockaddr = NULL;
    bp->socklen = 0;
    bp->more_tries = 0;
    bp->total_tries++;

    lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module);

    /* balancer_by_lua does not support yielding and
     * there cannot be any conflicts among concurrent requests,
     * thus it is safe to store the peer data in the main conf.
     */
    lmcf->balancer_peer_data = bp;   //将lua模块管理的rrp节点信息赋给主配置文件的balancer_peer_data

    rc = lscf->balancer.handler(r, lscf, L);

    if (rc == NGX_ERROR) {
        return NGX_ERROR;
    }

    if (ctx->exited && ctx->exit_code != NGX_OK) {
        rc = ctx->exit_code;
        if (rc == NGX_ERROR
            || rc == NGX_BUSY
            || rc == NGX_DECLINED
#ifdef HAVE_BALANCER_STATUS_CODE_PATCH
            || rc >= NGX_HTTP_SPECIAL_RESPONSE
#endif
        ) {
            return rc;
        }

        if (rc > NGX_OK) {
            return NGX_ERROR;
        }
    }

    if (bp->sockaddr && bp->socklen) {
        pc->sockaddr = bp->sockaddr;
        pc->socklen = bp->socklen;
        pc->cached = 0;
        pc->connection = NULL;
        pc->name = bp->host;

        bp->rrp.peers->single = 0;

        if (bp->more_tries) {  // 这里很关键,再多尝试的次数,通过set_more_tries api设置,后边也会单独介绍这个api。
            r->upstream->peer.tries += bp->more_tries; // nginx负载均衡时如果没有指定负载均衡算法默认使用rr,会依次尝试upstream中的全部上游节点,r->upstream->peer.tries 用来控制这个尝试次数的,每次尝试时自减,初始值为upstream中配置的上游节点的数量,上限不能超过proxy_next_upstream_tries中指定的数量,为0时代表已经没有可以再尝试的节点或者已经达到proxy_next_upstream_tries设置的上限,此时会返回502错误。
        }

        dd("tries: %d", (int) r->upstream->peer.tries);

        return NGX_OK;
    }

    // rrp struct已经准备好,交还给nginx原本的upstream处理流程
    return ngx_http_upstream_get_round_robin_peer(pc, &bp->rrp);
}

到此对nginx lua模块的负载均衡原理应该已经有个了解了,通过替换peer.init_upstream、peer.init、peer.get、peer.free逻辑修改ngx_http_upstream_rr_peer_data_t 完成。

接下来看再分析下几个比较重要的lua api,lua api其实是通过ffi机制调用对应的C函数实现的。

set_current_peer

syntax: ok, err = balancer.set_current_peer(host, port)

context: *balancer_by_lua**

Sets the peer address (host and port) for the current backend query (which may be a retry).

Domain names in host do not make sense. You need to use OpenResty libraries like lua-resty-dns to obtain IP address(es) from all the domain names before entering the balancer_by_lua* handler (for example, you can perform DNS lookups in an earlier phase like access_by_lua* and pass the results to the balancer_by_lua* handler via ngx.ctx.

设置当前上游节点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
int
ngx_http_lua_ffi_balancer_set_current_peer(ngx_http_request_t *r,
    const u_char *addr, size_t addr_len, int port, char **err)
{
    ngx_url_t              url;
    ngx_http_lua_ctx_t    *ctx;
    ngx_http_upstream_t   *u;

    ngx_http_lua_main_conf_t           *lmcf;
    ngx_http_lua_balancer_peer_data_t  *bp;

    if (r == NULL) {
        *err = "no request found";
        return NGX_ERROR;
    }

    u = r->upstream;

    if (u == NULL) {
        *err = "no upstream found";
        return NGX_ERROR;
    }

    ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
    if (ctx == NULL) {
        *err = "no ctx found";
        return NGX_ERROR;
    }

    if ((ctx->context & NGX_HTTP_LUA_CONTEXT_BALANCER) == 0) {
        *err = "API disabled in the current context";
        return NGX_ERROR;
    }

    lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module);

    /* we cannot read r->upstream->peer.data here directly because
     * it could be overridden by other modules like
     * ngx_http_upstream_keepalive_module.
     */
    bp = lmcf->balancer_peer_data; // 上游节点
    if (bp == NULL) {
        *err = "no upstream peer data found";
        return NGX_ERROR;
    }

    ngx_memzero(&url, sizeof(ngx_url_t));

    url.url.data = ngx_palloc(r->pool, addr_len);
    if (url.url.data == NULL) {
        *err = "no memory";
        return NGX_ERROR;
    }

    ngx_memcpy(url.url.data, addr, addr_len);

    url.url.len = addr_len;
    url.default_port = (in_port_t) port;
    url.uri_part = 0;
    url.no_resolve = 1;

    if (ngx_parse_url(r->pool, &url) != NGX_OK) {
        if (url.err) {
            *err = url.err;
        }

        return NGX_ERROR;
    }

    // 将节点赋给bp,供后续peer.get时用
    if (url.addrs && url.addrs[0].sockaddr) {
        bp->sockaddr = url.addrs[0].sockaddr;
        bp->socklen = url.addrs[0].socklen;
        bp->host = &url.addrs[0].name;

    } else {
        *err = "no host allowed";
        return NGX_ERROR;
    }

    return NGX_OK;
}

set_more_tries

syntax: ok, err = balancer.set_more_tries(count)

context: *balancer_by_lua**

Sets the tries performed when the current attempt (which may be a retry) fails (as determined by directives like proxy_next_upstream, depending on what particular nginx uptream module you are currently using). Note that the current attempt is excluded in the count number set here.

Please note that, the total number of tries in a single downstream request cannot exceed the hard limit configured by directives like proxy_next_upstream_tries, depending on what concrete nginx upstream module you are using. When exceeding this limit, the count value will get reduced to meet the limit and the second return value will be the string "reduced tries due to limit", which is a warning, while the first return value is still a true value.

官方这个解释有点不好理解,特别是*Note that the current attempt is excluded in the count number set here*一句,如果不看源码单就从字面意思话可能会理解成修改可以尝试的数量为count,其实这是不对的,应该是在现有基础上再多尝试count次才对,下边源码会详细解释为什么。

后边一段好理解这个值不能超过proxy_next_upstream_tries设置的值,即最多能尝试proxy_next_upstream_tries次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
int
ngx_http_lua_ffi_balancer_set_more_tries(ngx_http_request_t *r,
    int count, char **err)
{
#if (nginx_version >= 1007005)
    ngx_uint_t             max_tries, total;
#endif
    ngx_http_lua_ctx_t    *ctx;
    ngx_http_upstream_t   *u;

    ngx_http_lua_main_conf_t           *lmcf;
    ngx_http_lua_balancer_peer_data_t  *bp;

    if (r == NULL) {
        *err = "no request found";
        return NGX_ERROR;
    }

    u = r->upstream;

    if (u == NULL) {
        *err = "no upstream found";
        return NGX_ERROR;
    }

    ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
    if (ctx == NULL) {
        *err = "no ctx found";
        return NGX_ERROR;
    }

    if ((ctx->context & NGX_HTTP_LUA_CONTEXT_BALANCER) == 0) {
        *err = "API disabled in the current context";
        return NGX_ERROR;
    }

    lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module);

    bp = lmcf->balancer_peer_data;
    if (bp == NULL) {
        *err = "no upstream peer data found";
        return NGX_ERROR;
    }

#if (nginx_version >= 1007005)
    max_tries = r->upstream->conf->next_upstream_tries; // 配置文件proxy_next_upstream_tries 指令指定的数量
    total = bp->total_tries + r->upstream->peer.tries - 1; // 当前配置下可以尝试的总数。 total_tries每次尝试前自增;peer.tries 剩余还未尝试节点的数量,另外因为total_tries先自增所以需要再减掉1

    if (max_tries && total + count > max_tries) { // 再多尝试count次话超过了proxy_next_upstream_tries 指定的数量
        count = max_tries - total;
        *err = "reduced tries due to limit";

    } else {
        *err = NULL;
    }
#else
    *err = NULL;
#endif

    bp->more_tries = count;  // 再多尝试count次也不会超过proxy_next_upstream_tries 指定的值
    return NGX_OK;
}

从上边代码注释中可以看出set_more_tries的作用指定节点连接失败进行尝试时(即ngx_http_upstream_next阶段)需要再多尝试count次

1
r->upstream->peer.tries += bp->more_tries;

而不是r->upstream->peer.tries = bp->more_tries,两者区别还是蛮大的,实际应用中要注意且一定要结合proxy_next_upstream_tries使用,不然就会出现我在Nginx Upstream流程分析中遇到的无限尝试问题。

参考

https://github.com/openresty/lua-resty-core/blob/master/lib/ngx/balancer.md

https://www.qlee.in/openresty/2017/02/26/nginx-lua-coroutine-scheduler-1/