ClickHouse

Форк
0
507 строк · 15.3 Кб
1
#include <AggregateFunctions/AggregateFunctionFactory.h>
2
#include <AggregateFunctions/IAggregateFunction.h>
3
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
4
#include <DataTypes/DataTypesNumber.h>
5
#include <Interpreters/Context.h>
6
#include <Processors/Merges/Algorithms/Graphite.h>
7
#include <base/find_symbols.h>
8
#include <base/sort.h>
9

10
#include <string_view>
11
#include <vector>
12
#include <unordered_map>
13

14
#include <fmt/format.h>
15

16
#include <Poco/Util/AbstractConfiguration.h>
17

18

19
using namespace std::literals;
20

21
namespace DB::ErrorCodes
22
{
23
    extern const int NOT_IMPLEMENTED;
24
    extern const int BAD_ARGUMENTS;
25
    extern const int UNKNOWN_ELEMENT_IN_CONFIG;
26
    extern const int NO_ELEMENTS_IN_CONFIG;
27
 }
28

29
namespace DB::Graphite
30
{
31
static std::unordered_map<RuleType, const String> ruleTypeMap =
32
{
33
   { RuleTypeAll, "all" },
34
   { RuleTypePlain, "plain" },
35
   { RuleTypeTagged, "tagged"},
36
   { RuleTypeTagList, "tag_list"}
37
};
38

39
const String & ruleTypeStr(RuleType rule_type)
40
{
41
    try
42
    {
43
        return ruleTypeMap.at(rule_type);
44
    }
45
    catch (...)
46
    {
47
        throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "invalid rule type: {}", std::to_string(rule_type));
48
    }
49
}
50

51
RuleType ruleType(const String & s)
52
{
53
    if (s == "all")
54
        return RuleTypeAll;
55
    else if (s == "plain")
56
        return RuleTypePlain;
57
    else if (s == "tagged")
58
        return RuleTypeTagged;
59
    else if (s == "tag_list")
60
        return RuleTypeTagList;
61
    else
62
        throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "invalid rule type: {}", s);
63
}
64

65
void Pattern::updateHash(SipHash & hash) const
66
{
67
    hash.update(rule_type);
68
    hash.update(regexp_str);
69
    if (function)
70
    {
71
        hash.update(function->getName());
72
        for (const auto & p : function->getParameters())
73
            hash.update(toString(p));
74
    }
75
    for (const auto & r : retentions)
76
    {
77
        hash.update(r.age);
78
        hash.update(r.precision);
79
    }
80
}
81

82
static const Graphite::Pattern undef_pattern =
83
{ /// empty pattern for selectPatternForPath
84
        .rule_type = RuleTypeAll,
85
        .regexp = nullptr,
86
        .regexp_str = "",
87
        .function = nullptr,
88
        .retentions = Graphite::Retentions(),
89
        .type = Graphite::Pattern::TypeUndef,
90
};
91

92
inline static const Patterns & selectPatternsForMetricType(const Graphite::Params & params, std::string_view path)
93
{
94
    if (params.patterns_typed)
95
    {
96
        std::string_view path_view = path;
97
        if (path_view.find("?"sv) == std::string::npos)
98
            return params.patterns_plain;
99
        else
100
            return params.patterns_tagged;
101
    }
102
    else
103
    {
104
        return params.patterns;
105
    }
106
}
107

108
Graphite::RollupRule selectPatternForPath(
109
        const Graphite::Params & params,
110
        std::string_view path)
