ClickHouse

Форк
0
673 строки · 24.1 Кб
1
#include "Keeper.h"
2

3
#include <Common/ClickHouseRevision.h>
4
#include <Common/getMultipleKeysFromConfig.h>
5
#include <Common/DNSResolver.h>
6
#include <Interpreters/DNSCacheUpdater.h>
7
#include <Coordination/Defines.h>
8
#include <Common/Config/ConfigReloader.h>
9
#include <filesystem>
10
#include <IO/UseSSL.h>
11
#include <Core/ServerUUID.h>
12
#include <Common/logger_useful.h>
13
#include <Common/CgroupsMemoryUsageObserver.h>
14
#include <Common/ErrorHandlers.h>
15
#include <Common/assertProcessUserMatchesDataOwner.h>
16
#include <Common/makeSocketAddress.h>
17
#include <Server/waitServersToFinish.h>
18
#include <Server/CloudPlacementInfo.h>
19
#include <base/getMemoryAmount.h>
20
#include <base/scope_guard.h>
21
#include <base/safeExit.h>
22
#include <Poco/Net/NetException.h>
23
#include <Poco/Net/TCPServerParams.h>
24
#include <Poco/Net/TCPServer.h>
25
#include <Poco/Util/HelpFormatter.h>
26
#include <Poco/Environment.h>
27
#include <sys/stat.h>
28
#include <pwd.h>
29

30
#include <Interpreters/Context.h>
31

32
#include <Coordination/FourLetterCommand.h>
33
#include <Coordination/KeeperAsynchronousMetrics.h>
34

35
#include <Server/HTTP/HTTPServer.h>
36
#include <Server/HTTPHandlerFactory.h>
37
#include <Server/KeeperReadinessHandler.h>
38
#include <Server/PrometheusMetricsWriter.h>
39
#include <Server/TCPServer.h>
40

41
#include "Core/Defines.h"
42
#include "config.h"
43
#include <Common/config_version.h>
44
#include "config_tools.h"
45

46

47
#if USE_SSL
48
#    include <Poco/Net/Context.h>
49
#    include <Poco/Net/SecureServerSocket.h>
50
#    include <Server/CertificateReloader.h>
51
#endif
52

53
#include <Server/ProtocolServerAdapter.h>
54
#include <Server/KeeperTCPHandlerFactory.h>
55

56
#include <Disks/registerDisks.h>
57

58
#include <incbin.h>
59
/// A minimal file used when the keeper is run without installation
60
INCBIN(keeper_resource_embedded_xml, SOURCE_DIR "/programs/keeper/keeper_embedded.xml");
61

62
int mainEntryClickHouseKeeper(int argc, char ** argv)
63
{
64
    DB::Keeper app;
65

66
    try
67
    {
68
        return app.run(argc, argv);
69
    }
70
    catch (...)
71
    {
72
        std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
73
        auto code = DB::getCurrentExceptionCode();
74
        return code ? code : 1;
75
    }
76
}
77

78
#ifdef CLICKHOUSE_KEEPER_STANDALONE_BUILD
79

80
// Weak symbols don't work correctly on Darwin
81
// so we have a stub implementation to avoid linker errors
82
void collectCrashLog(
83
    Int32, UInt64, const String &, const StackTrace &)
84
{}
85

86
#endif
87

88
namespace DB
89
{
90

91
namespace ErrorCodes
92
{
93
    extern const int NO_ELEMENTS_IN_CONFIG;
94
    extern const int SUPPORT_IS_DISABLED;
95
    extern const int NETWORK_ERROR;
96
    extern const int LOGICAL_ERROR;
97
}
98

99
Poco::Net::SocketAddress Keeper::socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure) const
100
{
101
    auto address = makeSocketAddress(host, port, &logger());
102
    socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config().getBool("listen_reuse_port", false));
103
    socket.listen(/* backlog = */ config().getUInt("listen_backlog", 64));
104

105
    return address;
106
}
107

