libuv-svace-build

Форк
0
/
threadpool.c 
418 строк · 10.8 Кб
1
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2
 *
3
 * Permission is hereby granted, free of charge, to any person obtaining a copy
4
 * of this software and associated documentation files (the "Software"), to
5
 * deal in the Software without restriction, including without limitation the
6
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7
 * sell copies of the Software, and to permit persons to whom the Software is
8
 * furnished to do so, subject to the following conditions:
9
 *
10
 * The above copyright notice and this permission notice shall be included in
11
 * all copies or substantial portions of the Software.
12
 *
13
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19
 * IN THE SOFTWARE.
20
 */
21

22
#include "uv-common.h"
23

24
#if !defined(_WIN32)
25
# include "unix/internal.h"
26
#endif
27

28
#include <stdlib.h>
29

30
#define MAX_THREADPOOL_SIZE 1024
31

32
static uv_once_t once = UV_ONCE_INIT;
33
static uv_cond_t cond;
34
static uv_mutex_t mutex;
35
static unsigned int idle_threads;
36
static unsigned int slow_io_work_running;
37
static unsigned int nthreads;
38
static uv_thread_t* threads;
39
static uv_thread_t default_threads[4];
40
static struct uv__queue exit_message;
41
static struct uv__queue wq;
42
static struct uv__queue run_slow_work_message;
43
static struct uv__queue slow_io_pending_wq;
44

45
static unsigned int slow_work_thread_threshold(void) {
46
  return (nthreads + 1) / 2;
47
}
48

49
static void uv__cancelled(struct uv__work* w) {
50
  abort();
51
}
52

53

54
/* To avoid deadlock with uv_cancel() it's crucial that the worker
55
 * never holds the global mutex and the loop-local mutex at the same time.
56
 */
57
static void worker(void* arg) {
58
  struct uv__work* w;
59
  struct uv__queue* q;
60
  int is_slow_work;
61

62
  uv_sem_post((uv_sem_t*) arg);
63
  arg = NULL;
64

65
  uv_mutex_lock(&mutex);
66
  for (;;) {
67
    /* `mutex` should always be locked at this point. */
68

69
    /* Keep waiting while either no work is present or only slow I/O
70
       and we're at the threshold for that. */
71
    while (uv__queue_empty(&wq) ||
72
           (uv__queue_head(&wq) == &run_slow_work_message &&
73
            uv__queue_next(&run_slow_work_message) == &wq &&
74
            slow_io_work_running >= slow_work_thread_threshold())) {
75
      idle_threads += 1;
76
      uv_cond_wait(&cond, &mutex);
77
      idle_threads -= 1;
78
    }
79

80
    q = uv__queue_head(&wq);
81
    if (q == &exit_message) {
82
      uv_cond_signal(&cond);
83
      uv_mutex_unlock(&mutex);
84
      break;
85
    }
86

87
    uv__queue_remove(q);
88
    uv__queue_init(q);  /* Signal uv_cancel() that the work req is executing. */
89

90
    is_slow_work = 0;
91
    if (q == &run_slow_work_message) {
92
      /* If we're at the slow I/O threshold, re-schedule until after all
93
         other work in the queue is done. */
94
      if (slow_io_work_running >= slow_work_thread_threshold()) {
95
        uv__queue_insert_tail(&wq, q);
96
        continue;
97
      }
98

99
      /* If we encountered a request to run slow I/O work but there is none
100
         to run, that means it's cancelled => Start over. */
101
      if (uv__queue_empty(&slow_io_pending_wq))
102
        continue;
103

104
      is_slow_work = 1;
105
      slow_io_work_running++;
106

107
      q = uv__queue_head(&slow_io_pending_wq);
108
      uv__queue_remove(q);
109
      uv__queue_init(q);
110

111
      /* If there is more slow I/O work, schedule it to be run as well. */
112
      if (!uv__queue_empty(&slow_io_pending_wq)) {
113
        uv__queue_insert_tail(&wq, &run_slow_work_message);
114
        if (idle_threads > 0)
115
          uv_cond_signal(&cond);
116
      }
117
    }
118

119
    uv_mutex_unlock(&mutex);
120

121
    w = uv__queue_data(q, struct uv__work, wq);
122
    w->work(w);
123

124
    uv_mutex_lock(&w->loop->wq_mutex);
125
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
126
                        executing. */
127
    uv__queue_insert_tail(&w->loop->wq, &w->wq);
128
    uv_async_send(&w->loop->wq_async);
129
    uv_mutex_unlock(&w->loop->wq_mutex);
130

131
    /* Lock `mutex` since that is expected at the start of the next
132
     * iteration. */
133
    uv_mutex_lock(&mutex);
134
    if (is_slow_work) {
135
      /* `slow_io_work_running` is protected by `mutex`. */
136
      slow_io_work_running--;
137
    }
138
  }
139
}
140

