| |
| /* |
| * Copyright (C) Igor Sysoev |
| * Copyright (C) Nginx, Inc. |
| */ |
| |
| |
| #include <ngx_config.h> |
| #include <ngx_core.h> |
| #include <ngx_event.h> |
| #include <ngx_event_pipe.h> |
| |
| |
| static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p); |
| static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p); |
| |
| static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p); |
| static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf); |
| static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p); |
| |
| |
| ngx_int_t |
| ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write) |
| { |
| ngx_int_t rc; |
| ngx_uint_t flags; |
| ngx_event_t *rev, *wev; |
| |
| for ( ;; ) { |
| if (do_write) { |
| p->log->action = "sending to client"; |
| |
| rc = ngx_event_pipe_write_to_downstream(p); |
| |
| if (rc == NGX_ABORT) { |
| return NGX_ABORT; |
| } |
| |
| if (rc == NGX_BUSY) { |
| return NGX_OK; |
| } |
| } |
| |
| p->read = 0; |
| p->upstream_blocked = 0; |
| |
| p->log->action = "reading upstream"; |
| |
| if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) { |
| return NGX_ABORT; |
| } |
| |
| if (!p->read && !p->upstream_blocked) { |
| break; |
| } |
| |
| do_write = 1; |
| } |
| |
| if (p->upstream->fd != (ngx_socket_t) -1) { |
| rev = p->upstream->read; |
| |
| flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0; |
| |
| if (ngx_handle_read_event(rev, flags) != NGX_OK) { |
| return NGX_ABORT; |
| } |
| |
| if (!rev->delayed) { |
| if (rev->active && !rev->ready) { |
| ngx_add_timer(rev, p->read_timeout); |
| |
| } else if (rev->timer_set) { |
| ngx_del_timer(rev); |
| } |
| } |
| } |
| |
| if (p->downstream->fd != (ngx_socket_t) -1 |
| && p->downstream->data == p->output_ctx) |
| { |
| wev = p->downstream->write; |
| if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) { |
| return NGX_ABORT; |
| } |
| |
| if (!wev->delayed) { |
| if (wev->active && !wev->ready) { |
| ngx_add_timer(wev, p->send_timeout); |
| |
| } else if (wev->timer_set) { |
| ngx_del_timer(wev); |
| } |
| } |
| } |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_int_t |
| ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) |
| { |
| off_t limit; |
| ssize_t n, size; |
| ngx_int_t rc; |
| ngx_buf_t *b; |
| ngx_msec_t delay; |
| ngx_chain_t *chain, *cl, *ln; |
| |
| if (p->upstream_eof || p->upstream_error || p->upstream_done) { |
| return NGX_OK; |
| } |
| |
| #if (NGX_THREADS) |
| |
| if (p->aio) { |
| ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe read upstream: aio"); |
| return NGX_AGAIN; |
| } |
| |
| if (p->writing) { |
| ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe read upstream: writing"); |
| |
| rc = ngx_event_pipe_write_chain_to_temp_file(p); |
| |
| if (rc != NGX_OK) { |
| return rc; |
| } |
| } |
| |
| #endif |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe read upstream: %d", p->upstream->read->ready); |
| |
| for ( ;; ) { |
| |
| if (p->upstream_eof || p->upstream_error || p->upstream_done) { |
| break; |
| } |
| |
| if (p->preread_bufs == NULL && !p->upstream->read->ready) { |
| break; |
| } |
| |
| if (p->preread_bufs) { |
| |
| /* use the pre-read bufs if they exist */ |
| |
| chain = p->preread_bufs; |
| p->preread_bufs = NULL; |
| n = p->preread_size; |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe preread: %z", n); |
| |
| if (n) { |
| p->read = 1; |
| } |
| |
| } else { |
| |
| #if (NGX_HAVE_KQUEUE) |
| |
| /* |
| * kqueue notifies about the end of file or a pending error. |
| * This test allows not to allocate a buf on these conditions |
| * and not to call c->recv_chain(). |
| */ |
| |
| if (p->upstream->read->available == 0 |
| && p->upstream->read->pending_eof |
| #if (NGX_SSL) |
| && !p->upstream->ssl |
| #endif |
| ) |
| { |
| p->upstream->read->ready = 0; |
| p->upstream->read->eof = 1; |
| p->upstream_eof = 1; |
| p->read = 1; |
| |
| if (p->upstream->read->kq_errno) { |
| p->upstream->read->error = 1; |
| p->upstream_error = 1; |
| p->upstream_eof = 0; |
| |
| ngx_log_error(NGX_LOG_ERR, p->log, |
| p->upstream->read->kq_errno, |
| "kevent() reported that upstream " |
| "closed connection"); |
| } |
| |
| break; |
| } |
| #endif |
| |
| if (p->limit_rate) { |
| if (p->upstream->read->delayed) { |
| break; |
| } |
| |
| limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1) |
| - p->read_length; |
| |
| if (limit <= 0) { |
| p->upstream->read->delayed = 1; |
| delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1); |
| ngx_add_timer(p->upstream->read, delay); |
| break; |
| } |
| |
| } else { |
| limit = 0; |
| } |
| |
| if (p->free_raw_bufs) { |
| |
| /* use the free bufs if they exist */ |
| |
| chain = p->free_raw_bufs; |
| if (p->single_buf) { |
| p->free_raw_bufs = p->free_raw_bufs->next; |
| chain->next = NULL; |
| } else { |
| p->free_raw_bufs = NULL; |
| } |
| |
| } else if (p->allocated < p->bufs.num) { |
| |
| /* allocate a new buf if it's still allowed */ |
| |
| b = ngx_create_temp_buf(p->pool, p->bufs.size); |
| if (b == NULL) { |
| return NGX_ABORT; |
| } |
| |
| p->allocated++; |
| |
| chain = ngx_alloc_chain_link(p->pool); |
| if (chain == NULL) { |
| return NGX_ABORT; |
| } |
| |
| chain->buf = b; |
| chain->next = NULL; |
| |
| } else if (!p->cacheable |
| && p->downstream->data == p->output_ctx |
| && p->downstream->write->ready |
| && !p->downstream->write->delayed) |
| { |
| /* |
| * if the bufs are not needed to be saved in a cache and |
| * a downstream is ready then write the bufs to a downstream |
| */ |
| |
| p->upstream_blocked = 1; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe downstream ready"); |
| |
| break; |
| |
| } else if (p->cacheable |
| || p->temp_file->offset < p->max_temp_file_size) |
| { |
| |
| /* |
| * if it is allowed, then save some bufs from p->in |
| * to a temporary file, and add them to a p->out chain |
| */ |
| |
| rc = ngx_event_pipe_write_chain_to_temp_file(p); |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe temp offset: %O", p->temp_file->offset); |
| |
| if (rc == NGX_BUSY) { |
| break; |
| } |
| |
| if (rc != NGX_OK) { |
| return rc; |
| } |
| |
| chain = p->free_raw_bufs; |
| if (p->single_buf) { |
| p->free_raw_bufs = p->free_raw_bufs->next; |
| chain->next = NULL; |
| } else { |
| p->free_raw_bufs = NULL; |
| } |
| |
| } else { |
| |
| /* there are no bufs to read in */ |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "no pipe bufs to read in"); |
| |
| break; |
| } |
| |
| n = p->upstream->recv_chain(p->upstream, chain, limit); |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe recv chain: %z", n); |
| |
| if (p->free_raw_bufs) { |
| chain->next = p->free_raw_bufs; |
| } |
| p->free_raw_bufs = chain; |
| |
| if (n == NGX_ERROR) { |
| p->upstream_error = 1; |
| break; |
| } |
| |
| if (n == NGX_AGAIN) { |
| if (p->single_buf) { |
| ngx_event_pipe_remove_shadow_links(chain->buf); |
| } |
| |
| break; |
| } |
| |
| p->read = 1; |
| |
| if (n == 0) { |
| p->upstream_eof = 1; |
| break; |
| } |
| } |
| |
| delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0; |
| |
| p->read_length += n; |
| cl = chain; |
| p->free_raw_bufs = NULL; |
| |
| while (cl && n > 0) { |
| |
| ngx_event_pipe_remove_shadow_links(cl->buf); |
| |
| size = cl->buf->end - cl->buf->last; |
| |
| if (n >= size) { |
| cl->buf->last = cl->buf->end; |
| |
| /* STUB */ cl->buf->num = p->num++; |
| |
| if (p->input_filter(p, cl->buf) == NGX_ERROR) { |
| return NGX_ABORT; |
| } |
| |
| n -= size; |
| ln = cl; |
| cl = cl->next; |
| ngx_free_chain(p->pool, ln); |
| |
| } else { |
| cl->buf->last += n; |
| n = 0; |
| } |
| } |
| |
| if (cl) { |
| for (ln = cl; ln->next; ln = ln->next) { /* void */ } |
| |
| ln->next = p->free_raw_bufs; |
| p->free_raw_bufs = cl; |
| } |
| |
| if (delay > 0) { |
| p->upstream->read->delayed = 1; |
| ngx_add_timer(p->upstream->read, delay); |
| break; |
| } |
| } |
| |
| #if (NGX_DEBUG) |
| |
| for (cl = p->busy; cl; cl = cl->next) { |
| ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe buf busy s:%d t:%d f:%d " |
| "%p, pos %p, size: %z " |
| "file: %O, size: %O", |
| (cl->buf->shadow ? 1 : 0), |
| cl->buf->temporary, cl->buf->in_file, |
| cl->buf->start, cl->buf->pos, |
| cl->buf->last - cl->buf->pos, |
| cl->buf->file_pos, |
| cl->buf->file_last - cl->buf->file_pos); |
| } |
| |
| for (cl = p->out; cl; cl = cl->next) { |
| ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe buf out s:%d t:%d f:%d " |
| "%p, pos %p, size: %z " |
| "file: %O, size: %O", |
| (cl->buf->shadow ? 1 : 0), |
| cl->buf->temporary, cl->buf->in_file, |
| cl->buf->start, cl->buf->pos, |
| cl->buf->last - cl->buf->pos, |
| cl->buf->file_pos, |
| cl->buf->file_last - cl->buf->file_pos); |
| } |
| |
| for (cl = p->in; cl; cl = cl->next) { |
| ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe buf in s:%d t:%d f:%d " |
| "%p, pos %p, size: %z " |
| "file: %O, size: %O", |
| (cl->buf->shadow ? 1 : 0), |
| cl->buf->temporary, cl->buf->in_file, |
| cl->buf->start, cl->buf->pos, |
| cl->buf->last - cl->buf->pos, |
| cl->buf->file_pos, |
| cl->buf->file_last - cl->buf->file_pos); |
| } |
| |
| for (cl = p->free_raw_bufs; cl; cl = cl->next) { |
| ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe buf free s:%d t:%d f:%d " |
| "%p, pos %p, size: %z " |
| "file: %O, size: %O", |
| (cl->buf->shadow ? 1 : 0), |
| cl->buf->temporary, cl->buf->in_file, |
| cl->buf->start, cl->buf->pos, |
| cl->buf->last - cl->buf->pos, |
| cl->buf->file_pos, |
| cl->buf->file_last - cl->buf->file_pos); |
| } |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe length: %O", p->length); |
| |
| #endif |
| |
| if (p->free_raw_bufs && p->length != -1) { |
| cl = p->free_raw_bufs; |
| |
| if (cl->buf->last - cl->buf->pos >= p->length) { |
| |
| p->free_raw_bufs = cl->next; |
| |
| /* STUB */ cl->buf->num = p->num++; |
| |
| if (p->input_filter(p, cl->buf) == NGX_ERROR) { |
| return NGX_ABORT; |
| } |
| |
| ngx_free_chain(p->pool, cl); |
| } |
| } |
| |
| if (p->length == 0) { |
| p->upstream_done = 1; |
| p->read = 1; |
| } |
| |
| if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) { |
| |
| /* STUB */ p->free_raw_bufs->buf->num = p->num++; |
| |
| if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) { |
| return NGX_ABORT; |
| } |
| |
| p->free_raw_bufs = p->free_raw_bufs->next; |
| |
| if (p->free_bufs && p->buf_to_file == NULL) { |
| for (cl = p->free_raw_bufs; cl; cl = cl->next) { |
| if (cl->buf->shadow == NULL) { |
| ngx_pfree(p->pool, cl->buf->start); |
| } |
| } |
| } |
| } |
| |
| if (p->cacheable && (p->in || p->buf_to_file)) { |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe write chain"); |
| |
| rc = ngx_event_pipe_write_chain_to_temp_file(p); |
| |
| if (rc != NGX_OK) { |
| return rc; |
| } |
| } |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_int_t |
| ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p) |
| { |
| u_char *prev; |
| size_t bsize; |
| ngx_int_t rc; |
| ngx_uint_t flush, flushed, prev_last_shadow; |
| ngx_chain_t *out, **ll, *cl; |
| ngx_connection_t *downstream; |
| |
| downstream = p->downstream; |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe write downstream: %d", downstream->write->ready); |
| |
| #if (NGX_THREADS) |
| |
| if (p->writing) { |
| rc = ngx_event_pipe_write_chain_to_temp_file(p); |
| |
| if (rc == NGX_ABORT) { |
| return NGX_ABORT; |
| } |
| } |
| |
| #endif |
| |
| flushed = 0; |
| |
| for ( ;; ) { |
| if (p->downstream_error) { |
| return ngx_event_pipe_drain_chains(p); |
| } |
| |
| if (p->upstream_eof || p->upstream_error || p->upstream_done) { |
| |
| /* pass the p->out and p->in chains to the output filter */ |
| |
| for (cl = p->busy; cl; cl = cl->next) { |
| cl->buf->recycled = 0; |
| } |
| |
| if (p->out) { |
| ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe write downstream flush out"); |
| |
| for (cl = p->out; cl; cl = cl->next) { |
| cl->buf->recycled = 0; |
| } |
| |
| rc = p->output_filter(p->output_ctx, p->out); |
| |
| if (rc == NGX_ERROR) { |
| p->downstream_error = 1; |
| return ngx_event_pipe_drain_chains(p); |
| } |
| |
| p->out = NULL; |
| } |
| |
| if (p->writing) { |
| break; |
| } |
| |
| if (p->in) { |
| ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe write downstream flush in"); |
| |
| for (cl = p->in; cl; cl = cl->next) { |
| cl->buf->recycled = 0; |
| } |
| |
| rc = p->output_filter(p->output_ctx, p->in); |
| |
| if (rc == NGX_ERROR) { |
| p->downstream_error = 1; |
| return ngx_event_pipe_drain_chains(p); |
| } |
| |
| p->in = NULL; |
| } |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe write downstream done"); |
| |
| /* TODO: free unused bufs */ |
| |
| p->downstream_done = 1; |
| break; |
| } |
| |
| if (downstream->data != p->output_ctx |
| || !downstream->write->ready |
| || downstream->write->delayed) |
| { |
| break; |
| } |
| |
| /* bsize is the size of the busy recycled bufs */ |
| |
| prev = NULL; |
| bsize = 0; |
| |
| for (cl = p->busy; cl; cl = cl->next) { |
| |
| if (cl->buf->recycled) { |
| if (prev == cl->buf->start) { |
| continue; |
| } |
| |
| bsize += cl->buf->end - cl->buf->start; |
| prev = cl->buf->start; |
| } |
| } |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe write busy: %uz", bsize); |
| |
| out = NULL; |
| |
| if (bsize >= (size_t) p->busy_size) { |
| flush = 1; |
| goto flush; |
| } |
| |
| flush = 0; |
| ll = NULL; |
| prev_last_shadow = 1; |
| |
| for ( ;; ) { |
| if (p->out) { |
| cl = p->out; |
| |
| if (cl->buf->recycled) { |
| ngx_log_error(NGX_LOG_ALERT, p->log, 0, |
| "recycled buffer in pipe out chain"); |
| } |
| |
| p->out = p->out->next; |
| |
| } else if (!p->cacheable && !p->writing && p->in) { |
| cl = p->in; |
| |
| ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe write buf ls:%d %p %z", |
| cl->buf->last_shadow, |
| cl->buf->pos, |
| cl->buf->last - cl->buf->pos); |
| |
| if (cl->buf->recycled && prev_last_shadow) { |
| if (bsize + cl->buf->end - cl->buf->start > p->busy_size) { |
| flush = 1; |
| break; |
| } |
| |
| bsize += cl->buf->end - cl->buf->start; |
| } |
| |
| prev_last_shadow = cl->buf->last_shadow; |
| |
| p->in = p->in->next; |
| |
| } else { |
| break; |
| } |
| |
| cl->next = NULL; |
| |
| if (out) { |
| *ll = cl; |
| } else { |
| out = cl; |
| } |
| ll = &cl->next; |
| } |
| |
| flush: |
| |
| ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe write: out:%p, f:%ui", out, flush); |
| |
| if (out == NULL) { |
| |
| if (!flush) { |
| break; |
| } |
| |
| /* a workaround for AIO */ |
| if (flushed++ > 10) { |
| return NGX_BUSY; |
| } |
| } |
| |
| rc = p->output_filter(p->output_ctx, out); |
| |
| ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag); |
| |
| if (rc == NGX_ERROR) { |
| p->downstream_error = 1; |
| return ngx_event_pipe_drain_chains(p); |
| } |
| |
| for (cl = p->free; cl; cl = cl->next) { |
| |
| if (cl->buf->temp_file) { |
| if (p->cacheable || !p->cyclic_temp_file) { |
| continue; |
| } |
| |
| /* reset p->temp_offset if all bufs had been sent */ |
| |
| if (cl->buf->file_last == p->temp_file->offset) { |
| p->temp_file->offset = 0; |
| } |
| } |
| |
| /* TODO: free buf if p->free_bufs && upstream done */ |
| |
| /* add the free shadow raw buf to p->free_raw_bufs */ |
| |
| if (cl->buf->last_shadow) { |
| if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) { |
| return NGX_ABORT; |
| } |
| |
| cl->buf->last_shadow = 0; |
| } |
| |
| cl->buf->shadow = NULL; |
| } |
| } |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_int_t |
| ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p) |
| { |
| ssize_t size, bsize, n; |
| ngx_buf_t *b; |
| ngx_uint_t prev_last_shadow; |
| ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free; |
| |
| #if (NGX_THREADS) |
| |
| if (p->writing) { |
| |
| if (p->aio) { |
| return NGX_AGAIN; |
| } |
| |
| out = p->writing; |
| p->writing = NULL; |
| |
| n = ngx_write_chain_to_temp_file(p->temp_file, NULL); |
| |
| if (n == NGX_ERROR) { |
| return NGX_ABORT; |
| } |
| |
| goto done; |
| } |
| |
| #endif |
| |
| if (p->buf_to_file) { |
| out = ngx_alloc_chain_link(p->pool); |
| if (out == NULL) { |
| return NGX_ABORT; |
| } |
| |
| out->buf = p->buf_to_file; |
| out->next = p->in; |
| |
| } else { |
| out = p->in; |
| } |
| |
| if (!p->cacheable) { |
| |
| size = 0; |
| cl = out; |
| ll = NULL; |
| prev_last_shadow = 1; |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe offset: %O", p->temp_file->offset); |
| |
| do { |
| bsize = cl->buf->last - cl->buf->pos; |
| |
| ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "pipe buf ls:%d %p, pos %p, size: %z", |
| cl->buf->last_shadow, cl->buf->start, |
| cl->buf->pos, bsize); |
| |
| if (prev_last_shadow |
| && ((size + bsize > p->temp_file_write_size) |
| || (p->temp_file->offset + size + bsize |
| > p->max_temp_file_size))) |
| { |
| break; |
| } |
| |
| prev_last_shadow = cl->buf->last_shadow; |
| |
| size += bsize; |
| ll = &cl->next; |
| cl = cl->next; |
| |
| } while (cl); |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size); |
| |
| if (ll == NULL) { |
| return NGX_BUSY; |
| } |
| |
| if (cl) { |
| p->in = cl; |
| *ll = NULL; |
| |
| } else { |
| p->in = NULL; |
| p->last_in = &p->in; |
| } |
| |
| } else { |
| p->in = NULL; |
| p->last_in = &p->in; |
| } |
| |
| #if (NGX_THREADS) |
| if (p->thread_handler) { |
| p->temp_file->thread_write = 1; |
| p->temp_file->file.thread_task = p->thread_task; |
| p->temp_file->file.thread_handler = p->thread_handler; |
| p->temp_file->file.thread_ctx = p->thread_ctx; |
| } |
| #endif |
| |
| n = ngx_write_chain_to_temp_file(p->temp_file, out); |
| |
| if (n == NGX_ERROR) { |
| return NGX_ABORT; |
| } |
| |
| #if (NGX_THREADS) |
| |
| if (n == NGX_AGAIN) { |
| p->writing = out; |
| p->thread_task = p->temp_file->file.thread_task; |
| return NGX_AGAIN; |
| } |
| |
| done: |
| |
| #endif |
| |
| if (p->buf_to_file) { |
| p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos; |
| n -= p->buf_to_file->last - p->buf_to_file->pos; |
| p->buf_to_file = NULL; |
| out = out->next; |
| } |
| |
| if (n > 0) { |
| /* update previous buffer or add new buffer */ |
| |
| if (p->out) { |
| for (cl = p->out; cl->next; cl = cl->next) { /* void */ } |
| |
| b = cl->buf; |
| |
| if (b->file_last == p->temp_file->offset) { |
| p->temp_file->offset += n; |
| b->file_last = p->temp_file->offset; |
| goto free; |
| } |
| |
| last_out = &cl->next; |
| |
| } else { |
| last_out = &p->out; |
| } |
| |
| cl = ngx_chain_get_free_buf(p->pool, &p->free); |
| if (cl == NULL) { |
| return NGX_ABORT; |
| } |
| |
| b = cl->buf; |
| |
| ngx_memzero(b, sizeof(ngx_buf_t)); |
| |
| b->tag = p->tag; |
| |
| b->file = &p->temp_file->file; |
| b->file_pos = p->temp_file->offset; |
| p->temp_file->offset += n; |
| b->file_last = p->temp_file->offset; |
| |
| b->in_file = 1; |
| b->temp_file = 1; |
| |
| *last_out = cl; |
| } |
| |
| free: |
| |
| for (last_free = &p->free_raw_bufs; |
| *last_free != NULL; |
| last_free = &(*last_free)->next) |
| { |
| /* void */ |
| } |
| |
| for (cl = out; cl; cl = next) { |
| next = cl->next; |
| |
| cl->next = p->free; |
| p->free = cl; |
| |
| b = cl->buf; |
| |
| if (b->last_shadow) { |
| |
| tl = ngx_alloc_chain_link(p->pool); |
| if (tl == NULL) { |
| return NGX_ABORT; |
| } |
| |
| tl->buf = b->shadow; |
| tl->next = NULL; |
| |
| *last_free = tl; |
| last_free = &tl->next; |
| |
| b->shadow->pos = b->shadow->start; |
| b->shadow->last = b->shadow->start; |
| |
| ngx_event_pipe_remove_shadow_links(b->shadow); |
| } |
| } |
| |
| return NGX_OK; |
| } |
| |
| |
| /* the copy input filter */ |
| |
| ngx_int_t |
| ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf) |
| { |
| ngx_buf_t *b; |
| ngx_chain_t *cl; |
| |
| if (buf->pos == buf->last) { |
| return NGX_OK; |
| } |
| |
| if (p->upstream_done) { |
| ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
| "input data after close"); |
| return NGX_OK; |
| } |
| |
| if (p->length == 0) { |
| p->upstream_done = 1; |
| |
| ngx_log_error(NGX_LOG_WARN, p->log, 0, |
| "upstream sent more data than specified in " |
| "\"Content-Length\" header"); |
| |
| return NGX_OK; |
| } |
| |
| cl = ngx_chain_get_free_buf(p->pool, &p->free); |
| if (cl == NULL) { |
| return NGX_ERROR; |
| } |
| |
| b = cl->buf; |
| |
| ngx_memcpy(b, buf, sizeof(ngx_buf_t)); |
| b->shadow = buf; |
| b->tag = p->tag; |
| b->last_shadow = 1; |
| b->recycled = 1; |
| buf->shadow = b; |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num); |
| |
| if (p->in) { |
| *p->last_in = cl; |
| } else { |
| p->in = cl; |
| } |
| p->last_in = &cl->next; |
| |
| if (p->length == -1) { |
| return NGX_OK; |
| } |
| |
| if (b->last - b->pos > p->length) { |
| |
| ngx_log_error(NGX_LOG_WARN, p->log, 0, |
| "upstream sent more data than specified in " |
| "\"Content-Length\" header"); |
| |
| b->last = b->pos + p->length; |
| p->upstream_done = 1; |
| |
| return NGX_OK; |
| } |
| |
| p->length -= b->last - b->pos; |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_inline void |
| ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf) |
| { |
| ngx_buf_t *b, *next; |
| |
| b = buf->shadow; |
| |
| if (b == NULL) { |
| return; |
| } |
| |
| while (!b->last_shadow) { |
| next = b->shadow; |
| |
| b->temporary = 0; |
| b->recycled = 0; |
| |
| b->shadow = NULL; |
| b = next; |
| } |
| |
| b->temporary = 0; |
| b->recycled = 0; |
| b->last_shadow = 0; |
| |
| b->shadow = NULL; |
| |
| buf->shadow = NULL; |
| } |
| |
| |
| ngx_int_t |
| ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b) |
| { |
| ngx_chain_t *cl; |
| |
| cl = ngx_alloc_chain_link(p->pool); |
| if (cl == NULL) { |
| return NGX_ERROR; |
| } |
| |
| if (p->buf_to_file && b->start == p->buf_to_file->start) { |
| b->pos = p->buf_to_file->last; |
| b->last = p->buf_to_file->last; |
| |
| } else { |
| b->pos = b->start; |
| b->last = b->start; |
| } |
| |
| b->shadow = NULL; |
| |
| cl->buf = b; |
| |
| if (p->free_raw_bufs == NULL) { |
| p->free_raw_bufs = cl; |
| cl->next = NULL; |
| |
| return NGX_OK; |
| } |
| |
| if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) { |
| |
| /* add the free buf to the list start */ |
| |
| cl->next = p->free_raw_bufs; |
| p->free_raw_bufs = cl; |
| |
| return NGX_OK; |
| } |
| |
| /* the first free buf is partially filled, thus add the free buf after it */ |
| |
| cl->next = p->free_raw_bufs->next; |
| p->free_raw_bufs->next = cl; |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_int_t |
| ngx_event_pipe_drain_chains(ngx_event_pipe_t *p) |
| { |
| ngx_chain_t *cl, *tl; |
| |
| for ( ;; ) { |
| if (p->busy) { |
| cl = p->busy; |
| p->busy = NULL; |
| |
| } else if (p->out) { |
| cl = p->out; |
| p->out = NULL; |
| |
| } else if (p->in) { |
| cl = p->in; |
| p->in = NULL; |
| |
| } else { |
| return NGX_OK; |
| } |
| |
| while (cl) { |
| if (cl->buf->last_shadow) { |
| if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) { |
| return NGX_ABORT; |
| } |
| |
| cl->buf->last_shadow = 0; |
| } |
| |
| cl->buf->shadow = NULL; |
| tl = cl->next; |
| cl->next = p->free; |
| p->free = cl; |
| cl = tl; |
| } |
| } |
| } |