nginx-0.0.3-2004-03-31-19:26:46 import
diff --git a/src/core/nginx.c b/src/core/nginx.c
index 8dc6102..9c35e08 100644
--- a/src/core/nginx.c
+++ b/src/core/nginx.c
@@ -174,10 +174,6 @@
}
}
- if (ccf->worker_processes == NGX_CONF_UNSET) {
- ccf->worker_processes = 1;
- }
-
if (ccf->pid.len == 0) {
ccf->pid.len = sizeof(NGINX_PID) - 1;
ccf->pid.data = NGINX_PID;
diff --git a/src/core/ngx_atomic.h b/src/core/ngx_atomic.h
index b95e4cc..0ec5a87 100644
--- a/src/core/ngx_atomic.h
+++ b/src/core/ngx_atomic.h
@@ -25,6 +25,7 @@
NGX_SMP_LOCK
" xaddl %0, %2; "
+ " incl %0; "
: "=q" (old) : "0" (1), "m" (*value));
@@ -40,6 +41,7 @@
NGX_SMP_LOCK
" xaddl %0, %1; "
+ " decl %0; "
: "=q" (old) : "0" (-1), "m" (*value));
@@ -65,6 +67,15 @@
return res;
}
+
+#elif (WIN32)
+
+#define ngx_atomic_inc(x) InterlockedIncrement
+#define ngx_atomic_dec(x) InterlockedDecrement
+#define ngx_atomic_cmp_set(lock, old, set) \
+ InterlockedCompareExchange(lock, set, old)
+
+
#else
typedef volatile uint32_t ngx_atomic_t;
diff --git a/src/core/ngx_file.c b/src/core/ngx_file.c
index 48cff46..0e9344e 100644
--- a/src/core/ngx_file.c
+++ b/src/core/ngx_file.c
@@ -56,7 +56,7 @@
ngx_create_hashed_filename(file, path);
-#if 0
+#if 1
file->fd = ngx_open_tempfile(file->name.data, persistent);
#else
file->fd = ngx_open_tempfile(file->name.data, 1);
diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c
index c49e198..7a6b539 100644
--- a/src/event/modules/ngx_kqueue_module.c
+++ b/src/event/modules/ngx_kqueue_module.c
@@ -21,7 +21,7 @@
static int ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags);
static int ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags);
static int ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags);
-static int ngx_kqueue_process_events(ngx_log_t *log);
+static int ngx_kqueue_process_events(ngx_cycle_t *cycle);
#if (NGX_THREADS)
static void ngx_kqueue_thread_handler(ngx_event_t *ev);
#endif
@@ -343,10 +343,10 @@
}
-static ngx_int_t ngx_kqueue_process_events(ngx_log_t *log)
+static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle)
{
int events;
- ngx_int_t instance, i;
+ ngx_int_t i, instance;
ngx_err_t err;
ngx_msec_t timer;
ngx_event_t *ev;
@@ -370,6 +370,18 @@
ngx_old_elapsed_msec = ngx_elapsed_msec;
+ if (ngx_accept_mutex) {
+ if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+
+#if 1
+ if (ngx_accept_token == 0 && timer == 0) {
+ /* STUB */ timer = 500;
+ }
+#endif
+ }
+
if (timer) {
ts.tv_sec = timer / 1000;
ts.tv_nsec = (timer % 1000) * 1000000;
@@ -379,7 +391,8 @@
tp = NULL;
}
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, "kevent timer: %d", timer);
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+ "kevent timer: %d", timer);
events = kevent(ngx_kqueue, change_list, nchanges, event_list, nevents, tp);
@@ -394,40 +407,54 @@
ngx_gettimeofday(&tv);
ngx_time_update(tv.tv_sec);
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, "kevent events: %d", events);
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+ "kevent events: %d", events);
delta = ngx_elapsed_msec;
ngx_elapsed_msec = tv.tv_sec * 1000 + tv.tv_usec / 1000 - ngx_start_msec;
if (err) {
ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT,
- log, err, "kevent() failed");
+ cycle->log, err, "kevent() failed");
+
+ if (ngx_accept_token) {
+ *ngx_accept_mutex = 0;
+ }
+
return NGX_ERROR;
}
if (timer) {
delta = ngx_elapsed_msec - delta;
- ngx_log_debug2(NGX_LOG_DEBUG_EVENT, log, 0,
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"kevent timer: %d, delta: %d", timer, (int) delta);
} else {
if (events == 0) {
- ngx_log_error(NGX_LOG_ALERT, log, 0,
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"kevent() returned no events without timeout");
+
+ if (ngx_accept_token) {
+ *ngx_accept_mutex = 0;
+ }
+
return NGX_ERROR;
}
}
-#if (NGX_THREADS0)
if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+
+ if (ngx_accept_token) {
+ *ngx_accept_mutex = 0;
+ }
+
return NGX_ERROR;
}
-#endif
for (i = 0; i < events; i++) {
- ngx_log_debug6(NGX_LOG_DEBUG_EVENT, log, 0,
+ ngx_log_debug6(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
(event_list[i].ident > 0x8000000
&& event_list[i].ident != (unsigned) -1) ?
@@ -440,7 +467,7 @@
event_list[i].data, event_list[i].udata);
if (event_list[i].flags & EV_ERROR) {
- ngx_log_error(NGX_LOG_ALERT, log, event_list[i].data,
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, event_list[i].data,
"kevent() error on %d", event_list[i].ident);
continue;
}
@@ -454,15 +481,16 @@
instance = (uintptr_t) ev & 1;
ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1);
+ ev->returned_instance = instance;
- if (ev->active == 0 || ev->instance != instance) {
+ if (!ev->active || ev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0,
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"kevent: stale event " PTR_FMT, ev);
continue;
}
@@ -494,30 +522,29 @@
break;
default:
- ngx_log_error(NGX_LOG_ALERT, log, 0,
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"unexpected kevent() filter %d",
event_list[i].filter);
continue;
}
-#if (NGX_THREADS0)
- if (ngx_threaded) {
+#if 0
+ if (ngx_threaded || ngx_accept_token) {
+#endif
+ if (ngx_accept_token) {
- if (ev->light) {
-
- /*
- * The light events are the accept event,
- * or the event that waits in the mutex queue - we need to
- * remove it from the mutex queue before the inserting into
- * the posted events queue.
- */
-
+ if (ev->accept) {
ngx_mutex_unlock(ngx_posted_events_mutex);
ev->event_handler(ev);
if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+
+ if (ngx_accept_token) {
+ *ngx_accept_mutex = 0;
+ }
+
return NGX_ERROR;
}
@@ -529,36 +556,55 @@
continue;
}
-#endif
-
ev->event_handler(ev);
}
-#if (NGX_THREADS0)
ngx_mutex_unlock(ngx_posted_events_mutex);
-#endif
+
+ if (ngx_accept_token) {
+ *ngx_accept_mutex = 0;
+ }
if (timer && delta) {
ngx_event_expire_timers((ngx_msec_t) delta);
}
-#if (NGX_THREADS0)
- if (!ngx_threaded) {
+#if (NGX_THREADS)
+ if (ngx_threaded) {
+ return NGX_OK;
}
#endif
- /* TODO: non-thread mode only */
-
for ( ;; ) {
ev = (ngx_event_t *) ngx_posted_events;
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+ "kevent: posted event " PTR_FMT, ev);
+
if (ev == NULL) {
break;
}
ngx_posted_events = ev->next;
+ if ((!ev->posted && !ev->active)
+ || ev->instance != ev->returned_instance)
+ {
+ /*
+ * the stale event from a file descriptor
+ * that was just closed in this iteration
+ */
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+ "kevent: stale event " PTR_FMT, ev);
+ continue;
+ }
+
+ if (ev->posted) {
+ ev->posted = 0;
+ }
+
ev->event_handler(ev);
}
@@ -575,8 +621,9 @@
instance = (uintptr_t) ev & 1;
ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1);
- if (ev->active == 0 || ev->instance != instance) {
-
+ if ((!ev->posted && !ev->active)
+ || ev->instance != ev->returned_instance)
+ {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
@@ -587,6 +634,10 @@
return;
}
+ if (ev->posted) {
+ ev->posted = 0;
+ }
+
ev->event_handler(ev);
}
diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c
index e46824e..5678c84 100644
--- a/src/event/ngx_event.c
+++ b/src/event/ngx_event.c
@@ -233,6 +233,8 @@
rev->available = 0;
+ rev->accept = 1;
+
#if (HAVE_DEFERRED_ACCEPT)
rev->deferred_accept = s[i].deferred_accept;
#endif
@@ -273,7 +275,9 @@
} else {
rev->event_handler = &ngx_event_accept;
- ngx_add_event(rev, NGX_READ_EVENT, 0);
+ if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
}
#else
@@ -281,9 +285,13 @@
rev->event_handler = &ngx_event_accept;
if (ngx_event_flags & NGX_USE_SIGIO_EVENT) {
- ngx_add_conn(c);
+ if (ngx_add_conn(c) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
} else {
- ngx_add_event(rev, NGX_READ_EVENT, 0);
+ if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
}
#endif
diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h
index 84a70be..049b7b7 100644
--- a/src/event/ngx_event.h
+++ b/src/event/ngx_event.h
@@ -66,6 +66,7 @@
/* used to detect the stale events in kqueue, rt signals and epoll */
unsigned char instance:1;
+ unsigned char returned_instance:1;
/*
* the event was passed or would be passed to a kernel;
@@ -75,11 +76,13 @@
unsigned char disabled:1;
+ unsigned char posted:1;
+
/* the ready event; in aio mode 0 means that no operation can be posted */
unsigned char ready:1;
/* aio operation is complete */
- unsigned char complete:1;
+ unsigned short complete:1;
unsigned short eof:1;
unsigned short error:1;
@@ -93,6 +96,8 @@
unsigned short unexpected_eof:1;
+ unsigned short accept:1;
+
unsigned short deferred_accept:1;
/* TODO: aio_eof and kq_eof can be the single pending_eof */
@@ -178,7 +183,7 @@
int (*add_conn)(ngx_connection_t *c);
int (*del_conn)(ngx_connection_t *c, u_int flags);
- int (*process)(ngx_log_t *log);
+ int (*process)(ngx_cycle_t *cycle);
int (*init)(ngx_cycle_t *cycle);
void (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;
@@ -391,6 +396,9 @@
#if (NGX_THREADS)
extern ngx_mutex_t *ngx_posted_events_mutex;
#endif
+extern ngx_atomic_t *ngx_accept_mutex;
+extern ngx_uint_t ngx_accept_token;
+
extern int ngx_event_flags;
extern ngx_module_t ngx_events_module;
@@ -403,6 +411,10 @@
void ngx_event_accept(ngx_event_t *ev);
+ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle);
+ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle);
+ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle);
+
#if (WIN32)
void ngx_event_acceptex(ngx_event_t *ev);
diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c
index ac64cf3..36f2cfe 100644
--- a/src/event/ngx_event_accept.c
+++ b/src/event/ngx_event_accept.c
@@ -14,9 +14,13 @@
static size_t ngx_accept_log_error(void *data, char *buf, size_t len);
+ngx_atomic_t *ngx_accept_mutex;
+ngx_uint_t ngx_accept_token;
+
+
void ngx_event_accept(ngx_event_t *ev)
{
- ngx_uint_t instance, accepted;
+ ngx_uint_t instance, rinstance, winstance, accepted;
socklen_t len;
struct sockaddr *sa;
ngx_err_t err;
@@ -205,6 +209,8 @@
#endif
instance = rev->instance;
+ rinstance = rev->returned_instance;
+ winstance = wev->returned_instance;
ngx_memzero(rev, sizeof(ngx_event_t));
ngx_memzero(wev, sizeof(ngx_event_t));
@@ -217,7 +223,10 @@
c->socklen = len;
rev->instance = (u_char) !instance;
+ rev->returned_instance = (u_char) rinstance;
+
wev->instance = (u_char) !instance;
+ wev->returned_instance = (u_char) winstance;
rev->index = NGX_INVALID_INDEX;
wev->index = NGX_INVALID_INDEX;
@@ -295,6 +304,102 @@
}
+ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
+{
+ if (*ngx_accept_mutex == 0 && ngx_atomic_cmp_set(ngx_accept_mutex, 0, 1)) {
+
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+ "accept mutex locked");
+
+ if (!ngx_accept_token) {
+ if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+
+ ngx_accept_token = 1;
+ }
+
+ return NGX_OK;
+ }
+
+ if (ngx_accept_token) {
+ if (ngx_disable_accept_events(cycle) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+
+ ngx_accept_token = 0;
+ }
+
+ return NGX_OK;
+}
+
+
+ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle)
+{
+ ngx_uint_t i;
+ ngx_listening_t *s;
+
+ s = cycle->listening.elts;
+ for (i = 0; i < cycle->listening.nelts; i++) {
+
+ /*
+ * we do not need to handle the Winsock sockets here (divde a socket
+ * number by 4) because this function would never called
+ * in the Winsock environment
+ */
+
+ if (ngx_event_flags & NGX_USE_SIGIO_EVENT) {
+ if (ngx_add_conn(&cycle->connections[s[i].fd]) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+
+ } else {
+ if (ngx_add_event(&cycle->read_events[s[i].fd], NGX_READ_EVENT, 0)
+ == NGX_ERROR)
+ {
+ return NGX_ERROR;
+ }
+ }
+ }
+
+ return NGX_OK;
+}
+
+
+ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle)
+{
+ ngx_uint_t i;
+ ngx_listening_t *s;
+
+ s = cycle->listening.elts;
+ for (i = 0; i < cycle->listening.nelts; i++) {
+
+ /*
+ * we do not need to handle the Winsock sockets here (divde a socket
+ * number by 4) because this function would never called
+ * in the Winsock environment
+ */
+
+ if (ngx_event_flags & NGX_USE_SIGIO_EVENT) {
+ if (ngx_del_conn(&cycle->connections[s[i].fd], NGX_DISABLE_EVENT)
+ == NGX_ERROR)
+ {
+ return NGX_ERROR;
+ }
+
+ } else {
+ if (ngx_del_event(&cycle->read_events[s[i].fd], NGX_READ_EVENT,
+ NGX_DISABLE_EVENT) == NGX_ERROR)
+ {
+ return NGX_ERROR;
+ }
+ }
+ }
+
+ return NGX_OK;
+}
+
+
static size_t ngx_accept_log_error(void *data, char *buf, size_t len)
{
ngx_accept_log_ctx_t *ctx = data;
diff --git a/src/http/ngx_http_request.c b/src/http/ngx_http_request.c
index 21cce74..6b2a443 100644
--- a/src/http/ngx_http_request.c
+++ b/src/http/ngx_http_request.c
@@ -90,6 +90,20 @@
if (rev->ready) {
/* deferred accept, aio, iocp */
+
+ if (*ngx_accept_mutex) {
+ if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+ ngx_http_close_connection(c);
+ return;
+ }
+
+ rev->next = ngx_posted_events;
+ ngx_posted_events = rev;
+
+ ngx_mutex_unlock(ngx_posted_events_mutex);
+ return;
+ }
+
ngx_http_init_request(rev);
return;
}
diff --git a/src/os/unix/ngx_process_cycle.c b/src/os/unix/ngx_process_cycle.c
index cc3d8a4..f2d392a 100644
--- a/src/os/unix/ngx_process_cycle.c
+++ b/src/os/unix/ngx_process_cycle.c
@@ -65,12 +65,26 @@
signo = 0;
live = 0;
+ ngx_accept_mutex = mmap(NULL, sizeof(ngx_atomic_t), PROT_READ|PROT_WRITE,
+ MAP_ANON|MAP_SHARED, -1, 0);
+
+ if (ngx_accept_mutex == NULL) {
+ ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
+ "mmap(MAP_ANON|MAP_SHARED) failed");
+ /* fatal */
+ exit(2);
+ }
+
for ( ;; ) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "new cycle");
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_core_module);
+ if (ccf->worker_processes == NGX_CONF_UNSET) {
+ ccf->worker_processes = 1;
+ }
+
if (ngx_process == NGX_PROCESS_MASTER) {
for (i = 0; i < (ngx_uint_t) ccf->worker_processes; i++) {
ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
@@ -150,7 +164,7 @@
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"worker cycle");
- ngx_process_events(cycle->log);
+ ngx_process_events(cycle);
live = 0;
}
@@ -361,6 +375,10 @@
ngx_process = NGX_PROCESS_WORKER;
ngx_last_process = 0;
+ if (ngx_accept_mutex) {
+ ngx_accept_token = 1;
+ }
+
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
if (ccf->group != (gid_t) NGX_CONF_UNSET) {
@@ -442,7 +460,7 @@
for ( ;; ) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
- ngx_process_events(cycle->log);
+ ngx_process_events(cycle);
if (ngx_terminate) {
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting");
@@ -473,7 +491,7 @@
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
- ngx_process_events(cycle->log);
+ ngx_process_events(cycle);
if (ngx_reopen) {
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reopen logs");