108
void Keeper::createServer(const std::string & listen_host, const char * port_name, bool listen_try, CreateServerFunc && func) const
109
{
110
    /// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
111
    if (!config().has(port_name))
112
        return;
113

114
    auto port = config().getInt(port_name);
115
    try
116
    {
117
        func(port);
118
    }
119
    catch (const Poco::Exception &)
120
    {
121
        if (listen_try)
122
        {
123
            LOG_WARNING(&logger(), "Listen [{}]:{} failed: {}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, "
124
                "then consider to "
125
                "specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
126
                "file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
127
                " Example for disabled IPv4: <listen_host>::</listen_host>",
128
                listen_host, port, getCurrentExceptionMessage(false));
129
        }
130
        else
131
        {
132
            throw Exception(ErrorCodes::NETWORK_ERROR, "Listen [{}]:{} failed: {}", listen_host, port, getCurrentExceptionMessage(false));
133
        }
134
    }
135
}
136

137
void Keeper::uninitialize()
138
{
139
    logger().information("shutting down");
140
    BaseDaemon::uninitialize();
141
}
142

143
int Keeper::run()
144
{
145
    if (config().hasOption("help"))
146
    {
147
        Poco::Util::HelpFormatter help_formatter(Keeper::options());
148
        auto header_str = fmt::format("{0} [OPTION] [-- [ARG]...]\n"
149
#if ENABLE_CLICKHOUSE_KEEPER_CLIENT
150
                                      "{0} client [OPTION]\n"
151
#endif
152
                                      "positional arguments can be used to rewrite config.xml properties, for example, --http_port=8010",
153
                                      commandName());
154
        help_formatter.setHeader(header_str);
155
        help_formatter.format(std::cout);
156
        return 0;
157
    }
158
    if (config().hasOption("version"))
159
    {
160
        std::cout << VERSION_NAME << " keeper version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
161
        return 0;
162
    }
163

164
    return Application::run(); // NOLINT
165
}
166

167
void Keeper::initialize(Poco::Util::Application & self)
168
{
169
    ConfigProcessor::registerEmbeddedConfig("keeper_config.xml", std::string_view(reinterpret_cast<const char *>(gkeeper_resource_embedded_xmlData), gkeeper_resource_embedded_xmlSize));
170

171
    BaseDaemon::initialize(self);
172
    logger().information("starting up");
173

174
    LOG_INFO(&logger(), "OS Name = {}, OS Version = {}, OS Architecture = {}",
175
        Poco::Environment::osName(),
176
        Poco::Environment::osVersion(),
177
        Poco::Environment::osArchitecture());
178
}
179

180
std::string Keeper::getDefaultConfigFileName() const
181
{
182
    return "keeper_config.xml";
183
}
184

185
void Keeper::handleCustomArguments(const std::string & arg, [[maybe_unused]] const std::string & value) // NOLINT
186
{
187
    if (arg == "force-recovery")
188
    {
189
        assert(value.empty());
190
        config().setBool("keeper_server.force_recovery", true);
191
        return;
192
    }
193

194
    throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid argument {} provided", arg);
195
}
196

197
void Keeper::defineOptions(Poco::Util::OptionSet & options)
198
{
199
    options.addOption(
200
        Poco::Util::Option("help", "h", "show help and exit")
201
            .required(false)
202
            .repeatable(false)
203
            .binding("help"));
204
    options.addOption(
205
        Poco::Util::Option("version", "V", "show version and exit")
206
            .required(false)
207
            .repeatable(false)
208
            .binding("version"));
209
    options.addOption(
210
        Poco::Util::Option("force-recovery", "force-recovery", "Force recovery mode allowing Keeper to overwrite cluster configuration without quorum")
211
        .required(false)
212
        .repeatable(false)
213
        .noArgument()
214
        .callback(Poco::Util::OptionCallback<Keeper>(this, &Keeper::handleCustomArguments)));
215
    BaseDaemon::defineOptions(options);
216
}
217

