libuv-svace-build

Форк
0
/
test-threadpool-cancel.c 
418 строк · 11.2 Кб
1
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2
 *
3
 * Permission is hereby granted, free of charge, to any person obtaining a copy
4
 * of this software and associated documentation files (the "Software"), to
5
 * deal in the Software without restriction, including without limitation the
6
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7
 * sell copies of the Software, and to permit persons to whom the Software is
8
 * furnished to do so, subject to the following conditions:
9
 *
10
 * The above copyright notice and this permission notice shall be included in
11
 * all copies or substantial portions of the Software.
12
 *
13
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19
 * IN THE SOFTWARE.
20
 */
21

22
#include "uv.h"
23
#include "task.h"
24

25
#ifdef _WIN32
26
# define putenv _putenv
27
#endif
28

29
#define INIT_CANCEL_INFO(ci, what)                                            \
30
  do {                                                                        \
31
    (ci)->reqs = (what);                                                      \
32
    (ci)->nreqs = ARRAY_SIZE(what);                                           \
33
    (ci)->stride = sizeof((what)[0]);                                         \
34
  }                                                                           \
35
  while (0)
36

37
struct cancel_info {
38
  void* reqs;
39
  unsigned nreqs;
40
  unsigned stride;
41
  uv_timer_t timer_handle;
42
};
43

44
struct random_info {
45
  uv_random_t random_req;
46
  char buf[1];
47
};
48

49
static unsigned fs_cb_called;
50
static unsigned done_cb_called;
51
static unsigned done2_cb_called;
52
static unsigned timer_cb_called;
53
static uv_work_t pause_reqs[4];
54
static uv_sem_t pause_sems[ARRAY_SIZE(pause_reqs)];
55

56

57
static void work_cb(uv_work_t* req) {
58
  uv_sem_wait(pause_sems + (req - pause_reqs));
59
}
60

61

62
static void done_cb(uv_work_t* req, int status) {
63
  uv_sem_destroy(pause_sems + (req - pause_reqs));
64
}
65

66

67
static void saturate_threadpool(void) {
68
  uv_loop_t* loop;
69
  char buf[64];
70
  size_t i;
71

72
  snprintf(buf,
73
           sizeof(buf),
74
           "UV_THREADPOOL_SIZE=%lu",
75
           (unsigned long)ARRAY_SIZE(pause_reqs));
76
  putenv(buf);
77

78
  loop = uv_default_loop();
79
  for (i = 0; i < ARRAY_SIZE(pause_reqs); i += 1) {
80
    ASSERT_OK(uv_sem_init(pause_sems + i, 0));
81
    ASSERT_OK(uv_queue_work(loop, pause_reqs + i, work_cb, done_cb));
82
  }
83
}
84

85

86
static void unblock_threadpool(void) {
87
  size_t i;
88

89
  for (i = 0; i < ARRAY_SIZE(pause_reqs); i += 1)
90
    uv_sem_post(pause_sems + i);
91
}
92

93

94
static int known_broken(uv_req_t* req) {
95
  if (req->type != UV_FS)
96
    return 0;
97

98
#ifdef __linux__
99
  /* TODO(bnoordhuis) make cancellation work with io_uring */
100
  switch (((uv_fs_t*) req)->fs_type) {
101
    case UV_FS_CLOSE:
102
    case UV_FS_FDATASYNC:
103
    case UV_FS_FSTAT:
104
    case UV_FS_FSYNC:
105
    case UV_FS_LINK:
106
    case UV_FS_LSTAT:
107
    case UV_FS_MKDIR:
108
    case UV_FS_OPEN:
109
    case UV_FS_READ:
110
    case UV_FS_RENAME:
111
    case UV_FS_STAT:
112
    case UV_FS_SYMLINK:
113
    case UV_FS_WRITE:
114
    case UV_FS_UNLINK:
115
      return 1;
116
    default:  /* Squelch -Wswitch warnings. */
117
      break;
118
  }
119
#endif
120

121
  return 0;
122
}
123