141

142
static void post(struct uv__queue* q, enum uv__work_kind kind) {
143
  uv_mutex_lock(&mutex);
144
  if (kind == UV__WORK_SLOW_IO) {
145
    /* Insert into a separate queue. */
146
    uv__queue_insert_tail(&slow_io_pending_wq, q);
147
    if (!uv__queue_empty(&run_slow_work_message)) {
148
      /* Running slow I/O tasks is already scheduled => Nothing to do here.
149
         The worker that runs said other task will schedule this one as well. */
150
      uv_mutex_unlock(&mutex);
151
      return;
152
    }
153
    q = &run_slow_work_message;
154
  }
155

156
  uv__queue_insert_tail(&wq, q);
157
  if (idle_threads > 0)
158
    uv_cond_signal(&cond);
159
  uv_mutex_unlock(&mutex);
160
}
161

162

163
#ifdef __MVS__
164
/* TODO(itodorov) - zos: revisit when Woz compiler is available. */
165
__attribute__((destructor))
166
#endif
167
void uv__threadpool_cleanup(void) {
168
  unsigned int i;
169

170
  if (nthreads == 0)
171
    return;
172

173
#ifndef __MVS__
174
  /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
175
  post(&exit_message, UV__WORK_CPU);
176
#endif
177

178
  for (i = 0; i < nthreads; i++)
179
    if (uv_thread_join(threads + i))
180
      abort();
181

182
  if (threads != default_threads)
183
    uv__free(threads);
184

185
  uv_mutex_destroy(&mutex);
186
  uv_cond_destroy(&cond);
187

188
  threads = NULL;
189
  nthreads = 0;
190
}
191

192

193
static void init_threads(void) {
194
  uv_thread_options_t config;
195
  unsigned int i;
196
  const char* val;
197
  uv_sem_t sem;
198

199
  nthreads = ARRAY_SIZE(default_threads);
200
  val = getenv("UV_THREADPOOL_SIZE");
201
  if (val != NULL)
202
    nthreads = atoi(val);
203
  if (nthreads == 0)
204
    nthreads = 1;
205
  if (nthreads > MAX_THREADPOOL_SIZE)
206
    nthreads = MAX_THREADPOOL_SIZE;
207

208
  threads = default_threads;
209
  if (nthreads > ARRAY_SIZE(default_threads)) {
210
    threads = uv__malloc(nthreads * sizeof(threads[0]));
211
    if (threads == NULL) {
212
      nthreads = ARRAY_SIZE(default_threads);
213
      threads = default_threads;
214
    }
215
  }
216

217
  if (uv_cond_init(&cond))
218
    abort();
219

220
  if (uv_mutex_init(&mutex))
221
    abort();
222

223
  uv__queue_init(&wq);
224
  uv__queue_init(&slow_io_pending_wq);
225
  uv__queue_init(&run_slow_work_message);
226

227
  if (uv_sem_init(&sem, 0))
228
    abort();
229

230
  config.flags = UV_THREAD_HAS_STACK_SIZE;
231
  config.stack_size = 8u << 20;  /* 8 MB */
232

233
  for (i = 0; i < nthreads; i++)
234
    if (uv_thread_create_ex(threads + i, &config, worker, &sem))
235
      abort();
236

237
  for (i = 0; i < nthreads; i++)
238
    uv_sem_wait(&sem);
239

240
  uv_sem_destroy(&sem);
241
}
242

243

244
#ifndef _WIN32
245
static void reset_once(void) {
246
  uv_once_t child_once = UV_ONCE_INIT;
247
  memcpy(&once, &child_once, sizeof(child_once));
248
}
249
#endif
250

251

252
static void init_once(void) {
253
#ifndef _WIN32
254
  /* Re-initialize the threadpool after fork.
255
   * Note that this discards the global mutex and condition as well
256
   * as the work queue.
257
   */
258
  if (pthread_atfork(NULL, NULL, &reset_once))
259
    abort();
260
#endif
261
  init_threads();
262
}
263

264

265
void uv__work_submit(uv_loop_t* loop,
266
                     struct uv__work* w,
267
                     enum uv__work_kind kind,
268
                     void (*work)(struct uv__work* w),
269
                     void (*done)(struct uv__work* w, int status)) {
270
  uv_once(&once, init_once);
271
  w->loop = loop;
272
  w->work = work;
273
  w->done = done;
274
  post(&w->wq, kind);
275
}
276

277

278
/* TODO(bnoordhuis) teach libuv how to cancel file operations
279
 * that go through io_uring instead of the thread pool.
280
 */
