原創(chuàng)文章,轉(zhuǎn)載請注明: 轉(zhuǎn)載自pagefault
這次主要來看upstream的幾個相關(guān)的hook函數(shù)。
首先要知道,對于upstream,同時有兩個連接,一個時client和nginx,一個是nginx和upstream,這個時候就會有兩個回調(diào),然后上篇blog中,我們能看到在upstream中,會改變read_event_handler和write_event_handler,不過這里有三個條件,分別是
1 沒有使用cache,
2 不忽略client的提前終止
3 不是post_action
1 2 3 4 5 6 | //條件賦值 if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {//然后給讀寫handler賦值 r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->write_event_handler = ngx_http_upstream_wr_check_broken_connection; } |
然后我們來看這個兩個函數(shù),這兩個都會調(diào)用ngx_http_upstream_check_broken_connection,因此我們就先來詳細(xì)分析這個函數(shù)。
這個函數(shù)主要是用來檢測client的連接是否完好。因此它使用了MSG_PEEK這個參數(shù),也就是預(yù)讀,然后通過recv的返回值來判斷是否連接已經(jīng)斷開。
這里的代碼分為兩部分,第一部分是本身連接在進(jìn)入這個回調(diào)函數(shù)之前連接都已經(jīng)有錯誤了,這個時候如果是水平觸發(fā),則刪除事件,然后finalize這個upstream(沒有cache’的情況下),否則就直接finalize這個upstream。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | c = r->connection; u = r->upstream;//如果連接已經(jīng)出現(xiàn)錯誤。 if (c->error) {//如果是水平觸發(fā) if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) { event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT;//刪除事件 if (ngx_del_event(ev, event, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } if (!u->cacheable) {//清理upstream request ngx_http_upstream_finalize_request(r, u, NGX_HTTP_CLIENT_CLOSED_REQUEST); } return; } |
緊接著就是第二部分,這部分的工作就是預(yù)讀?。眰€字節(jié),然后來判斷是否連接已經(jīng)被client斷掉。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | //讀?。眰€字節(jié) n = recv(c->fd, buf, 1, MSG_PEEK); err = ngx_socket_errno; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, err, "http upstream recv(): %d", n); if (ev->write && (n >= 0 || err == NGX_EAGAIN)) { return; }//如果水平觸發(fā)則刪除事件 if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) { event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT; if (ngx_del_event(ev, event, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } }//如果還有數(shù)據(jù),則直接返回 if (n > 0) { return; } if (n == -1) { if (err == NGX_EAGAIN) { return; } ev->error = 1; } else { /* n == 0 */ err = 0; }//到達(dá)這里說明有錯誤產(chǎn)生了 ev->eof = 1;//設(shè)置錯誤,可以看到這個值在函數(shù)一開始有檢測. c->error = 1;//如果沒有cache,則finalize upstream request if (!u->cacheable && u->peer.connection) { ngx_log_error(NGX_LOG_INFO, ev->log, err, "client closed prematurely connection, " "so upstream connection is closed too"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_CLIENT_CLOSED_REQUEST); return; } ngx_log_error(NGX_LOG_INFO, ev->log, err, "client closed prematurely connection");//如果有cache,并且后端的upstream還在處理,則此時繼續(xù)處理upstream,忽略對端的錯誤. if (u->peer.connection == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_CLIENT_CLOSED_REQUEST); } |
然后我們來看nginx如何連接后端的upstream,在上篇blog的結(jié)束的時候,我們看到最終會調(diào)用ngx_http_upstream_connect來進(jìn)入連接upstream的處理,因此我們來詳細(xì)分析這個函數(shù)以及相關(guān)的函數(shù)。
函數(shù)一開始是初始化請求開始事件一些參數(shù)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | ..........................................................//取得upstream的狀態(tài) u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); tp = ngx_timeofday();//初始化時間 u->state->response_sec = tp->sec; u->state->response_msec = tp->msec; |
然后是調(diào)用ngx_event_connect_peer開始連接后端upstream.并且對返回值進(jìn)行處理,等下會詳細(xì)分析ngx_event_connect_peer這個函數(shù).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | //連接后端 rc = ngx_event_connect_peer(&u->peer); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream connect: %i", rc); if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; }//這個是很關(guān)鍵的一個結(jié)構(gòu)peer,后面的blog會詳細(xì)分析 u->state->peer = u->peer.name; if (rc == NGX_BUSY) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE); return; } if (rc == NGX_DECLINED) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } |
當(dāng)返回值為NGX_OK或者NGX_AGAIN的話,就說明連接成功或者暫時異步的連接還沒成功,所以需要掛載upstream端的回調(diào)函數(shù).這里要注意就是NGX_AGAIN的情況,因?yàn)槭钱惒降腸onnect,所以可能會連接不成功。所以如果返回NGX_AGAIN的話,需要掛載寫函數(shù).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | /* rc == NGX_OK || rc == NGX_AGAIN */ c = u->peer.connection; c->data = r; c->write->handler = ngx_http_upstream_handler; c->read->handler = ngx_http_upstream_handler;//開始掛載回調(diào)函數(shù),一個是讀,一個是寫。 u->write_event_handler = ngx_http_upstream_send_request_handler; u->read_event_handler = ngx_http_upstream_process_header; c->sendfile &= r->connection->sendfile; u->output.sendfile = c->sendfile; c->pool = r->pool; c->log = r->connection->log; c->read->log = c->log; c->write->log = c->log; /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */ u->writer.out = NULL; u->writer.last = &u->writer.out; u->writer.connection = c; u->writer.limit = 0; |
然后時對request_body的一些處理以及如果request_sent已經(jīng)設(shè)置,也就是這個upstream已經(jīng)發(fā)送過一部分?jǐn)?shù)據(jù)了,此時需要重新初始化upstream.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | if (u->request_sent) {//重新初始化upstream if (ngx_http_upstream_reinit(r, u) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } }//如果request_body存在的話,保存request_body if (r->request_body && r->request_body->buf && r->request_body->temp_file && r == r->main) { /* * the r->request_body->buf can be reused for one request only, * the subrequests should allocate their own temporay bufs */ u->output.free = ngx_alloc_chain_link(r->pool); if (u->output.free == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; }//保存到output u->output.free->buf = r->request_body->buf; u->output.free->next = NULL; u->output.allocated = 1;//重置request_body r->request_body->buf->pos = r->request_body->buf->start; r->request_body->buf->last = r->request_body->buf->start; r->request_body->buf->tag = u->output.tag; } |
最后則是先判斷rc的返回值,如果是NGX_AGAIN,則說明連接沒有返回,則設(shè)置定時器,然后返回,否則說明連接成功,這時就需要發(fā)送請求到后端。
1 2 3 4 5 6 7 | if (rc == NGX_AGAIN) {//添加定時器 ngx_add_timer(c->write, u->conf->connect_timeout); return; } ngx_http_upstream_send_request(r, u); |
緊接著我們來看最后的兩個函數(shù),分別是上面的ngx_event_connect_peer和ngx_http_upstream_send_request,我們來一個個看。
先來看ngx_event_connect_peer。它主要是用來連接后端,函數(shù)比較長,一部分一部分來看。
下面這部分主要是建立socket,然后設(shè)置屬性,從連接池取出來connection.這里后面的一部分和我們前面client請求上來之后,我們初始化connect類似.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | //取得我們將要發(fā)送的upstream對端 rc = pc->get(pc, pc->data); if (rc != NGX_OK) { return rc; }//新建socket s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, "socket %d", s); if (s == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_socket_n " failed"); return NGX_ERROR; }//取得連接 c = ngx_get_connection(s, pc->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_close_socket_n "failed"); } return NGX_ERROR; }//設(shè)置rcvbuf的大小 if (pc->rcvbuf) { if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const void *) &pc->rcvbuf, sizeof(int)) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, "setsockopt(SO_RCVBUF) failed"); goto failed; } }//設(shè)置非阻塞 if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_nonblocking_n " failed"); goto failed; } if (pc->local) { if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) { ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno, "bind(%V) failed", &pc->local->name); goto failed; } }//開始掛載對應(yīng)的讀寫函數(shù). c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->sendfile = 1; c->log_error = pc->log_error; if (pc->sockaddr->sa_family != AF_INET) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;#if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0;#endif } rev = c->read; wev = c->write; rev->log = pc->log; wev->log = pc->log; pc->connection = c; c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);#if (NGX_THREADS) /* TODO: lock event when call completion handler */ rev->lock = pc->lock; wev->lock = pc->lock; rev->own_lock = &c->lock; wev->own_lock = &c->lock;#endif if (ngx_add_conn) {//添加讀寫事件 if (ngx_add_conn(c) == NGX_ERROR) { goto failed; } } |
等socket設(shè)置完畢,nginx就開始連接后端的upstream,這段代碼可以學(xué)習(xí)一個好的代碼是如何處理錯誤的,
下面這段主要是處理當(dāng)返回值為-1,并且err不等于NGX_EINPROGRESS的時候,而NGX_EINPROGRESS表示非阻塞的socket,然后connect,然后連接還沒有完成,可是提前返回,就回設(shè)置這個errno.這個error不算出錯,因此需要過濾掉.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | rc = connect(s, pc->sockaddr, pc->socklen); if (rc == -1) { err = ngx_socket_errno;//判斷錯誤號 if (err != NGX_EINPROGRESS#if (NGX_WIN32) /* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */ && err != NGX_EAGAIN#endif ) { if (err == NGX_ECONNREFUSED#if (NGX_LINUX) /* * Linux returns EAGAIN instead of ECONNREFUSED * for unix sockets if listen queue is full */ || err == NGX_EAGAIN#endif || err == NGX_ECONNRESET || err == NGX_ENETDOWN || err == NGX_ENETUNREACH || err == NGX_EHOSTDOWN || err == NGX_EHOSTUNREACH) { level = NGX_LOG_ERR; } else { level = NGX_LOG_CRIT; } ngx_log_error(level, c->log, err, "connect() to %V failed", pc->name);//返回declined return NGX_DECLINED; } } |
然后就是下面的部門就是處理連接成功和錯誤號為NGX_EINPROGRESS的情況,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | //如果當(dāng)前的事件模型支持add_conn,則事件在開始已經(jīng)加好了,因此如果rc==-1則直接返回 if (ngx_add_conn) { if (rc == -1) { /* NGX_EINPROGRESS */ return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected"); wev->ready = 1; return NGX_OK; }..............................................//添加可讀事件 if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) { goto failed; } if (rc == -1) { /* NGX_EINPROGRESS *///如果錯誤號是 EINPROGRES 添加可寫事件 if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) { goto failed; } return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected"); wev->ready = 1; return NGX_OK;.............................................. |
最后我們來看下ngx_http_upstream_send_request的實(shí)現(xiàn),這個函數(shù)是用來發(fā)送數(shù)據(jù)到后端的upstream,然后這里有一個需要注意的地方,那就是在linux下當(dāng)非阻塞的connect,然后沒有連接完成,如果掛載寫事件,此時如果寫事件上報上來,并不代表連接成功,此時還需要調(diào)用getsockopt來判斷SO_ERROR,如果沒有錯誤才能保證連接成功。
SOL_SOCKET
to determine whether connect() completed successfully (SO_ERROR is zero) or unsuccessfully (SO_ERROR is one of the usual error codes listed
here, explaining the reason for the failure).
這里我看了下內(nèi)核的代碼,就是如果連接失敗,比如對端不可達(dá),內(nèi)核會設(shè)置sock->sk_soft_err,而在tcp_poll中只會檢測sk_err , 對應(yīng)的SO_ERROR會檢測這兩個錯誤。在內(nèi)核里面的注釋是這樣子的
* @sk_err: last error
* @sk_err_soft: errors that don’t cause failure but are the cause of a
* persistent failure not just ‘timed out’
這個按照我的理解,內(nèi)核里面的sk_err 表示4層的錯誤,而sk_err_soft下層的錯誤.
在nginx中是在ngx_http_upstream_test_connect中對連接是否斷開進(jìn)行判斷的(調(diào)用getsockopt).
然后發(fā)送數(shù)據(jù)則是調(diào)用ngx_output_chain,不過這里我們知道在ngx_output_chain中會依次調(diào)用filter鏈,可是upstream明顯不需要調(diào)用filter鏈,那么nginx是怎么做的呢,是這樣子的,在upstream的初始化的時候,已經(jīng)講u->output.output_filter改成ngx_chain_writer了:
1 | u->output.output_filter = ngx_chain_writer; |
最后就是一些對錯誤的處理,我們來看代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | static voidngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u){ ngx_int_t rc; ngx_connection_t *c; c = u->peer.connection; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream send request");//如果test connect失敗,則說明連接失敗,于是跳到下一個upstream,然后返回 if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } c->log->action = "sending request to upstream";//發(fā)送數(shù)據(jù),這里的u->output.output_filter已經(jīng)被修改過了 rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs); u->request_sent = 1;........................................................//和request的處理類似,如果again,則說明數(shù)據(jù)沒有發(fā)送完畢,此時掛載寫事件. if (rc == NGX_AGAIN) { ngx_add_timer(c->write, u->conf->send_timeout); if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } /* rc == NGX_OK *///設(shè)置tcp_cork,遠(yuǎn)離和前面的keepalive部分的處理類似 if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { if (ngx_tcp_push(c->fd) == NGX_ERROR) { ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, ngx_tcp_push_n " failed"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } c->tcp_nopush = NGX_TCP_NOPUSH_UNSET; } ngx_add_timer(c->read, u->conf->read_timeout);#if 1//如果讀也可以了,則開始解析頭 if (c->read->ready) { /* post aio operation */ /* * TODO comment * although we can post aio operation just in the end * of ngx_http_upstream_connect() CHECK IT !!! * it's better to do here because we postpone header buffer allocation */ ngx_http_upstream_process_header(r, u); return; }#endif...........................................} |
在下一篇blog里面,我會詳細(xì)的分析nginx對后端來的數(shù)據(jù)如何解析以及如何發(fā)送數(shù)據(jù)到client.





