ClickHouse
551 строка · 20.0 Кб
1#include <AggregateFunctions/AggregateFunctionQuantile.h>
2#include <AggregateFunctions/AggregateFunctionFactory.h>
3#include <AggregateFunctions/Helpers.h>
4#include <DataTypes/DataTypeDate.h>
5#include <DataTypes/DataTypeDateTime.h>
6#include <Core/Field.h>
7#include <cmath>
8#include <Common/RadixSort.h>
9#include <IO/WriteBuffer.h>
10#include <IO/ReadBuffer.h>
11#include <IO/WriteHelpers.h>
12#include <IO/ReadHelpers.h>
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
20extern const int ILLEGAL_TYPE_OF_ARGUMENT;
21extern const int INCORRECT_DATA;
22extern const int LOGICAL_ERROR;
23extern const int NOT_IMPLEMENTED;
24}
25
26namespace
27{
28
29template <typename T>
30class ApproxSampler
31{
32public:
33struct Stats
34{
35T value; // The sampled value
36Int64 g; // The minimum rank jump from the previous value's minimum rank
37Int64 delta; // The maximum span of the rank
38
39Stats() = default;
40Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) { }
41};
42
43struct QueryResult
44{
45size_t index;
46Int64 rank;
47T value;
48
49QueryResult(size_t index_, Int64 rank_, T value_) : index(index_), rank(rank_), value(value_) { }
50};
51
52ApproxSampler() = default;
53
54ApproxSampler(const ApproxSampler & other)
55: relative_error(other.relative_error)
56, compress_threshold(other.compress_threshold)
57, count(other.count)
58, compressed(other.compressed)
59, sampled(other.sampled.begin(), other.sampled.end())
60, backup_sampled(other.backup_sampled.begin(), other.backup_sampled.end())
61, head_sampled(other.head_sampled.begin(), other.head_sampled.end())
62{
63}
64
65explicit ApproxSampler(double relative_error_)
66: relative_error(relative_error_), compress_threshold(default_compress_threshold), count(0), compressed(false)
67{
68}
69
70bool isCompressed() const { return compressed; }
71void setCompressed() { compressed = true; }
72
73void insert(T x)
74{
75head_sampled.push_back(x);
76compressed = false;
77if (head_sampled.size() >= default_head_size)
78{
79withHeadBufferInserted();
80if (sampled.size() >= compress_threshold)
81compress();
82}
83}
84
85void query(const Float64 * percentiles, const size_t * indices, size_t size, T * result) const
86{
87if (!head_sampled.empty())
88throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot operate on an uncompressed summary, call compress() first");
89
90if (sampled.empty())
91{
92for (size_t i = 0; i < size; ++i)
93result[i] = T();
94return;
95}
96
97Int64 current_max = std::numeric_limits<Int64>::min();
98for (const auto & stats : sampled)
99current_max = std::max(stats.delta + stats.g, current_max);
100Int64 target_error = current_max / 2;
101
102size_t index = 0;
103auto min_rank = sampled[0].g;
104for (size_t i = 0; i < size; ++i)
105{
106double percentile = percentiles[indices[i]];
107if (percentile <= relative_error)
108{
109result[indices[i]] = sampled.front().value;
110}
111else if (percentile >= 1 - relative_error)
112{
113result[indices[i]] = sampled.back().value;
114}
115else
116{
117QueryResult res = findApproxQuantile(index, min_rank, target_error, percentile);
118index = res.index;
119min_rank = res.rank;
120result[indices[i]] = res.value;
121}
122}
123}
124
125void compress()
126{
127if (compressed)
128return;
129
130withHeadBufferInserted();
131
132doCompress(2 * relative_error * count);
133compressed = true;
134}
135
136
137void merge(const ApproxSampler & other)
138{
139if (other.count == 0)
140return;
141else if (count == 0)
142{
143compress_threshold = other.compress_threshold;
144relative_error = other.relative_error;
145count = other.count;
146compressed = other.compressed;
147
148sampled.resize_exact(other.sampled.size());
149memcpy(sampled.data(), other.sampled.data(), sizeof(Stats) * other.sampled.size());
150return;
151}
152else
153{
154// Merge the two buffers.
155// The GK algorithm is a bit unclear about it, but we need to adjust the statistics during the
156// merging. The main idea is that samples that come from one side will suffer from the lack of
157// precision of the other.
158// As a concrete example, take two QuantileSummaries whose samples (value, g, delta) are:
159// `a = [(0, 1, 0), (20, 99, 0)]` and `b = [(10, 1, 0), (30, 49, 0)]`
160// This means `a` has 100 values, whose minimum is 0 and maximum is 20,
161// while `b` has 50 values, between 10 and 30.
162// The resulting samples of the merge will be:
163// a+b = [(0, 1, 0), (10, 1, ??), (20, 99, ??), (30, 49, 0)]
164// The values of `g` do not change, as they represent the minimum number of values between two
165// consecutive samples. The values of `delta` should be adjusted, however.
166// Take the case of the sample `10` from `b`. In the original stream, it could have appeared
167// right after `0` (as expressed by `g=1`) or right before `20`, so `delta=99+0-1=98`.
168// In the GK algorithm's style of working in terms of maximum bounds, one can observe that the
169// maximum additional uncertainty over samples coming from `b` is `max(g_a + delta_a) =
170// floor(2 * eps_a * n_a)`. Likewise, additional uncertainty over samples from `a` is
171// `floor(2 * eps_b * n_b)`.
172// Only samples that interleave the other side are affected. That means that samples from
173// one side that are lesser (or greater) than all samples from the other side are just copied
174// unmodified.
175// If the merging instances have different `relativeError`, the resulting instance will carry
176// the largest one: `eps_ab = max(eps_a, eps_b)`.
177// The main invariant of the GK algorithm is kept:
178// `max(g_ab + delta_ab) <= floor(2 * eps_ab * (n_a + n_b))` since
179// `max(g_ab + delta_ab) <= floor(2 * eps_a * n_a) + floor(2 * eps_b * n_b)`
180// Finally, one can see how the `insert(x)` operation can be expressed as `merge([(x, 1, 0])`
181compress();
182
183backup_sampled.clear();
184backup_sampled.reserve_exact(sampled.size() + other.sampled.size());
185double merged_relative_error = std::max(relative_error, other.relative_error);
186size_t merged_count = count + other.count;
187Int64 additional_self_delta = static_cast<Int64>(std::floor(2 * other.relative_error * other.count));
188Int64 additional_other_delta = static_cast<Int64>(std::floor(2 * relative_error * count));
189
190// Do a merge of two sorted lists until one of the lists is fully consumed
191size_t self_idx = 0;
192size_t other_idx = 0;
193while (self_idx < sampled.size() && other_idx < other.sampled.size())
194{
195const Stats & self_sample = sampled[self_idx];
196const Stats & other_sample = other.sampled[other_idx];
197
198// Detect next sample
199Stats next_sample;
200Int64 additional_delta = 0;
201if (self_sample.value < other_sample.value)
202{
203++self_idx;
204next_sample = self_sample;
205additional_delta = other_idx > 0 ? additional_self_delta : 0;
206}
207else
208{
209++other_idx;
210next_sample = other_sample;
211additional_delta = self_idx > 0 ? additional_other_delta : 0;
212}
213
214// Insert it
215next_sample.delta += additional_delta;
216backup_sampled.emplace_back(std::move(next_sample));
217}
218
219// Copy the remaining samples from the other list
220// (by construction, at most one `while` loop will run)
221while (self_idx < sampled.size())
222{
223backup_sampled.emplace_back(sampled[self_idx]);
224++self_idx;
225}
226while (other_idx < other.sampled.size())
227{
228backup_sampled.emplace_back(other.sampled[other_idx]);
229++other_idx;
230}
231
232std::swap(sampled, backup_sampled);
233relative_error = merged_relative_error;
234count = merged_count;
235compress_threshold = other.compress_threshold;
236
237doCompress(2 * merged_relative_error * merged_count);
238compressed = true;
239}
240}
241
242void write(WriteBuffer & buf) const
243{
244writeBinaryLittleEndian(compress_threshold, buf);
245writeBinaryLittleEndian(relative_error, buf);
246writeBinaryLittleEndian(count, buf);
247writeBinaryLittleEndian(sampled.size(), buf);
248
249for (const auto & stats : sampled)
250{
251writeBinaryLittleEndian(stats.value, buf);
252writeBinaryLittleEndian(stats.g, buf);
253writeBinaryLittleEndian(stats.delta, buf);
254}
255}
256
257void read(ReadBuffer & buf)
258{
259readBinaryLittleEndian(compress_threshold, buf);
260if (compress_threshold != default_compress_threshold)
261throw Exception(
262ErrorCodes::INCORRECT_DATA,
263"The compress threshold {} isn't the expected one {}",
264compress_threshold,
265default_compress_threshold);
266
267readBinaryLittleEndian(relative_error, buf);
268readBinaryLittleEndian(count, buf);
269
270size_t sampled_len = 0;
271readBinaryLittleEndian(sampled_len, buf);
272sampled.resize_exact(sampled_len);
273
274for (size_t i = 0; i < sampled_len; ++i)
275{
276auto & stats = sampled[i];
277readBinaryLittleEndian(stats.value, buf);
278readBinaryLittleEndian(stats.g, buf);
279readBinaryLittleEndian(stats.delta, buf);
280}
281}
282
283private:
284QueryResult findApproxQuantile(size_t index, Int64 min_rank_at_index, double target_error, double percentile) const
285{
286Stats curr_sample = sampled[index];
287Int64 rank = static_cast<Int64>(std::ceil(percentile * count));
288size_t i = index;
289Int64 min_rank = min_rank_at_index;
290while (i < sampled.size() - 1)
291{
292Int64 max_rank = min_rank + curr_sample.delta;
293if (max_rank - target_error <= rank && rank <= min_rank + target_error)
294return {i, min_rank, curr_sample.value};
295else
296{
297++i;
298curr_sample = sampled[i];
299min_rank += curr_sample.g;
300}
301}
302return {sampled.size() - 1, 0, sampled.back().value};
303}
304
305void withHeadBufferInserted()
306{
307if (head_sampled.empty())
308return;
309
310bool use_radix_sort = head_sampled.size() >= 256 && (is_arithmetic_v<T> && !is_big_int_v<T>);
311if (use_radix_sort)
312RadixSort<RadixSortNumTraits<T>>::executeLSD(head_sampled.data(), head_sampled.size());
313else
314::sort(head_sampled.begin(), head_sampled.end());
315
316backup_sampled.clear();
317backup_sampled.reserve_exact(sampled.size() + head_sampled.size());
318
319size_t sample_idx = 0;
320size_t ops_idx = 0;
321size_t current_count = count;
322for (; ops_idx < head_sampled.size(); ++ops_idx)
323{
324T current_sample = head_sampled[ops_idx];
325
326// Add all the samples before the next observation.
327while (sample_idx < sampled.size() && sampled[sample_idx].value <= current_sample)
328{
329backup_sampled.emplace_back(sampled[sample_idx]);
330++sample_idx;
331}
332
333// If it is the first one to insert, of if it is the last one
334++current_count;
335Int64 delta;
336if (backup_sampled.empty() || (sample_idx == sampled.size() && ops_idx == (head_sampled.size() - 1)))
337delta = 0;
338else
339delta = static_cast<Int64>(std::floor(2 * relative_error * current_count));
340
341backup_sampled.emplace_back(current_sample, 1, delta);
342}
343
344// Add all the remaining existing samples
345for (; sample_idx < sampled.size(); ++sample_idx)
346backup_sampled.emplace_back(sampled[sample_idx]);
347
348std::swap(sampled, backup_sampled);
349head_sampled.clear();
350count = current_count;
351}
352
353
354void doCompress(double merge_threshold)
355{
356if (sampled.empty())
357return;
358
359backup_sampled.clear();
360// Start for the last element, which is always part of the set.
361// The head contains the current new head, that may be merged with the current element.
362Stats head = sampled.back();
363ssize_t i = sampled.size() - 2;
364
365// Do not compress the last element
366while (i >= 1)
367{
368// The current sample:
369const auto & sample1 = sampled[i];
370// Do we need to compress?
371if (sample1.g + head.g + head.delta < merge_threshold)
372{
373// Do not insert yet, just merge the current element into the head.
374head.g += sample1.g;
375}
376else
377{
378// Prepend the current head, and keep the current sample as target for merging.
379backup_sampled.push_back(head);
380head = sample1;
381}
382--i;
383}
384
385backup_sampled.push_back(head);
386// If necessary, add the minimum element:
387auto curr_head = sampled.front();
388
389// don't add the minimum element if `currentSamples` has only one element (both `currHead` and
390// `head` point to the same element)
391if (curr_head.value <= head.value && sampled.size() > 1)
392backup_sampled.emplace_back(sampled.front());
393
394std::reverse(backup_sampled.begin(), backup_sampled.end());
395std::swap(sampled, backup_sampled);
396}
397
398double relative_error;
399size_t compress_threshold;
400size_t count;
401bool compressed;
402
403PaddedPODArray<Stats> sampled;
404PaddedPODArray<Stats> backup_sampled;
405PaddedPODArray<T> head_sampled;
406
407static constexpr size_t default_compress_threshold = 10000;
408static constexpr size_t default_head_size = 50000;
409};
410
411template <typename Value>
412class QuantileGK
413{
414private:
415using Data = ApproxSampler<Value>;
416Data data;
417
418public:
419QuantileGK() = default;
420
421explicit QuantileGK(size_t accuracy) : data(1.0 / static_cast<double>(accuracy)) { }
422
423void add(const Value & x) { data.insert(x); }
424
425template <typename Weight>
426void add(const Value &, const Weight &)
427{
428throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method add with weight is not implemented for GKSampler");
429}
430
431void merge(const QuantileGK & rhs)
432{
433if (!data.isCompressed())
434data.compress();
435
436if (rhs.data.isCompressed())
437data.merge(rhs.data);
438else
439{
440/// We can't modify rhs, so copy it and compress
441Data rhs_data_copy(rhs.data);
442rhs_data_copy.compress();
443data.merge(rhs_data_copy);
444}
445}
446
447void serialize(WriteBuffer & buf) const
448{
449if (data.isCompressed())
450data.write(buf);
451else
452{
453/// We can't modify rhs, so copy it and compress
454Data data_copy(data);
455data_copy.compress();
456data_copy.write(buf);
457}
458}
459
460void deserialize(ReadBuffer & buf)
461{
462data.read(buf);
463/// Serialized data is always compressed
464data.setCompressed();
465}
466
467/// Get the value of the `level` quantile. The level must be between 0 and 1.
468Value get(Float64 level)
469{
470if (!data.isCompressed())
471data.compress();
472
473Value res;
474size_t indice = 0;
475data.query(&level, &indice, 1, &res);
476return res;
477}
478
479/// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.
480/// indices - an array of index levels such that the corresponding elements will go in ascending order.
481void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result)
482{
483if (!data.isCompressed())
484data.compress();
485
486data.query(levels, indices, size, result);
487}
488
489Float64 getFloat64(Float64 /*level*/)
490{
491throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getFloat64 is not implemented for GKSampler");
492}
493
494void getManyFloat(const Float64 * /*levels*/, const size_t * /*indices*/, size_t /*size*/, Float64 * /*result*/)
495{
496throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getManyFloat is not implemented for GKSampler");
497}
498};
499
500template <typename Value, bool _> using FuncQuantileGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantileGK, false, void, false, true>;
501template <typename Value, bool _> using FuncQuantilesGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantilesGK, false, void, true, true>;
502
503template <template <typename, bool> class Function>
504AggregateFunctionPtr createAggregateFunctionQuantile(
505const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
506{
507if (argument_types.empty())
508throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Aggregate function {} requires at least one argument", name);
509
510const DataTypePtr & argument_type = argument_types[0];
511WhichDataType which(argument_type);
512
513#define DISPATCH(TYPE) \
514if (which.idx == TypeIndex::TYPE) \
515return std::make_shared<Function<TYPE, true>>(argument_types, params);
516FOR_BASIC_NUMERIC_TYPES(DISPATCH)
517#undef DISPATCH
518
519if (which.idx == TypeIndex::Date) return std::make_shared<Function<DataTypeDate::FieldType, false>>(argument_types, params);
520if (which.idx == TypeIndex::DateTime) return std::make_shared<Function<DataTypeDateTime::FieldType, false>>(argument_types, params);
521
522if (which.idx == TypeIndex::Decimal32) return std::make_shared<Function<Decimal32, false>>(argument_types, params);
523if (which.idx == TypeIndex::Decimal64) return std::make_shared<Function<Decimal64, false>>(argument_types, params);
524if (which.idx == TypeIndex::Decimal128) return std::make_shared<Function<Decimal128, false>>(argument_types, params);
525if (which.idx == TypeIndex::Decimal256) return std::make_shared<Function<Decimal256, false>>(argument_types, params);
526if (which.idx == TypeIndex::DateTime64) return std::make_shared<Function<DateTime64, false>>(argument_types, params);
527
528if (which.idx == TypeIndex::Int128) return std::make_shared<Function<Int128, true>>(argument_types, params);
529if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<UInt128, true>>(argument_types, params);
530if (which.idx == TypeIndex::Int256) return std::make_shared<Function<Int256, true>>(argument_types, params);
531if (which.idx == TypeIndex::UInt256) return std::make_shared<Function<UInt256, true>>(argument_types, params);
532
533throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",
534argument_type->getName(), name);
535}
536
537}
538
539void registerAggregateFunctionsQuantileApprox(AggregateFunctionFactory & factory)
540{
541/// For aggregate functions returning array we cannot return NULL on empty set.
542AggregateFunctionProperties properties = { .returns_default_when_only_null = true };
543
544factory.registerFunction(NameQuantileGK::name, createAggregateFunctionQuantile<FuncQuantileGK>);
545factory.registerFunction(NameQuantilesGK::name, {createAggregateFunctionQuantile<FuncQuantilesGK>, properties});
546
547/// 'median' is an alias for 'quantile'
548factory.registerAlias("medianGK", NameQuantileGK::name);
549}
550
551}
552