电竞比分网-中国电竞赛事及体育赛事平台

分享

nginx中upstream的設(shè)計和實(shí)現(xiàn)(二)

 enrol 2012-02-19

nginx中upstream的設(shè)計和實(shí)現(xiàn)(二)

WP Greet Box icon
Hello there! If you are new here, you might want to subscribe to the RSS feed for updates on this topic.

原創(chuàng)文章,轉(zhuǎn)載請注明: 轉(zhuǎn)載自pagefault

本文鏈接地址: nginx中upstream的設(shè)計和實(shí)現(xiàn)(二)

這次主要來看upstream的幾個相關(guān)的hook函數(shù)。

首先要知道,對于upstream,同時有兩個連接,一個時client和nginx,一個是nginx和upstream,這個時候就會有兩個回調(diào),然后上篇blog中,我們能看到在upstream中,會改變read_event_handler和write_event_handler,不過這里有三個條件,分別是
1 沒有使用cache,
2 不忽略client的提前終止
3 不是post_action

1
2
3
4
5
6
//條件賦值
    if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
//然后給讀寫handler賦值
        r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
        r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
    }


然后我們來看這個兩個函數(shù),這兩個都會調(diào)用ngx_http_upstream_check_broken_connection,因此我們就先來詳細(xì)分析這個函數(shù)。

這個函數(shù)主要是用來檢測client的連接是否完好。因此它使用了MSG_PEEK這個參數(shù),也就是預(yù)讀,然后通過recv的返回值來判斷是否連接已經(jīng)斷開。

這里的代碼分為兩部分,第一部分是本身連接在進(jìn)入這個回調(diào)函數(shù)之前連接都已經(jīng)有錯誤了,這個時候如果是水平觸發(fā),則刪除事件,然后finalize這個upstream(沒有cache’的情況下),否則就直接finalize這個upstream。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    c = r->connection;
    u = r->upstream;
//如果連接已經(jīng)出現(xiàn)錯誤。
    if (c->error) {
//如果是水平觸發(fā)
        if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) {
 
            event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT;
//刪除事件
            if (ngx_del_event(ev, event, 0) != NGX_OK) {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
        }
 
        if (!u->cacheable) {
//清理upstream request
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_CLIENT_CLOSED_REQUEST);
        }
 
        return;
    }

緊接著就是第二部分,這部分的工作就是預(yù)讀?。眰€字節(jié),然后來判斷是否連接已經(jīng)被client斷掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//讀?。眰€字節(jié)
    n = recv(c->fd, buf, 1, MSG_PEEK);
 
    err = ngx_socket_errno;
 
    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, err,
                   "http upstream recv(): %d", n);
 
    if (ev->write && (n >= 0 || err == NGX_EAGAIN)) {
        return;
    }
//如果水平觸發(fā)則刪除事件
    if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) {
 
        event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT;
 
        if (ngx_del_event(ev, event, 0) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }
//如果還有數(shù)據(jù),則直接返回
    if (n > 0) {
        return;
    }
 
    if (n == -1) {
        if (err == NGX_EAGAIN) {
            return;
        }
 
        ev->error = 1;
 
    } else { /* n == 0 */
        err = 0;
    }
 
//到達(dá)這里說明有錯誤產(chǎn)生了
    ev->eof = 1;
//設(shè)置錯誤,可以看到這個值在函數(shù)一開始有檢測.
    c->error = 1;
//如果沒有cache,則finalize upstream request
    if (!u->cacheable && u->peer.connection) {
        ngx_log_error(NGX_LOG_INFO, ev->log, err,
                      "client closed prematurely connection, "
                      "so upstream connection is closed too");
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_CLIENT_CLOSED_REQUEST);
        return;
    }
 
    ngx_log_error(NGX_LOG_INFO, ev->log, err,
                  "client closed prematurely connection");
//如果有cache,并且后端的upstream還在處理,則此時繼續(xù)處理upstream,忽略對端的錯誤.
    if (u->peer.connection == NULL) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_CLIENT_CLOSED_REQUEST);
    }