111
{
112
    const Graphite::Pattern * first_match = &undef_pattern;
113

114
    const Patterns & patterns_check = selectPatternsForMetricType(params, path);
115

116
    for (const auto & pattern : patterns_check)
117
    {
118
        if (!pattern.regexp)
119
        {
120
            /// Default pattern
121
            if (first_match->type == Graphite::Pattern::TypeUndef && pattern.type == Graphite::Pattern::TypeAll)
122
            {
123
                /// There is only default pattern for both retention and aggregation
124
                return {&pattern, &pattern};
125
            }
126
            if (pattern.type != first_match->type)
127
            {
128
                if (first_match->type == Graphite::Pattern::TypeRetention)
129
                {
130
                    return {first_match, &pattern};
131
                }
132
                if (first_match->type == Graphite::Pattern::TypeAggregation)
133
                {
134
                    return {&pattern, first_match};
135
                }
136
            }
137
        }
138
        else
139
        {
140
            if (pattern.regexp->match(path.data(), path.size()))
141
            {
142
                /// General pattern with matched path
143
                if (pattern.type == Graphite::Pattern::TypeAll)
144
                {
145
                    /// Only for not default patterns with both function and retention parameters
146
                    return {&pattern, &pattern};
147
                }
148
                if (first_match->type == Graphite::Pattern::TypeUndef)
149
                {
150
                    first_match = &pattern;
151
                    continue;
152
                }
153
                if (pattern.type != first_match->type)
154
                {
155
                    if (first_match->type == Graphite::Pattern::TypeRetention)
156
                    {
157
                        return {first_match, &pattern};
158
                    }
159
                    if (first_match->type == Graphite::Pattern::TypeAggregation)
160
                    {
161
                        return {&pattern, first_match};
162
                    }
163
                }
164
            }
165
        }
166
    }
167

168
    return {nullptr, nullptr};
169
}
170

171
/** Is used to order Graphite::Retentions by age and precision descending.
172
  * Throws exception if not both age and precision are less or greater then another.
173
  */
174
static bool compareRetentions(const Retention & a, const Retention & b)
175
{
176
    if (a.age > b.age && a.precision > b.precision)
177
    {
178
        return true;
179
    }
180
    else if (a.age < b.age && a.precision < b.precision)
181
    {
182
        return false;
183
    }
184
    throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Age and precision should only grow up: {}:{} vs {}:{}",
185
                    a.age, a.precision, b.age, b.precision);
186
}
187

188
bool operator==(const Retention & a, const Retention & b)
189
{
190
    return a.age == b.age && a.precision == b.precision;
191
}
192

193
std::ostream & operator<<(std::ostream & stream, const Retentions & a)
194
{
195
    stream << "{ ";
196
    for (size_t i = 0; i < a.size(); i++)
197
    {
198
        if (i > 0)
199
            stream << ",";
200
        stream << " { age = " << a[i].age << ", precision = " << a[i].precision << " }";
201
    }
202
    stream << " }";
203

204
    return stream;
205
}
206

207
bool operator==(const Pattern & a, const Pattern & b)
208
{
209
    // equal
210
    // Retentions retentions;    /// Must be ordered by 'age' descending.
211
    if (a.type != b.type || a.regexp_str != b.regexp_str || a.rule_type != b.rule_type)
212
        return false;
213

214
    if (a.function == nullptr)
215
    {
216
        if (b.function != nullptr)
217
            return false;
218
    }
219
    else if (b.function == nullptr)
220
    {
221
        return false;
222
    }
223
    else if (a.function->getName() != b.function->getName())
224
    {
225
        return false;
226
    }
227

228
    return a.retentions == b.retentions;
229
}
230

231
std::ostream & operator<<(std::ostream & stream, const Pattern & a)
232
{
233
    stream << "{ rule_type = " << ruleTypeStr(a.rule_type);
234
    if (!a.regexp_str.empty())
235
        stream << ", regexp = '" << a.regexp_str << "'";
236
    if (a.function != nullptr)
237
        stream << ", function = " << a.function->getName();
238
    if (!a.retentions.empty())
239
    {
240
        stream << ",\n  retentions = {\n";
241
        for (size_t i = 0; i < a.retentions.size(); i++)
242
        {
243
            stream << "    { " << a.retentions[i].age << ", " << a.retentions[i].precision << " }";
244
            if (i < a.retentions.size() - 1)
245
                stream << ",";
246
            stream << "\n";
247
        }
248
        stream << "  }\n";
249
    }
250
    else
251
        stream << " ";
252

253
    stream << "}";
254
    return stream;
255
}
256

