libuv-svace-build

Форк
0
/
benchmark-pump.c 
478 строк · 10.0 Кб
1
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2
 *
3
 * Permission is hereby granted, free of charge, to any person obtaining a copy
4
 * of this software and associated documentation files (the "Software"), to
5
 * deal in the Software without restriction, including without limitation the
6
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7
 * sell copies of the Software, and to permit persons to whom the Software is
8
 * furnished to do so, subject to the following conditions:
9
 *
10
 * The above copyright notice and this permission notice shall be included in
11
 * all copies or substantial portions of the Software.
12
 *
13
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19
 * IN THE SOFTWARE.
20
 */
21

22
#include "task.h"
23
#include "uv.h"
24

25
#include <math.h>
26
#include <stdio.h>
27

28

29
static int TARGET_CONNECTIONS;
30
#define WRITE_BUFFER_SIZE           8192
31
#define MAX_SIMULTANEOUS_CONNECTS   100
32

33
#define PRINT_STATS                 0
34
#define STATS_INTERVAL              1000 /* msec */
35
#define STATS_COUNT                 5
36

37

38
static void do_write(uv_stream_t*);
39
static void maybe_connect_some(void);
40

41
static uv_req_t* req_alloc(void);
42
static void req_free(uv_req_t* uv_req);
43

44
static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
45
static void buf_free(const uv_buf_t* buf);
46

47
static uv_loop_t* loop;
48

49
static uv_tcp_t tcpServer;
50
static uv_pipe_t pipeServer;
51
static uv_stream_t* server;
52
static struct sockaddr_in listen_addr;
53
static struct sockaddr_in connect_addr;
54

55
static int64_t start_time;
56

57
static int max_connect_socket = 0;
58
static int max_read_sockets = 0;
59
static int read_sockets = 0;
60
static int write_sockets = 0;
61

62
static int64_t nrecv = 0;
63
static int64_t nrecv_total = 0;
64
static int64_t nsent = 0;
65
static int64_t nsent_total = 0;
66

67
static int stats_left = 0;
68

69
static char write_buffer[WRITE_BUFFER_SIZE];
70

71
/* Make this as large as you need. */
72
#define MAX_WRITE_HANDLES 1000
73

74
static stream_type type;
75

76
static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
77
static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
78

79
static uv_timer_t timer_handle;
80

81

82
static double gbit(int64_t bytes, int64_t passed_ms) {
83
  double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;
84
  return gbits / ((double)passed_ms / 1000);
85
}
86

87

88
static void show_stats(uv_timer_t* handle) {
89
  int64_t diff;
90
  int i;
91

92
#if PRINT_STATS
93
  fprintf(stderr, "connections: %d, write: %.1f gbit/s\n",
94
          write_sockets,
95
          gbit(nsent, STATS_INTERVAL));
96
  fflush(stderr);
97
#endif
98

99
  /* Exit if the show is over */
100
  if (!--stats_left) {
101

102
    uv_update_time(loop);
103
    diff = uv_now(loop) - start_time;
104

105
    fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n",
106
            type == TCP ? "tcp" : "pipe",
107
            write_sockets,
108
            gbit(nsent_total, diff));
109
    fflush(stderr);
110

111
    for (i = 0; i < write_sockets; i++) {
112
      if (type == TCP)
113
        uv_close((uv_handle_t*) &tcp_write_handles[i], NULL);
114
      else
115
        uv_close((uv_handle_t*) &pipe_write_handles[i], NULL);
116
    }
117

118
    exit(0);
119
  }
120

121
  /* Reset read and write counters */
122
  nrecv = 0;
123
  nsent = 0;
124
}
125

126

127
static void read_show_stats(void) {
128
  int64_t diff;
129

130
  uv_update_time(loop);
131
  diff = uv_now(loop) - start_time;
132

133
  fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n",
134
          type == TCP ? "tcp" : "pipe",
135
          max_read_sockets,
136
          gbit(nrecv_total, diff));
137
  fflush(stderr);
138
}
139

140

141

142
static void read_sockets_close_cb(uv_handle_t* handle) {
143
  free(handle);
144
  read_sockets--;
145

146
  /* If it's past the first second and everyone has closed their connection
147
   * Then print stats.
148
   */
149
  if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {
150
    read_show_stats();
151
    uv_close((uv_handle_t*)server, NULL);
152
  }
153
}
154

155

