ClickHouse

Форк
0
/
AggregateFunctionQuantileGK.cpp 
551 строка · 20.0 Кб
1
#include <AggregateFunctions/AggregateFunctionQuantile.h>
2
#include <AggregateFunctions/AggregateFunctionFactory.h>
3
#include <AggregateFunctions/Helpers.h>
4
#include <DataTypes/DataTypeDate.h>
5
#include <DataTypes/DataTypeDateTime.h>
6
#include <Core/Field.h>
7
#include <cmath>
8
#include <Common/RadixSort.h>
9
#include <IO/WriteBuffer.h>
10
#include <IO/ReadBuffer.h>
11
#include <IO/WriteHelpers.h>
12
#include <IO/ReadHelpers.h>
13

14
namespace DB
15
{
16

17
namespace ErrorCodes
18
{
19
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
20
    extern const int ILLEGAL_TYPE_OF_ARGUMENT;
21
    extern const int INCORRECT_DATA;
22
    extern const int LOGICAL_ERROR;
23
    extern const int NOT_IMPLEMENTED;
24
}
25

26
namespace
27
{
28

29
template <typename T>
30
class ApproxSampler
31
{
32
public:
33
    struct Stats
34
    {
35
        T value;     // The sampled value
36
        Int64 g;     // The minimum rank jump from the previous value's minimum rank
37
        Int64 delta; // The maximum span of the rank
38

39
        Stats() = default;
40
        Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) { }
41
    };
42

43
    struct QueryResult
44
    {
45
        size_t index;
46
        Int64 rank;
47
        T value;
48

49
        QueryResult(size_t index_, Int64 rank_, T value_) : index(index_), rank(rank_), value(value_) { }
50
    };
51

52
    ApproxSampler() = default;
53

54
    ApproxSampler(const ApproxSampler & other)
55
        : relative_error(other.relative_error)
56
        , compress_threshold(other.compress_threshold)
57
        , count(other.count)
58
        , compressed(other.compressed)
59
        , sampled(other.sampled.begin(), other.sampled.end())
60
        , backup_sampled(other.backup_sampled.begin(), other.backup_sampled.end())
61
        , head_sampled(other.head_sampled.begin(), other.head_sampled.end())
62
    {
63
    }
64

65
    explicit ApproxSampler(double relative_error_)
66
        : relative_error(relative_error_), compress_threshold(default_compress_threshold), count(0), compressed(false)
67
    {
68
    }
69

70
    bool isCompressed() const { return compressed; }
71
    void setCompressed() { compressed = true; }
72

73
    void insert(T x)
74
    {
75
        head_sampled.push_back(x);
76
        compressed = false;
77
        if (head_sampled.size() >= default_head_size)
78
        {
79
            withHeadBufferInserted();
80
            if (sampled.size() >= compress_threshold)
81
                compress();
82
        }
83
    }
84

85
    void query(const Float64 * percentiles, const size_t * indices, size_t size, T * result) const
86
    {
87
        if (!head_sampled.empty())
88
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot operate on an uncompressed summary, call compress() first");
89

90
        if (sampled.empty())
91
        {
92
            for (size_t i = 0; i < size; ++i)
93
                result[i] = T();
94
            return;
95
        }
96

97
        Int64 current_max = std::numeric_limits<Int64>::min();
98
        for (const auto & stats : sampled)
99
            current_max = std::max(stats.delta + stats.g, current_max);
100
        Int64 target_error = current_max / 2;
101

102
        size_t index = 0;
103
        auto min_rank = sampled[0].g;
104
        for (size_t i = 0; i < size; ++i)
105
        {
106
            double percentile = percentiles[indices[i]];
107
            if (percentile <= relative_error)
108
            {
109
                result[indices[i]] = sampled.front().value;
110
            }
111
            else if (percentile >= 1 - relative_error)
112
            {
113
                result[indices[i]] = sampled.back().value;
114
            }
115
            else
116
            {
117
                QueryResult res = findApproxQuantile(index, min_rank, target_error, percentile);
118
                index = res.index;
119
                min_rank = res.rank;
120
                result[indices[i]] = res.value;
121
            }
122
        }
123
    }
124

125
    void compress()
126
    {
127
        if (compressed)
128
            return;
129

130
        withHeadBufferInserted();
131

132
        doCompress(2 * relative_error * count);
133
        compressed = true;
134
    }
135

136

137
    void merge(const ApproxSampler & other)
138
    {
139
        if (other.count == 0)
140
            return;
141
        else if (count == 0)
142
        {
143
            compress_threshold = other.compress_threshold;
144
            relative_error = other.relative_error;
145
            count = other.count;
146
            compressed = other.compressed;
147

148
            sampled.resize_exact(other.sampled.size());
149
            memcpy(sampled.data(), other.sampled.data(), sizeof(Stats) * other.sampled.size());
150
            return;
151
        }
152
        else
153
        {
154
            // Merge the two buffers.
155
            // The GK algorithm is a bit unclear about it, but we need to adjust the statistics during the
156
            // merging. The main idea is that samples that come from one side will suffer from the lack of
157
            // precision of the other.
158
            // As a concrete example, take two QuantileSummaries whose samples (value, g, delta) are:
159
            // `a = [(0, 1, 0), (20, 99, 0)]` and `b = [(10, 1, 0), (30, 49, 0)]`
160
            // This means `a` has 100 values, whose minimum is 0 and maximum is 20,
161
            // while `b` has 50 values, between 10 and 30.
162
            // The resulting samples of the merge will be:
163
            // a+b = [(0, 1, 0), (10, 1, ??), (20, 99, ??), (30, 49, 0)]
164
            // The values of `g` do not change, as they represent the minimum number of values between two
165
            // consecutive samples. The values of `delta` should be adjusted, however.
166
            // Take the case of the sample `10` from `b`. In the original stream, it could have appeared
167
            // right after `0` (as expressed by `g=1`) or right before `20`, so `delta=99+0-1=98`.
168
            // In the GK algorithm's style of working in terms of maximum bounds, one can observe that the
169
            // maximum additional uncertainty over samples coming from `b` is `max(g_a + delta_a) =
170
            // floor(2 * eps_a * n_a)`. Likewise, additional uncertainty over samples from `a` is
171
            // `floor(2 * eps_b * n_b)`.
172
            // Only samples that interleave the other side are affected. That means that samples from
173
            // one side that are lesser (or greater) than all samples from the other side are just copied
174
            // unmodified.
175
            // If the merging instances have different `relativeError`, the resulting instance will carry
176
            // the largest one: `eps_ab = max(eps_a, eps_b)`.
177
            // The main invariant of the GK algorithm is kept:
178
            // `max(g_ab + delta_ab) <= floor(2 * eps_ab * (n_a + n_b))` since
179
            // `max(g_ab + delta_ab) <= floor(2 * eps_a * n_a) + floor(2 * eps_b * n_b)`
180
            // Finally, one can see how the `insert(x)` operation can be expressed as `merge([(x, 1, 0])`
181
            compress();
182

183
            backup_sampled.clear();
184
            backup_sampled.reserve_exact(sampled.size() + other.sampled.size());
185
            double merged_relative_error = std::max(relative_error, other.relative_error);
186
            size_t merged_count = count + other.count;
187
            Int64 additional_self_delta = static_cast<Int64>(std::floor(2 * other.relative_error * other.count));
188
            Int64 additional_other_delta = static_cast<Int64>(std::floor(2 * relative_error * count));
189

190
            // Do a merge of two sorted lists until one of the lists is fully consumed
191
            size_t self_idx = 0;
192
            size_t other_idx = 0;
193
            while (self_idx < sampled.size() && other_idx < other.sampled.size())
194
            {
195
                const Stats & self_sample = sampled[self_idx];
196
                const Stats & other_sample = other.sampled[other_idx];
197

198
                // Detect next sample
199
                Stats next_sample;
200
                Int64 additional_delta = 0;
201
                if (self_sample.value < other_sample.value)
202
                {
203
                    ++self_idx;
204
                    next_sample = self_sample;
205
                    additional_delta = other_idx > 0 ? additional_self_delta : 0;
206
                }
207
                else
208
                {
209
                    ++other_idx;
210
                    next_sample = other_sample;
211
                    additional_delta = self_idx > 0 ? additional_other_delta : 0;
212
                }
213

214
                // Insert it
215
                next_sample.delta += additional_delta;
216
                backup_sampled.emplace_back(std::move(next_sample));
217
            }
218

219
            // Copy the remaining samples from the other list
220
            // (by construction, at most one `while` loop will run)
221
            while (self_idx < sampled.size())
222
            {
223
                backup_sampled.emplace_back(sampled[self_idx]);
224
                ++self_idx;
225
            }
226
            while (other_idx < other.sampled.size())
227
            {
228
                backup_sampled.emplace_back(other.sampled[other_idx]);
229
                ++other_idx;
230
            }
231

232
            std::swap(sampled, backup_sampled);
233
            relative_error = merged_relative_error;
234
            count = merged_count;
235
            compress_threshold = other.compress_threshold;
236

237
            doCompress(2 * merged_relative_error * merged_count);
238
            compressed = true;
239
        }
240
    }
241

242
    void write(WriteBuffer & buf) const
243
    {
244
        writeBinaryLittleEndian(compress_threshold, buf);
245
        writeBinaryLittleEndian(relative_error, buf);
246
        writeBinaryLittleEndian(count, buf);
247
        writeBinaryLittleEndian(sampled.size(), buf);
248

249
        for (const auto & stats : sampled)
250
        {
251
            writeBinaryLittleEndian(stats.value, buf);
252
            writeBinaryLittleEndian(stats.g, buf);
253
            writeBinaryLittleEndian(stats.delta, buf);
254
        }
255
    }
256

257
    void read(ReadBuffer & buf)
258
    {
259
        readBinaryLittleEndian(compress_threshold, buf);
260
        if (compress_threshold != default_compress_threshold)
261
            throw Exception(
262
                ErrorCodes::INCORRECT_DATA,
263
                "The compress threshold {} isn't the expected one {}",
264
                compress_threshold,
265
                default_compress_threshold);
266

267
        readBinaryLittleEndian(relative_error, buf);
268
        readBinaryLittleEndian(count, buf);
269

270
        size_t sampled_len = 0;
271
        readBinaryLittleEndian(sampled_len, buf);
272
        sampled.resize_exact(sampled_len);
273

274
        for (size_t i = 0; i < sampled_len; ++i)
275
        {
276
            auto & stats = sampled[i];
277
            readBinaryLittleEndian(stats.value, buf);
278
            readBinaryLittleEndian(stats.g, buf);
279
            readBinaryLittleEndian(stats.delta, buf);
280
        }
281
    }
282

283
private:
284
    QueryResult findApproxQuantile(size_t index, Int64 min_rank_at_index, double target_error, double percentile) const
285
    {
286
        Stats curr_sample = sampled[index];
287
        Int64 rank = static_cast<Int64>(std::ceil(percentile * count));
288
        size_t i = index;
289
        Int64 min_rank = min_rank_at_index;
290
        while (i < sampled.size() - 1)
291
        {
292
            Int64 max_rank = min_rank + curr_sample.delta;
293
            if (max_rank - target_error <= rank && rank <= min_rank + target_error)
294
                return {i, min_rank, curr_sample.value};
295
            else
296
            {
297
                ++i;
298
                curr_sample = sampled[i];
299
                min_rank += curr_sample.g;
300
            }
301
        }
302
        return {sampled.size() - 1, 0, sampled.back().value};
303
    }
304

305
    void withHeadBufferInserted()
306
    {
307
        if (head_sampled.empty())
308
            return;
309

310
        bool use_radix_sort = head_sampled.size() >= 256 && (is_arithmetic_v<T> && !is_big_int_v<T>);
311
        if (use_radix_sort)
312
            RadixSort<RadixSortNumTraits<T>>::executeLSD(head_sampled.data(), head_sampled.size());
313
        else
314
            ::sort(head_sampled.begin(), head_sampled.end());
315

316
        backup_sampled.clear();
317
        backup_sampled.reserve_exact(sampled.size() + head_sampled.size());
318

319
        size_t sample_idx = 0;
320
        size_t ops_idx = 0;
321
        size_t current_count = count;
322
        for (; ops_idx < head_sampled.size(); ++ops_idx)
323
        {
324
            T current_sample = head_sampled[ops_idx];
325

326
            // Add all the samples before the next observation.
327
            while (sample_idx < sampled.size() && sampled[sample_idx].value <= current_sample)
328
            {
329
                backup_sampled.emplace_back(sampled[sample_idx]);
330
                ++sample_idx;
331
            }
332

333
            // If it is the first one to insert, of if it is the last one
334
            ++current_count;
335
            Int64 delta;
336
            if (backup_sampled.empty() || (sample_idx == sampled.size() && ops_idx == (head_sampled.size() - 1)))
337
                delta = 0;
338
            else
339
                delta = static_cast<Int64>(std::floor(2 * relative_error * current_count));
340

341
            backup_sampled.emplace_back(current_sample, 1, delta);
342
        }
343

344
        // Add all the remaining existing samples
345
        for (; sample_idx < sampled.size(); ++sample_idx)
346
            backup_sampled.emplace_back(sampled[sample_idx]);
347

348
        std::swap(sampled, backup_sampled);
349
        head_sampled.clear();
350
        count = current_count;
351
    }
352

353

354
    void doCompress(double merge_threshold)
355
    {
356
        if (sampled.empty())
357
            return;
358

359
        backup_sampled.clear();
360
        // Start for the last element, which is always part of the set.
361
        // The head contains the current new head, that may be merged with the current element.
362
        Stats head = sampled.back();
363
        ssize_t i = sampled.size() - 2;
364

365
        // Do not compress the last element
366
        while (i >= 1)
367
        {
368
            // The current sample:
369
            const auto & sample1 = sampled[i];
370
            // Do we need to compress?
371
            if (sample1.g + head.g + head.delta < merge_threshold)
372
            {
373
                // Do not insert yet, just merge the current element into the head.
374
                head.g += sample1.g;
375
            }
376
            else
377
            {
378
                // Prepend the current head, and keep the current sample as target for merging.
379
                backup_sampled.push_back(head);
380
                head = sample1;
381
            }
382
            --i;
383
        }
384

385
        backup_sampled.push_back(head);
386
        // If necessary, add the minimum element:
387
        auto curr_head = sampled.front();
388

389
        // don't add the minimum element if `currentSamples` has only one element (both `currHead` and
390
        // `head` point to the same element)
391
        if (curr_head.value <= head.value && sampled.size() > 1)
392
            backup_sampled.emplace_back(sampled.front());
393

394
        std::reverse(backup_sampled.begin(), backup_sampled.end());
395
        std::swap(sampled, backup_sampled);
396
    }
397

398
    double relative_error;
399
    size_t compress_threshold;
400
    size_t count;
401
    bool compressed;
402

403
    PaddedPODArray<Stats> sampled;
404
    PaddedPODArray<Stats> backup_sampled;
405
    PaddedPODArray<T> head_sampled;
406

407
    static constexpr size_t default_compress_threshold = 10000;
408
    static constexpr size_t default_head_size = 50000;
409
};
410

411
template <typename Value>
412
class QuantileGK
413
{
414
private:
415
    using Data = ApproxSampler<Value>;
416
    Data data;
417

418
public:
419
    QuantileGK() = default;
420

421
    explicit QuantileGK(size_t accuracy) : data(1.0 / static_cast<double>(accuracy)) { }
422

423
    void add(const Value & x) { data.insert(x); }
424

425
    template <typename Weight>
426
    void add(const Value &, const Weight &)
427
    {
428
        throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method add with weight is not implemented for GKSampler");
429
    }
430

431
    void merge(const QuantileGK & rhs)
432
    {
433
        if (!data.isCompressed())
434
            data.compress();
435

436
        if (rhs.data.isCompressed())
437
            data.merge(rhs.data);
438
        else
439
        {
440
            /// We can't modify rhs, so copy it and compress
441
            Data rhs_data_copy(rhs.data);
442
            rhs_data_copy.compress();
443
            data.merge(rhs_data_copy);
444
        }
445
    }
446

447
    void serialize(WriteBuffer & buf) const
448
    {
449
        if (data.isCompressed())
450
            data.write(buf);
451
        else
452
        {
453
            /// We can't modify rhs, so copy it and compress
454
            Data data_copy(data);
455
            data_copy.compress();
456
            data_copy.write(buf);
457
        }
458
    }
459

460
    void deserialize(ReadBuffer & buf)
461
    {
462
        data.read(buf);
463
        /// Serialized data is always compressed
464
        data.setCompressed();
465
    }
466

467
    /// Get the value of the `level` quantile. The level must be between 0 and 1.
468
    Value get(Float64 level)
469
    {
470
        if (!data.isCompressed())
471
            data.compress();
472

473
        Value res;
474
        size_t indice = 0;
475
        data.query(&level, &indice, 1, &res);
476
        return res;
477
    }
478

479
    /// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.
480
    /// indices - an array of index levels such that the corresponding elements will go in ascending order.
481
    void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result)
482
    {
483
        if (!data.isCompressed())
484
            data.compress();
485

486
        data.query(levels, indices, size, result);
487
    }
488

489
    Float64 getFloat64(Float64 /*level*/)
490
    {
491
        throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getFloat64 is not implemented for GKSampler");
492
    }
493

494
    void getManyFloat(const Float64 * /*levels*/, const size_t * /*indices*/, size_t /*size*/, Float64 * /*result*/)
495
    {
496
        throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getManyFloat is not implemented for GKSampler");
497
    }
498
};
499

