ClickHouse

Форк
0
/
MetricsTransmitter.cpp 
134 строки · 4.2 Кб
1
#include "MetricsTransmitter.h"
2

3
#include <Common/AsynchronousMetrics.h>
4

5
#include <Common/CurrentMetrics.h>
6
#include <Common/Exception.h>
7
#include <Common/setThreadName.h>
8

9
#include <Daemon/BaseDaemon.h>
10

11
#include <Poco/Util/Application.h>
12
#include <Poco/Util/LayeredConfiguration.h>
13

14

15
namespace DB
16
{
17

18
MetricsTransmitter::MetricsTransmitter(
19
    const Poco::Util::AbstractConfiguration & config, const std::string & config_name_, const AsynchronousMetrics & async_metrics_)
20
    : async_metrics(async_metrics_), config_name(config_name_)
21
{
22
    interval_seconds = config.getInt(config_name + ".interval", 60);
23
    send_events = config.getBool(config_name + ".events", true);
24
    send_events_cumulative = config.getBool(config_name + ".events_cumulative", false);
25
    send_metrics = config.getBool(config_name + ".metrics", true);
26
    send_asynchronous_metrics = config.getBool(config_name + ".asynchronous_metrics", true);
27

28
    thread = ThreadFromGlobalPool{&MetricsTransmitter::run, this};
29
}
30

31

32
MetricsTransmitter::~MetricsTransmitter()
33
{
34
    try
35
    {
36
        {
37
            std::lock_guard lock{mutex};
38
            quit = true;
39
        }
40

41
        cond.notify_one();
42

43
        thread->join();
44
    }
45
    catch (...)
46
    {
47
        DB::tryLogCurrentException(__PRETTY_FUNCTION__);
48
    }
49
}
50

51

52
void MetricsTransmitter::run()
53
{
54
    const std::string thread_name = "MetrTx" + std::to_string(interval_seconds);
55
    setThreadName(thread_name.c_str());
56

57
    const auto get_next_time = [](size_t seconds)
58
    {
59
        /// To avoid time drift and transmit values exactly each interval:
60
        ///  next time aligned to system seconds
61
        /// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00
62
        return std::chrono::system_clock::time_point(
63
            (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds
64
            + std::chrono::seconds(seconds));
65
    };
66

67
    std::vector<ProfileEvents::Count> prev_counters(ProfileEvents::end());
68

69
    std::unique_lock lock{mutex};
70

71
    while (true)
72
    {
73
        if (cond.wait_until(lock, get_next_time(interval_seconds), [this] { return quit; }))
74
            break;
75

76
        transmit(prev_counters);
77
    }
78
}
79

80

81
void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_counters)
82
{
83
    auto async_metrics_values = async_metrics.getValues();
84

85
    GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
86
    key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size());
87

88
    if (send_events)
89
    {
90
        for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
91
        {
92
            const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
93
            const auto counter_increment = counter - prev_counters[i];
94
            prev_counters[i] = counter;
95

96
            std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))};
97
            key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
98
        }
99
    }
100

101
    if (send_events_cumulative)
102
    {
103
        for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
104
        {
105
            const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
106
            std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))};
107
            key_vals.emplace_back(profile_events_cumulative_path_prefix + key, counter);
108
        }
109
    }
110

111
    if (send_metrics)
112
    {
113
        for (CurrentMetrics::Metric i = CurrentMetrics::Metric(0), end = CurrentMetrics::end(); i < end; ++i)
114
        {
115
            const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
116

117
            std::string key{CurrentMetrics::getName(static_cast<CurrentMetrics::Metric>(i))};
118
            key_vals.emplace_back(current_metrics_path_prefix + key, value);
119
        }
120
    }
121

122
    if (send_asynchronous_metrics)
123
    {
124
        for (const auto & name_value : async_metrics_values)
125
        {
126
            key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second.value);
127
        }
128
    }
129

130
    if (!key_vals.empty())
131
        BaseDaemon::instance().writeToGraphite(key_vals, config_name);
132
}
133

134
}
135

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.