ClickHouse
134 строки · 4.2 Кб
1#include "MetricsTransmitter.h"
2
3#include <Common/AsynchronousMetrics.h>
4
5#include <Common/CurrentMetrics.h>
6#include <Common/Exception.h>
7#include <Common/setThreadName.h>
8
9#include <Daemon/BaseDaemon.h>
10
11#include <Poco/Util/Application.h>
12#include <Poco/Util/LayeredConfiguration.h>
13
14
15namespace DB
16{
17
18MetricsTransmitter::MetricsTransmitter(
19const Poco::Util::AbstractConfiguration & config, const std::string & config_name_, const AsynchronousMetrics & async_metrics_)
20: async_metrics(async_metrics_), config_name(config_name_)
21{
22interval_seconds = config.getInt(config_name + ".interval", 60);
23send_events = config.getBool(config_name + ".events", true);
24send_events_cumulative = config.getBool(config_name + ".events_cumulative", false);
25send_metrics = config.getBool(config_name + ".metrics", true);
26send_asynchronous_metrics = config.getBool(config_name + ".asynchronous_metrics", true);
27
28thread = ThreadFromGlobalPool{&MetricsTransmitter::run, this};
29}
30
31
32MetricsTransmitter::~MetricsTransmitter()
33{
34try
35{
36{
37std::lock_guard lock{mutex};
38quit = true;
39}
40
41cond.notify_one();
42
43thread->join();
44}
45catch (...)
46{
47DB::tryLogCurrentException(__PRETTY_FUNCTION__);
48}
49}
50
51
52void MetricsTransmitter::run()
53{
54const std::string thread_name = "MetrTx" + std::to_string(interval_seconds);
55setThreadName(thread_name.c_str());
56
57const auto get_next_time = [](size_t seconds)
58{
59/// To avoid time drift and transmit values exactly each interval:
60/// next time aligned to system seconds
61/// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00
62return std::chrono::system_clock::time_point(
63(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds
64+ std::chrono::seconds(seconds));
65};
66
67std::vector<ProfileEvents::Count> prev_counters(ProfileEvents::end());
68
69std::unique_lock lock{mutex};
70
71while (true)
72{
73if (cond.wait_until(lock, get_next_time(interval_seconds), [this] { return quit; }))
74break;
75
76transmit(prev_counters);
77}
78}
79
80
81void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_counters)
82{
83auto async_metrics_values = async_metrics.getValues();
84
85GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
86key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size());
87
88if (send_events)
89{
90for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
91{
92const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
93const auto counter_increment = counter - prev_counters[i];
94prev_counters[i] = counter;
95
96std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))};
97key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
98}
99}
100
101if (send_events_cumulative)
102{
103for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
104{
105const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
106std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))};
107key_vals.emplace_back(profile_events_cumulative_path_prefix + key, counter);
108}
109}
110
111if (send_metrics)
112{
113for (CurrentMetrics::Metric i = CurrentMetrics::Metric(0), end = CurrentMetrics::end(); i < end; ++i)
114{
115const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
116
117std::string key{CurrentMetrics::getName(static_cast<CurrentMetrics::Metric>(i))};
118key_vals.emplace_back(current_metrics_path_prefix + key, value);
119}
120}
121
122if (send_asynchronous_metrics)
123{
124for (const auto & name_value : async_metrics_values)
125{
126key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second.value);
127}
128}
129
130if (!key_vals.empty())
131BaseDaemon::instance().writeToGraphite(key_vals, config_name);
132}
133
134}
135