nginx-0.0.1-2003-02-06-20:21:13 import
diff --git a/src/event/modules/ngx_devpoll_module.c b/src/event/modules/ngx_devpoll_module.c
index 13971ad..e2e7750 100644
--- a/src/event/modules/ngx_devpoll_module.c
+++ b/src/event/modules/ngx_devpoll_module.c
@@ -16,6 +16,7 @@
#error "/dev/poll is not supported on this platform"
#endif
+static int ngx_devpoll_set_event(ngx_event_t *ev, int event, u_int flags);
/* STUB */
#define DEVPOLL_NCHANGES 512
@@ -137,7 +138,7 @@
}
-int ngx_devpoll_set_event(ngx_event_t *ev, int event, u_int flags)
+static int ngx_devpoll_set_event(ngx_event_t *ev, int event, u_int flags)
{
int n;
ngx_connection_t *c;
@@ -192,7 +193,6 @@
int events, n, i;
ngx_msec_t timer, delta;
ngx_err_t err;
- ngx_event_t *ev;
ngx_connection_t *c;
struct dvpoll dvp;
struct timeval tv;
@@ -233,7 +233,7 @@
nchanges = 0;
- if (timer != INFTIM) {
+ if ((int) timer != INFTIM) {
gettimeofday(&tv, NULL);
delta = tv.tv_sec * 1000 + tv.tv_usec / 1000 - delta;
@@ -305,7 +305,7 @@
}
}
- if (timer != INFTIM) {
+ if ((int) timer != INFTIM) {
ngx_event_expire_timers(delta);
}
diff --git a/src/event/modules/ngx_iocp_module.c b/src/event/modules/ngx_iocp_module.c
new file mode 100644
index 0000000..f474c4c
--- /dev/null
+++ b/src/event/modules/ngx_iocp_module.c
@@ -0,0 +1,137 @@
+
+#include <ngx_config.h>
+
+#include <ngx_core.h>
+#include <ngx_log.h>
+#include <ngx_errno.h>
+#include <ngx_time.h>
+#include <ngx_connection.h>
+#include <ngx_event.h>
+#include <ngx_event_timer.h>
+
+#include <ngx_iocp_module.h>
+
+
+int ngx_iocp_threads = 0;;
+
+
+static HANDLE iocp;
+static ngx_event_t *timer_queue;
+
+
+int ngx_iocp_init(int max_connections, ngx_log_t *log)
+{
+ iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
+ NULL, 0, ngx_iocp_threads);
+
+ if (iocp == NULL) {
+ ngx_log_error(NGX_LOG_EMERG, log, ngx_errno,
+ "CreateIoCompletionPort() failed");
+ return NGX_ERROR;
+ }
+
+ timer_queue = ngx_event_init_timer(log);
+ if (timer_queue == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_event_actions.process = ngx_iocp_process_events;
+
+ ngx_event_flags = NGX_HAVE_AIO_EVENT|NGX_HAVE_IOCP_EVENT;
+
+ return NGX_OK;
+}
+
+
+int ngx_iocp_add_event(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+
+ c = (ngx_connection_t *) ev->data;
+
+ ngx_log_debug(ev->log, "iocp: %d, %08x:%08x" _ c->fd _ ev _ &ev->ovlp);
+
+ if (CreateIoCompletionPort((HANDLE) c->fd, iocp, (DWORD) ev, 0) == NULL) {
+ ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
+ "CreateIoCompletionPort() failed");
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+}
+
+
+int ngx_iocp_process_events(ngx_log_t *log)
+{
+ int rc;
+ size_t bytes;
+ ngx_err_t err;
+ ngx_msec_t timer, delta;
+ ngx_event_t *ev, *e;
+ ngx_event_ovlp_t *ovlp;
+
+ ngx_log_debug(log, "iocp");
+
+ timer = ngx_event_find_timer();
+
+ if (timer) {
+ delta = ngx_msec();
+
+ } else {
+ timer = INFINITE;
+ delta = 0;
+ }
+
+ ngx_log_debug(log, "iocp timer: %d" _ timer);
+
+#if 1
+ rc = GetQueuedCompletionStatus(iocp, &bytes, (LPDWORD) &e,
+ (LPOVERLAPPED *) &ovlp, timer);
+ ngx_log_debug(log, "iocp: %d, %d:%08x:%08x" _ rc _ bytes _ e _ ovlp);
+ if (rc == 0) {
+#else
+ if (GetQueuedCompletionStatus(iocp, &bytes, (LPDWORD) &e,
+ (LPOVERLAPPED *) &ovlp, timer) == 0) {
+#endif
+ err = ngx_errno;
+
+ if (ovlp == NULL) {
+ if (err != WAIT_TIMEOUT) {
+ ngx_log_error(NGX_LOG_ALERT, log, err,
+ "GetQueuedCompletionStatus() failed");
+
+ return NGX_ERROR;
+ }
+
+ } else {
+ ovlp->error = err;
+ }
+ }
+
+ if (timer != INFINITE) {
+ delta = ngx_msec() - delta;
+ }
+
+ if (ovlp) {
+ ev = ovlp->event;
+
+ngx_log_debug(log, "iocp ev: %08x" _ ev);
+
+ if (ev == e) {
+ ev->ready = 1;
+ ev->available = bytes;
+ }
+
+ngx_log_debug(log, "iocp ev: %08x" _ ev->event_handler);
+
+ if (ev->event_handler(ev) == NGX_ERROR) {
+ ev->close_handler(ev);
+ }
+ }
+
+ if (timer != INFINITE) {
+ ngx_event_expire_timers(delta);
+ }
+
+ return NGX_OK;
+}
diff --git a/src/event/modules/ngx_iocp_module.h b/src/event/modules/ngx_iocp_module.h
new file mode 100644
index 0000000..d7f2f51
--- /dev/null
+++ b/src/event/modules/ngx_iocp_module.h
@@ -0,0 +1,15 @@
+#ifndef _NGX_IOCP_MODULE_H_INCLUDED_
+#define _NGX_IOCP_MODULE_H_INCLUDED_
+
+
+#include <ngx_types.h>
+#include <ngx_log.h>
+#include <ngx_event.h>
+
+
+int ngx_iocp_init(int max_connections, ngx_log_t *log);
+int ngx_iocp_add_event(ngx_event_t *ev);
+int ngx_iocp_process_events(ngx_log_t *log);
+
+
+#endif /* _NGX_IOCP_MODULE_H_INCLUDED_ */
diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c
index abf3b6e..da8450e 100644
--- a/src/event/modules/ngx_kqueue_module.c
+++ b/src/event/modules/ngx_kqueue_module.c
@@ -23,7 +23,11 @@
/* should be per-thread */
+#if 1
+int kq;
+#else
static int kq;
+#endif
static struct kevent *change_list, *event_list;
static unsigned int nchanges;
static int nevents;
@@ -63,7 +67,12 @@
ngx_event_actions.process = ngx_kqueue_process_events;
ngx_event_flags = NGX_HAVE_LEVEL_EVENT
- |NGX_HAVE_ONESHOT_EVENT|NGX_HAVE_CLEAR_EVENT;
+ |NGX_HAVE_ONESHOT_EVENT
+#if (HAVE_AIO_EVENT)
+ |NGX_HAVE_AIO_EVENT;
+#else
+ |NGX_HAVE_CLEAR_EVENT;
+#endif
#endif
return NGX_OK;
@@ -221,10 +230,19 @@
for (i = 0; i < events; i++) {
#if (NGX_DEBUG_EVENT)
- ngx_log_debug(log, "kevent: %d: ft:%d f:%08x ff:%08x d:%d ud:%08x" _
- event_list[i].ident _ event_list[i].filter _
- event_list[i].flags _ event_list[i].fflags _
- event_list[i].data _ event_list[i].udata);
+ if (event_list[i].ident > 0x8000000) {
+ ngx_log_debug(log,
+ "kevent: %08x: ft:%d f:%08x ff:%08x d:%d ud:%08x" _
+ event_list[i].ident _ event_list[i].filter _
+ event_list[i].flags _ event_list[i].fflags _
+ event_list[i].data _ event_list[i].udata);
+ } else {
+ ngx_log_debug(log,
+ "kevent: %d: ft:%d f:%08x ff:%08x d:%d ud:%08x" _
+ event_list[i].ident _ event_list[i].filter _
+ event_list[i].flags _ event_list[i].fflags _
+ event_list[i].data _ event_list[i].udata);
+ }
#endif
if (event_list[i].flags & EV_ERROR) {
@@ -243,7 +261,6 @@
case EVFILT_READ:
case EVFILT_WRITE:
- ev->ready = 1;
ev->available = event_list[i].data;
if (event_list[i].flags & EV_EOF) {
@@ -255,12 +272,18 @@
ngx_del_timer(ev);
}
+ /* fall through */
+
+ case EVFILT_AIO:
+ ev->ready = 1;
+
if (ev->event_handler(ev) == NGX_ERROR) {
ev->close_handler(ev);
}
break;
+
default:
ngx_log_error(NGX_LOG_ALERT, log, 0,
"unknown kevent filter %d" _ event_list[i].filter);
diff --git a/src/event/modules/ngx_kqueue_module.h b/src/event/modules/ngx_kqueue_module.h
index c561920..568476e 100644
--- a/src/event/modules/ngx_kqueue_module.h
+++ b/src/event/modules/ngx_kqueue_module.h
@@ -14,4 +14,10 @@
int ngx_kqueue_process_events(ngx_log_t *log);
+#if 1
+extern int kq;
+#endif
+
+
+
#endif /* _NGX_KQUEUE_MODULE_H_INCLUDED_ */
diff --git a/src/event/modules/ngx_poll_module.c b/src/event/modules/ngx_poll_module.c
index e27031c..939340e 100644
--- a/src/event/modules/ngx_poll_module.c
+++ b/src/event/modules/ngx_poll_module.c
@@ -13,7 +13,7 @@
/* should be per-thread */
static struct pollfd *event_list;
-static unsigned int nevents;
+static u_int nevents;
static ngx_event_t **event_index;
static ngx_event_t **ready_index;
@@ -140,7 +140,8 @@
int ngx_poll_process_events(ngx_log_t *log)
{
- int i, ready, nready, found;
+ int ready, found;
+ u_int i, nready;
ngx_msec_t timer, delta;
ngx_err_t err;
ngx_event_t *ev;
@@ -172,7 +173,7 @@
ngx_log_debug(log, "poll ready %d" _ ready);
- if (timer != INFTIM) {
+ if ((int) timer != INFTIM) {
delta = ngx_msec() - delta;
} else {
@@ -256,7 +257,7 @@
ngx_log_error(NGX_LOG_ALERT, log, 0, "poll ready != events");
}
- if (timer != INFTIM) {
+ if ((int) timer != INFTIM) {
ngx_event_expire_timers(delta);
}
diff --git a/src/event/modules/ngx_select_module.c b/src/event/modules/ngx_select_module.c
index fe461c3..50d030c 100644
--- a/src/event/modules/ngx_select_module.c
+++ b/src/event/modules/ngx_select_module.c
@@ -23,7 +23,7 @@
static int max_fd;
#endif
-static int nevents;
+static u_int nevents;
static ngx_event_t **event_index;
static ngx_event_t **ready_index;
@@ -177,8 +177,8 @@
int ngx_select_process_events(ngx_log_t *log)
{
- int ready, found, nready;
- u_int i;
+ int ready, found;
+ u_int i, nready;
ngx_msec_t timer, delta;
ngx_event_t *ev;
ngx_connection_t *c;
diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c
index 63cf999..063385d 100644
--- a/src/event/ngx_event.c
+++ b/src/event/ngx_event.c
@@ -12,16 +12,24 @@
#include <ngx_event_accept.h>
#include <ngx_select_module.h>
+
#if (HAVE_POLL)
#include <ngx_poll_module.h>
#endif
+
#if (HAVE_DEVPOLL)
#include <ngx_devpoll_module.h>
#endif
+
#if (HAVE_KQUEUE)
#include <ngx_kqueue_module.h>
#endif
+#if (HAVE_IOCP)
+#include <ngx_event_acceptex.h>
+#include <ngx_iocp_module.h>
+#endif
+
ngx_connection_t *ngx_connections;
ngx_event_t *ngx_read_events, *ngx_write_events;
@@ -68,7 +76,10 @@
ngx_devpoll_init,
#endif
#if (HAVE_KQUEUE)
- ngx_kqueue_init
+ ngx_kqueue_init,
+#endif
+#if (HAVE_IOCP)
+ ngx_iocp_init
#endif
};
@@ -86,6 +97,10 @@
/* STUB */
int max_connections = 512;
+#if (HAVE_IOCP)
+ ngx_event_type = NGX_IOCP_EVENT;
+#endif
+
if (ngx_init_events(max_connections, log) == NGX_ERROR) {
exit(1);
}
@@ -127,16 +142,38 @@
ngx_memcpy(ev->log, c->log, sizeof(ngx_log_t));
c->read = ev;
ev->data = c;
- ev->event_handler = &ngx_event_accept;
- ev->listening = 1;
ev->index = NGX_INVALID_INDEX;
+#if 0
+ ev->listening = 1;
+#endif
ev->available = 0;
#if (HAVE_DEFERRED_ACCEPT)
ev->deferred_accept = s[i].deferred_accept;
#endif
+
+#if (HAVE_IOCP)
+
+ if (ngx_event_flags & NGX_HAVE_IOCP_EVENT) {
+ ev->event_handler = &ngx_event_acceptex;
+
+ if (ngx_iocp_add_event(ev) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+
+ ngx_event_post_acceptex(&s[i], 1);
+
+ } else {
+ ev->event_handler = &ngx_event_accept;
+ }
+
+#else
+
+ ev->event_handler = &ngx_event_accept;
ngx_add_event(ev, NGX_READ_EVENT, 0);
+
+#endif
}
}
diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h
index a918810..d81be38 100644
--- a/src/event/ngx_event.h
+++ b/src/event/ngx_event.h
@@ -15,6 +15,15 @@
typedef struct ngx_event_s ngx_event_t;
+#if (HAVE_IOCP)
+typedef struct {
+ WSAOVERLAPPED ovlp;
+ ngx_event_t *event;
+ int error;
+} ngx_event_ovlp_t;
+#endif
+
+
struct ngx_event_s {
void *data;
@@ -45,11 +54,11 @@
/* otherwise: */
/* accept: 1 if accept many, 0 otherwise */
- /* flags - int are probably faster on write then bits ??? */
-
unsigned oneshot:1;
+#if 0
unsigned listening:1;
+#endif
unsigned write:1;
unsigned active:1;
@@ -66,11 +75,24 @@
#if (HAVE_DEFERRED_ACCEPT)
unsigned deferred_accept:1;
#endif
+
#if (HAVE_KQUEUE)
unsigned eof:1;
int error;
#endif
+
+#if (HAVE_AIO)
+
+#if (HAVE_IOCP)
+ ngx_event_ovlp_t ovlp;
+#else
+ struct aiocb aiocb;
+#endif
+
+#endif
+
+
#if 0
void *thr_ctx; /* event thread context if $(CC) doesn't
understand __thread declaration
@@ -94,6 +116,10 @@
#if (HAVE_KQUEUE)
NGX_KQUEUE_EVENT,
#endif
+#if (HAVE_IOCP)
+ NGX_IOCP_EVENT,
+#endif
+ NGX_DUMMY_EVENT /* avoid comma at end of enumerator list */
} ngx_event_type_e ;
typedef struct {
@@ -125,6 +151,10 @@
/* No need to add or delete event filters - overlapped, aio_read, aioread */
#define NGX_HAVE_AIO_EVENT 16
+/* Need to add socket or halde only once - i/o completion port.
+ It also requires to set HAVE_AIO_EVENT and NGX_HAVE_AIO_EVENT */
+#define NGX_HAVE_IOCP_EVENT 32
+
/* Event filter is deleted before closing file. Has no meaning
for select, poll, epoll.
@@ -187,43 +217,26 @@
#define ngx_process_events ngx_event_actions.process
#define ngx_add_event ngx_event_actions.add
#define ngx_del_event ngx_event_actions.del
+
#if 0
#define ngx_add_timer ngx_event_actions.timer
#else
#define ngx_add_timer ngx_event_add_timer
#endif
+
+#if (HAVE_IOCP_EVENT)
+#define ngx_event_recv ngx_event_wsarecv
+#elif (HAVE_AIO_EVENT)
+#define ngx_event_recv ngx_event_aio_read
+#else
#define ngx_event_recv ngx_event_recv_core
+#endif
#endif
#define ngx_del_timer ngx_event_del_timer
-#if 0
-ngx_inline static void ngx_del_timer(ngx_event_t *ev)
-{
-#if (NGX_DEBUG_EVENT)
- /* STUB - we can not cast (ngx_connection_t *) here */
- ngx_log_debug(ev->log, "del timer: %d" _ *(int *)(ev->data));
-#endif
-
- if (ev->timer_prev) {
- ev->timer_prev->timer_next = ev->timer_next;
- }
-
- if (ev->timer_next) {
- ev->timer_next->timer_delta += ev->timer_delta;
- ev->timer_next->timer_prev = ev->timer_prev;
- ev->timer_next = NULL;
- }
-
- if (ev->timer_prev) {
- ev->timer_prev = NULL;
- }
-}
-#endif
-
-
extern ngx_event_t *ngx_read_events;
extern ngx_event_t *ngx_write_events;
@@ -236,6 +249,10 @@
#endif
+ssize_t ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size);
+int ngx_event_close_connection(ngx_event_t *ev);
+
+
void ngx_pre_thread(ngx_array_t *ls, ngx_pool_t *pool, ngx_log_t *log);
void ngx_worker(ngx_log_t *log);
diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c
index 14422af..2d64c0d 100644
--- a/src/event/ngx_event_accept.c
+++ b/src/event/ngx_event_accept.c
@@ -31,7 +31,9 @@
ev->ready = 0;
+#if 0
/* DEBUG */ ev->available++;
+#endif
do {
ngx_test_null(pool, ngx_create_pool(ls->pool_size, ev->log), NGX_OK);
@@ -55,13 +57,40 @@
return NGX_OK;
}
-#if !(HAVE_INHERITED_NONBLOCK)
+
+#if (HAVE_INHERITED_NONBLOCK)
+
+#if (HAVE_AIO_EVENT)
+ if ((ngx_event_flags & NGX_HAVE_AIO_EVENT)) {
+ if (ngx_blocking(s) == -1) {
+ ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
+ ngx_blocking_n " %s failed", ls->addr_text.data);
+ return NGX_OK;
+ }
+ }
+#endif
+
+#else /* !HAVE_INHERITED_NONBLOCK */
+
+#if (HAVE_AIO_EVENT)
+ if (!(ngx_event_flags & NGX_HAVE_AIO_EVENT)) {
+ if (ngx_nonblocking(s) == -1) {
+ ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
+ ngx_nonblocking_n " %s failed", ls->addr_text.data);
+ return NGX_OK;
+ }
+ }
+#else
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
ngx_nonblocking_n " %s failed", ls->addr_text.data);
+ return NGX_OK;
}
#endif
+#endif /* HAVE_INHERITED_NONBLOCK */
+
+
rev = &ngx_read_events[s];
wev = &ngx_write_events[s];
c = &ngx_connections[s];
@@ -88,9 +117,15 @@
c->fd = s;
c->unexpected_eof = 1;
wev->write = 1;
- wev->ready = 1;
- wev->timer = rev->timer = 10000;
+#if (HAVE_AIO_EVENT)
+ if (!(ngx_event_flags & NGX_HAVE_AIO_EVENT)) {
+ wev->ready = 1;
+ }
+#endif
+
+ /* STUB ? */ wev->timer = rev->timer = 10000;
+
wev->timer_handler = rev->timer_handler = ngx_event_close_connection;
wev->close_handler = rev->close_handler = ngx_event_close_connection;
diff --git a/src/event/ngx_event_acceptex.c b/src/event/ngx_event_acceptex.c
new file mode 100644
index 0000000..83b3c42
--- /dev/null
+++ b/src/event/ngx_event_acceptex.c
@@ -0,0 +1,150 @@
+
+#include <ngx_config.h>
+
+#include <ngx_core.h>
+#include <ngx_types.h>
+#include <ngx_log.h>
+#include <ngx_listen.h>
+#include <ngx_connection.h>
+#include <ngx_event.h>
+#include <ngx_event_close.h>
+#include <ngx_iocp_module.h>
+
+#include <ngx_event_acceptex.h>
+
+
+
+/* This function should always return NGX_OK even there are some failures
+ because if we return NGX_ERROR then listening socket would be closed */
+
+int ngx_event_acceptex(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+
+ c = (ngx_connection_t *) ev->data;
+
+ if (ev->ovlp.error) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, ev->ovlp.error,
+ "AcceptEx(%s) falied", c->addr_text.data);
+ return NGX_OK;
+ }
+
+ GetAcceptExSockaddrs(c->data, 0,
+ c->socklen + 16, c->socklen + 16,
+ &c->local_sockaddr, &c->local_socklen,
+ &c->sockaddr, &c->socklen);
+
+ ngx_event_post_acceptex(c->listening, 1);
+
+ /* STUB: InterlockedInc() */
+ c->number = ngx_connection_counter++;
+
+ c->handler(c);
+
+ return NGX_OK;
+
+}
+
+
+int ngx_event_post_acceptex(ngx_listen_t *ls, int n)
+{
+ int i;
+ u_int rcvd;
+ ngx_err_t err;
+ ngx_pool_t *pool;
+ ngx_event_t *rev, *wev;
+ ngx_socket_t s;
+ ngx_connection_t *c;
+
+ for (i = 0; i < n; i++) {
+
+ /* TODO: look up reused sockets */
+
+ ngx_log_debug(ls->log, "socket: %x" _ ls->flags);
+
+ s = ngx_socket(ls->family, ls->type, ls->protocol, ls->flags);
+
+ if (s == -1) {
+ ngx_log_error(NGX_LOG_ALERT, ls->log, ngx_socket_errno,
+ ngx_socket_n " for AcceptEx(%s) falied",
+ ls->addr_text.data);
+
+ return NGX_ERROR;
+ }
+
+ ngx_test_null(pool, ngx_create_pool(ls->pool_size, ls->log), NGX_ERROR);
+
+ rev = &ngx_read_events[s];
+ wev = &ngx_write_events[s];
+ c = &ngx_connections[s];
+
+ ngx_memzero(rev, sizeof(ngx_event_t));
+ ngx_memzero(wev, sizeof(ngx_event_t));
+ ngx_memzero(c, sizeof(ngx_connection_t));
+
+ c->pool = pool;
+
+ rev->index = wev->index = NGX_INVALID_INDEX;
+
+ rev->ovlp.event = rev;
+ wev->ovlp.event = wev;
+
+ rev->data = wev->data = c;
+ c->read = rev;
+ c->write = wev;
+
+ c->family = ls->family;
+ c->socklen = ls->socklen;
+ c->addr = ls->addr;
+ c->addr_text_max_len = ls->addr_text_max_len;
+ c->post_accept_timeout = ls->post_accept_timeout;
+
+ c->listening = ls;
+ c->fd = s;
+
+ c->unexpected_eof = 1;
+ wev->write = 1;
+
+ c->handler = ls->handler;
+ rev->event_handler = ngx_event_acceptex;
+
+ wev->timer_handler = rev->timer_handler = ngx_event_close_connection;
+ wev->close_handler = rev->close_handler = ngx_event_close_connection;
+
+ c->ctx = ls->ctx;
+ c->servers = ls->servers;
+
+ ngx_test_null(c->data, ngx_palloc(pool, 2 * (c->socklen + 16)),
+ NGX_ERROR);
+ ngx_test_null(c->local_sockaddr, ngx_palloc(pool, c->socklen),
+ NGX_ERROR);
+ ngx_test_null(c->sockaddr, ngx_palloc(pool, c->socklen),
+ NGX_ERROR);
+
+ ngx_test_null(c->log, ngx_palloc(c->pool, sizeof(ngx_log_t)),
+ NGX_ERROR);
+ ngx_memcpy(c->log, ls->log, sizeof(ngx_log_t));
+ rev->log = wev->log = c->log;
+
+ if (ngx_iocp_add_event(rev) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+
+ if (AcceptEx(ls->fd, s, c->data, 0,
+ c->socklen + 16, c->socklen + 16,
+ &rcvd, (LPOVERLAPPED) &rev->ovlp) == 0) {
+
+ err = ngx_socket_errno;
+ if (err == WSA_IO_PENDING) {
+ return NGX_OK;
+ }
+
+ ngx_log_error(NGX_LOG_ALERT, ls->log, err,
+ "AcceptEx(%s) falied", ls->addr_text.data);
+
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+}
diff --git a/src/event/ngx_event_acceptex.h b/src/event/ngx_event_acceptex.h
new file mode 100644
index 0000000..8d55af1
--- /dev/null
+++ b/src/event/ngx_event_acceptex.h
@@ -0,0 +1,13 @@
+#ifndef _NGX_EVENT_ACCEPTEX_H_INCLUDED_
+#define _NGX_EVENT_ACCEPTEX_H_INCLUDED_
+
+
+#include <ngx_listen.h>
+#include <ngx_event.h>
+
+
+int ngx_event_acceptex(ngx_event_t *ev);
+int ngx_event_post_acceptex(ngx_listen_t *ls, int n);
+
+
+#endif /* _NGX_EVENT_ACCEPTEX_H_INCLUDED_ */
diff --git a/src/event/ngx_event_aio_read.c b/src/event/ngx_event_aio_read.c
new file mode 100644
index 0000000..4561cf4
--- /dev/null
+++ b/src/event/ngx_event_aio_read.c
@@ -0,0 +1,113 @@
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_errno.h>
+#include <ngx_log.h>
+#include <ngx_recv.h>
+#include <ngx_connection.h>
+#include <ngx_event.h>
+
+#if (HAVE_KQUEUE)
+#include <ngx_kqueue_module.h>
+#endif
+
+
+/*
+ The data is ready - 3 syscalls:
+ aio_read(), aio_error(), aio_return()
+ The data is not ready - 4 (kqueue) or 5 syscalls:
+ aio_read(), aio_error(), notifiction,
+ aio_error(), aio_return()
+ aio_cancel(), aio_error()
+*/
+
+ssize_t ngx_event_aio_read(ngx_connection_t *c, char *buf, size_t size)
+{
+ int rc, first, canceled;
+ ngx_event_t *ev;
+
+ ev = c->read;
+
+ canceled = 0;
+
+ if (ev->timedout) {
+ ngx_set_socket_errno(NGX_ETIMEDOUT);
+ ngx_log_error(NGX_LOG_ERR, ev->log, 0, "aio_read() timed out");
+
+ rc = aio_cancel(c->fd, &ev->aiocb);
+ if (rc == -1) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno,
+ "aio_cancel() failed");
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug(ev->log, "aio_cancel: %d" _ rc);
+
+ canceled = 1;
+
+ ev->ready = 1;
+ }
+
+ first = 0;
+
+ if (!ev->ready) {
+ ngx_memzero(&ev->aiocb, sizeof(struct aiocb));
+
+ ev->aiocb.aio_fildes = c->fd;
+ ev->aiocb.aio_buf = buf;
+ ev->aiocb.aio_nbytes = size;
+
+#if (HAVE_KQUEUE)
+ ev->aiocb.aio_sigevent.sigev_notify_kqueue = kq;
+ ev->aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
+ ev->aiocb.aio_sigevent.sigev_value.sigval_ptr = ev;
+#endif
+
+ if (aio_read(&ev->aiocb) == -1) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno,
+ "aio_read() failed");
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug(ev->log, "aio_read: OK");
+
+ ev->active = 1;
+ first = 1;
+ }
+
+ ev->ready = 0;
+
+ rc = aio_error(&ev->aiocb);
+ if (rc == -1) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_error() failed");
+ return NGX_ERROR;
+ }
+
+ if (rc != 0) {
+ if (rc == NGX_EINPROGRESS) {
+ if (!first) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, rc,
+ "aio_read() still in progress");
+ }
+ return NGX_AGAIN;
+ }
+
+ if (rc == NGX_ECANCELED && canceled) {
+ return NGX_ERROR;
+ }
+
+ ngx_log_error(NGX_LOG_CRIT, ev->log, rc, "aio_read() failed");
+ return NGX_ERROR;
+ }
+
+ rc = aio_return(&ev->aiocb);
+ if (rc == -1) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_return() failed");
+
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug(ev->log, "aio_read: %d" _ rc);
+
+ return rc;
+}
diff --git a/src/event/ngx_event_aio_write.c b/src/event/ngx_event_aio_write.c
new file mode 100644
index 0000000..00da4f4
--- /dev/null
+++ b/src/event/ngx_event_aio_write.c
@@ -0,0 +1,115 @@
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_errno.h>
+#include <ngx_log.h>
+#include <ngx_recv.h>
+#include <ngx_connection.h>
+
+#if (HAVE_KQUEUE)
+#include <ngx_kqueue_module.h>
+#endif
+
+
+/*
+ The data is ready - 3 syscalls:
+ aio_write(), aio_error(), aio_return()
+ The data is not ready - 4 (kqueue) or 5 syscalls:
+ aio_write(), aio_error(), notifiction,
+ aio_error(), aio_return()
+ aio_cancel(), aio_error()
+*/
+
+ssize_t ngx_event_aio_write(ngx_connection_t *c, char *buf, size_t size)
+{
+ int rc, first, canceled;
+ ngx_event_t *ev;
+
+ ev = c->write;
+
+ canceled = 0;
+
+ngx_log_debug(ev->log, "aio: ev->ready: %d" _ ev->ready);
+ngx_log_debug(ev->log, "aio: aiocb: %08x" _ &ev->aiocb);
+
+ if (ev->timedout) {
+ ngx_set_socket_errno(NGX_ETIMEDOUT);
+ ngx_log_error(NGX_LOG_ERR, ev->log, 0, "aio_write() timed out");
+
+ rc = aio_cancel(c->fd, &ev->aiocb);
+ if (rc == -1) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno,
+ "aio_cancel() failed");
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug(ev->log, "aio_cancel: %d" _ rc);
+
+ canceled = 1;
+
+ ev->ready = 1;
+ }
+
+ first = 0;
+
+ if (!ev->ready) {
+ ngx_memzero(&ev->aiocb, sizeof(struct aiocb));
+
+ ev->aiocb.aio_fildes = c->fd;
+ ev->aiocb.aio_buf = buf;
+ ev->aiocb.aio_nbytes = size;
+
+#if (HAVE_KQUEUE)
+ ev->aiocb.aio_sigevent.sigev_notify_kqueue = kq;
+ ev->aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
+ ev->aiocb.aio_sigevent.sigev_value.sigval_ptr = ev;
+#endif
+
+ if (aio_write(&ev->aiocb) == -1) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno,
+ "aio_write() failed");
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug(ev->log, "aio_write: OK");
+
+ ev->active = 1;
+ first = 1;
+ }
+
+ ev->ready = 0;
+
+ rc = aio_error(&ev->aiocb);
+ if (rc == -1) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_error() failed");
+ return NGX_ERROR;
+ }
+
+ if (rc != 0) {
+ if (rc == NGX_EINPROGRESS) {
+ if (!first) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, rc,
+ "aio_write() still in progress");
+ }
+ return NGX_AGAIN;
+ }
+
+ if (rc == NGX_ECANCELED && canceled) {
+ return NGX_ERROR;
+ }
+
+ ngx_log_error(NGX_LOG_CRIT, ev->log, rc, "aio_write() failed");
+ return NGX_ERROR;
+ }
+
+ rc = aio_return(&ev->aiocb);
+ if (rc == -1) {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_return() failed");
+
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug(ev->log, "aio_write: %d" _ rc);
+
+ return rc;
+}
diff --git a/src/event/ngx_event_close.c b/src/event/ngx_event_close.c
index c829c71..8c8fd1b 100644
--- a/src/event/ngx_event_close.c
+++ b/src/event/ngx_event_close.c
@@ -27,7 +27,7 @@
ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);
if ((rc = ngx_close_socket(c->fd)) == -1)
- ngx_log_error(NGX_LOG_ERR, c->log, ngx_socket_errno,
+ ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno,
"ngx_event_close: close failed");
c->fd = -1;
diff --git a/src/event/ngx_event_recv.c b/src/event/ngx_event_recv.c
index 4b01a4c..4fa95c3 100644
--- a/src/event/ngx_event_recv.c
+++ b/src/event/ngx_event_recv.c
@@ -6,7 +6,7 @@
#include <ngx_recv.h>
#include <ngx_connection.h>
-int ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size)
+ssize_t ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size)
{
int n;
ngx_err_t err;
@@ -20,9 +20,23 @@
#if (HAVE_KQUEUE)
ngx_log_debug(c->log, "ngx_event_recv: eof:%d, avail:%d, err:%d" _
c->read->eof _ c->read->available _ c->read->error);
-#if !(USE_KQUEUE)
- if (ngx_event_type == NGX_KQUEUE_EVENT)
#endif
+
+#if (USE_KQUEUE)
+
+ if (c->read->eof && c->read->available == 0) {
+ if (c->read->error) {
+ ngx_log_error(NGX_LOG_ERR, c->log, c->read->error,
+ "recv() failed");
+ return NGX_ERROR;
+ }
+
+ return 0;
+ }
+
+#elif (HAVE_KQUEUE)
+
+ if (ngx_event_type == NGX_KQUEUE_EVENT) {
if (c->read->eof && c->read->available == 0) {
if (c->read->error) {
ngx_log_error(NGX_LOG_ERR, c->log, c->read->error,
@@ -32,6 +46,8 @@
return 0;
}
+ }
+
#endif
n = ngx_recv(c->fd, buf, size, 0);
@@ -48,11 +64,16 @@
return NGX_ERROR;
}
-#if (HAVE_KQUEUE)
-#if !(USE_KQUEUE)
- if (ngx_event_type == NGX_KQUEUE_EVENT)
-#endif
+#if (USE_KQUEUE)
+
+ c->read->available -= n;
+
+#elif (HAVE_KQUEUE)
+
+ if (ngx_event_type == NGX_KQUEUE_EVENT) {
c->read->available -= n;
+ }
+
#endif
return n;
diff --git a/src/event/ngx_event_timer.h b/src/event/ngx_event_timer.h
index 7f1816a..227f702 100644
--- a/src/event/ngx_event_timer.h
+++ b/src/event/ngx_event_timer.h
@@ -15,8 +15,6 @@
void ngx_event_expire_timers(ngx_msec_t timer);
-extern ngx_event_t *ngx_timer_queue;
-
ngx_inline static void ngx_event_del_timer(ngx_event_t *ev)
{
@@ -41,4 +39,4 @@
}
-#endif _NGX_EVENT_TIMER_H_INCLUDED_
+#endif /* _NGX_EVENT_TIMER_H_INCLUDED_ */
diff --git a/src/event/ngx_event_write.c b/src/event/ngx_event_write.c
index 35949ec..9e7670c 100644
--- a/src/event/ngx_event_write.c
+++ b/src/event/ngx_event_write.c
@@ -11,11 +11,11 @@
#include <ngx_event_write.h>
-ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in,
- off_t flush)
+ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, off_t flush)
{
- int rc;
- char *last;
+ int rc, i, last;
+ u_int flags;
+ char *prev;
off_t sent;
ngx_iovec_t *iov;
ngx_array_t *header, *trailer;
@@ -24,6 +24,7 @@
ch = in;
file = NULL;
+ last = 0;
ngx_test_null(header, ngx_create_array(c->pool, 10, sizeof(ngx_iovec_t)),
(ngx_chain_t *) -1);
@@ -36,12 +37,12 @@
trailer->nelts = 0;
if (ch->hunk->type & NGX_HUNK_IN_MEMORY) {
- last = NULL;
+ prev = NULL;
iov = NULL;
while (ch && (ch->hunk->type & NGX_HUNK_IN_MEMORY))
{
- if (last == ch->hunk->pos.mem) {
+ if (prev == ch->hunk->pos.mem) {
iov->ngx_iov_len += ch->hunk->last.mem - ch->hunk->pos.mem;
} else {
@@ -49,7 +50,11 @@
(ngx_chain_t *) -1);
iov->ngx_iov_base = ch->hunk->pos.mem;
iov->ngx_iov_len = ch->hunk->last.mem - ch->hunk->pos.mem;
- last = ch->hunk->last.mem;
+ prev = ch->hunk->last.mem;
+ }
+
+ if (ch->hunk->type & NGX_HUNK_LAST) {
+ last = 1;
}
ch = ch->next;
@@ -59,6 +64,10 @@
if (ch && (ch->hunk->type & NGX_HUNK_FILE)) {
file = ch->hunk;
ch = ch->next;
+
+ if (ch->hunk->type & NGX_HUNK_LAST) {
+ last = 1;
+ }
}
#if (HAVE_MAX_SENDFILE_IOVEC)
@@ -68,12 +77,12 @@
} else {
#endif
if (ch && ch->hunk->type & NGX_HUNK_IN_MEMORY) {
- last = NULL;
+ prev = NULL;
iov = NULL;
while (ch && (ch->hunk->type & NGX_HUNK_IN_MEMORY)) {
- if (last == ch->hunk->pos.mem) {
+ if (prev == ch->hunk->pos.mem) {
iov->ngx_iov_len +=
ch->hunk->last.mem - ch->hunk->pos.mem;
@@ -83,7 +92,11 @@
iov->ngx_iov_base = ch->hunk->pos.mem;
iov->ngx_iov_len =
ch->hunk->last.mem - ch->hunk->pos.mem;
- last = ch->hunk->last.mem;
+ prev = ch->hunk->last.mem;
+ }
+
+ if (ch->hunk->type & NGX_HUNK_LAST) {
+ last = 1;
}
ch = ch->next;
@@ -91,19 +104,47 @@
}
if (file) {
- rc = ngx_sendfile(c->fd,
+ flags = ngx_sendfile_flags;
+#if (HAVE_SENDFILE_DISCONNECT)
+ if (last && c->close) {
+ flags |= HAVE_SENDFILE_DISCONNECT;
+ }
+#endif
+ rc = ngx_sendfile(c,
(ngx_iovec_t *) header->elts, header->nelts,
file->file->fd, file->pos.file,
(size_t) (file->last.file - file->pos.file),
(ngx_iovec_t *) trailer->elts, trailer->nelts,
- &sent, c->log);
+ &sent, flags);
+
+#if (HAVE_AIO_EVENT) && !(HAVE_IOCP_EVENT)
+ } else if (ngx_event_flags & NGX_HAVE_AIO_EVENT) {
+
+ sent = 0;
+ rc = NGX_AGAIN;
+ iov = (ngx_iovec_t *) header->elts;
+ for (i = 0; i < header->nelts; i++) {
+ rc = ngx_event_aio_write(c, iov[i].ngx_iov_base,
+ iov[i].ngx_iov_len);
+
+ if (rc > 0) {
+ sent += rc;
+ } else {
+ break;
+ }
+
+ if (rc < (int) iov->ngx_iov_len) {
+ break;
+ }
+ }
+#endif
} else {
rc = ngx_sendv(c, (ngx_iovec_t *) header->elts, header->nelts);
sent = rc > 0 ? rc: 0;
#if (NGX_DEBUG_EVENT_WRITE)
- ngx_log_debug(c->log, "sendv: " QD_FMT _ sent);
+ ngx_log_debug(c->log, "sendv: " OFF_FMT _ sent);
#endif
}
#if (HAVE_MAX_SENDFILE_IOVEC)
@@ -118,7 +159,7 @@
for (ch = in; ch; ch = ch->next) {
#if (NGX_DEBUG_EVENT_WRITE)
- ngx_log_debug(c->log, "event write: %x " QX_FMT " " QD_FMT _
+ ngx_log_debug(c->log, "event write: %x " QX_FMT " " OFF_FMT _
ch->hunk->type _
ch->hunk->pos.file _
ch->hunk->last.file - ch->hunk->pos.file);
@@ -129,7 +170,7 @@
ch->hunk->pos.file = ch->hunk->last.file;
#if (NGX_DEBUG_EVENT_WRITE)
- ngx_log_debug(c->log, "event write: " QX_FMT " 0 " QD_FMT _
+ ngx_log_debug(c->log, "event write: " QX_FMT " 0 " OFF_FMT _
ch->hunk->pos.file _ sent);
#endif
@@ -144,7 +185,7 @@
ch->hunk->pos.file += sent;
#if (NGX_DEBUG_EVENT_WRITE)
- ngx_log_debug(c->log, "event write: " QX_FMT " " QD_FMT _
+ ngx_log_debug(c->log, "event write: " QX_FMT " " OFF_FMT _
ch->hunk->pos.file _
ch->hunk->last.file - ch->hunk->pos.file);
#endif
diff --git a/src/event/ngx_event_wsarecv.c b/src/event/ngx_event_wsarecv.c
new file mode 100644
index 0000000..20c0c1b
--- /dev/null
+++ b/src/event/ngx_event_wsarecv.c
@@ -0,0 +1,97 @@
+
+#include <ngx_config.h>
+
+#include <ngx_core.h>
+#include <ngx_errno.h>
+#include <ngx_log.h>
+#include <ngx_connection.h>
+#include <ngx_event.h>
+
+
+ssize_t ngx_event_wsarecv(ngx_connection_t *c, char *buf, size_t size)
+{
+ int rc;
+ u_int flags;
+ size_t bytes;
+ ngx_err_t err;
+ WSABUF wsabuf[1];
+ ngx_event_t *ev;
+ LPWSAOVERLAPPED_COMPLETION_ROUTINE handler;
+
+ ev = c->read;
+
+/* DEBUG */ bytes = 0;
+
+ if (ev->timedout) {
+ ngx_set_socket_errno(NGX_ETIMEDOUT);
+ ngx_log_error(NGX_LOG_ERR, ev->log, 0, "WSARecv() timed out");
+
+ return NGX_ERROR;
+ }
+
+ if (ev->ready) {
+ ev->ready = 0;
+
+#if (HAVE_IOCP_EVENT) /* iocp */
+
+ if (ngx_event_flags & NGX_HAVE_IOCP_EVENT) {
+ if (ev->ovlp.error) {
+ ngx_log_error(NGX_LOG_ERR, c->log, ev->ovlp.error,
+ "WSARecv() failed");
+ return NGX_ERROR;
+ }
+
+ return ev->available;
+ }
+
+#endif
+
+ if (WSAGetOverlappedResult(c->fd, (LPWSAOVERLAPPED) &ev->ovlp,
+ &bytes, 0, NULL) == 0) {
+ err = ngx_socket_errno;
+ ngx_log_error(NGX_LOG_CRIT, ev->log, err,
+ "WSARecv() or WSAGetOverlappedResult() failed");
+
+ return NGX_ERROR;
+ }
+
+ return bytes;
+ }
+
+ ngx_memzero(&ev->ovlp, sizeof(WSAOVERLAPPED));
+ wsabuf[0].buf = buf;
+ wsabuf[0].len = size;
+ flags = 0;
+
+#if 0
+ handler = ev->handler;
+#else
+ handler = NULL;
+#endif
+
+ rc = WSARecv(c->fd, wsabuf, 1, &bytes, &flags,
+ (LPWSAOVERLAPPED) &ev->ovlp, handler);
+
+ ngx_log_debug(ev->log, "WSARecv: %d:%d" _ rc _ bytes);
+
+ if (rc == -1) {
+ err = ngx_socket_errno;
+ if (err == WSA_IO_PENDING) {
+ return NGX_AGAIN;
+
+ } else {
+ ngx_log_error(NGX_LOG_CRIT, ev->log, err, "WSARecv() failed");
+ return NGX_ERROR;
+ }
+ }
+
+#if (HAVE_IOCP_EVENT) /* iocp */
+
+ if (ngx_event_flags & NGX_HAVE_IOCP_EVENT) {
+ return NGX_AGAIN;
+ }
+
+#endif
+
+ return bytes;
+}