ClickHouse

Форк
0
/
AggregateFunctionAnyHeavy.cpp 
168 строк · 5.0 Кб
1
#include <AggregateFunctions/AggregateFunctionFactory.h>
2
#include <AggregateFunctions/SingleValueData.h>
3
#include <IO/ReadHelpers.h>
4
#include <IO/WriteHelpers.h>
5
#include <base/defines.h>
6

7

8
namespace DB
9
{
10
struct Settings;
11

12
namespace ErrorCodes
13
{
14
extern const int LOGICAL_ERROR;
15
}
16

17
namespace
18
{
19

20
/** Implement 'heavy hitters' algorithm.
21
  * Selects most frequent value if its frequency is more than 50% in each thread of execution.
22
  * Otherwise, selects some arbitrary value.
23
  * http://www.cs.umd.edu/~samir/498/karp.pdf
24
  */
25
struct AggregateFunctionAnyHeavyData
26
{
27
    using Self = AggregateFunctionAnyHeavyData;
28

29
private:
30
    SingleValueDataBaseMemoryBlock v_data;
31
    UInt64 counter = 0;
32

33
public:
34
    [[noreturn]] explicit AggregateFunctionAnyHeavyData()
35
    {
36
        throw Exception(ErrorCodes::LOGICAL_ERROR, "AggregateFunctionAnyHeavyData initialized empty");
37
    }
38

39
    explicit AggregateFunctionAnyHeavyData(TypeIndex value_type) { generateSingleValueFromTypeIndex(value_type, v_data); }
40

41
    ~AggregateFunctionAnyHeavyData() { data().~SingleValueDataBase(); }
42

43
    SingleValueDataBase & data() { return v_data.get(); }
44
    const SingleValueDataBase & data() const { return v_data.get(); }
45

46
    void add(const IColumn & column, size_t row_num, Arena * arena)
47
    {
48
        if (data().isEqualTo(column, row_num))
49
        {
50
            ++counter;
51
        }
52
        else if (counter == 0)
53
        {
54
            data().set(column, row_num, arena);
55
            ++counter;
56
        }
57
        else
58
        {
59
            --counter;
60
        }
61
    }
62

63
    void add(const Self & to, Arena * arena)
64
    {
65
        if (!to.data().has())
66
            return;
67

68
        if (data().isEqualTo(to.data()))
69
            counter += to.counter;
70
        else if (!data().has() || counter < to.counter)
71
            data().set(to.data(), arena);
72
        else
73
            counter -= to.counter;
74
    }
75

76
    void addManyDefaults(const IColumn & column, size_t length, Arena * arena)
77
    {
78
        for (size_t i = 0; i < length; ++i)
79
            add(column, 0, arena);
80
    }
81

82
    void write(WriteBuffer & buf, const ISerialization & serialization) const
83
    {
84
        data().write(buf, serialization);
85
        writeBinaryLittleEndian(counter, buf);
86
    }
87

88
    void read(ReadBuffer & buf, const ISerialization & serialization, Arena * arena)
89
    {
90
        data().read(buf, serialization, arena);
91
        readBinaryLittleEndian(counter, buf);
92
    }
93

94
    void insertResultInto(IColumn & to) const { data().insertResultInto(to); }
95
};
96

97

98
class AggregateFunctionAnyHeavy final : public IAggregateFunctionDataHelper<AggregateFunctionAnyHeavyData, AggregateFunctionAnyHeavy>
99
{
100
private:
101
    SerializationPtr serialization;
102
    const TypeIndex value_type_index;
103

104
public:
105
    explicit AggregateFunctionAnyHeavy(const DataTypePtr & type)
106
        : IAggregateFunctionDataHelper<AggregateFunctionAnyHeavyData, AggregateFunctionAnyHeavy>({type}, {}, type)
107
        , serialization(type->getDefaultSerialization())
108
        , value_type_index(WhichDataType(type).idx)
109
    {
110
    }
111

112
    void create(AggregateDataPtr __restrict place) const override { new (place) AggregateFunctionAnyHeavyData(value_type_index); }
113

114
    String getName() const override { return "anyHeavy"; }
115

116
    void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
117
    {
118
        data(place).add(*columns[0], row_num, arena);
119
    }
120

121
    void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override
122
    {
123
        data(place).addManyDefaults(*columns[0], 0, arena);
124
    }
125

126
    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
127
    {
128
        data(place).add(data(rhs), arena);
129
    }
130

131
    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
132
    {
133
        data(place).write(buf, *serialization);
134
    }
135

136
    void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
137
    {
138
        data(place).read(buf, *serialization, arena);
139
    }
140

141
    bool allocatesMemoryInArena() const override { return singleValueTypeAllocatesMemoryInArena(value_type_index); }
142

143
    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
144
    {
145
        data(place).insertResultInto(to);
146
    }
147
};
148

149

150
AggregateFunctionPtr
151
createAggregateFunctionAnyHeavy(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
152
{
153
    assertNoParameters(name, parameters);
154
    assertUnary(name, argument_types);
155

156
    const DataTypePtr & res_type = argument_types[0];
157
    return AggregateFunctionPtr(new AggregateFunctionAnyHeavy(res_type));
158
}
159

160
}
161

162
void registerAggregateFunctionAnyHeavy(AggregateFunctionFactory & factory)
163
{
164
    AggregateFunctionProperties default_properties = {.returns_default_when_only_null = false, .is_order_dependent = true};
165
    factory.registerFunction("anyHeavy", {createAggregateFunctionAnyHeavy, default_properties});
166
}
167

168
}
169

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.