然后我們來看nginx如何連接后端的upstream,在上篇blog的結(jié)束的時候,我們看到最終會調(diào)用ngx_http_upstream_connect來進(jìn)入連接upstream的處理,因此我們來詳細(xì)分析這個函數(shù)以及相關(guān)的函數(shù)。

函數(shù)一開始是初始化請求開始事件一些參數(shù)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
..........................................................
//取得upstream的狀態(tài)
    u->state = ngx_array_push(r->upstream_states);
    if (u->state == NULL) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }
 
    ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
 
    tp = ngx_timeofday();
//初始化時間
    u->state->response_sec = tp->sec;
    u->state->response_msec = tp->msec;

然后是調(diào)用ngx_event_connect_peer開始連接后端upstream.并且對返回值進(jìn)行處理,等下會詳細(xì)分析ngx_event_connect_peer這個函數(shù).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//連接后端
    rc = ngx_event_connect_peer(&u->peer);
 
    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http upstream connect: %i", rc);
 
    if (rc == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }
//這個是很關(guān)鍵的一個結(jié)構(gòu)peer,后面的blog會詳細(xì)分析
    u->state->peer = u->peer.name;
 
    if (rc == NGX_BUSY) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
        return;
    }
 
    if (rc == NGX_DECLINED) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

當(dāng)返回值為NGX_OK或者NGX_AGAIN的話,就說明連接成功或者暫時異步的連接還沒成功,所以需要掛載upstream端的回調(diào)函數(shù).這里要注意就是NGX_AGAIN的情況,因?yàn)槭钱惒降腸onnect,所以可能會連接不成功。所以如果返回NGX_AGAIN的話,需要掛載寫函數(shù).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    /* rc == NGX_OK || rc == NGX_AGAIN */
 
    c = u->peer.connection;
 
    c->data = r;
 
    c->write->handler = ngx_http_upstream_handler;
    c->read->handler = ngx_http_upstream_handler;
//開始掛載回調(diào)函數(shù),一個是讀,一個是寫。
    u->write_event_handler = ngx_http_upstream_send_request_handler;
    u->read_event_handler = ngx_http_upstream_process_header;
 
    c->sendfile &= r->connection->sendfile;
    u->output.sendfile = c->sendfile;
 
    c->pool = r->pool;
    c->log = r->connection->log;
    c->read->log = c->log;
    c->write->log = c->log;
 
    /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
 
    u->writer.out = NULL;
    u->writer.last = &u->writer.out;
    u->writer.connection = c;
    u->writer.limit = 0;

然后時對request_body的一些處理以及如果request_sent已經(jīng)設(shè)置,也就是這個upstream已經(jīng)發(fā)送過一部分?jǐn)?shù)據(jù)了,此時需要重新初始化upstream.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    if (u->request_sent) {
//重新初始化upstream
        if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }
//如果request_body存在的話,保存request_body
    if (r->request_body
        && r->request_body->buf
        && r->request_body->temp_file
        && r == r->main)
    {
        /*
         * the r->request_body->buf can be reused for one request only,
         * the subrequests should allocate their own temporay bufs
         */
 
        u->output.free = ngx_alloc_chain_link(r->pool);
        if (u->output.free == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
//保存到output
        u->output.free->buf = r->request_body->buf;
        u->output.free->next = NULL;
        u->output.allocated = 1;
//重置request_body
        r->request_body->buf->pos = r->request_body->buf->start;
        r->request_body->buf->last = r->request_body->buf->start;
        r->request_body->buf->tag = u->output.tag;
    }

最后則是先判斷rc的返回值,如果是NGX_AGAIN,則說明連接沒有返回,則設(shè)置定時器,然后返回,否則說明連接成功,這時就需要發(fā)送請求到后端。

1
2
3
4
5
6
7
    if (rc == NGX_AGAIN) {
//添加定時器
        ngx_add_timer(c->write, u->conf->connect_timeout);
        return;
    }
 
    ngx_http_upstream_send_request(r, u);

緊接著我們來看最后的兩個函數(shù),分別是上面的ngx_event_connect_peer和ngx_http_upstream_send_request,我們來一個個看。

先來看ngx_event_connect_peer。它主要是用來連接后端,函數(shù)比較長,一部分一部分來看。

下面這部分主要是建立socket,然后設(shè)置屬性,從連接池取出來connection.這里后面的一部分和我們前面client請求上來之后,我們初始化connect類似.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//取得我們將要發(fā)送的upstream對端
    rc = pc->get(pc, pc->data);
    if (rc != NGX_OK) {
        return rc;
    }
//新建socket
    s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0);
 
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, "socket %d", s);
 
    if (s == -1) {
        ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                      ngx_socket_n " failed");
        return NGX_ERROR;
    }
 