156
static void start_stats_collection(void) {
157
  int r;
158

159
  /* Show-stats timer */
160
  stats_left = STATS_COUNT;
161
  r = uv_timer_init(loop, &timer_handle);
162
  ASSERT_OK(r);
163
  r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);
164
  ASSERT_OK(r);
165

166
  uv_update_time(loop);
167
  start_time = uv_now(loop);
168
}
169

170

171
static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) {
172
  if (nrecv_total == 0) {
173
    ASSERT_OK(start_time);
174
    uv_update_time(loop);
175
    start_time = uv_now(loop);
176
  }
177

178
  if (bytes < 0) {
179
    uv_close((uv_handle_t*)stream, read_sockets_close_cb);
180
    return;
181
  }
182

183
  buf_free(buf);
184

185
  nrecv += bytes;
186
  nrecv_total += bytes;
187
}
188

189

190
static void write_cb(uv_write_t* req, int status) {
191
  ASSERT_OK(status);
192

193
  req_free((uv_req_t*) req);
194

195
  nsent += sizeof write_buffer;
196
  nsent_total += sizeof write_buffer;
197

198
  do_write((uv_stream_t*) req->handle);
199
}
200

201

202
static void do_write(uv_stream_t* stream) {
203
  uv_write_t* req;
204
  uv_buf_t buf;
205
  int r;
206

207
  buf.base = (char*) &write_buffer;
208
  buf.len = sizeof write_buffer;
209

210
  req = (uv_write_t*) req_alloc();
211
  r = uv_write(req, stream, &buf, 1, write_cb);
212
  ASSERT_OK(r);
213
}
214

215

216
static void connect_cb(uv_connect_t* req, int status) {
217
  int i;
218

219
  if (status) {
220
    fprintf(stderr, "%s", uv_strerror(status));
221
    fflush(stderr);
222
  }
223
  ASSERT_OK(status);
224

225
  write_sockets++;
226
  req_free((uv_req_t*) req);
227

228
  maybe_connect_some();
229

230
  if (write_sockets == TARGET_CONNECTIONS) {
231
    start_stats_collection();
232

233
    /* Yay! start writing */
234
    for (i = 0; i < write_sockets; i++) {
235
      if (type == TCP)
236
        do_write((uv_stream_t*) &tcp_write_handles[i]);
237
      else
238
        do_write((uv_stream_t*) &pipe_write_handles[i]);
239
    }
240
  }
241
}
242

243

244
static void maybe_connect_some(void) {
245
  uv_connect_t* req;
246
  uv_tcp_t* tcp;
247
  uv_pipe_t* pipe;
248
  int r;
249

250
  while (max_connect_socket < TARGET_CONNECTIONS &&
251
         max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
252
    if (type == TCP) {
253
      tcp = &tcp_write_handles[max_connect_socket++];
254

255
      r = uv_tcp_init(loop, tcp);
256
      ASSERT_OK(r);
257

258
      req = (uv_connect_t*) req_alloc();
259
      r = uv_tcp_connect(req,
260
                         tcp,
261
                         (const struct sockaddr*) &connect_addr,
262
                         connect_cb);
263
      ASSERT_OK(r);
264
    } else {
265
      pipe = &pipe_write_handles[max_connect_socket++];
266

267
      r = uv_pipe_init(loop, pipe, 0);
268
      ASSERT_OK(r);
269

270
      req = (uv_connect_t*) req_alloc();
271
      uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
272
    }
273
  }
274
}
275

276

277
static void connection_cb(uv_stream_t* s, int status) {
278
  uv_stream_t* stream;
279
  int r;
280

281
  ASSERT_PTR_EQ(server, s);
282
  ASSERT_OK(status);
283

284
  if (type == TCP) {
285
    stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
286
    r = uv_tcp_init(loop, (uv_tcp_t*)stream);
287
    ASSERT_OK(r);
288
  } else {
289
    stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
290
    r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
291
    ASSERT_OK(r);
292
  }
293

294
  r = uv_accept(s, stream);
295
  ASSERT_OK(r);
296

297
  r = uv_read_start(stream, buf_alloc, read_cb);
298
  ASSERT_OK(r);
299

300
  read_sockets++;
301
  max_read_sockets++;
302
}
303

304

305
/*
306
 * Request allocator
307
 */
308

309
typedef struct req_list_s {
310
  union uv_any_req uv_req;
311
  struct req_list_s* next;
312
} req_list_t;
313

