Upstream: pipe length and input_filter_init in buffered mode.

As long as ngx_event_pipe() has more data read from upstream than specified
in p->length it's passed to input filter even if buffer isn't yet full.  This
allows to process data with known length without relying on connection close
to signal data end.

By default p->length is set to -1 in upstream module, i.e. end of data is
indicated by connection close.  To set it from per-protocol handlers upstream
input_filter_init() now called in buffered mode (as well as in
unbuffered mode).
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
index 6f068fe..802c65e 100644
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -392,8 +392,31 @@
                        cl->buf->file_last - cl->buf->file_pos);
     }
 
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
+                   "pipe length: %O", p->length);
+
 #endif
 
+    if (p->free_raw_bufs && p->length != -1) {
+        cl = p->free_raw_bufs;
+
+        if (cl->buf->last - cl->buf->pos >= p->length) {
+
+            /* STUB */ cl->buf->num = p->num++;
+
+            if (p->input_filter(p, cl->buf) == NGX_ERROR) {
+                 return NGX_ABORT;
+            }
+
+            p->free_raw_bufs = cl->next;
+        }
+    }
+
+    if (p->length == 0) {
+        p->upstream_done = 1;
+        p->read = 1;
+    }
+
     if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
 
         /* STUB */ p->free_raw_bufs->buf->num = p->num++;
@@ -848,6 +871,12 @@
     }
     p->last_in = &cl->next;
 
+    if (p->length == -1) {
+        return NGX_OK;
+    }
+
+    p->length -= b->last - b->pos;
+
     return NGX_OK;
 }
 
diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h
index 00b8acf..2633467 100644
--- a/src/event/ngx_event_pipe.h
+++ b/src/event/ngx_event_pipe.h
@@ -65,6 +65,7 @@
     ssize_t            busy_size;
 
     off_t              read_length;
+    off_t              length;
 
     off_t              max_temp_file_size;
     ssize_t            temp_file_write_size;
diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c
index f3730a0..62009f6 100644
--- a/src/http/ngx_http_upstream.c
+++ b/src/http/ngx_http_upstream.c
@@ -2304,6 +2304,15 @@
     p->send_timeout = clcf->send_timeout;
     p->send_lowat = clcf->send_lowat;
 
+    p->length = -1;
+
+    if (u->input_filter_init
+        && u->input_filter_init(p->input_ctx) != NGX_OK)
+    {
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
     u->read_event_handler = ngx_http_upstream_process_upstream;
     r->write_event_handler = ngx_http_upstream_process_downstream;