218
namespace
219
{
220

221
struct KeeperHTTPContext : public IHTTPContext
222
{
223
    explicit KeeperHTTPContext(ContextPtr context_)
224
        : context(std::move(context_))
225
    {}
226

227
    uint64_t getMaxHstsAge() const override
228
    {
229
        return context->getConfigRef().getUInt64("keeper_server.hsts_max_age", 0);
230
    }
231

232
    uint64_t getMaxUriSize() const override
233
    {
234
        return context->getConfigRef().getUInt64("keeper_server.http_max_uri_size", 1048576);
235
    }
236

237
    uint64_t getMaxFields() const override
238
    {
239
        return context->getConfigRef().getUInt64("keeper_server.http_max_fields", 1000000);
240
    }
241

242
    uint64_t getMaxFieldNameSize() const override
243
    {
244
        return context->getConfigRef().getUInt64("keeper_server.http_max_field_name_size", 128 * 1024);
245
    }
246

247
    uint64_t getMaxFieldValueSize() const override
248
    {
249
        return context->getConfigRef().getUInt64("keeper_server.http_max_field_value_size", 128 * 1024);
250
    }
251

252
    uint64_t getMaxChunkSize() const override
253
    {
254
        return context->getConfigRef().getUInt64("keeper_server.http_max_chunk_size", 100_GiB);
255
    }
256

257
    Poco::Timespan getReceiveTimeout() const override
258
    {
259
        return {context->getConfigRef().getInt64("keeper_server.http_receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0};
260
    }
261

262
    Poco::Timespan getSendTimeout() const override
263
    {
264
        return {context->getConfigRef().getInt64("keeper_server.http_send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0};
265
    }
266

267
    ContextPtr context;
268
};
269

270
HTTPContextPtr httpContext()
271
{
272
    return std::make_shared<KeeperHTTPContext>(Context::getGlobalContextInstance());
273
}
274

275
}
276

277
int Keeper::main(const std::vector<std::string> & /*args*/)
278
try
279
{
280
    Poco::Logger * log = &logger();
281

282
    UseSSL use_ssl;
283

284
    MainThreadStatus::getInstance();
285

286
#if !defined(NDEBUG) || !defined(__OPTIMIZE__)
287
    LOG_WARNING(log, "Keeper was built in debug mode. It will work slowly.");
288
#endif
289

290
#if defined(SANITIZER)
291
    LOG_WARNING(log, "Keeper was built with sanitizer. It will work slowly.");
292
#endif
293

294
    if (!config().has("keeper_server"))
295
        throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Keeper configuration (<keeper_server> section) not found in config");
296

297
    auto updateMemorySoftLimitInConfig = [&](Poco::Util::AbstractConfiguration & config)
298
    {
299
        UInt64 memory_soft_limit = 0;
300
        if (config.has("keeper_server.max_memory_usage_soft_limit"))
301
        {
302
            memory_soft_limit = config.getUInt64("keeper_server.max_memory_usage_soft_limit");
303
        }
304

305
        /// if memory soft limit is not set, we will use default value
306
        if (memory_soft_limit == 0)
307
        {
308
            Float64 ratio = 0.9;
309
            if (config.has("keeper_server.max_memory_usage_soft_limit_ratio"))
310
                ratio = config.getDouble("keeper_server.max_memory_usage_soft_limit_ratio");
311

312
            size_t physical_server_memory = getMemoryAmount();
313
            if (ratio > 0 && physical_server_memory > 0)
314
            {
315
                memory_soft_limit = static_cast<UInt64>(physical_server_memory * ratio);
316
                config.setUInt64("keeper_server.max_memory_usage_soft_limit", memory_soft_limit);
317
            }
318
        }
319
        LOG_INFO(log, "keeper_server.max_memory_usage_soft_limit is set to {}", formatReadableSizeWithBinarySuffix(memory_soft_limit));
320
    };
321

322
    updateMemorySoftLimitInConfig(config());
323

324
    std::string path;
325

326
    if (config().has("keeper_server.storage_path"))
327
    {
328
        path = config().getString("keeper_server.storage_path");
329
    }
330
    else if (config().has("keeper_server.log_storage_path"))
331
    {
332
        path = std::filesystem::path(config().getString("keeper_server.log_storage_path")).parent_path();
333
    }
334
    else if (config().has("keeper_server.snapshot_storage_path"))
335
    {
336
        path = std::filesystem::path(config().getString("keeper_server.snapshot_storage_path")).parent_path();
337
    }
338
    else if (std::filesystem::is_directory(std::filesystem::path{config().getString("path", DBMS_DEFAULT_PATH)} / "coordination"))
339
    {
340
        throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
341
                        "By default 'keeper_server.storage_path' could be assigned to {}, but the directory {} already exists. Please specify 'keeper_server.storage_path' in the keeper configuration explicitly",
342
                        KEEPER_DEFAULT_PATH, String{std::filesystem::path{config().getString("path", DBMS_DEFAULT_PATH)} / "coordination"});
343
    }
344
    else
345
    {
346
        path = KEEPER_DEFAULT_PATH;
347
    }
348

349
    std::filesystem::create_directories(path);
350

351
    /// Check that the process user id matches the owner of the data.
352
    assertProcessUserMatchesDataOwner(path, [&](const std::string & message){ LOG_WARNING(log, fmt::runtime(message)); });
353

354
    DB::ServerUUID::load(path + "/uuid", log);
355

356
    std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
357

358
    if (config().has(DB::PlacementInfo::PLACEMENT_CONFIG_PREFIX))
359
    {
360
        PlacementInfo::PlacementInfo::instance().initialize(config());
361
    }
362

363
    GlobalThreadPool::initialize(
364
        config().getUInt("max_thread_pool_size", 100),
365
        config().getUInt("max_thread_pool_free_size", 1000),
366
        config().getUInt("thread_pool_queue_size", 10000)
367
    );
368
    /// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed).
369
    SCOPE_EXIT({
370
        Stopwatch watch;
371
        LOG_INFO(log, "Waiting for background threads");
372
        GlobalThreadPool::instance().shutdown();
373
        LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
374
    });
375

376
    static ServerErrorHandler error_handler;
377
    Poco::ErrorHandler::set(&error_handler);
378

379
    /// Initialize DateLUT early, to not interfere with running time of first query.
380
    LOG_DEBUG(log, "Initializing DateLUT.");
381
    DateLUT::serverTimezoneInstance();
382
    LOG_TRACE(log, "Initialized DateLUT with time zone '{}'.", DateLUT::serverTimezoneInstance().getTimeZone());
383

384
    /// Don't want to use DNS cache
385
    DNSResolver::instance().setDisableCacheFlag();
386

387
    Poco::ThreadPool server_pool(3, config().getUInt("max_connections", 1024));
388
    std::mutex servers_lock;
389
    auto servers = std::make_shared<std::vector<ProtocolServerAdapter>>();
390

391
    auto shared_context = Context::createShared();
392
    auto global_context = Context::createGlobal(shared_context.get());
393

394
    global_context->makeGlobalContext();
395
    global_context->setApplicationType(Context::ApplicationType::KEEPER);
396
    global_context->setPath(path);
397
    global_context->setRemoteHostFilter(config());
398

399
    if (config().has("macros"))
400
        global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
401

402
    registerDisks(/*global_skip_access_check=*/false);
403

404
    /// This object will periodically calculate some metrics.
405
    KeeperAsynchronousMetrics async_metrics(
406
        global_context,
407
        config().getUInt("asynchronous_metrics_update_period_s", 1),
408
        [&]() -> std::vector<ProtocolServerMetrics>
409
        {
410
            std::vector<ProtocolServerMetrics> metrics;
411

412
            std::lock_guard lock(servers_lock);
413
            metrics.reserve(servers->size());
414
            for (const auto & server : *servers)
415
                metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
416
            return metrics;
417
        }
418
    );
419

420
    std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");
421

422
    bool listen_try = config().getBool("listen_try", false);
423
    if (listen_hosts.empty())
424
    {
425
        listen_hosts.emplace_back("::1");
426
        listen_hosts.emplace_back("127.0.0.1");
427
        listen_try = true;
428
    }
429

430
    /// Initialize keeper RAFT. Do nothing if no keeper_server in config.
431
    global_context->initializeKeeperDispatcher(/* start_async = */ false);
432
    FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher());
433

434
    auto config_getter = [&] () -> const Poco::Util::AbstractConfiguration &
435
    {
436
        return global_context->getConfigRef();
437
    };
438

439
    auto tcp_receive_timeout = config().getInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC);
440
    auto tcp_send_timeout = config().getInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC);
441

442
    for (const auto & listen_host : listen_hosts)
443
    {
444
        /// TCP Keeper
445
        const char * port_name = "keeper_server.tcp_port";
446
        createServer(listen_host, port_name, listen_try, [&](UInt16 port)
447
        {
448
            Poco::Net::ServerSocket socket;
449
            auto address = socketBindListen(socket, listen_host, port);
450
            socket.setReceiveTimeout(Poco::Timespan{tcp_receive_timeout, 0});
451
            socket.setSendTimeout(Poco::Timespan{tcp_send_timeout, 0});
452
            servers->emplace_back(
453
                listen_host,
454
                port_name,
455
                "Keeper (tcp): " + address.toString(),
456
                std::make_unique<TCPServer>(
457
                    new KeeperTCPHandlerFactory(
458
                        config_getter, global_context->getKeeperDispatcher(),
459
                        tcp_receive_timeout, tcp_send_timeout, false), server_pool, socket));
460
        });
461

462
        const char * secure_port_name = "keeper_server.tcp_port_secure";
463
        createServer(listen_host, secure_port_name, listen_try, [&](UInt16 port)
464
        {
465
#if USE_SSL
466
            Poco::Net::SecureServerSocket socket;
467
            auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
468
            socket.setReceiveTimeout(Poco::Timespan{tcp_receive_timeout, 0});
469
            socket.setSendTimeout(Poco::Timespan{tcp_send_timeout, 0});
470
            servers->emplace_back(
471
                listen_host,
472
                secure_port_name,
473
                "Keeper with secure protocol (tcp_secure): " + address.toString(),
474
                std::make_unique<TCPServer>(
475
                    new KeeperTCPHandlerFactory(
476
                        config_getter, global_context->getKeeperDispatcher(),
477
                        tcp_receive_timeout, tcp_send_timeout, true), server_pool, socket));
478
#else
479
            UNUSED(port);
480
            throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
481
#endif
482
        });
483

484
        const auto & config = config_getter();
485
        auto http_context = httpContext();
486
        Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0);
487
        Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
488
        http_params->setTimeout(http_context->getReceiveTimeout());
489
        http_params->setKeepAliveTimeout(keep_alive_timeout);
490

491
        /// Prometheus (if defined and not setup yet with http_port)
492
        port_name = "prometheus.port";
493
        createServer(
494
            listen_host,
495
            port_name,
496
            listen_try,
497
            [&, my_http_context = std::move(http_context)](UInt16 port) mutable
498
            {
499
                Poco::Net::ServerSocket socket;
500
                auto address = socketBindListen(socket, listen_host, port);
501
                socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
502
                socket.setSendTimeout(my_http_context->getSendTimeout());
503
                auto metrics_writer = std::make_shared<KeeperPrometheusMetricsWriter>(config, "prometheus", async_metrics);
504
                servers->emplace_back(
505
                    listen_host,
506
                    port_name,
507
                    "Prometheus: http://" + address.toString(),
508
                    std::make_unique<HTTPServer>(
509
                        std::move(my_http_context),
510
                        createPrometheusMainHandlerFactory(*this, config_getter(), metrics_writer, "PrometheusHandler-factory"),
511
                        server_pool,
512
                        socket,
513
                        http_params));
514
            });
515

516
        /// HTTP control endpoints
517
        port_name = "keeper_server.http_control.port";
518
        createServer(listen_host, port_name, listen_try, [&](UInt16 port) mutable
519
        {
520
            auto my_http_context = httpContext();
521
            Poco::Timespan my_keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0);
522
            Poco::Net::HTTPServerParams::Ptr my_http_params = new Poco::Net::HTTPServerParams;
523
            my_http_params->setTimeout(my_http_context->getReceiveTimeout());
524
            my_http_params->setKeepAliveTimeout(my_keep_alive_timeout);
525

526
            Poco::Net::ServerSocket socket;
527
            auto address = socketBindListen(socket, listen_host, port);
528
            socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
529
            socket.setSendTimeout(my_http_context->getSendTimeout());
530
            servers->emplace_back(
531
                listen_host,
532
                port_name,
533
                "HTTP Control: http://" + address.toString(),
534
                std::make_unique<HTTPServer>(
535
                    std::move(my_http_context), createKeeperHTTPControlMainHandlerFactory(config_getter(), global_context->getKeeperDispatcher(), "KeeperHTTPControlHandler-factory"), server_pool, socket, http_params)
536
                    );
537
        });