281
static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
282
  int cancelled;
283

284
  uv_once(&once, init_once);  /* Ensure |mutex| is initialized. */
285
  uv_mutex_lock(&mutex);
286
  uv_mutex_lock(&w->loop->wq_mutex);
287

288
  cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
289
  if (cancelled)
290
    uv__queue_remove(&w->wq);
291

292
  uv_mutex_unlock(&w->loop->wq_mutex);
293
  uv_mutex_unlock(&mutex);
294

295
  if (!cancelled)
296
    return UV_EBUSY;
297

298
  w->work = uv__cancelled;
299
  uv_mutex_lock(&loop->wq_mutex);
300
  uv__queue_insert_tail(&loop->wq, &w->wq);
301
  uv_async_send(&loop->wq_async);
302
  uv_mutex_unlock(&loop->wq_mutex);
303

304
  return 0;
305
}
306

307

308
void uv__work_done(uv_async_t* handle) {
309
  struct uv__work* w;
310
  uv_loop_t* loop;
311
  struct uv__queue* q;
312
  struct uv__queue wq;
313
  int err;
314
  int nevents;
315

316
  loop = container_of(handle, uv_loop_t, wq_async);
317
  uv_mutex_lock(&loop->wq_mutex);
318
  uv__queue_move(&loop->wq, &wq);
319
  uv_mutex_unlock(&loop->wq_mutex);
320

321
  nevents = 0;
322

323
  while (!uv__queue_empty(&wq)) {
324
    q = uv__queue_head(&wq);
325
    uv__queue_remove(q);
326

327
    w = container_of(q, struct uv__work, wq);
328
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
329
    w->done(w, err);
330
    nevents++;
331
  }
332

333
  /* This check accomplishes 2 things:
334
   * 1. Even if the queue was empty, the call to uv__work_done() should count
335
   *    as an event. Which will have been added by the event loop when
336
   *    calling this callback.
337
   * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
338
   */
339
  if (nevents > 1) {
340
    /* Subtract 1 to counter the call to uv__work_done(). */
341
    uv__metrics_inc_events(loop, nevents - 1);
342
    if (uv__get_internal_fields(loop)->current_timeout == 0)
343
      uv__metrics_inc_events_waiting(loop, nevents - 1);
344
  }
345
}
346

347

348
static void uv__queue_work(struct uv__work* w) {
349
  uv_work_t* req = container_of(w, uv_work_t, work_req);
350

351
  req->work_cb(req);
352
}
353

354

355
static void uv__queue_done(struct uv__work* w, int err) {
356
  uv_work_t* req;
357

358
  req = container_of(w, uv_work_t, work_req);
359
  uv__req_unregister(req->loop, req);
360

361
  if (req->after_work_cb == NULL)
362
    return;
363

364
  req->after_work_cb(req, err);
365
}
366

367

368
int uv_queue_work(uv_loop_t* loop,
369
                  uv_work_t* req,
370
                  uv_work_cb work_cb,
371
                  uv_after_work_cb after_work_cb) {
372
  if (work_cb == NULL)
373
    return UV_EINVAL;
374

375
  uv__req_init(loop, req, UV_WORK);
376
  req->loop = loop;
377
  req->work_cb = work_cb;
378
  req->after_work_cb = after_work_cb;
379
  uv__work_submit(loop,
380
                  &req->work_req,
381
                  UV__WORK_CPU,
382
                  uv__queue_work,
383
                  uv__queue_done);
384
  return 0;
385
}
386

387

388
int uv_cancel(uv_req_t* req) {
389
  struct uv__work* wreq;
390
  uv_loop_t* loop;
391

392
  switch (req->type) {
393
  case UV_FS:
394
    loop =  ((uv_fs_t*) req)->loop;
395
    wreq = &((uv_fs_t*) req)->work_req;
396
    break;
397
  case UV_GETADDRINFO:
398
    loop =  ((uv_getaddrinfo_t*) req)->loop;
399
    wreq = &((uv_getaddrinfo_t*) req)->work_req;
400
    break;
401
  case UV_GETNAMEINFO:
402
    loop = ((uv_getnameinfo_t*) req)->loop;
403
    wreq = &((uv_getnameinfo_t*) req)->work_req;
404
    break;
405
  case UV_RANDOM:
406
    loop = ((uv_random_t*) req)->loop;
407
    wreq = &((uv_random_t*) req)->work_req;
408
    break;
409
  case UV_WORK:
410
    loop =  ((uv_work_t*) req)->loop;
411
    wreq = &((uv_work_t*) req)->work_req;
412
    break;
413
  default:
414
    return UV_EINVAL;
415
  }
416

417
  return uv__work_cancel(loop, req, wreq);
418
}
419

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.