ClickHouse

Форк
0
/
AggregateFunctionQuantile.cpp 
162 строки · 6.4 Кб
1
#include <AggregateFunctions/AggregateFunctionQuantile.h>
2
#include <AggregateFunctions/ReservoirSampler.h>
3
#include <AggregateFunctions/AggregateFunctionFactory.h>
4
#include <AggregateFunctions/Helpers.h>
5
#include <DataTypes/DataTypeDate.h>
6
#include <DataTypes/DataTypeDateTime.h>
7
#include <Core/Field.h>
8

9

10
namespace DB
11
{
12

13
struct Settings;
14

15
namespace ErrorCodes
16
{
17
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
18
    extern const int ILLEGAL_TYPE_OF_ARGUMENT;
19
    extern const int NOT_IMPLEMENTED;
20
}
21

22
namespace
23
{
24

25
/** Quantile calculation with "reservoir sample" algorithm.
26
  * It collects pseudorandom subset of limited size from a stream of values,
27
  *  and approximate quantile from it.
28
  * The result is non-deterministic. Also look at QuantileReservoirSamplerDeterministic.
29
  *
30
  * This algorithm is quite inefficient in terms of precision for memory usage,
31
  *  but very efficient in CPU (though less efficient than QuantileTiming and than QuantileExact for small sets).
32
  */
33
template <typename Value>
34
struct QuantileReservoirSampler
35
{
36
    using Data = ReservoirSampler<Value, ReservoirSamplerOnEmpty::RETURN_NAN_OR_ZERO>;
37
    Data data;
38

39
    void add(const Value & x)
40
    {
41
        data.insert(x);
42
    }
43

44
    template <typename Weight>
45
    void add(const Value &, const Weight &)
46
    {
47
        throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method add with weight is not implemented for ReservoirSampler");
48
    }
49

50
    void merge(const QuantileReservoirSampler & rhs)
51
    {
52
        data.merge(rhs.data);
53
    }
54

55
    void serialize(WriteBuffer & buf) const
56
    {
57
        data.write(buf);
58
    }
59

60
    void deserialize(ReadBuffer & buf)
61
    {
62
        data.read(buf);
63
    }
64

65
    /// Get the value of the `level` quantile. The level must be between 0 and 1.
66
    Value get(Float64 level)
67
    {
68
        if (data.empty())
69
            return {};
70

71
        if constexpr (is_decimal<Value>)
72
            return Value(static_cast<typename Value::NativeType>(data.quantileInterpolated(level)));
73
        else
74
            return static_cast<Value>(data.quantileInterpolated(level));
75
    }
76

77
    /// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.
78
    /// indices - an array of index levels such that the corresponding elements will go in ascending order.
79
    void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result)
80
    {
81
        bool is_empty = data.empty();
82

83
        for (size_t i = 0; i < size; ++i)
84
        {
85
            if (is_empty)
86
            {
87
                result[i] = Value{};
88
            }
89
            else
90
            {
91
                if constexpr (is_decimal<Value>)
92
                    result[indices[i]] = Value(static_cast<typename Value::NativeType>(data.quantileInterpolated(levels[indices[i]])));
93
                else
94
                    result[indices[i]] = Value(data.quantileInterpolated(levels[indices[i]]));
95
            }
96
        }
97
    }
98

99
    /// The same, but in the case of an empty state, NaN is returned.
100
    Float64 getFloat(Float64 level)
101
    {
102
        return data.quantileInterpolated(level);
103
    }
104

105
    void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result)
106
    {
107
        for (size_t i = 0; i < size; ++i)
108
            result[indices[i]] = data.quantileInterpolated(levels[indices[i]]);
109
    }
110
};
111

112

113
template <typename Value, bool float_return> using FuncQuantile = AggregateFunctionQuantile<Value, QuantileReservoirSampler<Value>, NameQuantile, false, std::conditional_t<float_return, Float64, void>, false, false>;
114
template <typename Value, bool float_return> using FuncQuantiles = AggregateFunctionQuantile<Value, QuantileReservoirSampler<Value>, NameQuantiles, false, std::conditional_t<float_return, Float64, void>, true, false>;
115

116
template <template <typename, bool> class Function>
117
AggregateFunctionPtr createAggregateFunctionQuantile(
118
    const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
119
{
120
    if (argument_types.empty())
121
        throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Aggregate function {} requires at least one argument", name);
122

123
    const DataTypePtr & argument_type = argument_types[0];
124
    WhichDataType which(argument_type);
125

126
#define DISPATCH(TYPE) \
127
    if (which.idx == TypeIndex::TYPE) return std::make_shared<Function<TYPE, true>>(argument_types, params);
128
    FOR_BASIC_NUMERIC_TYPES(DISPATCH)
129
#undef DISPATCH
130
    if (which.idx == TypeIndex::Date) return std::make_shared<Function<DataTypeDate::FieldType, false>>(argument_types, params);
131
    if (which.idx == TypeIndex::DateTime) return std::make_shared<Function<DataTypeDateTime::FieldType, false>>(argument_types, params);
132

133
    if (which.idx == TypeIndex::Decimal32) return std::make_shared<Function<Decimal32, false>>(argument_types, params);
134
    if (which.idx == TypeIndex::Decimal64) return std::make_shared<Function<Decimal64, false>>(argument_types, params);
135
    if (which.idx == TypeIndex::Decimal128) return std::make_shared<Function<Decimal128, false>>(argument_types, params);
136
    if (which.idx == TypeIndex::Decimal256) return std::make_shared<Function<Decimal256, false>>(argument_types, params);
137
    if (which.idx == TypeIndex::DateTime64) return std::make_shared<Function<DateTime64, false>>(argument_types, params);
138

139
    if (which.idx == TypeIndex::Int128) return std::make_shared<Function<Int128, true>>(argument_types, params);
140
    if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<UInt128, true>>(argument_types, params);
141
    if (which.idx == TypeIndex::Int256) return std::make_shared<Function<Int256, true>>(argument_types, params);
142
    if (which.idx == TypeIndex::UInt256) return std::make_shared<Function<UInt256, true>>(argument_types, params);
143

144
    throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",
145
                    argument_type->getName(), name);
146
}
147

148
}
149

150
void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
151
{
152
    /// For aggregate functions returning array we cannot return NULL on empty set.
153
    AggregateFunctionProperties properties = { .returns_default_when_only_null = true };
154

155
    factory.registerFunction(NameQuantile::name, createAggregateFunctionQuantile<FuncQuantile>);
156
    factory.registerFunction(NameQuantiles::name, { createAggregateFunctionQuantile<FuncQuantiles>, properties });
157

158
    /// 'median' is an alias for 'quantile'
159
    factory.registerAlias("median", NameQuantile::name);
160
}
161

162
}
163

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.