538
    }
539

540
    for (auto & server : *servers)
541
    {
542
        server.start();
543
        LOG_INFO(log, "Listening for {}", server.getDescription());
544
    }
545

546
    async_metrics.start();
547

548
    zkutil::EventPtr unused_event = std::make_shared<Poco::Event>();
549
    zkutil::ZooKeeperNodeCache unused_cache([] { return nullptr; });
550

551
    const std::string cert_path = config().getString("openSSL.server.certificateFile", "");
552
    const std::string key_path = config().getString("openSSL.server.privateKeyFile", "");
553

554
    std::vector<std::string> extra_paths = {include_from_path};
555
    if (!cert_path.empty())
556
        extra_paths.emplace_back(cert_path);
557
    if (!key_path.empty())
558
        extra_paths.emplace_back(key_path);
559

560
    /// ConfigReloader have to strict parameters which are redundant in our case
561
    auto main_config_reloader = std::make_unique<ConfigReloader>(
562
        config_path,
563
        extra_paths,
564
        config().getString("path", KEEPER_DEFAULT_PATH),
565
        std::move(unused_cache),
566
        unused_event,
567
        [&](ConfigurationPtr config, bool /* initial_loading */)
568
        {
569
            updateLevels(*config, logger());
570

571
            updateMemorySoftLimitInConfig(*config);
572

573
            if (config->has("keeper_server"))
574
                global_context->updateKeeperConfiguration(*config);
575

576
#if USE_SSL
577
            CertificateReloader::instance().tryLoad(*config);
578
#endif
579
        },
580
        /* already_loaded = */ false);  /// Reload it right now (initial loading)
