ClickHouse
507 строк · 15.3 Кб
1#include <AggregateFunctions/AggregateFunctionFactory.h>
2#include <AggregateFunctions/IAggregateFunction.h>
3#include <AggregateFunctions/parseAggregateFunctionParameters.h>
4#include <DataTypes/DataTypesNumber.h>
5#include <Interpreters/Context.h>
6#include <Processors/Merges/Algorithms/Graphite.h>
7#include <base/find_symbols.h>
8#include <base/sort.h>
9
10#include <string_view>
11#include <vector>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <Poco/Util/AbstractConfiguration.h>
17
18
19using namespace std::literals;
20
21namespace DB::ErrorCodes
22{
23extern const int NOT_IMPLEMENTED;
24extern const int BAD_ARGUMENTS;
25extern const int UNKNOWN_ELEMENT_IN_CONFIG;
26extern const int NO_ELEMENTS_IN_CONFIG;
27}
28
29namespace DB::Graphite
30{
31static std::unordered_map<RuleType, const String> ruleTypeMap =
32{
33{ RuleTypeAll, "all" },
34{ RuleTypePlain, "plain" },
35{ RuleTypeTagged, "tagged"},
36{ RuleTypeTagList, "tag_list"}
37};
38
39const String & ruleTypeStr(RuleType rule_type)
40{
41try
42{
43return ruleTypeMap.at(rule_type);
44}
45catch (...)
46{
47throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "invalid rule type: {}", std::to_string(rule_type));
48}
49}
50
51RuleType ruleType(const String & s)
52{
53if (s == "all")
54return RuleTypeAll;
55else if (s == "plain")
56return RuleTypePlain;
57else if (s == "tagged")
58return RuleTypeTagged;
59else if (s == "tag_list")
60return RuleTypeTagList;
61else
62throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "invalid rule type: {}", s);
63}
64
65void Pattern::updateHash(SipHash & hash) const
66{
67hash.update(rule_type);
68hash.update(regexp_str);
69if (function)
70{
71hash.update(function->getName());
72for (const auto & p : function->getParameters())
73hash.update(toString(p));
74}
75for (const auto & r : retentions)
76{
77hash.update(r.age);
78hash.update(r.precision);
79}
80}
81
82static const Graphite::Pattern undef_pattern =
83{ /// empty pattern for selectPatternForPath
84.rule_type = RuleTypeAll,
85.regexp = nullptr,
86.regexp_str = "",
87.function = nullptr,
88.retentions = Graphite::Retentions(),
89.type = Graphite::Pattern::TypeUndef,
90};
91
92inline static const Patterns & selectPatternsForMetricType(const Graphite::Params & params, std::string_view path)
93{
94if (params.patterns_typed)
95{
96std::string_view path_view = path;
97if (path_view.find("?"sv) == std::string::npos)
98return params.patterns_plain;
99else
100return params.patterns_tagged;
101}
102else
103{
104return params.patterns;
105}
106}
107
108Graphite::RollupRule selectPatternForPath(
109const Graphite::Params & params,
110std::string_view path)
111{
112const Graphite::Pattern * first_match = &undef_pattern;
113
114const Patterns & patterns_check = selectPatternsForMetricType(params, path);
115
116for (const auto & pattern : patterns_check)
117{
118if (!pattern.regexp)
119{
120/// Default pattern
121if (first_match->type == Graphite::Pattern::TypeUndef && pattern.type == Graphite::Pattern::TypeAll)
122{
123/// There is only default pattern for both retention and aggregation
124return {&pattern, &pattern};
125}
126if (pattern.type != first_match->type)
127{
128if (first_match->type == Graphite::Pattern::TypeRetention)
129{
130return {first_match, &pattern};
131}
132if (first_match->type == Graphite::Pattern::TypeAggregation)
133{
134return {&pattern, first_match};
135}
136}
137}
138else
139{
140if (pattern.regexp->match(path.data(), path.size()))
141{
142/// General pattern with matched path
143if (pattern.type == Graphite::Pattern::TypeAll)
144{
145/// Only for not default patterns with both function and retention parameters
146return {&pattern, &pattern};
147}
148if (first_match->type == Graphite::Pattern::TypeUndef)
149{
150first_match = &pattern;
151continue;
152}
153if (pattern.type != first_match->type)
154{
155if (first_match->type == Graphite::Pattern::TypeRetention)
156{
157return {first_match, &pattern};
158}
159if (first_match->type == Graphite::Pattern::TypeAggregation)
160{
161return {&pattern, first_match};
162}
163}
164}
165}
166}
167
168return {nullptr, nullptr};
169}
170
171/** Is used to order Graphite::Retentions by age and precision descending.
172* Throws exception if not both age and precision are less or greater then another.
173*/
174static bool compareRetentions(const Retention & a, const Retention & b)
175{
176if (a.age > b.age && a.precision > b.precision)
177{
178return true;
179}
180else if (a.age < b.age && a.precision < b.precision)
181{
182return false;
183}
184throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Age and precision should only grow up: {}:{} vs {}:{}",
185a.age, a.precision, b.age, b.precision);
186}
187
188bool operator==(const Retention & a, const Retention & b)
189{
190return a.age == b.age && a.precision == b.precision;
191}
192
193std::ostream & operator<<(std::ostream & stream, const Retentions & a)
194{
195stream << "{ ";
196for (size_t i = 0; i < a.size(); i++)
197{
198if (i > 0)
199stream << ",";
200stream << " { age = " << a[i].age << ", precision = " << a[i].precision << " }";
201}
202stream << " }";
203
204return stream;
205}
206
207bool operator==(const Pattern & a, const Pattern & b)
208{
209// equal
210// Retentions retentions; /// Must be ordered by 'age' descending.
211if (a.type != b.type || a.regexp_str != b.regexp_str || a.rule_type != b.rule_type)
212return false;
213
214if (a.function == nullptr)
215{
216if (b.function != nullptr)
217return false;
218}
219else if (b.function == nullptr)
220{
221return false;
222}
223else if (a.function->getName() != b.function->getName())
224{
225return false;
226}
227
228return a.retentions == b.retentions;
229}
230
231std::ostream & operator<<(std::ostream & stream, const Pattern & a)
232{
233stream << "{ rule_type = " << ruleTypeStr(a.rule_type);
234if (!a.regexp_str.empty())
235stream << ", regexp = '" << a.regexp_str << "'";
236if (a.function != nullptr)
237stream << ", function = " << a.function->getName();
238if (!a.retentions.empty())
239{
240stream << ",\n retentions = {\n";
241for (size_t i = 0; i < a.retentions.size(); i++)
242{
243stream << " { " << a.retentions[i].age << ", " << a.retentions[i].precision << " }";
244if (i < a.retentions.size() - 1)
245stream << ",";
246stream << "\n";
247}
248stream << " }\n";
249}
250else
251stream << " ";
252
253stream << "}";
254return stream;
255}
256
257std::string buildTaggedRegex(std::string regexp_str)
258{
259/*
260* tags list in format (for name or any value can use regexp, alphabet sorting not needed)
261* spaces are not stiped and used as tag and value part
262* name must be first (if used)
263*
264* tag1=value1; tag2=VALUE2_REGEX;tag3=value3
265* or
266* name;tag1=value1;tag2=VALUE2_REGEX;tag3=value3
267* or for one tag
268* tag1=value1
269*
270* Resulting regex against metric like
271* name?tag1=value1&tag2=value2
272*
273* So,
274*
275* name
276* produce
277* name\?
278*
279* tag2=val2
280* produce
281* [\?&]tag2=val2(&.*)?$
282*
283* nam.* ; tag1=val1 ; tag2=val2
284* produce
285* nam.*\?(.*&)?tag1=val1&(.*&)?tag2=val2(&.*)?$
286*/
287
288std::vector<std::string> tags;
289
290splitInto<';'>(tags, regexp_str);
291/* remove empty elements */
292using namespace std::string_literals;
293std::erase(tags, ""s);
294if (tags[0].find('=') == tags[0].npos)
295{
296if (tags.size() == 1) /* only name */
297return "^" + tags[0] + "\\?";
298/* start with name value */
299regexp_str = "^" + tags[0] + "\\?(.*&)?";
300tags.erase(std::begin(tags));
301}
302else
303regexp_str = "[\\?&]";
304
305::sort(std::begin(tags), std::end(tags)); /* sorted tag keys */
306regexp_str += fmt::format(
307"{}{}",
308fmt::join(tags, "&(.*&)?"),
309"(&.*)?$" /* close regex */
310);
311
312return regexp_str;
313}
314
315/** Read the settings for Graphite rollup from config.
316* Example
317*
318* <graphite_rollup>
319* <path_column_name>Path</path_column_name>
320* <pattern>
321* <regexp>click_cost</regexp>
322* <function>any</function>
323* <retention>
324* <age>0</age>
325* <precision>3600</precision>
326* </retention>
327* <retention>
328* <age>86400</age>
329* <precision>60</precision>
330* </retention>
331* </pattern>
332* <default>
333* <function>max</function>
334* <retention>
335* <age>0</age>
336* <precision>60</precision>
337* </retention>
338* <retention>
339* <age>3600</age>
340* <precision>300</precision>
341* </retention>
342* <retention>
343* <age>86400</age>
344* <precision>3600</precision>
345* </retention>
346* </default>
347* </graphite_rollup>
348*/
349static const Pattern & appendGraphitePattern(
350const Poco::Util::AbstractConfiguration & config,
351const String & config_element, Patterns & patterns,
352bool default_rule,
353ContextPtr context)
354{
355Pattern pattern;
356
357Poco::Util::AbstractConfiguration::Keys keys;
358config.keys(config_element, keys);
359
360for (const auto & key : keys)
361{
362if (key == "regexp")
363{
364pattern.regexp_str = config.getString(config_element + ".regexp");
365}
366else if (key == "function")
367{
368String aggregate_function_name_with_params = config.getString(config_element + ".function");
369String aggregate_function_name;
370Array params_row;
371getAggregateFunctionNameAndParametersArray(
372aggregate_function_name_with_params, aggregate_function_name, params_row, "GraphiteMergeTree storage initialization", context);
373
374/// TODO Not only Float64
375auto action = NullsAction::EMPTY;
376AggregateFunctionProperties properties;
377pattern.function = AggregateFunctionFactory::instance().get(
378aggregate_function_name, action, {std::make_shared<DataTypeFloat64>()}, params_row, properties);
379}
380else if (key == "rule_type")
381{
382String rule_type = config.getString(config_element + ".rule_type");
383pattern.rule_type = ruleType(rule_type);
384}
385else if (startsWith(key, "retention"))
386{
387pattern.retentions.emplace_back(Graphite::Retention{
388.age = config.getUInt(config_element + "." + key + ".age"),
389.precision = config.getUInt(config_element + "." + key + ".precision")});
390}
391else
392throw Exception(DB::ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", key);
393}
394
395if (!pattern.regexp_str.empty())
396{
397if (pattern.rule_type == RuleTypeTagList)
398{
399// construct tagged regexp
400pattern.regexp_str = buildTaggedRegex(pattern.regexp_str);
401pattern.rule_type = RuleTypeTagged;
402}
403pattern.regexp = std::make_shared<OptimizedRegularExpression>(pattern.regexp_str);
404}
405
406if (!pattern.function && pattern.retentions.empty())
407throw Exception(DB::ErrorCodes::NO_ELEMENTS_IN_CONFIG,
408"At least one of an aggregate function or retention rules is mandatory for rollup patterns in GraphiteMergeTree");
409
410if (default_rule && pattern.rule_type != RuleTypeAll)
411{
412throw Exception(DB::ErrorCodes::BAD_ARGUMENTS,
413"Default must have rule_type all for rollup patterns in GraphiteMergeTree");
414}
415
416if (!pattern.function)
417{
418pattern.type = Graphite::Pattern::TypeRetention;
419}
420else if (pattern.retentions.empty())
421{
422pattern.type = Graphite::Pattern::TypeAggregation;
423}
424else
425{
426pattern.type = Graphite::Pattern::TypeAll;
427}
428
429if (pattern.type & Graphite::Pattern::TypeAggregation) /// TypeAggregation or TypeAll
430if (pattern.function->allocatesMemoryInArena())
431throw Exception(DB::ErrorCodes::NOT_IMPLEMENTED,
432"Aggregate function {} isn't supported in GraphiteMergeTree", pattern.function->getName());
433
434/// retention should be in descending order of age.
435if (pattern.type & Graphite::Pattern::TypeRetention) /// TypeRetention or TypeAll
436::sort(pattern.retentions.begin(), pattern.retentions.end(), compareRetentions);
437
438patterns.emplace_back(pattern);
439return patterns.back();
440}
441
442void setGraphitePatternsFromConfig(ContextPtr context, const String & config_element, Graphite::Params & params)
443{
444const auto & config = context->getConfigRef();
445
446if (!config.has(config_element))
447throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "No '{}' element in configuration file", config_element);
448
449params.config_name = config_element;
450params.path_column_name = config.getString(config_element + ".path_column_name", "Path");
451params.time_column_name = config.getString(config_element + ".time_column_name", "Time");
452params.value_column_name = config.getString(config_element + ".value_column_name", "Value");
453params.version_column_name = config.getString(config_element + ".version_column_name", "Timestamp");
454
455params.patterns_typed = false;
456
457Poco::Util::AbstractConfiguration::Keys keys;
458config.keys(config_element, keys);
459
460for (const auto & key : keys)
461{
462if (startsWith(key, "pattern"))
463{
464if (appendGraphitePattern(config, config_element + "." + key, params.patterns, false, context).rule_type != RuleTypeAll)
465params.patterns_typed = true;
466}
467else if (key == "default")
468{
469/// See below.
470}
471else if (key == "path_column_name" || key == "time_column_name" || key == "value_column_name" || key == "version_column_name")
472{
473/// See above.
474}
475else
476throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", key);
477}
478
479if (config.has(config_element + ".default"))
480appendGraphitePattern(config, config_element + "." + ".default", params.patterns, true, context);
481
482for (const auto & pattern : params.patterns)
483{
484if (pattern.rule_type == RuleTypeAll)
485{
486if (params.patterns_typed)
487{
488params.patterns_plain.push_back(pattern);
489params.patterns_tagged.push_back(pattern);
490}
491}
492else if (pattern.rule_type == RuleTypePlain)
493{
494params.patterns_plain.push_back(pattern);
495}
496else if (pattern.rule_type == RuleTypeTagged)
497{
498params.patterns_tagged.push_back(pattern);
499}
500else
501{
502throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unhandled rule_type in config: {}", ruleTypeStr(pattern.rule_type));
503}
504}
505}
506
507}
508