libuv-svace-build
478 строк · 10.0 Кб
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2*
3* Permission is hereby granted, free of charge, to any person obtaining a copy
4* of this software and associated documentation files (the "Software"), to
5* deal in the Software without restriction, including without limitation the
6* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7* sell copies of the Software, and to permit persons to whom the Software is
8* furnished to do so, subject to the following conditions:
9*
10* The above copyright notice and this permission notice shall be included in
11* all copies or substantial portions of the Software.
12*
13* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19* IN THE SOFTWARE.
20*/
21
22#include "task.h"23#include "uv.h"24
25#include <math.h>26#include <stdio.h>27
28
29static int TARGET_CONNECTIONS;30#define WRITE_BUFFER_SIZE 819231#define MAX_SIMULTANEOUS_CONNECTS 10032
33#define PRINT_STATS 034#define STATS_INTERVAL 1000 /* msec */35#define STATS_COUNT 536
37
38static void do_write(uv_stream_t*);39static void maybe_connect_some(void);40
41static uv_req_t* req_alloc(void);42static void req_free(uv_req_t* uv_req);43
44static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);45static void buf_free(const uv_buf_t* buf);46
47static uv_loop_t* loop;48
49static uv_tcp_t tcpServer;50static uv_pipe_t pipeServer;51static uv_stream_t* server;52static struct sockaddr_in listen_addr;53static struct sockaddr_in connect_addr;54
55static int64_t start_time;56
57static int max_connect_socket = 0;58static int max_read_sockets = 0;59static int read_sockets = 0;60static int write_sockets = 0;61
62static int64_t nrecv = 0;63static int64_t nrecv_total = 0;64static int64_t nsent = 0;65static int64_t nsent_total = 0;66
67static int stats_left = 0;68
69static char write_buffer[WRITE_BUFFER_SIZE];70
71/* Make this as large as you need. */
72#define MAX_WRITE_HANDLES 100073
74static stream_type type;75
76static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];77static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];78
79static uv_timer_t timer_handle;80
81
82static double gbit(int64_t bytes, int64_t passed_ms) {83double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;84return gbits / ((double)passed_ms / 1000);85}
86
87
88static void show_stats(uv_timer_t* handle) {89int64_t diff;90int i;91
92#if PRINT_STATS93fprintf(stderr, "connections: %d, write: %.1f gbit/s\n",94write_sockets,95gbit(nsent, STATS_INTERVAL));96fflush(stderr);97#endif98
99/* Exit if the show is over */100if (!--stats_left) {101
102uv_update_time(loop);103diff = uv_now(loop) - start_time;104
105fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n",106type == TCP ? "tcp" : "pipe",107write_sockets,108gbit(nsent_total, diff));109fflush(stderr);110
111for (i = 0; i < write_sockets; i++) {112if (type == TCP)113uv_close((uv_handle_t*) &tcp_write_handles[i], NULL);114else115uv_close((uv_handle_t*) &pipe_write_handles[i], NULL);116}117
118exit(0);119}120
121/* Reset read and write counters */122nrecv = 0;123nsent = 0;124}
125
126
127static void read_show_stats(void) {128int64_t diff;129
130uv_update_time(loop);131diff = uv_now(loop) - start_time;132
133fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n",134type == TCP ? "tcp" : "pipe",135max_read_sockets,136gbit(nrecv_total, diff));137fflush(stderr);138}
139
140
141
142static void read_sockets_close_cb(uv_handle_t* handle) {143free(handle);144read_sockets--;145
146/* If it's past the first second and everyone has closed their connection147* Then print stats.
148*/
149if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {150read_show_stats();151uv_close((uv_handle_t*)server, NULL);152}153}
154
155
156static void start_stats_collection(void) {157int r;158
159/* Show-stats timer */160stats_left = STATS_COUNT;161r = uv_timer_init(loop, &timer_handle);162ASSERT_OK(r);163r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);164ASSERT_OK(r);165
166uv_update_time(loop);167start_time = uv_now(loop);168}
169
170
171static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) {172if (nrecv_total == 0) {173ASSERT_OK(start_time);174uv_update_time(loop);175start_time = uv_now(loop);176}177
178if (bytes < 0) {179uv_close((uv_handle_t*)stream, read_sockets_close_cb);180return;181}182
183buf_free(buf);184
185nrecv += bytes;186nrecv_total += bytes;187}
188
189
190static void write_cb(uv_write_t* req, int status) {191ASSERT_OK(status);192
193req_free((uv_req_t*) req);194
195nsent += sizeof write_buffer;196nsent_total += sizeof write_buffer;197
198do_write((uv_stream_t*) req->handle);199}
200
201
202static void do_write(uv_stream_t* stream) {203uv_write_t* req;204uv_buf_t buf;205int r;206
207buf.base = (char*) &write_buffer;208buf.len = sizeof write_buffer;209
210req = (uv_write_t*) req_alloc();211r = uv_write(req, stream, &buf, 1, write_cb);212ASSERT_OK(r);213}
214
215
216static void connect_cb(uv_connect_t* req, int status) {217int i;218
219if (status) {220fprintf(stderr, "%s", uv_strerror(status));221fflush(stderr);222}223ASSERT_OK(status);224
225write_sockets++;226req_free((uv_req_t*) req);227
228maybe_connect_some();229
230if (write_sockets == TARGET_CONNECTIONS) {231start_stats_collection();232
233/* Yay! start writing */234for (i = 0; i < write_sockets; i++) {235if (type == TCP)236do_write((uv_stream_t*) &tcp_write_handles[i]);237else238do_write((uv_stream_t*) &pipe_write_handles[i]);239}240}241}
242
243
244static void maybe_connect_some(void) {245uv_connect_t* req;246uv_tcp_t* tcp;247uv_pipe_t* pipe;248int r;249
250while (max_connect_socket < TARGET_CONNECTIONS &&251max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {252if (type == TCP) {253tcp = &tcp_write_handles[max_connect_socket++];254
255r = uv_tcp_init(loop, tcp);256ASSERT_OK(r);257
258req = (uv_connect_t*) req_alloc();259r = uv_tcp_connect(req,260tcp,261(const struct sockaddr*) &connect_addr,262connect_cb);263ASSERT_OK(r);264} else {265pipe = &pipe_write_handles[max_connect_socket++];266
267r = uv_pipe_init(loop, pipe, 0);268ASSERT_OK(r);269
270req = (uv_connect_t*) req_alloc();271uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);272}273}274}
275
276
277static void connection_cb(uv_stream_t* s, int status) {278uv_stream_t* stream;279int r;280
281ASSERT_PTR_EQ(server, s);282ASSERT_OK(status);283
284if (type == TCP) {285stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));286r = uv_tcp_init(loop, (uv_tcp_t*)stream);287ASSERT_OK(r);288} else {289stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));290r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);291ASSERT_OK(r);292}293
294r = uv_accept(s, stream);295ASSERT_OK(r);296
297r = uv_read_start(stream, buf_alloc, read_cb);298ASSERT_OK(r);299
300read_sockets++;301max_read_sockets++;302}
303
304
305/*
306* Request allocator
307*/
308
309typedef struct req_list_s {310union uv_any_req uv_req;311struct req_list_s* next;312} req_list_t;313
314
315static req_list_t* req_freelist = NULL;316
317
318static uv_req_t* req_alloc(void) {319req_list_t* req;320
321req = req_freelist;322if (req != NULL) {323req_freelist = req->next;324return (uv_req_t*) req;325}326
327req = (req_list_t*) malloc(sizeof *req);328return (uv_req_t*) req;329}
330
331
332static void req_free(uv_req_t* uv_req) {333req_list_t* req = (req_list_t*) uv_req;334
335req->next = req_freelist;336req_freelist = req;337}
338
339
340/*
341* Buffer allocator
342*/
343
344typedef struct buf_list_s {345uv_buf_t uv_buf_t;346struct buf_list_s* next;347} buf_list_t;348
349
350static buf_list_t* buf_freelist = NULL;351
352
353static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {354buf_list_t* ab;355
356ab = buf_freelist;357if (ab != NULL)358buf_freelist = ab->next;359else {360ab = malloc(size + sizeof(*ab));361ab->uv_buf_t.len = size;362ab->uv_buf_t.base = (char*) (ab + 1);363}364
365*buf = ab->uv_buf_t;366}
367
368
369static void buf_free(const uv_buf_t* buf) {370buf_list_t* ab = (buf_list_t*) buf->base - 1;371ab->next = buf_freelist;372buf_freelist = ab;373}
374
375
376HELPER_IMPL(tcp_pump_server) {377int r;378
379type = TCP;380loop = uv_default_loop();381
382ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));383
384/* Server */385server = (uv_stream_t*)&tcpServer;386r = uv_tcp_init(loop, &tcpServer);387ASSERT_OK(r);388r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);389ASSERT_OK(r);390r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb);391ASSERT_OK(r);392
393notify_parent_process();394uv_run(loop, UV_RUN_DEFAULT);395
396return 0;397}
398
399
400HELPER_IMPL(pipe_pump_server) {401int r;402type = PIPE;403
404loop = uv_default_loop();405
406/* Server */407server = (uv_stream_t*)&pipeServer;408r = uv_pipe_init(loop, &pipeServer, 0);409ASSERT_OK(r);410r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);411ASSERT_OK(r);412r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb);413ASSERT_OK(r);414
415notify_parent_process();416uv_run(loop, UV_RUN_DEFAULT);417
418MAKE_VALGRIND_HAPPY(loop);419return 0;420}
421
422
423static void tcp_pump(int n) {424ASSERT_LE(n, MAX_WRITE_HANDLES);425TARGET_CONNECTIONS = n;426type = TCP;427
428loop = uv_default_loop();429
430ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));431
432/* Start making connections */433maybe_connect_some();434
435uv_run(loop, UV_RUN_DEFAULT);436
437MAKE_VALGRIND_HAPPY(loop);438}
439
440
441static void pipe_pump(int n) {442ASSERT_LE(n, MAX_WRITE_HANDLES);443TARGET_CONNECTIONS = n;444type = PIPE;445
446loop = uv_default_loop();447
448/* Start making connections */449maybe_connect_some();450
451uv_run(loop, UV_RUN_DEFAULT);452
453MAKE_VALGRIND_HAPPY(loop);454}
455
456
457BENCHMARK_IMPL(tcp_pump100_client) {458tcp_pump(100);459return 0;460}
461
462
463BENCHMARK_IMPL(tcp_pump1_client) {464tcp_pump(1);465return 0;466}
467
468
469BENCHMARK_IMPL(pipe_pump100_client) {470pipe_pump(100);471return 0;472}
473
474
475BENCHMARK_IMPL(pipe_pump1_client) {476pipe_pump(1);477return 0;478}
479