257
std::string buildTaggedRegex(std::string regexp_str)
258
{
259
    /*
260
    * tags list in format (for name or any value can use regexp, alphabet sorting not needed)
261
    * spaces are not stiped and used as tag and value part
262
    * name must be first (if used)
263
    *
264
    * tag1=value1; tag2=VALUE2_REGEX;tag3=value3
265
    * or
266
    * name;tag1=value1;tag2=VALUE2_REGEX;tag3=value3
267
    * or for one tag
268
    * tag1=value1
269
    *
270
    * Resulting regex against metric like
271
    * name?tag1=value1&tag2=value2
272
    *
273
    * So,
274
    *
275
    * name
276
    * produce
277
    * name\?
278
    *
279
    * tag2=val2
280
    * produce
281
    * [\?&]tag2=val2(&.*)?$
282
    *
283
    * nam.* ; tag1=val1 ; tag2=val2
284
    * produce
285
    * nam.*\?(.*&)?tag1=val1&(.*&)?tag2=val2(&.*)?$
286
    */
287

288
    std::vector<std::string> tags;
289

290
    splitInto<';'>(tags, regexp_str);
291
    /* remove empty elements */
292
    using namespace std::string_literals;
293
    std::erase(tags, ""s);
294
    if (tags[0].find('=') == tags[0].npos)
295
    {
296
        if (tags.size() == 1) /* only name */
297
            return "^" + tags[0] + "\\?";
298
        /* start with name value */
299
        regexp_str = "^" + tags[0] + "\\?(.*&)?";
300
        tags.erase(std::begin(tags));
301
    }
302
    else
303
        regexp_str = "[\\?&]";
304

305
    ::sort(std::begin(tags), std::end(tags)); /* sorted tag keys */
306
    regexp_str += fmt::format(
307
        "{}{}",
308
        fmt::join(tags, "&(.*&)?"),
309
        "(&.*)?$"  /* close regex */
310
    );
311

312
    return regexp_str;
313
}
314

315
/** Read the settings for Graphite rollup from config.
316
  * Example
317
  *
318
  * <graphite_rollup>
319
  *     <path_column_name>Path</path_column_name>
320
  *     <pattern>
321
  *         <regexp>click_cost</regexp>
322
  *         <function>any</function>
323
  *         <retention>
324
  *             <age>0</age>
325
  *             <precision>3600</precision>
326
  *         </retention>
327
  *         <retention>
328
  *             <age>86400</age>
329
  *             <precision>60</precision>
330
  *         </retention>
331
  *     </pattern>
332
  *     <default>
333
  *         <function>max</function>
334
  *         <retention>
335
  *             <age>0</age>
336
  *             <precision>60</precision>
337
  *         </retention>
338
  *         <retention>
339
  *             <age>3600</age>
340
  *             <precision>300</precision>
341
  *         </retention>
342
  *         <retention>
343
  *             <age>86400</age>
344
  *             <precision>3600</precision>
345
  *         </retention>
346
  *     </default>
347
  * </graphite_rollup>
348
  */
349
static const Pattern & appendGraphitePattern(
350
    const Poco::Util::AbstractConfiguration & config,
351
    const String & config_element, Patterns & patterns,
352
    bool default_rule,
353
    ContextPtr context)
