libuv-svace-build
418 строк · 10.8 Кб
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2*
3* Permission is hereby granted, free of charge, to any person obtaining a copy
4* of this software and associated documentation files (the "Software"), to
5* deal in the Software without restriction, including without limitation the
6* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7* sell copies of the Software, and to permit persons to whom the Software is
8* furnished to do so, subject to the following conditions:
9*
10* The above copyright notice and this permission notice shall be included in
11* all copies or substantial portions of the Software.
12*
13* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19* IN THE SOFTWARE.
20*/
21
22#include "uv-common.h"23
24#if !defined(_WIN32)25# include "unix/internal.h"26#endif27
28#include <stdlib.h>29
30#define MAX_THREADPOOL_SIZE 102431
32static uv_once_t once = UV_ONCE_INIT;33static uv_cond_t cond;34static uv_mutex_t mutex;35static unsigned int idle_threads;36static unsigned int slow_io_work_running;37static unsigned int nthreads;38static uv_thread_t* threads;39static uv_thread_t default_threads[4];40static struct uv__queue exit_message;41static struct uv__queue wq;42static struct uv__queue run_slow_work_message;43static struct uv__queue slow_io_pending_wq;44
45static unsigned int slow_work_thread_threshold(void) {46return (nthreads + 1) / 2;47}
48
49static void uv__cancelled(struct uv__work* w) {50abort();51}
52
53
54/* To avoid deadlock with uv_cancel() it's crucial that the worker
55* never holds the global mutex and the loop-local mutex at the same time.
56*/
57static void worker(void* arg) {58struct uv__work* w;59struct uv__queue* q;60int is_slow_work;61
62uv_sem_post((uv_sem_t*) arg);63arg = NULL;64
65uv_mutex_lock(&mutex);66for (;;) {67/* `mutex` should always be locked at this point. */68
69/* Keep waiting while either no work is present or only slow I/O70and we're at the threshold for that. */
71while (uv__queue_empty(&wq) ||72(uv__queue_head(&wq) == &run_slow_work_message &&73uv__queue_next(&run_slow_work_message) == &wq &&74slow_io_work_running >= slow_work_thread_threshold())) {75idle_threads += 1;76uv_cond_wait(&cond, &mutex);77idle_threads -= 1;78}79
80q = uv__queue_head(&wq);81if (q == &exit_message) {82uv_cond_signal(&cond);83uv_mutex_unlock(&mutex);84break;85}86
87uv__queue_remove(q);88uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */89
90is_slow_work = 0;91if (q == &run_slow_work_message) {92/* If we're at the slow I/O threshold, re-schedule until after all93other work in the queue is done. */
94if (slow_io_work_running >= slow_work_thread_threshold()) {95uv__queue_insert_tail(&wq, q);96continue;97}98
99/* If we encountered a request to run slow I/O work but there is none100to run, that means it's cancelled => Start over. */
101if (uv__queue_empty(&slow_io_pending_wq))102continue;103
104is_slow_work = 1;105slow_io_work_running++;106
107q = uv__queue_head(&slow_io_pending_wq);108uv__queue_remove(q);109uv__queue_init(q);110
111/* If there is more slow I/O work, schedule it to be run as well. */112if (!uv__queue_empty(&slow_io_pending_wq)) {113uv__queue_insert_tail(&wq, &run_slow_work_message);114if (idle_threads > 0)115uv_cond_signal(&cond);116}117}118
119uv_mutex_unlock(&mutex);120
121w = uv__queue_data(q, struct uv__work, wq);122w->work(w);123
124uv_mutex_lock(&w->loop->wq_mutex);125w->work = NULL; /* Signal uv_cancel() that the work req is done126executing. */
127uv__queue_insert_tail(&w->loop->wq, &w->wq);128uv_async_send(&w->loop->wq_async);129uv_mutex_unlock(&w->loop->wq_mutex);130
131/* Lock `mutex` since that is expected at the start of the next132* iteration. */
133uv_mutex_lock(&mutex);134if (is_slow_work) {135/* `slow_io_work_running` is protected by `mutex`. */136slow_io_work_running--;137}138}139}
140
141
142static void post(struct uv__queue* q, enum uv__work_kind kind) {143uv_mutex_lock(&mutex);144if (kind == UV__WORK_SLOW_IO) {145/* Insert into a separate queue. */146uv__queue_insert_tail(&slow_io_pending_wq, q);147if (!uv__queue_empty(&run_slow_work_message)) {148/* Running slow I/O tasks is already scheduled => Nothing to do here.149The worker that runs said other task will schedule this one as well. */
150uv_mutex_unlock(&mutex);151return;152}153q = &run_slow_work_message;154}155
156uv__queue_insert_tail(&wq, q);157if (idle_threads > 0)158uv_cond_signal(&cond);159uv_mutex_unlock(&mutex);160}
161
162
163#ifdef __MVS__164/* TODO(itodorov) - zos: revisit when Woz compiler is available. */
165__attribute__((destructor))166#endif167void uv__threadpool_cleanup(void) {168unsigned int i;169
170if (nthreads == 0)171return;172
173#ifndef __MVS__174/* TODO(gabylb) - zos: revisit when Woz compiler is available. */175post(&exit_message, UV__WORK_CPU);176#endif177
178for (i = 0; i < nthreads; i++)179if (uv_thread_join(threads + i))180abort();181
182if (threads != default_threads)183uv__free(threads);184
185uv_mutex_destroy(&mutex);186uv_cond_destroy(&cond);187
188threads = NULL;189nthreads = 0;190}
191
192
193static void init_threads(void) {194uv_thread_options_t config;195unsigned int i;196const char* val;197uv_sem_t sem;198
199nthreads = ARRAY_SIZE(default_threads);200val = getenv("UV_THREADPOOL_SIZE");201if (val != NULL)202nthreads = atoi(val);203if (nthreads == 0)204nthreads = 1;205if (nthreads > MAX_THREADPOOL_SIZE)206nthreads = MAX_THREADPOOL_SIZE;207
208threads = default_threads;209if (nthreads > ARRAY_SIZE(default_threads)) {210threads = uv__malloc(nthreads * sizeof(threads[0]));211if (threads == NULL) {212nthreads = ARRAY_SIZE(default_threads);213threads = default_threads;214}215}216
217if (uv_cond_init(&cond))218abort();219
220if (uv_mutex_init(&mutex))221abort();222
223uv__queue_init(&wq);224uv__queue_init(&slow_io_pending_wq);225uv__queue_init(&run_slow_work_message);226
227if (uv_sem_init(&sem, 0))228abort();229
230config.flags = UV_THREAD_HAS_STACK_SIZE;231config.stack_size = 8u << 20; /* 8 MB */232
233for (i = 0; i < nthreads; i++)234if (uv_thread_create_ex(threads + i, &config, worker, &sem))235abort();236
237for (i = 0; i < nthreads; i++)238uv_sem_wait(&sem);239
240uv_sem_destroy(&sem);241}
242
243
244#ifndef _WIN32245static void reset_once(void) {246uv_once_t child_once = UV_ONCE_INIT;247memcpy(&once, &child_once, sizeof(child_once));248}
249#endif250
251
252static void init_once(void) {253#ifndef _WIN32254/* Re-initialize the threadpool after fork.255* Note that this discards the global mutex and condition as well
256* as the work queue.
257*/
258if (pthread_atfork(NULL, NULL, &reset_once))259abort();260#endif261init_threads();262}
263
264
265void uv__work_submit(uv_loop_t* loop,266struct uv__work* w,267enum uv__work_kind kind,268void (*work)(struct uv__work* w),269void (*done)(struct uv__work* w, int status)) {270uv_once(&once, init_once);271w->loop = loop;272w->work = work;273w->done = done;274post(&w->wq, kind);275}
276
277
278/* TODO(bnoordhuis) teach libuv how to cancel file operations
279* that go through io_uring instead of the thread pool.
280*/
281static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {282int cancelled;283
284uv_once(&once, init_once); /* Ensure |mutex| is initialized. */285uv_mutex_lock(&mutex);286uv_mutex_lock(&w->loop->wq_mutex);287
288cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;289if (cancelled)290uv__queue_remove(&w->wq);291
292uv_mutex_unlock(&w->loop->wq_mutex);293uv_mutex_unlock(&mutex);294
295if (!cancelled)296return UV_EBUSY;297
298w->work = uv__cancelled;299uv_mutex_lock(&loop->wq_mutex);300uv__queue_insert_tail(&loop->wq, &w->wq);301uv_async_send(&loop->wq_async);302uv_mutex_unlock(&loop->wq_mutex);303
304return 0;305}
306
307
308void uv__work_done(uv_async_t* handle) {309struct uv__work* w;310uv_loop_t* loop;311struct uv__queue* q;312struct uv__queue wq;313int err;314int nevents;315
316loop = container_of(handle, uv_loop_t, wq_async);317uv_mutex_lock(&loop->wq_mutex);318uv__queue_move(&loop->wq, &wq);319uv_mutex_unlock(&loop->wq_mutex);320
321nevents = 0;322
323while (!uv__queue_empty(&wq)) {324q = uv__queue_head(&wq);325uv__queue_remove(q);326
327w = container_of(q, struct uv__work, wq);328err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;329w->done(w, err);330nevents++;331}332
333/* This check accomplishes 2 things:334* 1. Even if the queue was empty, the call to uv__work_done() should count
335* as an event. Which will have been added by the event loop when
336* calling this callback.
337* 2. Prevents accidental wrap around in case nevents == 0 events == 0.
338*/
339if (nevents > 1) {340/* Subtract 1 to counter the call to uv__work_done(). */341uv__metrics_inc_events(loop, nevents - 1);342if (uv__get_internal_fields(loop)->current_timeout == 0)343uv__metrics_inc_events_waiting(loop, nevents - 1);344}345}
346
347
348static void uv__queue_work(struct uv__work* w) {349uv_work_t* req = container_of(w, uv_work_t, work_req);350
351req->work_cb(req);352}
353
354
355static void uv__queue_done(struct uv__work* w, int err) {356uv_work_t* req;357
358req = container_of(w, uv_work_t, work_req);359uv__req_unregister(req->loop, req);360
361if (req->after_work_cb == NULL)362return;363
364req->after_work_cb(req, err);365}
366
367
368int uv_queue_work(uv_loop_t* loop,369uv_work_t* req,370uv_work_cb work_cb,371uv_after_work_cb after_work_cb) {372if (work_cb == NULL)373return UV_EINVAL;374
375uv__req_init(loop, req, UV_WORK);376req->loop = loop;377req->work_cb = work_cb;378req->after_work_cb = after_work_cb;379uv__work_submit(loop,380&req->work_req,381UV__WORK_CPU,382uv__queue_work,383uv__queue_done);384return 0;385}
386
387
388int uv_cancel(uv_req_t* req) {389struct uv__work* wreq;390uv_loop_t* loop;391
392switch (req->type) {393case UV_FS:394loop = ((uv_fs_t*) req)->loop;395wreq = &((uv_fs_t*) req)->work_req;396break;397case UV_GETADDRINFO:398loop = ((uv_getaddrinfo_t*) req)->loop;399wreq = &((uv_getaddrinfo_t*) req)->work_req;400break;401case UV_GETNAMEINFO:402loop = ((uv_getnameinfo_t*) req)->loop;403wreq = &((uv_getnameinfo_t*) req)->work_req;404break;405case UV_RANDOM:406loop = ((uv_random_t*) req)->loop;407wreq = &((uv_random_t*) req)->work_req;408break;409case UV_WORK:410loop = ((uv_work_t*) req)->loop;411wreq = &((uv_work_t*) req)->work_req;412break;413default:414return UV_EINVAL;415}416
417return uv__work_cancel(loop, req, wreq);418}
419