ClickHouse
688 строк · 23.9 Кб
1#include <unistd.h>
2#include <cstdlib>
3#include <csignal>
4#include <iostream>
5#include <iomanip>
6#include <optional>
7#include <random>
8#include <string_view>
9#include <pcg_random.hpp>
10#include <Poco/Util/Application.h>
11#include <Common/Stopwatch.h>
12#include <Common/ThreadPool.h>
13#include <AggregateFunctions/ReservoirSampler.h>
14#include <AggregateFunctions/registerAggregateFunctions.h>
15#include <boost/program_options.hpp>
16#include <Common/ConcurrentBoundedQueue.h>
17#include <Common/Exception.h>
18#include <Common/randomSeed.h>
19#include <Common/clearPasswordFromCommandLine.h>
20#include <IO/ReadBufferFromFileDescriptor.h>
21#include <IO/WriteBufferFromFile.h>
22#include <IO/ReadHelpers.h>
23#include <IO/WriteHelpers.h>
24#include <IO/Operators.h>
25#include <IO/ConnectionTimeouts.h>
26#include <IO/UseSSL.h>
27#include <QueryPipeline/RemoteQueryExecutor.h>
28#include <Interpreters/Context.h>
29#include <Client/Connection.h>
30#include <Common/InterruptListener.h>
31#include <Common/Config/ConfigProcessor.h>
32#include <Common/Config/getClientConfigPath.h>
33#include <Common/TerminalSize.h>
34#include <Common/StudentTTest.h>
35#include <Common/CurrentMetrics.h>
36#include <Common/ErrorCodes.h>
37#include <Core/BaseSettingsProgramOptions.h>
38
39
40/** A tool for evaluating ClickHouse performance.
41* The tool emulates a case with fixed amount of simultaneously executing queries.
42*/
43
44namespace CurrentMetrics
45{
46extern const Metric LocalThread;
47extern const Metric LocalThreadActive;
48extern const Metric LocalThreadScheduled;
49}
50
51namespace DB
52{
53
54using Ports = std::vector<UInt16>;
55static constexpr std::string_view DEFAULT_CLIENT_NAME = "benchmark";
56
57namespace ErrorCodes
58{
59extern const int CANNOT_BLOCK_SIGNAL;
60extern const int EMPTY_DATA_PASSED;
61}
62
63class Benchmark : public Poco::Util::Application
64{
65public:
66Benchmark(unsigned concurrency_,
67double delay_,
68Strings && hosts_,
69Ports && ports_,
70bool round_robin_,
71bool cumulative_,
72bool secure_,
73const String & default_database_,
74const String & user_,
75const String & password_,
76const String & quota_key_,
77const String & stage,
78bool randomize_,
79size_t max_iterations_,
80double max_time_,
81size_t confidence_,
82const String & query_id_,
83const String & query_to_execute_,
84size_t max_consecutive_errors_,
85bool continue_on_errors_,
86bool reconnect_,
87bool display_client_side_time_,
88bool print_stacktrace_,
89const Settings & settings_)
90:
91round_robin(round_robin_),
92concurrency(concurrency_),
93delay(delay_),
94queue(concurrency),
95randomize(randomize_),
96cumulative(cumulative_),
97max_iterations(max_iterations_),
98max_time(max_time_),
99confidence(confidence_),
100query_id(query_id_),
101query_to_execute(query_to_execute_),
102continue_on_errors(continue_on_errors_),
103max_consecutive_errors(max_consecutive_errors_),
104reconnect(reconnect_),
105display_client_side_time(display_client_side_time_),
106print_stacktrace(print_stacktrace_),
107settings(settings_),
108shared_context(Context::createShared()),
109global_context(Context::createGlobal(shared_context.get())),
110pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, concurrency)
111{
112const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
113size_t connections_cnt = std::max(ports_.size(), hosts_.size());
114
115connections.reserve(connections_cnt);
116comparison_info_total.reserve(round_robin ? 1 : connections_cnt);
117comparison_info_per_interval.reserve(round_robin ? 1 : connections_cnt);
118
119for (size_t i = 0; i < connections_cnt; ++i)
120{
121UInt16 cur_port = i >= ports_.size() ? 9000 : ports_[i];
122std::string cur_host = i >= hosts_.size() ? "localhost" : hosts_[i];
123
124connections.emplace_back(std::make_unique<ConnectionPool>(
125concurrency,
126cur_host, cur_port,
127default_database_, user_, password_, quota_key_,
128/* cluster_= */ "",
129/* cluster_secret_= */ "",
130/* client_name_= */ std::string(DEFAULT_CLIENT_NAME),
131Protocol::Compression::Enable,
132secure));
133
134if (!round_robin || comparison_info_per_interval.empty())
135{
136comparison_info_per_interval.emplace_back(std::make_shared<Stats>());
137comparison_info_total.emplace_back(std::make_shared<Stats>());
138}
139}
140
141global_context->makeGlobalContext();
142global_context->setSettings(settings);
143global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
144global_context->setQueryKindInitial();
145
146std::cerr << std::fixed << std::setprecision(3);
147
148/// This is needed to receive blocks with columns of AggregateFunction data type
149/// (example: when using stage = 'with_mergeable_state')
150registerAggregateFunctions();
151
152query_processing_stage = QueryProcessingStage::fromString(stage);
153}
154
155void initialize(Poco::Util::Application & self [[maybe_unused]]) override
156{
157std::string home_path;
158const char * home_path_cstr = getenv("HOME"); // NOLINT(concurrency-mt-unsafe)
159if (home_path_cstr)
160home_path = home_path_cstr;
161
162std::optional<std::string> config_path;
163if (config().has("config-file"))
164config_path.emplace(config().getString("config-file"));
165else
166config_path = getClientConfigPath(home_path);
167if (config_path.has_value())
168{
169ConfigProcessor config_processor(*config_path);
170auto loaded_config = config_processor.loadConfig();
171config().add(loaded_config.configuration);
172}
173}
174
175int main(const std::vector<std::string> &) override
176{
177readQueries();
178runBenchmark();
179return 0;
180}
181
182private:
183using Entry = ConnectionPool::Entry;
184using EntryPtr = std::shared_ptr<Entry>;
185using EntryPtrs = std::vector<EntryPtr>;
186
187bool round_robin;
188unsigned concurrency;
189double delay;
190
191using Query = std::string;
192using Queries = std::vector<Query>;
193Queries queries;
194
195using Queue = ConcurrentBoundedQueue<Query>;
196Queue queue;
197
198using ConnectionPoolUniq = std::unique_ptr<ConnectionPool>;
199using ConnectionPoolUniqs = std::vector<ConnectionPoolUniq>;
200ConnectionPoolUniqs connections;
201
202bool randomize;
203bool cumulative;
204size_t max_iterations;
205double max_time;
206size_t confidence;
207String query_id;
208String query_to_execute;
209bool continue_on_errors;
210size_t max_consecutive_errors;
211bool reconnect;
212bool display_client_side_time;
213bool print_stacktrace;
214const Settings & settings;
215SharedContextHolder shared_context;
216ContextMutablePtr global_context;
217QueryProcessingStage::Enum query_processing_stage;
218
219std::atomic<size_t> consecutive_errors{0};
220
221/// Don't execute new queries after timelimit or SIGINT or exception
222std::atomic<bool> shutdown{false};
223
224std::atomic<size_t> queries_executed{0};
225
226struct Stats
227{
228std::atomic<size_t> queries{0};
229size_t errors = 0;
230size_t read_rows = 0;
231size_t read_bytes = 0;
232size_t result_rows = 0;
233size_t result_bytes = 0;
234
235using Sampler = ReservoirSampler<double>;
236Sampler sampler {1 << 16};
237
238void add(double duration, size_t read_rows_inc, size_t read_bytes_inc, size_t result_rows_inc, size_t result_bytes_inc)
239{
240++queries;
241read_rows += read_rows_inc;
242read_bytes += read_bytes_inc;
243result_rows += result_rows_inc;
244result_bytes += result_bytes_inc;
245sampler.insert(duration);
246}
247
248void clear()
249{
250queries = 0;
251read_rows = 0;
252read_bytes = 0;
253result_rows = 0;
254result_bytes = 0;
255sampler.clear();
256}
257};
258
259using MultiStats = std::vector<std::shared_ptr<Stats>>;
260MultiStats comparison_info_per_interval;
261MultiStats comparison_info_total;
262StudentTTest t_test;
263
264Stopwatch total_watch;
265Stopwatch delay_watch;
266
267std::mutex mutex;
268
269ThreadPool pool;
270
271void readQueries()
272{
273if (query_to_execute.empty())
274{
275ReadBufferFromFileDescriptor in(STDIN_FILENO);
276
277while (!in.eof())
278{
279String query;
280readText(query, in);
281assertChar('\n', in);
282
283if (!query.empty())
284queries.emplace_back(std::move(query));
285}
286
287if (queries.empty())
288throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "Empty list of queries.");
289}
290else
291{
292queries.emplace_back(query_to_execute);
293}
294
295
296std::cerr << "Loaded " << queries.size() << " queries.\n";
297}
298
299
300void printNumberOfQueriesExecuted(size_t num)
301{
302std::cerr << "\nQueries executed: " << num;
303if (queries.size() > 1)
304std::cerr << " (" << (num * 100.0 / queries.size()) << "%)";
305std::cerr << ".\n";
306}
307
308/// Try push new query and check cancellation conditions
309bool tryPushQueryInteractively(const String & query, InterruptListener & interrupt_listener)
310{
311bool inserted = false;
312
313while (!inserted)
314{
315inserted = queue.tryPush(query, 100);
316
317if (shutdown)
318{
319/// An exception occurred in a worker
320return false;
321}
322
323if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
324{
325std::cout << "Stopping launch of queries."
326<< " Requested time limit " << max_time << " seconds is exhausted.\n";
327return false;
328}
329
330if (interrupt_listener.check())
331{
332std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
333return false;
334}
335
336double seconds = delay_watch.elapsedSeconds();
337if (delay > 0 && seconds > delay)
338{
339printNumberOfQueriesExecuted(queries_executed);
340cumulative
341? report(comparison_info_total, total_watch.elapsedSeconds())
342: report(comparison_info_per_interval, seconds);
343delay_watch.restart();
344}
345}
346
347return true;
348}
349
350void runBenchmark()
351{
352pcg64 generator(randomSeed());
353std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
354
355try
356{
357for (size_t i = 0; i < concurrency; ++i)
358pool.scheduleOrThrowOnError([this]() mutable { thread(); });
359}
360catch (...)
361{
362shutdown = true;
363pool.wait();
364throw;
365}
366
367InterruptListener interrupt_listener;
368delay_watch.restart();
369
370/// Push queries into queue
371for (size_t i = 0; !max_iterations || i < max_iterations; ++i)
372{
373size_t query_index = randomize ? distribution(generator) : i % queries.size();
374
375if (!tryPushQueryInteractively(queries[query_index], interrupt_listener))
376{
377shutdown = true;
378break;
379}
380}
381
382/// Now we don't block the Ctrl+C signal and second signal will terminate the program without waiting.
383interrupt_listener.unblock();
384
385pool.wait();
386total_watch.stop();
387
388printNumberOfQueriesExecuted(queries_executed);
389report(comparison_info_total, total_watch.elapsedSeconds());
390}
391
392
393void thread()
394{
395Query query;
396
397/// Randomly choosing connection index
398pcg64 generator(randomSeed());
399std::uniform_int_distribution<size_t> distribution(0, connections.size() - 1);
400
401/// In these threads we do not accept INT signal.
402sigset_t sig_set;
403if (sigemptyset(&sig_set)
404|| sigaddset(&sig_set, SIGINT)
405|| pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
406{
407throw ErrnoException(ErrorCodes::CANNOT_BLOCK_SIGNAL, "Cannot block signal");
408}
409
410while (true)
411{
412bool extracted = false;
413
414while (!extracted)
415{
416extracted = queue.tryPop(query, 100);
417
418if (shutdown || (max_iterations && queries_executed == max_iterations))
419return;
420}
421
422const auto connection_index = distribution(generator);
423try
424{
425execute(query, connection_index);
426consecutive_errors = 0;
427}
428catch (...)
429{
430std::lock_guard lock(mutex);
431std::cerr << "An error occurred while processing the query " << "'" << query << "'"
432<< ": " << getCurrentExceptionMessage(false) << std::endl;
433if (!(continue_on_errors || max_consecutive_errors > ++consecutive_errors))
434{
435shutdown = true;
436throw;
437}
438else
439{
440std::cerr << getCurrentExceptionMessage(print_stacktrace,
441true /*check embedded stack trace*/) << std::endl;
442
443size_t info_index = round_robin ? 0 : connection_index;
444++comparison_info_per_interval[info_index]->errors;
445++comparison_info_total[info_index]->errors;
446}
447}
448// Count failed queries toward executed, so that we'd reach
449// max_iterations even if every run fails.
450++queries_executed;
451}
452}
453
454void execute(Query & query, size_t connection_index)
455{
456Stopwatch watch;
457
458ConnectionPool::Entry entry = connections[connection_index]->get(
459ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings));
460
461if (reconnect)
462entry->disconnect();
463
464RemoteQueryExecutor executor(
465*entry, query, {}, global_context, nullptr, Scalars(), Tables(), query_processing_stage);
466if (!query_id.empty())
467executor.setQueryId(query_id);
468
469Progress progress;
470executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
471
472executor.sendQuery(ClientInfo::QueryKind::INITIAL_QUERY);
473
474ProfileInfo info;
475while (Block block = executor.readBlock())
476info.update(block);
477
478executor.finish();
479
480double duration = (display_client_side_time || progress.elapsed_ns == 0)
481? watch.elapsedSeconds()
482: progress.elapsed_ns / 1e9;
483
484std::lock_guard lock(mutex);
485
486size_t info_index = round_robin ? 0 : connection_index;
487comparison_info_per_interval[info_index]->add(duration, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
488comparison_info_total[info_index]->add(duration, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
489t_test.add(info_index, duration);
490}
491
492void report(MultiStats & infos, double seconds)
493{
494std::lock_guard lock(mutex);
495
496std::cerr << "\n";
497for (size_t i = 0; i < infos.size(); ++i)
498{
499const auto & info = infos[i];
500
501/// Avoid zeros, nans or exceptions
502if (0 == info->queries)
503return;
504
505std::string connection_description = connections[i]->getDescription();
506if (round_robin)
507{
508connection_description.clear();
509for (const auto & conn : connections)
510{
511if (!connection_description.empty())
512connection_description += ", ";
513connection_description += conn->getDescription();
514}
515}
516std::cerr
517<< connection_description << ", "
518<< "queries: " << info->queries << ", ";
519if (info->errors)
520{
521std::cerr << "errors: " << info->errors << ", ";
522}
523std::cerr
524<< "QPS: " << (info->queries / seconds) << ", "
525<< "RPS: " << (info->read_rows / seconds) << ", "
526<< "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", "
527<< "result RPS: " << (info->result_rows / seconds) << ", "
528<< "result MiB/s: " << (info->result_bytes / seconds / 1048576) << "."
529<< "\n";
530}
531std::cerr << "\n";
532
533auto print_percentile = [&](double percent)
534{
535std::cerr << percent << "%\t\t";
536for (const auto & info : infos)
537{
538std::cerr << info->sampler.quantileNearest(percent / 100.0) << " sec.\t";
539}
540std::cerr << "\n";
541};
542
543for (int percent = 0; percent <= 90; percent += 10)
544print_percentile(percent);
545
546print_percentile(95);
547print_percentile(99);
548print_percentile(99.9);
549print_percentile(99.99);
550
551std::cerr << "\n" << t_test.compareAndReport(confidence).second << "\n";
552
553if (!cumulative)
554{
555for (auto & info : infos)
556info->clear();
557}
558}
559
560public:
561
562~Benchmark() override
563{
564shutdown = true;
565}
566};
567
568}
569
570
571int mainEntryClickHouseBenchmark(int argc, char ** argv)
572{
573using namespace DB;
574bool print_stacktrace = true;
575
576try
577{
578using boost::program_options::value;
579
580/// Note: according to the standard, subsequent calls to getenv can mangle previous result.
581/// So we copy the results to std::string.
582std::optional<std::string> env_user_str;
583std::optional<std::string> env_password_str;
584std::optional<std::string> env_quota_key_str;
585
586const char * env_user = getenv("CLICKHOUSE_USER"); // NOLINT(concurrency-mt-unsafe)
587if (env_user != nullptr)
588env_user_str.emplace(std::string(env_user));
589
590const char * env_password = getenv("CLICKHOUSE_PASSWORD"); // NOLINT(concurrency-mt-unsafe)
591if (env_password != nullptr)
592env_password_str.emplace(std::string(env_password));
593
594const char * env_quota_key = getenv("CLICKHOUSE_QUOTA_KEY"); // NOLINT(concurrency-mt-unsafe)
595if (env_quota_key != nullptr)
596env_quota_key_str.emplace(std::string(env_quota_key));
597
598boost::program_options::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
599desc.add_options()
600("help", "produce help message")
601("query,q", value<std::string>()->default_value(""), "query to execute")
602("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries")
603("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
604("stage", value<std::string>()->default_value("complete"), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation,with_mergeable_state_after_aggregation_and_limit")
605("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")
606("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
607("randomize,r", "randomize order of execution")
608("host,h", value<Strings>()->multitoken(), "list of hosts")
609("port", value<Ports>()->multitoken(), "list of ports")
610("roundrobin", "Instead of comparing queries for different --host/--port just pick one random --host/--port for every query and send query to it.")
611("cumulative", "prints cumulative data instead of data per interval")
612("secure,s", "Use TLS connection")
613("user,u", value<std::string>()->default_value(env_user_str.value_or("default")), "")
614("password", value<std::string>()->default_value(env_password_str.value_or("")), "")
615("quota_key", value<std::string>()->default_value(env_quota_key_str.value_or("")), "")
616("database", value<std::string>()->default_value("default"), "")
617("stacktrace", "print stack traces of exceptions")
618("confidence", value<size_t>()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)")
619("query_id", value<std::string>()->default_value(""), "")
620("max-consecutive-errors", value<size_t>()->default_value(0), "set number of allowed consecutive errors")
621("ignore-error,continue_on_errors", "continue testing even if a query fails")
622("reconnect", "establish new connection for every query")
623("client-side-time", "display the time including network communication instead of server-side time; note that for server versions before 22.8 we always display client-side time")
624;
625
626Settings settings;
627addProgramOptions(settings, desc);
628
629boost::program_options::variables_map options;
630boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
631boost::program_options::notify(options);
632
633clearPasswordFromCommandLine(argc, argv);
634
635if (options.count("help"))
636{
637std::cout << "Usage: " << argv[0] << " [options] < queries.txt\n";
638std::cout << desc << "\n";
639std::cout << "\nSee also: https://clickhouse.com/docs/en/operations/utilities/clickhouse-benchmark/\n";
640return 0;
641}
642
643print_stacktrace = options.count("stacktrace");
644
645/// NOTE Maybe clickhouse-benchmark should also respect .xml configuration of clickhouse-client.
646
647UInt16 default_port = options.count("secure") ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT;
648
649UseSSL use_ssl;
650Ports ports = options.count("port")
651? options["port"].as<Ports>()
652: Ports({default_port});
653
654Strings hosts = options.count("host") ? options["host"].as<Strings>() : Strings({"localhost"});
655
656Benchmark benchmark(
657options["concurrency"].as<unsigned>(),
658options["delay"].as<double>(),
659std::move(hosts),
660std::move(ports),
661options.count("roundrobin"),
662options.count("cumulative"),
663options.count("secure"),
664options["database"].as<std::string>(),
665options["user"].as<std::string>(),
666options["password"].as<std::string>(),
667options["quota_key"].as<std::string>(),
668options["stage"].as<std::string>(),
669options.count("randomize"),
670options["iterations"].as<size_t>(),
671options["timelimit"].as<double>(),
672options["confidence"].as<size_t>(),
673options["query_id"].as<std::string>(),
674options["query"].as<std::string>(),
675options["max-consecutive-errors"].as<size_t>(),
676options.count("ignore-error"),
677options.count("reconnect"),
678options.count("client-side-time"),
679print_stacktrace,
680settings);
681return benchmark.run();
682}
683catch (...)
684{
685std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << std::endl;
686return getCurrentExceptionCode();
687}
688}
689