581

582
    SCOPE_EXIT({
583
        LOG_INFO(log, "Shutting down.");
584
        main_config_reloader.reset();
585

586
        async_metrics.stop();
587

588
        LOG_DEBUG(log, "Waiting for current connections to Keeper to finish.");
589
        size_t current_connections = 0;
590
        for (auto & server : *servers)
591
        {
592
            server.stop();
593
            current_connections += server.currentConnections();
594
        }
595

596
        if (current_connections)
597
            LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
598
        else
599
            LOG_INFO(log, "Closed all listening sockets.");
600

601
        if (current_connections > 0)
602
            current_connections = waitServersToFinish(*servers, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
603

604
        if (current_connections)
605
            LOG_INFO(log, "Closed connections to Keeper. But {} remain. Probably some users cannot finish their connections after context shutdown.", current_connections);
606
        else
607
            LOG_INFO(log, "Closed connections to Keeper.");
608

609
        global_context->shutdownKeeperDispatcher();
610

611
        /// Wait server pool to avoid use-after-free of destroyed context in the handlers
612
        server_pool.joinAll();
613

614
        LOG_DEBUG(log, "Destroyed global context.");
615

616
        if (current_connections)
617
        {
618
            LOG_INFO(log, "Will shutdown forcefully.");
619
            safeExit(0);
620
        }
621
    });
622

623

624
    buildLoggers(config(), logger());
625
    main_config_reloader->start();
626

627
    std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
628
    try
629
    {
630
        auto wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15);
631
        if (wait_time != 0)
632
        {
633
            cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
634
            /// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls
635
            /// its memory usage by other means (via setting 'max_memory_usage_soft_limit').
636
            cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
637
            cgroups_memory_usage_observer->startThread();
638
        }
639
    }
640
    catch (Exception &)
641
    {
642
        tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
643
    }
644

645

646
    LOG_INFO(log, "Ready for connections.");
647

648
    waitForTerminationRequest();
649

650
    return Application::EXIT_OK;
651
}
652
catch (...)
653
{
654
    /// Poco does not provide stacktrace.
655
    tryLogCurrentException("Application");
656
    auto code = getCurrentExceptionCode();
657
    return code ? code : -1;
658
}
659

660

661
void Keeper::logRevision() const
662
{
663
    LOG_INFO(getLogger("Application"),
664
        "Starting ClickHouse Keeper {} (revision: {}, git hash: {}, build id: {}), PID {}",
665
        VERSION_STRING,
666
        ClickHouseRevision::getVersionRevision(),
667
        git_hash.empty() ? "<unknown>" : git_hash,
668
        build_id.empty() ? "<unknown>" : build_id,
669
        getpid());
670
}
671

672

673
}
674

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.