314

315
static req_list_t* req_freelist = NULL;
316

317

318
static uv_req_t* req_alloc(void) {
319
  req_list_t* req;
320

321
  req = req_freelist;
322
  if (req != NULL) {
323
    req_freelist = req->next;
324
    return (uv_req_t*) req;
325
  }
326

327
  req = (req_list_t*) malloc(sizeof *req);
328
  return (uv_req_t*) req;
329
}
330

331

332
static void req_free(uv_req_t* uv_req) {
333
  req_list_t* req = (req_list_t*) uv_req;
334

335
  req->next = req_freelist;
336
  req_freelist = req;
337
}
338

339

340
/*
341
 * Buffer allocator
342
 */
343

344
typedef struct buf_list_s {
345
  uv_buf_t uv_buf_t;
346
  struct buf_list_s* next;
347
} buf_list_t;
348

349

350
static buf_list_t* buf_freelist = NULL;
351

352

353
static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
354
  buf_list_t* ab;
355

356
  ab = buf_freelist;
357
  if (ab != NULL)
358
    buf_freelist = ab->next;
359
  else {
360
    ab = malloc(size + sizeof(*ab));
361
    ab->uv_buf_t.len = size;
362
    ab->uv_buf_t.base = (char*) (ab + 1);
363
  }
364

365
  *buf = ab->uv_buf_t;
366
}
367

368

369
static void buf_free(const uv_buf_t* buf) {
370
  buf_list_t* ab = (buf_list_t*) buf->base - 1;
371
  ab->next = buf_freelist;
372
  buf_freelist = ab;
373
}
374

375

376
HELPER_IMPL(tcp_pump_server) {
377
  int r;
378

379
  type = TCP;
380
  loop = uv_default_loop();
381

382
  ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));
383

384
  /* Server */
385
  server = (uv_stream_t*)&tcpServer;
386
  r = uv_tcp_init(loop, &tcpServer);
387
  ASSERT_OK(r);
388
  r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);
389
  ASSERT_OK(r);
390
  r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb);
391
  ASSERT_OK(r);
392

393
  notify_parent_process();
394
  uv_run(loop, UV_RUN_DEFAULT);
395

396
  return 0;
397
}
398

399

400
HELPER_IMPL(pipe_pump_server) {
401
  int r;
402
  type = PIPE;
403

404
  loop = uv_default_loop();
405

406
  /* Server */
407
  server = (uv_stream_t*)&pipeServer;
408
  r = uv_pipe_init(loop, &pipeServer, 0);
409
  ASSERT_OK(r);
410
  r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);
411
  ASSERT_OK(r);
412
  r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb);
413
  ASSERT_OK(r);
414

415
  notify_parent_process();
416
  uv_run(loop, UV_RUN_DEFAULT);
417

418
  MAKE_VALGRIND_HAPPY(loop);
419
  return 0;
420
}
421

422

423
static void tcp_pump(int n) {
424
  ASSERT_LE(n, MAX_WRITE_HANDLES);
425
  TARGET_CONNECTIONS = n;
426
  type = TCP;
427

428
  loop = uv_default_loop();
429

430
  ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));
431

432
  /* Start making connections */
433
  maybe_connect_some();
434

435
  uv_run(loop, UV_RUN_DEFAULT);
436

437
  MAKE_VALGRIND_HAPPY(loop);
438
}
439

440

441
static void pipe_pump(int n) {
442
  ASSERT_LE(n, MAX_WRITE_HANDLES);
443
  TARGET_CONNECTIONS = n;
444
  type = PIPE;
445

446
  loop = uv_default_loop();
447

448
  /* Start making connections */
449
  maybe_connect_some();
450

451
  uv_run(loop, UV_RUN_DEFAULT);
452

453
  MAKE_VALGRIND_HAPPY(loop);
454
}
455

456

457
BENCHMARK_IMPL(tcp_pump100_client) {
458
  tcp_pump(100);
459
  return 0;
460
}
461

462

463
BENCHMARK_IMPL(tcp_pump1_client) {
464
  tcp_pump(1);
465
  return 0;
466
}
467

468

469
BENCHMARK_IMPL(pipe_pump100_client) {
470
  pipe_pump(100);
471
  return 0;
472
}
473

474

475
BENCHMARK_IMPL(pipe_pump1_client) {
476
  pipe_pump(1);
477
  return 0;
478
}
479

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.