//取得連接
    c = ngx_get_connection(s, pc->log);
 
    if (c == NULL) {
        if (ngx_close_socket(s) == -1) {
            ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                          ngx_close_socket_n "failed");
        }
 
        return NGX_ERROR;
    }
//設(shè)置rcvbuf的大小
    if (pc->rcvbuf) {
        if (setsockopt(s, SOL_SOCKET, SO_RCVBUF,
                       (const void *) &pc->rcvbuf, sizeof(int)) == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                          "setsockopt(SO_RCVBUF) failed");
            goto failed;
        }
    }
//設(shè)置非阻塞
    if (ngx_nonblocking(s) == -1) {
        ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                      ngx_nonblocking_n " failed");
 
        goto failed;
    }
 
    if (pc->local) {
        if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
            ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,
                          "bind(%V) failed", &pc->local->name);
 
            goto failed;
        }
    }
//開始掛載對應(yīng)的讀寫函數(shù).
    c->recv = ngx_recv;
    c->send = ngx_send;
    c->recv_chain = ngx_recv_chain;
    c->send_chain = ngx_send_chain;
 
    c->sendfile = 1;
 
    c->log_error = pc->log_error;
 
    if (pc->sockaddr->sa_family != AF_INET) {
        c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
        c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
 
#if (NGX_SOLARIS)
        /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
        c->sendfile = 0;
#endif
    }
 
    rev = c->read;
    wev = c->write;
 
    rev->log = pc->log;
    wev->log = pc->log;
 
    pc->connection = c;
 
    c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
 
#if (NGX_THREADS)
 
    /* TODO: lock event when call completion handler */
 
    rev->lock = pc->lock;
    wev->lock = pc->lock;
    rev->own_lock = &c->lock;
    wev->own_lock = &c->lock;
 
#endif
 
    if (ngx_add_conn) {
//添加讀寫事件
        if (ngx_add_conn(c) == NGX_ERROR) {
            goto failed;
        }
    }

等socket設(shè)置完畢,nginx就開始連接后端的upstream,這段代碼可以學(xué)習(xí)一個好的代碼是如何處理錯誤的,

下面這段主要是處理當(dāng)返回值為-1,并且err不等于NGX_EINPROGRESS的時候,而NGX_EINPROGRESS表示非阻塞的socket,然后connect,然后連接還沒有完成,可是提前返回,就回設(shè)置這個errno.這個error不算出錯,因此需要過濾掉.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    rc = connect(s, pc->sockaddr, pc->socklen);
 
    if (rc == -1) {
        err = ngx_socket_errno;
 
//判斷錯誤號
        if (err != NGX_EINPROGRESS
#if (NGX_WIN32)
            /* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */
            && err != NGX_EAGAIN
#endif
            )
        {
            if (err == NGX_ECONNREFUSED
#if (NGX_LINUX)
                /*
                 * Linux returns EAGAIN instead of ECONNREFUSED
                 * for unix sockets if listen queue is full
                 */
                || err == NGX_EAGAIN
#endif
                || err == NGX_ECONNRESET
                || err == NGX_ENETDOWN
                || err == NGX_ENETUNREACH
                || err == NGX_EHOSTDOWN
                || err == NGX_EHOSTUNREACH)
            {
                level = NGX_LOG_ERR;
 
            } else {
                level = NGX_LOG_CRIT;
            }
 
            ngx_log_error(level, c->log, err, "connect() to %V failed",
                          pc->name);
//返回declined
            return NGX_DECLINED;
        }
    }

