ClickHouse
162 строки · 6.4 Кб
1#include <AggregateFunctions/AggregateFunctionQuantile.h>
2#include <AggregateFunctions/ReservoirSampler.h>
3#include <AggregateFunctions/AggregateFunctionFactory.h>
4#include <AggregateFunctions/Helpers.h>
5#include <DataTypes/DataTypeDate.h>
6#include <DataTypes/DataTypeDateTime.h>
7#include <Core/Field.h>
8
9
10namespace DB
11{
12
13struct Settings;
14
15namespace ErrorCodes
16{
17extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
18extern const int ILLEGAL_TYPE_OF_ARGUMENT;
19extern const int NOT_IMPLEMENTED;
20}
21
22namespace
23{
24
25/** Quantile calculation with "reservoir sample" algorithm.
26* It collects pseudorandom subset of limited size from a stream of values,
27* and approximate quantile from it.
28* The result is non-deterministic. Also look at QuantileReservoirSamplerDeterministic.
29*
30* This algorithm is quite inefficient in terms of precision for memory usage,
31* but very efficient in CPU (though less efficient than QuantileTiming and than QuantileExact for small sets).
32*/
33template <typename Value>
34struct QuantileReservoirSampler
35{
36using Data = ReservoirSampler<Value, ReservoirSamplerOnEmpty::RETURN_NAN_OR_ZERO>;
37Data data;
38
39void add(const Value & x)
40{
41data.insert(x);
42}
43
44template <typename Weight>
45void add(const Value &, const Weight &)
46{
47throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method add with weight is not implemented for ReservoirSampler");
48}
49
50void merge(const QuantileReservoirSampler & rhs)
51{
52data.merge(rhs.data);
53}
54
55void serialize(WriteBuffer & buf) const
56{
57data.write(buf);
58}
59
60void deserialize(ReadBuffer & buf)
61{
62data.read(buf);
63}
64
65/// Get the value of the `level` quantile. The level must be between 0 and 1.
66Value get(Float64 level)
67{
68if (data.empty())
69return {};
70
71if constexpr (is_decimal<Value>)
72return Value(static_cast<typename Value::NativeType>(data.quantileInterpolated(level)));
73else
74return static_cast<Value>(data.quantileInterpolated(level));
75}
76
77/// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.
78/// indices - an array of index levels such that the corresponding elements will go in ascending order.
79void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result)
80{
81bool is_empty = data.empty();
82
83for (size_t i = 0; i < size; ++i)
84{
85if (is_empty)
86{
87result[i] = Value{};
88}
89else
90{
91if constexpr (is_decimal<Value>)
92result[indices[i]] = Value(static_cast<typename Value::NativeType>(data.quantileInterpolated(levels[indices[i]])));
93else
94result[indices[i]] = Value(data.quantileInterpolated(levels[indices[i]]));
95}
96}
97}
98
99/// The same, but in the case of an empty state, NaN is returned.
100Float64 getFloat(Float64 level)
101{
102return data.quantileInterpolated(level);
103}
104
105void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result)
106{
107for (size_t i = 0; i < size; ++i)
108result[indices[i]] = data.quantileInterpolated(levels[indices[i]]);
109}
110};
111
112
113template <typename Value, bool float_return> using FuncQuantile = AggregateFunctionQuantile<Value, QuantileReservoirSampler<Value>, NameQuantile, false, std::conditional_t<float_return, Float64, void>, false, false>;
114template <typename Value, bool float_return> using FuncQuantiles = AggregateFunctionQuantile<Value, QuantileReservoirSampler<Value>, NameQuantiles, false, std::conditional_t<float_return, Float64, void>, true, false>;
115
116template <template <typename, bool> class Function>
117AggregateFunctionPtr createAggregateFunctionQuantile(
118const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
119{
120if (argument_types.empty())
121throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Aggregate function {} requires at least one argument", name);
122
123const DataTypePtr & argument_type = argument_types[0];
124WhichDataType which(argument_type);
125
126#define DISPATCH(TYPE) \
127if (which.idx == TypeIndex::TYPE) return std::make_shared<Function<TYPE, true>>(argument_types, params);
128FOR_BASIC_NUMERIC_TYPES(DISPATCH)
129#undef DISPATCH
130if (which.idx == TypeIndex::Date) return std::make_shared<Function<DataTypeDate::FieldType, false>>(argument_types, params);
131if (which.idx == TypeIndex::DateTime) return std::make_shared<Function<DataTypeDateTime::FieldType, false>>(argument_types, params);
132
133if (which.idx == TypeIndex::Decimal32) return std::make_shared<Function<Decimal32, false>>(argument_types, params);
134if (which.idx == TypeIndex::Decimal64) return std::make_shared<Function<Decimal64, false>>(argument_types, params);
135if (which.idx == TypeIndex::Decimal128) return std::make_shared<Function<Decimal128, false>>(argument_types, params);
136if (which.idx == TypeIndex::Decimal256) return std::make_shared<Function<Decimal256, false>>(argument_types, params);
137if (which.idx == TypeIndex::DateTime64) return std::make_shared<Function<DateTime64, false>>(argument_types, params);
138
139if (which.idx == TypeIndex::Int128) return std::make_shared<Function<Int128, true>>(argument_types, params);
140if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<UInt128, true>>(argument_types, params);
141if (which.idx == TypeIndex::Int256) return std::make_shared<Function<Int256, true>>(argument_types, params);
142if (which.idx == TypeIndex::UInt256) return std::make_shared<Function<UInt256, true>>(argument_types, params);
143
144throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",
145argument_type->getName(), name);
146}
147
148}
149
150void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
151{
152/// For aggregate functions returning array we cannot return NULL on empty set.
153AggregateFunctionProperties properties = { .returns_default_when_only_null = true };
154
155factory.registerFunction(NameQuantile::name, createAggregateFunctionQuantile<FuncQuantile>);
156factory.registerFunction(NameQuantiles::name, { createAggregateFunctionQuantile<FuncQuantiles>, properties });
157
158/// 'median' is an alias for 'quantile'
159factory.registerAlias("median", NameQuantile::name);
160}
161
162}
163