500
template <typename Value, bool _> using FuncQuantileGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantileGK, false, void, false, true>;
501
template <typename Value, bool _> using FuncQuantilesGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantilesGK, false, void, true, true>;
502

503
template <template <typename, bool> class Function>
504
AggregateFunctionPtr createAggregateFunctionQuantile(
505
    const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
506
{
507
    if (argument_types.empty())
508
        throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Aggregate function {} requires at least one argument", name);
509

510
    const DataTypePtr & argument_type = argument_types[0];
511
    WhichDataType which(argument_type);
512

513
#define DISPATCH(TYPE) \
514
    if (which.idx == TypeIndex::TYPE) \
515
        return std::make_shared<Function<TYPE, true>>(argument_types, params);
516
    FOR_BASIC_NUMERIC_TYPES(DISPATCH)
517
#undef DISPATCH
518

519
    if (which.idx == TypeIndex::Date) return std::make_shared<Function<DataTypeDate::FieldType, false>>(argument_types, params);
520
    if (which.idx == TypeIndex::DateTime) return std::make_shared<Function<DataTypeDateTime::FieldType, false>>(argument_types, params);
521

522
    if (which.idx == TypeIndex::Decimal32) return std::make_shared<Function<Decimal32, false>>(argument_types, params);
523
    if (which.idx == TypeIndex::Decimal64) return std::make_shared<Function<Decimal64, false>>(argument_types, params);
524
    if (which.idx == TypeIndex::Decimal128) return std::make_shared<Function<Decimal128, false>>(argument_types, params);
525
    if (which.idx == TypeIndex::Decimal256) return std::make_shared<Function<Decimal256, false>>(argument_types, params);
526
    if (which.idx == TypeIndex::DateTime64) return std::make_shared<Function<DateTime64, false>>(argument_types, params);
527

528
    if (which.idx == TypeIndex::Int128) return std::make_shared<Function<Int128, true>>(argument_types, params);
529
    if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<UInt128, true>>(argument_types, params);
530
    if (which.idx == TypeIndex::Int256) return std::make_shared<Function<Int256, true>>(argument_types, params);
531
    if (which.idx == TypeIndex::UInt256) return std::make_shared<Function<UInt256, true>>(argument_types, params);
532

533
    throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",
534
                    argument_type->getName(), name);
535
}
536

537
}
538

539
void registerAggregateFunctionsQuantileApprox(AggregateFunctionFactory & factory)
540
{
541
    /// For aggregate functions returning array we cannot return NULL on empty set.
542
    AggregateFunctionProperties properties = { .returns_default_when_only_null = true };
543

544
    factory.registerFunction(NameQuantileGK::name, createAggregateFunctionQuantile<FuncQuantileGK>);
545
    factory.registerFunction(NameQuantilesGK::name, {createAggregateFunctionQuantile<FuncQuantilesGK>, properties});
546

547
    /// 'median' is an alias for 'quantile'
548
    factory.registerAlias("medianGK", NameQuantileGK::name);
549
}
550

551
}
552

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.