124

125
static void fs_cb(uv_fs_t* req) {
126
  ASSERT_NE(known_broken((uv_req_t*) req) || \
127
      req->result == UV_ECANCELED, 0);
128
  uv_fs_req_cleanup(req);
129
  fs_cb_called++;
130
}
131

132

133
static void getaddrinfo_cb(uv_getaddrinfo_t* req,
134
                           int status,
135
                           struct addrinfo* res) {
136
  ASSERT_EQ(status, UV_EAI_CANCELED);
137
  ASSERT_NULL(res);
138
  uv_freeaddrinfo(res);  /* Should not crash. */
139
}
140

141

142
static void getnameinfo_cb(uv_getnameinfo_t* handle,
143
                           int status,
144
                           const char* hostname,
145
                           const char* service) {
146
  ASSERT_EQ(status, UV_EAI_CANCELED);
147
  ASSERT_NULL(hostname);
148
  ASSERT_NULL(service);
149
}
150

151

152
static void work2_cb(uv_work_t* req) {
153
  ASSERT(0 && "work2_cb called");
154
}
155

156

157
static void done2_cb(uv_work_t* req, int status) {
158
  ASSERT_EQ(status, UV_ECANCELED);
159
  done2_cb_called++;
160
}
161

162

163
static void timer_cb(uv_timer_t* handle) {
164
  struct cancel_info* ci;
165
  uv_req_t* req;
166
  unsigned i;
167

168
  ci = container_of(handle, struct cancel_info, timer_handle);
169

170
  for (i = 0; i < ci->nreqs; i++) {
171
    req = (uv_req_t*) ((char*) ci->reqs + i * ci->stride);
172
    ASSERT(known_broken(req) || 0 == uv_cancel(req));
173
  }
174

175
  uv_close((uv_handle_t*) &ci->timer_handle, NULL);
176
  unblock_threadpool();
177
  timer_cb_called++;
178
}
179

180

181
static void nop_done_cb(uv_work_t* req, int status) {
182
  ASSERT_EQ(status, UV_ECANCELED);
183
  done_cb_called++;
184
}
185

186

187
static void nop_random_cb(uv_random_t* req, int status, void* buf, size_t len) {
188
  struct random_info* ri;
189

190
  ri = container_of(req, struct random_info, random_req);
191

192
  ASSERT_EQ(status, UV_ECANCELED);
193
  ASSERT_PTR_EQ(buf, (void*) ri->buf);
194
  ASSERT_EQ(len, sizeof(ri->buf));
195

196
  done_cb_called++;
197
}
198

199

200
TEST_IMPL(threadpool_cancel_getaddrinfo) {
201
  uv_getaddrinfo_t reqs[4];
202
  struct cancel_info ci;
203
  struct addrinfo hints;
204
  uv_loop_t* loop;
205
  int r;
206

207
  INIT_CANCEL_INFO(&ci, reqs);
208
  loop = uv_default_loop();
209
  saturate_threadpool();
210

211
  r = uv_getaddrinfo(loop, reqs + 0, getaddrinfo_cb, "fail", NULL, NULL);
212
  ASSERT_OK(r);
213

214
  r = uv_getaddrinfo(loop, reqs + 1, getaddrinfo_cb, NULL, "fail", NULL);
215
  ASSERT_OK(r);
216

217
  r = uv_getaddrinfo(loop, reqs + 2, getaddrinfo_cb, "fail", "fail", NULL);
218
  ASSERT_OK(r);
219

220
  r = uv_getaddrinfo(loop, reqs + 3, getaddrinfo_cb, "fail", NULL, &hints);
221
  ASSERT_OK(r);
222

223
  ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
224
  ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
225
  ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
226
  ASSERT_EQ(1, timer_cb_called);
227

228
  MAKE_VALGRIND_HAPPY(loop);
229
  return 0;
230
}
231

232

