|  | 
 | /* | 
 |  * Copyright (C) Nginx, Inc. | 
 |  * Copyright (C) Valentin V. Bartenev | 
 |  * Copyright (C) Ruslan Ermilov | 
 |  */ | 
 |  | 
 |  | 
 | #include <ngx_config.h> | 
 | #include <ngx_core.h> | 
 | #include <ngx_thread_pool.h> | 
 |  | 
 |  | 
 | typedef struct { | 
 |     ngx_array_t               pools; | 
 | } ngx_thread_pool_conf_t; | 
 |  | 
 |  | 
 | typedef struct { | 
 |     ngx_thread_task_t        *first; | 
 |     ngx_thread_task_t       **last; | 
 | } ngx_thread_pool_queue_t; | 
 |  | 
 | #define ngx_thread_pool_queue_init(q)                                         \ | 
 |     (q)->first = NULL;                                                        \ | 
 |     (q)->last = &(q)->first | 
 |  | 
 |  | 
 | struct ngx_thread_pool_s { | 
 |     ngx_thread_mutex_t        mtx; | 
 |     ngx_thread_pool_queue_t   queue; | 
 |     ngx_int_t                 waiting; | 
 |     ngx_thread_cond_t         cond; | 
 |  | 
 |     ngx_log_t                *log; | 
 |  | 
 |     ngx_str_t                 name; | 
 |     ngx_uint_t                threads; | 
 |     ngx_int_t                 max_queue; | 
 |  | 
 |     u_char                   *file; | 
 |     ngx_uint_t                line; | 
 | }; | 
 |  | 
 |  | 
 | static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, | 
 |     ngx_pool_t *pool); | 
 | static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp); | 
 | static void ngx_thread_pool_exit_handler(void *data, ngx_log_t *log); | 
 |  | 
 | static void *ngx_thread_pool_cycle(void *data); | 
 | static void ngx_thread_pool_handler(ngx_event_t *ev); | 
 |  | 
 | static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); | 
 |  | 
 | static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle); | 
 | static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf); | 
 |  | 
 | static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle); | 
 | static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle); | 
 |  | 
 |  | 
 | static ngx_command_t  ngx_thread_pool_commands[] = { | 
 |  | 
 |     { ngx_string("thread_pool"), | 
 |       NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23, | 
 |       ngx_thread_pool, | 
 |       0, | 
 |       0, | 
 |       NULL }, | 
 |  | 
 |       ngx_null_command | 
 | }; | 
 |  | 
 |  | 
 | static ngx_core_module_t  ngx_thread_pool_module_ctx = { | 
 |     ngx_string("thread_pool"), | 
 |     ngx_thread_pool_create_conf, | 
 |     ngx_thread_pool_init_conf | 
 | }; | 
 |  | 
 |  | 
 | ngx_module_t  ngx_thread_pool_module = { | 
 |     NGX_MODULE_V1, | 
 |     &ngx_thread_pool_module_ctx,           /* module context */ | 
 |     ngx_thread_pool_commands,              /* module directives */ | 
 |     NGX_CORE_MODULE,                       /* module type */ | 
 |     NULL,                                  /* init master */ | 
 |     NULL,                                  /* init module */ | 
 |     ngx_thread_pool_init_worker,           /* init process */ | 
 |     NULL,                                  /* init thread */ | 
 |     NULL,                                  /* exit thread */ | 
 |     ngx_thread_pool_exit_worker,           /* exit process */ | 
 |     NULL,                                  /* exit master */ | 
 |     NGX_MODULE_V1_PADDING | 
 | }; | 
 |  | 
 |  | 
 | static ngx_str_t  ngx_thread_pool_default = ngx_string("default"); | 
 |  | 
 | static ngx_uint_t               ngx_thread_pool_task_id; | 
 | static ngx_atomic_t             ngx_thread_pool_done_lock; | 
 | static ngx_thread_pool_queue_t  ngx_thread_pool_done; | 
 |  | 
 |  | 
 | static ngx_int_t | 
 | ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool) | 
 | { | 
 |     int             err; | 
 |     pthread_t       tid; | 
 |     ngx_uint_t      n; | 
 |     pthread_attr_t  attr; | 
 |  | 
 |     if (ngx_notify == NULL) { | 
 |         ngx_log_error(NGX_LOG_ALERT, log, 0, | 
 |                "the configured event method cannot be used with thread pools"); | 
 |         return NGX_ERROR; | 
 |     } | 
 |  | 
 |     ngx_thread_pool_queue_init(&tp->queue); | 
 |  | 
 |     if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) { | 
 |         return NGX_ERROR; | 
 |     } | 
 |  | 
 |     if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) { | 
 |         (void) ngx_thread_mutex_destroy(&tp->mtx, log); | 
 |         return NGX_ERROR; | 
 |     } | 
 |  | 
 |     tp->log = log; | 
 |  | 
 |     err = pthread_attr_init(&attr); | 
 |     if (err) { | 
 |         ngx_log_error(NGX_LOG_ALERT, log, err, | 
 |                       "pthread_attr_init() failed"); | 
 |         return NGX_ERROR; | 
 |     } | 
 |  | 
 | #if 0 | 
 |     err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); | 
 |     if (err) { | 
 |         ngx_log_error(NGX_LOG_ALERT, log, err, | 
 |                       "pthread_attr_setstacksize() failed"); | 
 |         return NGX_ERROR; | 
 |     } | 
 | #endif | 
 |  | 
 |     for (n = 0; n < tp->threads; n++) { | 
 |         err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp); | 
 |         if (err) { | 
 |             ngx_log_error(NGX_LOG_ALERT, log, err, | 
 |                           "pthread_create() failed"); | 
 |             return NGX_ERROR; | 
 |         } | 
 |     } | 
 |  | 
 |     (void) pthread_attr_destroy(&attr); | 
 |  | 
 |     return NGX_OK; | 
 | } | 
 |  | 
 |  | 
 | static void | 
 | ngx_thread_pool_destroy(ngx_thread_pool_t *tp) | 
 | { | 
 |     ngx_uint_t           n; | 
 |     ngx_thread_task_t    task; | 
 |     volatile ngx_uint_t  lock; | 
 |  | 
 |     ngx_memzero(&task, sizeof(ngx_thread_task_t)); | 
 |  | 
 |     task.handler = ngx_thread_pool_exit_handler; | 
 |     task.ctx = (void *) &lock; | 
 |  | 
 |     for (n = 0; n < tp->threads; n++) { | 
 |         lock = 1; | 
 |  | 
 |         if (ngx_thread_task_post(tp, &task) != NGX_OK) { | 
 |             return; | 
 |         } | 
 |  | 
 |         while (lock) { | 
 |             ngx_sched_yield(); | 
 |         } | 
 |  | 
 |         task.event.active = 0; | 
 |     } | 
 |  | 
 |     (void) ngx_thread_cond_destroy(&tp->cond, tp->log); | 
 |  | 
 |     (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log); | 
 | } | 
 |  | 
 |  | 
 | static void | 
 | ngx_thread_pool_exit_handler(void *data, ngx_log_t *log) | 
 | { | 
 |     ngx_uint_t *lock = data; | 
 |  | 
 |     *lock = 0; | 
 |  | 
 |     pthread_exit(0); | 
 | } | 
 |  | 
 |  | 
 | ngx_thread_task_t * | 
 | ngx_thread_task_alloc(ngx_pool_t *pool, size_t size) | 
 | { | 
 |     ngx_thread_task_t  *task; | 
 |  | 
 |     task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size); | 
 |     if (task == NULL) { | 
 |         return NULL; | 
 |     } | 
 |  | 
 |     task->ctx = task + 1; | 
 |  | 
 |     return task; | 
 | } | 
 |  | 
 |  | 
 | ngx_int_t | 
 | ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task) | 
 | { | 
 |     if (task->event.active) { | 
 |         ngx_log_error(NGX_LOG_ALERT, tp->log, 0, | 
 |                       "task #%ui already active", task->id); | 
 |         return NGX_ERROR; | 
 |     } | 
 |  | 
 |     if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { | 
 |         return NGX_ERROR; | 
 |     } | 
 |  | 
 |     if (tp->waiting >= tp->max_queue) { | 
 |         (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); | 
 |  | 
 |         ngx_log_error(NGX_LOG_ERR, tp->log, 0, | 
 |                       "thread pool \"%V\" queue overflow: %i tasks waiting", | 
 |                       &tp->name, tp->waiting); | 
 |         return NGX_ERROR; | 
 |     } | 
 |  | 
 |     task->event.active = 1; | 
 |  | 
 |     task->id = ngx_thread_pool_task_id++; | 
 |     task->next = NULL; | 
 |  | 
 |     if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) { | 
 |         (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); | 
 |         return NGX_ERROR; | 
 |     } | 
 |  | 
 |     *tp->queue.last = task; | 
 |     tp->queue.last = &task->next; | 
 |  | 
 |     tp->waiting++; | 
 |  | 
 |     (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); | 
 |  | 
 |     ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, | 
 |                    "task #%ui added to thread pool \"%V\"", | 
 |                    task->id, &tp->name); | 
 |  | 
 |     return NGX_OK; | 
 | } | 
 |  | 
 |  | 
 | static void * | 
 | ngx_thread_pool_cycle(void *data) | 
 | { | 
 |     ngx_thread_pool_t *tp = data; | 
 |  | 
 |     int                 err; | 
 |     sigset_t            set; | 
 |     ngx_thread_task_t  *task; | 
 |  | 
 | #if 0 | 
 |     ngx_time_update(); | 
 | #endif | 
 |  | 
 |     ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0, | 
 |                    "thread in pool \"%V\" started", &tp->name); | 
 |  | 
 |     sigfillset(&set); | 
 |  | 
 |     sigdelset(&set, SIGILL); | 
 |     sigdelset(&set, SIGFPE); | 
 |     sigdelset(&set, SIGSEGV); | 
 |     sigdelset(&set, SIGBUS); | 
 |  | 
 |     err = pthread_sigmask(SIG_BLOCK, &set, NULL); | 
 |     if (err) { | 
 |         ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed"); | 
 |         return NULL; | 
 |     } | 
 |  | 
 |     for ( ;; ) { | 
 |         if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { | 
 |             return NULL; | 
 |         } | 
 |  | 
 |         /* the number may become negative */ | 
 |         tp->waiting--; | 
 |  | 
 |         while (tp->queue.first == NULL) { | 
 |             if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log) | 
 |                 != NGX_OK) | 
 |             { | 
 |                 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); | 
 |                 return NULL; | 
 |             } | 
 |         } | 
 |  | 
 |         task = tp->queue.first; | 
 |         tp->queue.first = task->next; | 
 |  | 
 |         if (tp->queue.first == NULL) { | 
 |             tp->queue.last = &tp->queue.first; | 
 |         } | 
 |  | 
 |         if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) { | 
 |             return NULL; | 
 |         } | 
 |  | 
 | #if 0 | 
 |         ngx_time_update(); | 
 | #endif | 
 |  | 
 |         ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, | 
 |                        "run task #%ui in thread pool \"%V\"", | 
 |                        task->id, &tp->name); | 
 |  | 
 |         task->handler(task->ctx, tp->log); | 
 |  | 
 |         ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, | 
 |                        "complete task #%ui in thread pool \"%V\"", | 
 |                        task->id, &tp->name); | 
 |  | 
 |         task->next = NULL; | 
 |  | 
 |         ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); | 
 |  | 
 |         *ngx_thread_pool_done.last = task; | 
 |         ngx_thread_pool_done.last = &task->next; | 
 |  | 
 |         ngx_unlock(&ngx_thread_pool_done_lock); | 
 |  | 
 |         (void) ngx_notify(ngx_thread_pool_handler); | 
 |     } | 
 | } | 
 |  | 
 |  | 
 | static void | 
 | ngx_thread_pool_handler(ngx_event_t *ev) | 
 | { | 
 |     ngx_event_t        *event; | 
 |     ngx_thread_task_t  *task; | 
 |  | 
 |     ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler"); | 
 |  | 
 |     ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); | 
 |  | 
 |     task = ngx_thread_pool_done.first; | 
 |     ngx_thread_pool_done.first = NULL; | 
 |     ngx_thread_pool_done.last = &ngx_thread_pool_done.first; | 
 |  | 
 |     ngx_unlock(&ngx_thread_pool_done_lock); | 
 |  | 
 |     while (task) { | 
 |         ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, | 
 |                        "run completion handler for task #%ui", task->id); | 
 |  | 
 |         event = &task->event; | 
 |         task = task->next; | 
 |  | 
 |         event->complete = 1; | 
 |         event->active = 0; | 
 |  | 
 |         event->handler(event); | 
 |     } | 
 | } | 
 |  | 
 |  | 
 | static void * | 
 | ngx_thread_pool_create_conf(ngx_cycle_t *cycle) | 
 | { | 
 |     ngx_thread_pool_conf_t  *tcf; | 
 |  | 
 |     tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t)); | 
 |     if (tcf == NULL) { | 
 |         return NULL; | 
 |     } | 
 |  | 
 |     if (ngx_array_init(&tcf->pools, cycle->pool, 4, | 
 |                        sizeof(ngx_thread_pool_t *)) | 
 |         != NGX_OK) | 
 |     { | 
 |         return NULL; | 
 |     } | 
 |  | 
 |     return tcf; | 
 | } | 
 |  | 
 |  | 
 | static char * | 
 | ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf) | 
 | { | 
 |     ngx_thread_pool_conf_t *tcf = conf; | 
 |  | 
 |     ngx_uint_t           i; | 
 |     ngx_thread_pool_t  **tpp; | 
 |  | 
 |     tpp = tcf->pools.elts; | 
 |  | 
 |     for (i = 0; i < tcf->pools.nelts; i++) { | 
 |  | 
 |         if (tpp[i]->threads) { | 
 |             continue; | 
 |         } | 
 |  | 
 |         if (tpp[i]->name.len == ngx_thread_pool_default.len | 
 |             && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data, | 
 |                            ngx_thread_pool_default.len) | 
 |                == 0) | 
 |         { | 
 |             tpp[i]->threads = 32; | 
 |             tpp[i]->max_queue = 65536; | 
 |             continue; | 
 |         } | 
 |  | 
 |         ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, | 
 |                       "unknown thread pool \"%V\" in %s:%ui", | 
 |                       &tpp[i]->name, tpp[i]->file, tpp[i]->line); | 
 |  | 
 |         return NGX_CONF_ERROR; | 
 |     } | 
 |  | 
 |     return NGX_CONF_OK; | 
 | } | 
 |  | 
 |  | 
 | static char * | 
 | ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) | 
 | { | 
 |     ngx_str_t          *value; | 
 |     ngx_uint_t          i; | 
 |     ngx_thread_pool_t  *tp; | 
 |  | 
 |     value = cf->args->elts; | 
 |  | 
 |     tp = ngx_thread_pool_add(cf, &value[1]); | 
 |  | 
 |     if (tp == NULL) { | 
 |         return NGX_CONF_ERROR; | 
 |     } | 
 |  | 
 |     if (tp->threads) { | 
 |         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, | 
 |                            "duplicate thread pool \"%V\"", &tp->name); | 
 |         return NGX_CONF_ERROR; | 
 |     } | 
 |  | 
 |     tp->max_queue = 65536; | 
 |  | 
 |     for (i = 2; i < cf->args->nelts; i++) { | 
 |  | 
 |         if (ngx_strncmp(value[i].data, "threads=", 8) == 0) { | 
 |  | 
 |             tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8); | 
 |  | 
 |             if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) { | 
 |                 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, | 
 |                                    "invalid threads value \"%V\"", &value[i]); | 
 |                 return NGX_CONF_ERROR; | 
 |             } | 
 |  | 
 |             continue; | 
 |         } | 
 |  | 
 |         if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) { | 
 |  | 
 |             tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10); | 
 |  | 
 |             if (tp->max_queue == NGX_ERROR) { | 
 |                 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, | 
 |                                    "invalid max_queue value \"%V\"", &value[i]); | 
 |                 return NGX_CONF_ERROR; | 
 |             } | 
 |  | 
 |             continue; | 
 |         } | 
 |     } | 
 |  | 
 |     if (tp->threads == 0) { | 
 |         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, | 
 |                            "\"%V\" must have \"threads\" parameter", | 
 |                            &cmd->name); | 
 |         return NGX_CONF_ERROR; | 
 |     } | 
 |  | 
 |     return NGX_CONF_OK; | 
 | } | 
 |  | 
 |  | 
 | ngx_thread_pool_t * | 
 | ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name) | 
 | { | 
 |     ngx_thread_pool_t       *tp, **tpp; | 
 |     ngx_thread_pool_conf_t  *tcf; | 
 |  | 
 |     if (name == NULL) { | 
 |         name = &ngx_thread_pool_default; | 
 |     } | 
 |  | 
 |     tp = ngx_thread_pool_get(cf->cycle, name); | 
 |  | 
 |     if (tp) { | 
 |         return tp; | 
 |     } | 
 |  | 
 |     tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t)); | 
 |     if (tp == NULL) { | 
 |         return NULL; | 
 |     } | 
 |  | 
 |     tp->name = *name; | 
 |     tp->file = cf->conf_file->file.name.data; | 
 |     tp->line = cf->conf_file->line; | 
 |  | 
 |     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, | 
 |                                                   ngx_thread_pool_module); | 
 |  | 
 |     tpp = ngx_array_push(&tcf->pools); | 
 |     if (tpp == NULL) { | 
 |         return NULL; | 
 |     } | 
 |  | 
 |     *tpp = tp; | 
 |  | 
 |     return tp; | 
 | } | 
 |  | 
 |  | 
 | ngx_thread_pool_t * | 
 | ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name) | 
 | { | 
 |     ngx_uint_t                i; | 
 |     ngx_thread_pool_t       **tpp; | 
 |     ngx_thread_pool_conf_t   *tcf; | 
 |  | 
 |     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, | 
 |                                                   ngx_thread_pool_module); | 
 |  | 
 |     tpp = tcf->pools.elts; | 
 |  | 
 |     for (i = 0; i < tcf->pools.nelts; i++) { | 
 |  | 
 |         if (tpp[i]->name.len == name->len | 
 |             && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0) | 
 |         { | 
 |             return tpp[i]; | 
 |         } | 
 |     } | 
 |  | 
 |     return NULL; | 
 | } | 
 |  | 
 |  | 
 | static ngx_int_t | 
 | ngx_thread_pool_init_worker(ngx_cycle_t *cycle) | 
 | { | 
 |     ngx_uint_t                i; | 
 |     ngx_thread_pool_t       **tpp; | 
 |     ngx_thread_pool_conf_t   *tcf; | 
 |  | 
 |     if (ngx_process != NGX_PROCESS_WORKER | 
 |         && ngx_process != NGX_PROCESS_SINGLE) | 
 |     { | 
 |         return NGX_OK; | 
 |     } | 
 |  | 
 |     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, | 
 |                                                   ngx_thread_pool_module); | 
 |  | 
 |     if (tcf == NULL) { | 
 |         return NGX_OK; | 
 |     } | 
 |  | 
 |     ngx_thread_pool_queue_init(&ngx_thread_pool_done); | 
 |  | 
 |     tpp = tcf->pools.elts; | 
 |  | 
 |     for (i = 0; i < tcf->pools.nelts; i++) { | 
 |         if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) { | 
 |             return NGX_ERROR; | 
 |         } | 
 |     } | 
 |  | 
 |     return NGX_OK; | 
 | } | 
 |  | 
 |  | 
 | static void | 
 | ngx_thread_pool_exit_worker(ngx_cycle_t *cycle) | 
 | { | 
 |     ngx_uint_t                i; | 
 |     ngx_thread_pool_t       **tpp; | 
 |     ngx_thread_pool_conf_t   *tcf; | 
 |  | 
 |     if (ngx_process != NGX_PROCESS_WORKER | 
 |         && ngx_process != NGX_PROCESS_SINGLE) | 
 |     { | 
 |         return; | 
 |     } | 
 |  | 
 |     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, | 
 |                                                   ngx_thread_pool_module); | 
 |  | 
 |     if (tcf == NULL) { | 
 |         return; | 
 |     } | 
 |  | 
 |     tpp = tcf->pools.elts; | 
 |  | 
 |     for (i = 0; i < tcf->pools.nelts; i++) { | 
 |         ngx_thread_pool_destroy(tpp[i]); | 
 |     } | 
 | } |