然后就是下面的部門就是處理連接成功和錯誤號為NGX_EINPROGRESS的情況,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//如果當(dāng)前的事件模型支持add_conn,則事件在開始已經(jīng)加好了,因此如果rc==-1則直接返回
    if (ngx_add_conn) {
        if (rc == -1) {
 
            /* NGX_EINPROGRESS */
 
            return NGX_AGAIN;
        }
 
        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
 
        wev->ready = 1;
 
        return NGX_OK;
    }
..............................................
//添加可讀事件
    if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {
        goto failed;
    }
 
    if (rc == -1) {
 
        /* NGX_EINPROGRESS */
//如果錯誤號是 EINPROGRES 添加可寫事件
        if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) {
            goto failed;
        }
 
        return NGX_AGAIN;
    }
 
    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
 
    wev->ready = 1;
 
    return NGX_OK;
..............................................

最后我們來看下ngx_http_upstream_send_request的實(shí)現(xiàn),這個函數(shù)是用來發(fā)送數(shù)據(jù)到后端的upstream,然后這里有一個需要注意的地方,那就是在linux下當(dāng)非阻塞的connect,然后沒有連接完成,如果掛載寫事件,此時如果寫事件上報上來,并不代表連接成功,此時還需要調(diào)用getsockopt來判斷SO_ERROR,如果沒有錯誤才能保證連接成功。

SOL_SOCKET
to determine whether connect() completed successfully (SO_ERROR is zero) or unsuccessfully (SO_ERROR is one of the usual error codes listed
here, explaining the reason for the failure).

這里我看了下內(nèi)核的代碼,就是如果連接失敗,比如對端不可達(dá),內(nèi)核會設(shè)置sock->sk_soft_err,而在tcp_poll中只會檢測sk_err , 對應(yīng)的SO_ERROR會檢測這兩個錯誤。在內(nèi)核里面的注釋是這樣子的

* @sk_err: last error
* @sk_err_soft: errors that don’t cause failure but are the cause of a
* persistent failure not just ‘timed out’

這個按照我的理解,內(nèi)核里面的sk_err 表示4層的錯誤,而sk_err_soft下層的錯誤.

在nginx中是在ngx_http_upstream_test_connect中對連接是否斷開進(jìn)行判斷的(調(diào)用getsockopt).

然后發(fā)送數(shù)據(jù)則是調(diào)用ngx_output_chain,不過這里我們知道在ngx_output_chain中會依次調(diào)用filter鏈,可是upstream明顯不需要調(diào)用filter鏈,那么nginx是怎么做的呢,是這樣子的,在upstream的初始化的時候,已經(jīng)講u->output.output_filter改成ngx_chain_writer了:

1
u->output.output_filter = ngx_chain_writer;

最后就是一些對錯誤的處理,我們來看代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;
 
    c = u->peer.connection;
 
    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream send request");
//如果test connect失敗,則說明連接失敗,于是跳到下一個upstream,然后返回
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }
 
    c->log->action = "sending request to upstream";
//發(fā)送數(shù)據(jù),這里的u->output.output_filter已經(jīng)被修改過了
    rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
 
    u->request_sent = 1;
........................................................
//和request的處理類似,如果again,則說明數(shù)據(jù)沒有發(fā)送完畢,此時掛載寫事件.
    if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->send_timeout);
 
        if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
 
        return;
    }
 
    /* rc == NGX_OK */
//設(shè)置tcp_cork,遠(yuǎn)離和前面的keepalive部分的處理類似
    if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
        if (ngx_tcp_push(c->fd) == NGX_ERROR) {
            ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
                          ngx_tcp_push_n " failed");
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
 
        c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
    }
 
    ngx_add_timer(c->read, u->conf->read_timeout);
 
#if 1
//如果讀也可以了,則開始解析頭
    if (c->read->ready) {
 
        /* post aio operation */
 
        /*
         * TODO comment
         * although we can post aio operation just in the end
         * of ngx_http_upstream_connect() CHECK IT !!!
         * it's better to do here because we postpone header buffer allocation
         */
 
        ngx_http_upstream_process_header(r, u);
        return;
    }
#endif
...........................................
}

在下一篇blog里面,我會詳細(xì)的分析nginx對后端來的數(shù)據(jù)如何解析以及如何發(fā)送數(shù)據(jù)到client.

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多