ClickHouse
426 строк · 13.1 Кб
1#include <AggregateFunctions/AggregateFunctionFactory.h>
2#include <AggregateFunctions/FactoryHelpers.h>
3#include <AggregateFunctions/Helpers.h>
4#include <Common/FieldVisitorConvertToNumber.h>
5
6#include <Common/NaNUtils.h>
7
8#include <Columns/ColumnVector.h>
9#include <Columns/ColumnTuple.h>
10#include <Columns/ColumnArray.h>
11#include <Common/assert_cast.h>
12
13#include <DataTypes/DataTypesNumber.h>
14#include <DataTypes/DataTypeArray.h>
15#include <DataTypes/DataTypeTuple.h>
16
17#include <IO/WriteBuffer.h>
18#include <IO/ReadBuffer.h>
19#include <IO/WriteHelpers.h>
20#include <IO/ReadHelpers.h>
21#include <IO/VarInt.h>
22
23#include <AggregateFunctions/IAggregateFunction.h>
24
25#include <queue>
26#include <cmath>
27#include <cstddef>
28
29
30namespace DB
31{
32struct Settings;
33
34namespace ErrorCodes
35{
36extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
37extern const int ILLEGAL_TYPE_OF_ARGUMENT;
38extern const int BAD_ARGUMENTS;
39extern const int UNSUPPORTED_PARAMETER;
40extern const int PARAMETER_OUT_OF_BOUND;
41extern const int TOO_LARGE_ARRAY_SIZE;
42extern const int INCORRECT_DATA;
43}
44
45
46namespace
47{
48
49/** distance compression algorithm implementation
50* http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
51*/
52class AggregateFunctionHistogramData
53{
54public:
55using Mean = Float64;
56using Weight = Float64;
57
58constexpr static size_t bins_count_limit = 250;
59
60private:
61struct WeightedValue
62{
63Mean mean;
64Weight weight;
65
66WeightedValue operator+(const WeightedValue & other) const
67{
68return {mean + other.weight * (other.mean - mean) / (other.weight + weight), other.weight + weight};
69}
70};
71
72// quantity of stored weighted-values
73UInt32 size;
74
75// calculated lower and upper bounds of seen points
76Mean lower_bound;
77Mean upper_bound;
78
79// Weighted values representation of histogram.
80WeightedValue points[0];
81
82void sort()
83{
84::sort(points, points + size,
85[](const WeightedValue & first, const WeightedValue & second)
86{
87return first.mean < second.mean;
88});
89}
90
91template <typename T>
92struct PriorityQueueStorage
93{
94size_t size = 0;
95T * data_ptr;
96
97explicit PriorityQueueStorage(T * value)
98: data_ptr(value)
99{
100}
101
102void push_back(T val) /// NOLINT
103{
104data_ptr[size] = std::move(val);
105++size;
106}
107
108void pop_back() { --size; } /// NOLINT
109T * begin() { return data_ptr; }
110T * end() const { return data_ptr + size; }
111bool empty() const { return size == 0; }
112T & front() { return *data_ptr; }
113const T & front() const { return *data_ptr; }
114
115using value_type = T;
116using reference = T&;
117using const_reference = const T&;
118using size_type = size_t;
119};
120
121/**
122* Repeatedly fuse most close values until max_bins bins left
123*/
124void compress(UInt32 max_bins)
125{
126sort();
127auto new_size = size;
128if (size <= max_bins)
129return;
130
131// Maintain doubly-linked list of "active" points
132// and store neighbour pairs in priority queue by distance
133UInt32 previous[size + 1];
134UInt32 next[size + 1];
135bool active[size + 1];
136std::fill(active, active + size, true);
137active[size] = false;
138
139auto delete_node = [&](UInt32 i)
140{
141previous[next[i]] = previous[i];
142next[previous[i]] = next[i];
143active[i] = false;
144};
145
146for (size_t i = 0; i <= size; ++i)
147{
148previous[i] = static_cast<UInt32>(i - 1);
149next[i] = static_cast<UInt32>(i + 1);
150}
151
152next[size] = 0;
153previous[0] = size;
154
155using QueueItem = std::pair<Mean, UInt32>;
156
157QueueItem storage[2 * size - max_bins];
158
159std::priority_queue<
160QueueItem,
161PriorityQueueStorage<QueueItem>,
162std::greater<>>
163queue{std::greater<>(),
164PriorityQueueStorage<QueueItem>(storage)};
165
166auto quality = [&](UInt32 i) { return points[next[i]].mean - points[i].mean; };
167
168for (size_t i = 0; i + 1 < size; ++i)
169queue.push({quality(static_cast<UInt32>(i)), i});
170
171while (new_size > max_bins && !queue.empty())
172{
173auto min_item = queue.top();
174queue.pop();
175auto left = min_item.second;
176auto right = next[left];
177
178if (!active[left] || !active[right] || quality(left) > min_item.first)
179continue;
180
181points[left] = points[left] + points[right];
182
183delete_node(right);
184if (active[next[left]])
185queue.push({quality(left), left});
186if (active[previous[left]])
187queue.push({quality(previous[left]), previous[left]});
188
189--new_size;
190}
191
192size_t left = 0;
193for (size_t right = 0; right < size; ++right)
194{
195if (active[right])
196{
197points[left] = points[right];
198++left;
199}
200}
201size = new_size;
202}
203
204/***
205* Delete too close points from histogram.
206* Assumes that points are sorted.
207*/
208void unique()
209{
210if (size == 0)
211return;
212
213size_t left = 0;
214
215for (auto right = left + 1; right < size; ++right)
216{
217// Fuse points if their text representations differ only in last digit
218auto min_diff = 10 * (points[left].mean + points[right].mean) * std::numeric_limits<Mean>::epsilon();
219if (points[left].mean + std::fabs(min_diff) >= points[right].mean)
220{
221points[left] = points[left] + points[right];
222}
223else
224{
225++left;
226points[left] = points[right];
227}
228}
229size = static_cast<UInt32>(left + 1);
230}
231
232public:
233AggregateFunctionHistogramData()
234: size(0)
235, lower_bound(std::numeric_limits<Mean>::max())
236, upper_bound(std::numeric_limits<Mean>::lowest())
237{
238static_assert(offsetof(AggregateFunctionHistogramData, points) == sizeof(AggregateFunctionHistogramData), "points should be last member");
239}
240
241static size_t structSize(size_t max_bins)
242{
243return sizeof(AggregateFunctionHistogramData) + max_bins * 2 * sizeof(WeightedValue);
244}
245
246void insertResultInto(ColumnVector<Mean> & to_lower, ColumnVector<Mean> & to_upper, ColumnVector<Weight> & to_weights, UInt32 max_bins)
247{
248compress(max_bins);
249unique();
250
251for (size_t i = 0; i < size; ++i)
252{
253to_lower.insertValue((i == 0) ? lower_bound : (points[i].mean + points[i - 1].mean) / 2);
254to_upper.insertValue((i + 1 == size) ? upper_bound : (points[i].mean + points[i + 1].mean) / 2);
255
256// linear density approximation
257Weight lower_weight = (i == 0) ? points[i].weight : ((points[i - 1].weight) + points[i].weight * 3) / 4;
258Weight upper_weight = (i + 1 == size) ? points[i].weight : (points[i + 1].weight + points[i].weight * 3) / 4;
259to_weights.insertValue((lower_weight + upper_weight) / 2);
260}
261}
262
263void add(Mean value, Weight weight, UInt32 max_bins)
264{
265// nans break sort and compression
266// infs don't fit in bins partition method
267if (!isFinite(value))
268throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid value (inf or nan) for aggregation by 'histogram' function");
269
270points[size] = {value, weight};
271++size;
272lower_bound = std::min(lower_bound, value);
273upper_bound = std::max(upper_bound, value);
274
275if (size >= max_bins * 2)
276compress(max_bins);
277}
278
279void merge(const AggregateFunctionHistogramData & other, UInt32 max_bins)
280{
281lower_bound = std::min(lower_bound, other.lower_bound);
282upper_bound = std::max(upper_bound, other.upper_bound);
283for (size_t i = 0; i < other.size; ++i)
284add(other.points[i].mean, other.points[i].weight, max_bins);
285}
286
287void write(WriteBuffer & buf) const
288{
289writeBinary(lower_bound, buf);
290writeBinary(upper_bound, buf);
291
292writeVarUInt(size, buf);
293buf.write(reinterpret_cast<const char *>(points), size * sizeof(WeightedValue));
294}
295
296void read(ReadBuffer & buf, UInt32 max_bins)
297{
298readBinary(lower_bound, buf);
299readBinary(upper_bound, buf);
300
301readVarUInt(size, buf);
302if (size > max_bins * 2)
303throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too many bins");
304static constexpr size_t max_size = 1_GiB;
305if (size > max_size)
306throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
307"Too large array size in histogram (maximum: {})", max_size);
308
309buf.readStrict(reinterpret_cast<char *>(points), size * sizeof(WeightedValue));
310}
311};
312
313template <typename T>
314class AggregateFunctionHistogram final: public IAggregateFunctionDataHelper<AggregateFunctionHistogramData, AggregateFunctionHistogram<T>>
315{
316private:
317using Data = AggregateFunctionHistogramData;
318
319const UInt32 max_bins;
320
321public:
322AggregateFunctionHistogram(UInt32 max_bins_, const DataTypes & arguments, const Array & params)
323: IAggregateFunctionDataHelper<AggregateFunctionHistogramData, AggregateFunctionHistogram<T>>(arguments, params, createResultType())
324, max_bins(max_bins_)
325{
326}
327
328size_t sizeOfData() const override
329{
330return Data::structSize(max_bins);
331}
332static DataTypePtr createResultType()
333{
334DataTypes types;
335auto mean = std::make_shared<DataTypeNumber<Data::Mean>>();
336auto weight = std::make_shared<DataTypeNumber<Data::Weight>>();
337
338// lower bound
339types.emplace_back(mean);
340// upper bound
341types.emplace_back(mean);
342// weight
343types.emplace_back(weight);
344
345auto tuple = std::make_shared<DataTypeTuple>(types);
346return std::make_shared<DataTypeArray>(tuple);
347}
348
349bool allocatesMemoryInArena() const override { return false; }
350
351void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
352{
353auto val = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
354this->data(place).add(static_cast<Data::Mean>(val), 1, max_bins);
355}
356
357void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
358{
359this->data(place).merge(this->data(rhs), max_bins);
360}
361
362void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
363{
364this->data(place).write(buf);
365}
366
367void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
368{
369this->data(place).read(buf, max_bins);
370}
371
372void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
373{
374auto & data = this->data(place);
375
376auto & to_array = assert_cast<ColumnArray &>(to);
377ColumnArray::Offsets & offsets_to = to_array.getOffsets();
378auto & to_tuple = assert_cast<ColumnTuple &>(to_array.getData());
379
380auto & to_lower = assert_cast<ColumnVector<Data::Mean> &>(to_tuple.getColumn(0));
381auto & to_upper = assert_cast<ColumnVector<Data::Mean> &>(to_tuple.getColumn(1));
382auto & to_weights = assert_cast<ColumnVector<Data::Weight> &>(to_tuple.getColumn(2));
383data.insertResultInto(to_lower, to_upper, to_weights, max_bins);
384
385offsets_to.push_back(to_tuple.size());
386}
387
388String getName() const override { return "histogram"; }
389};
390
391
392AggregateFunctionPtr createAggregateFunctionHistogram(const std::string & name, const DataTypes & arguments, const Array & params, const Settings *)
393{
394if (params.size() != 1)
395throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires single parameter: bins count", name);
396
397if (params[0].getType() != Field::Types::UInt64)
398throw Exception(ErrorCodes::UNSUPPORTED_PARAMETER, "Invalid type for bins count");
399
400UInt32 bins_count = applyVisitor(FieldVisitorConvertToNumber<UInt32>(), params[0]);
401
402auto limit = AggregateFunctionHistogramData::bins_count_limit;
403if (bins_count > limit)
404throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, "Unsupported bins count. Should not be greater than {}", limit);
405
406if (bins_count == 0)
407throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bin count should be positive");
408
409assertUnary(name, arguments);
410AggregateFunctionPtr res(createWithNumericType<AggregateFunctionHistogram>(*arguments[0], bins_count, arguments, params));
411
412if (!res)
413throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
414"Illegal type {} of argument for aggregate function {}", arguments[0]->getName(), name);
415
416return res;
417}
418
419}
420
421void registerAggregateFunctionHistogram(AggregateFunctionFactory & factory)
422{
423factory.registerFunction("histogram", createAggregateFunctionHistogram);
424}
425
426}
427