354
{
355
    Pattern pattern;
356

357
    Poco::Util::AbstractConfiguration::Keys keys;
358
    config.keys(config_element, keys);
359

360
    for (const auto & key : keys)
361
    {
362
        if (key == "regexp")
363
        {
364
            pattern.regexp_str = config.getString(config_element + ".regexp");
365
        }
366
        else if (key == "function")
367
        {
368
            String aggregate_function_name_with_params = config.getString(config_element + ".function");
369
            String aggregate_function_name;
370
            Array params_row;
371
            getAggregateFunctionNameAndParametersArray(
372
                aggregate_function_name_with_params, aggregate_function_name, params_row, "GraphiteMergeTree storage initialization", context);
373

374
            /// TODO Not only Float64
375
            auto action = NullsAction::EMPTY;
376
            AggregateFunctionProperties properties;
377
            pattern.function = AggregateFunctionFactory::instance().get(
378
                aggregate_function_name, action, {std::make_shared<DataTypeFloat64>()}, params_row, properties);
379
        }
380
        else if (key == "rule_type")
381
        {
382
            String rule_type = config.getString(config_element + ".rule_type");
383
            pattern.rule_type = ruleType(rule_type);
384
        }
385
        else if (startsWith(key, "retention"))
386
        {
387
            pattern.retentions.emplace_back(Graphite::Retention{
388
                .age = config.getUInt(config_element + "." + key + ".age"),
389
                .precision = config.getUInt(config_element + "." + key + ".precision")});
390
        }
391
        else
392
            throw Exception(DB::ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", key);
393
    }
394

395
    if (!pattern.regexp_str.empty())
396
    {
397
        if (pattern.rule_type == RuleTypeTagList)
398
        {
399
            // construct tagged regexp
400
            pattern.regexp_str = buildTaggedRegex(pattern.regexp_str);
401
            pattern.rule_type = RuleTypeTagged;
402
        }
403
        pattern.regexp = std::make_shared<OptimizedRegularExpression>(pattern.regexp_str);
404
    }
405

406
    if (!pattern.function && pattern.retentions.empty())
407
        throw Exception(DB::ErrorCodes::NO_ELEMENTS_IN_CONFIG,
408
            "At least one of an aggregate function or retention rules is mandatory for rollup patterns in GraphiteMergeTree");
409

410
    if (default_rule && pattern.rule_type != RuleTypeAll)
411
    {
412
        throw Exception(DB::ErrorCodes::BAD_ARGUMENTS,
413
            "Default must have rule_type all for rollup patterns in GraphiteMergeTree");
414
    }
415

416
    if (!pattern.function)
417
    {
418
        pattern.type = Graphite::Pattern::TypeRetention;
419
    }
420
    else if (pattern.retentions.empty())
421
    {
422
        pattern.type = Graphite::Pattern::TypeAggregation;
423
    }
424
    else
425
    {
426
        pattern.type = Graphite::Pattern::TypeAll;
427
    }
428

429
    if (pattern.type & Graphite::Pattern::TypeAggregation) /// TypeAggregation or TypeAll
430
        if (pattern.function->allocatesMemoryInArena())
431
            throw Exception(DB::ErrorCodes::NOT_IMPLEMENTED,
432
                            "Aggregate function {} isn't supported in GraphiteMergeTree", pattern.function->getName());
433

434
    /// retention should be in descending order of age.
435
    if (pattern.type & Graphite::Pattern::TypeRetention) /// TypeRetention or TypeAll
436
        ::sort(pattern.retentions.begin(), pattern.retentions.end(), compareRetentions);
437

438
    patterns.emplace_back(pattern);
439
    return patterns.back();
440
}
441

442
void setGraphitePatternsFromConfig(ContextPtr context, const String & config_element, Graphite::Params & params)
443
{
444
    const auto & config = context->getConfigRef();
445

446
    if (!config.has(config_element))
447
        throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "No '{}' element in configuration file", config_element);
448

449
    params.config_name = config_element;
450
    params.path_column_name = config.getString(config_element + ".path_column_name", "Path");
451
    params.time_column_name = config.getString(config_element + ".time_column_name", "Time");
452
    params.value_column_name = config.getString(config_element + ".value_column_name", "Value");
453
    params.version_column_name = config.getString(config_element + ".version_column_name", "Timestamp");
454

455
    params.patterns_typed = false;
456

457
    Poco::Util::AbstractConfiguration::Keys keys;
458
    config.keys(config_element, keys);
459

460
    for (const auto & key : keys)
461
    {
462
        if (startsWith(key, "pattern"))
463
        {
464
            if (appendGraphitePattern(config, config_element + "." + key, params.patterns, false, context).rule_type != RuleTypeAll)
465
                params.patterns_typed = true;
466
        }
467
        else if (key == "default")
468
        {
469
            /// See below.
470
        }
471
        else if (key == "path_column_name" || key == "time_column_name" || key == "value_column_name" || key == "version_column_name")
472
        {
473
            /// See above.
474
        }
475
        else
476
            throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", key);
477
    }
478

479
    if (config.has(config_element + ".default"))
480
        appendGraphitePattern(config, config_element + "." + ".default", params.patterns, true, context);
481

482
    for (const auto & pattern : params.patterns)
483
    {
484
        if (pattern.rule_type == RuleTypeAll)
485
        {
486
            if (params.patterns_typed)
487
            {
488
                params.patterns_plain.push_back(pattern);
489
                params.patterns_tagged.push_back(pattern);
490
            }
491
        }
492
        else if (pattern.rule_type == RuleTypePlain)
493
        {
494
            params.patterns_plain.push_back(pattern);
495
        }
496
        else if (pattern.rule_type == RuleTypeTagged)
497
        {
498
            params.patterns_tagged.push_back(pattern);
499
        }
500
        else
501
        {
502
            throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unhandled rule_type in config: {}", ruleTypeStr(pattern.rule_type));
503
        }
504
    }
505
}
506

507
}
508

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.