233
TEST_IMPL(threadpool_cancel_getnameinfo) {
234
  uv_getnameinfo_t reqs[4];
235
  struct sockaddr_in addr4;
236
  struct cancel_info ci;
237
  uv_loop_t* loop;
238
  int r;
239

240
  r = uv_ip4_addr("127.0.0.1", 80, &addr4);
241
  ASSERT_OK(r);
242

243
  INIT_CANCEL_INFO(&ci, reqs);
244
  loop = uv_default_loop();
245
  saturate_threadpool();
246

247
  r = uv_getnameinfo(loop, reqs + 0, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
248
  ASSERT_OK(r);
249

250
  r = uv_getnameinfo(loop, reqs + 1, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
251
  ASSERT_OK(r);
252

253
  r = uv_getnameinfo(loop, reqs + 2, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
254
  ASSERT_OK(r);
255

256
  r = uv_getnameinfo(loop, reqs + 3, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
257
  ASSERT_OK(r);
258

259
  ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
260
  ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
261
  ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
262
  ASSERT_EQ(1, timer_cb_called);
263

264
  MAKE_VALGRIND_HAPPY(loop);
265
  return 0;
266
}
267

268

269
TEST_IMPL(threadpool_cancel_random) {
270
  struct random_info req;
271
  uv_loop_t* loop;
272

273
  saturate_threadpool();
274
  loop = uv_default_loop();
275
  ASSERT_OK(uv_random(loop,
276
                      &req.random_req,
277
                      &req.buf,
278
                      sizeof(req.buf),
279
                      0,
280
                      nop_random_cb));
281
  ASSERT_OK(uv_cancel((uv_req_t*) &req));
282
  ASSERT_OK(done_cb_called);
283
  unblock_threadpool();
284
  ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
285
  ASSERT_EQ(1, done_cb_called);
286

287
  MAKE_VALGRIND_HAPPY(loop);
288
  return 0;
289
}
290

291

292
TEST_IMPL(threadpool_cancel_work) {
293
  struct cancel_info ci;
294
  uv_work_t reqs[16];
295
  uv_loop_t* loop;
296
  unsigned i;
297

298
  INIT_CANCEL_INFO(&ci, reqs);
299
  loop = uv_default_loop();
300
  saturate_threadpool();
301

302
  for (i = 0; i < ARRAY_SIZE(reqs); i++)
303
    ASSERT_OK(uv_queue_work(loop, reqs + i, work2_cb, done2_cb));
304

305
  ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
306
  ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
307
  ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
308
  ASSERT_EQ(1, timer_cb_called);
309
  ASSERT_EQ(ARRAY_SIZE(reqs), done2_cb_called);
310

311
  MAKE_VALGRIND_HAPPY(loop);
312
  return 0;
313
}
314

315

316
TEST_IMPL(threadpool_cancel_fs) {
317
  struct cancel_info ci;
318
  uv_fs_t reqs[26];
319
  uv_loop_t* loop;
320
  unsigned n;
321
  uv_buf_t iov;
322

323
  INIT_CANCEL_INFO(&ci, reqs);
324
  loop = uv_default_loop();
325
  saturate_threadpool();
326
  iov = uv_buf_init(NULL, 0);
327

328
  /* Needs to match ARRAY_SIZE(fs_reqs). */
329
  n = 0;
330
  ASSERT_OK(uv_fs_chmod(loop, reqs + n++, "/", 0, fs_cb));
331
  ASSERT_OK(uv_fs_chown(loop, reqs + n++, "/", 0, 0, fs_cb));
332
  ASSERT_OK(uv_fs_close(loop, reqs + n++, 0, fs_cb));
333
  ASSERT_OK(uv_fs_fchmod(loop, reqs + n++, 0, 0, fs_cb));
334
  ASSERT_OK(uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fs_cb));
335
  ASSERT_OK(uv_fs_fdatasync(loop, reqs + n++, 0, fs_cb));
336
  ASSERT_OK(uv_fs_fstat(loop, reqs + n++, 0, fs_cb));
337
  ASSERT_OK(uv_fs_fsync(loop, reqs + n++, 0, fs_cb));
338
  ASSERT_OK(uv_fs_ftruncate(loop, reqs + n++, 0, 0, fs_cb));
339
  ASSERT_OK(uv_fs_futime(loop, reqs + n++, 0, 0, 0, fs_cb));
340
  ASSERT_OK(uv_fs_link(loop, reqs + n++, "/", "/", fs_cb));
341
  ASSERT_OK(uv_fs_lstat(loop, reqs + n++, "/", fs_cb));
342
  ASSERT_OK(uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
343
  ASSERT_OK(uv_fs_open(loop, reqs + n++, "/", 0, 0, fs_cb));
344
  ASSERT_OK(uv_fs_read(loop, reqs + n++, -1, &iov, 1, 0, fs_cb));
345
  ASSERT_OK(uv_fs_scandir(loop, reqs + n++, "/", 0, fs_cb));
346
  ASSERT_OK(uv_fs_readlink(loop, reqs + n++, "/", fs_cb));
347
  ASSERT_OK(uv_fs_realpath(loop, reqs + n++, "/", fs_cb));
348
  ASSERT_OK(uv_fs_rename(loop, reqs + n++, "/", "/", fs_cb));
349
  ASSERT_OK(uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
350
  ASSERT_OK(uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fs_cb));
351
  ASSERT_OK(uv_fs_stat(loop, reqs + n++, "/", fs_cb));
352
  ASSERT_OK(uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fs_cb));
353
  ASSERT_OK(uv_fs_unlink(loop, reqs + n++, "/", fs_cb));
354
  ASSERT_OK(uv_fs_utime(loop, reqs + n++, "/", 0, 0, fs_cb));
355
  ASSERT_OK(uv_fs_write(loop, reqs + n++, -1, &iov, 1, 0, fs_cb));
356
  ASSERT_EQ(n, ARRAY_SIZE(reqs));
357

358
  ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
359
  ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
360
  ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
361
  ASSERT_EQ(n, fs_cb_called);
362
  ASSERT_EQ(1, timer_cb_called);
363

364

365
  MAKE_VALGRIND_HAPPY(loop);
366
  return 0;
367
}
368

369

370
TEST_IMPL(threadpool_cancel_single) {
371
  uv_loop_t* loop;
372
  uv_work_t req;
373

374
  saturate_threadpool();
375
  loop = uv_default_loop();
376
  ASSERT_OK(uv_queue_work(loop, &req, (uv_work_cb) abort, nop_done_cb));
377
  ASSERT_OK(uv_cancel((uv_req_t*) &req));
378
  ASSERT_OK(done_cb_called);
379
  unblock_threadpool();
380
  ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
381
  ASSERT_EQ(1, done_cb_called);
382

383
  MAKE_VALGRIND_HAPPY(loop);
384
  return 0;
385
}
386

387

388
static void after_busy_cb(uv_work_t* req, int status) {
389
  ASSERT_OK(status);
390
  done_cb_called++;
391
}
392

393
static void busy_cb(uv_work_t* req) {
394
  uv_sem_post((uv_sem_t*) req->data);
395
  /* Assume that calling uv_cancel() takes less than 10ms. */
396
  uv_sleep(10);
397
}
398

399
TEST_IMPL(threadpool_cancel_when_busy) {
400
  uv_sem_t sem_lock;
401
  uv_work_t req;
402

403
  req.data = &sem_lock;
404

405
  ASSERT_OK(uv_sem_init(&sem_lock, 0));
406
  ASSERT_OK(uv_queue_work(uv_default_loop(), &req, busy_cb, after_busy_cb));
407

408
  uv_sem_wait(&sem_lock);
409

410
  ASSERT_EQ(uv_cancel((uv_req_t*) &req), UV_EBUSY);
411
  ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT));
412
  ASSERT_EQ(1, done_cb_called);
413

414
  uv_sem_destroy(&sem_lock);
415

416
  MAKE_VALGRIND_HAPPY(uv_default_loop());
417
  return 0;
418
}
419

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.