ClickHouse
168 строк · 5.0 Кб
1#include <AggregateFunctions/AggregateFunctionFactory.h>
2#include <AggregateFunctions/SingleValueData.h>
3#include <IO/ReadHelpers.h>
4#include <IO/WriteHelpers.h>
5#include <base/defines.h>
6
7
8namespace DB
9{
10struct Settings;
11
12namespace ErrorCodes
13{
14extern const int LOGICAL_ERROR;
15}
16
17namespace
18{
19
20/** Implement 'heavy hitters' algorithm.
21* Selects most frequent value if its frequency is more than 50% in each thread of execution.
22* Otherwise, selects some arbitrary value.
23* http://www.cs.umd.edu/~samir/498/karp.pdf
24*/
25struct AggregateFunctionAnyHeavyData
26{
27using Self = AggregateFunctionAnyHeavyData;
28
29private:
30SingleValueDataBaseMemoryBlock v_data;
31UInt64 counter = 0;
32
33public:
34[[noreturn]] explicit AggregateFunctionAnyHeavyData()
35{
36throw Exception(ErrorCodes::LOGICAL_ERROR, "AggregateFunctionAnyHeavyData initialized empty");
37}
38
39explicit AggregateFunctionAnyHeavyData(TypeIndex value_type) { generateSingleValueFromTypeIndex(value_type, v_data); }
40
41~AggregateFunctionAnyHeavyData() { data().~SingleValueDataBase(); }
42
43SingleValueDataBase & data() { return v_data.get(); }
44const SingleValueDataBase & data() const { return v_data.get(); }
45
46void add(const IColumn & column, size_t row_num, Arena * arena)
47{
48if (data().isEqualTo(column, row_num))
49{
50++counter;
51}
52else if (counter == 0)
53{
54data().set(column, row_num, arena);
55++counter;
56}
57else
58{
59--counter;
60}
61}
62
63void add(const Self & to, Arena * arena)
64{
65if (!to.data().has())
66return;
67
68if (data().isEqualTo(to.data()))
69counter += to.counter;
70else if (!data().has() || counter < to.counter)
71data().set(to.data(), arena);
72else
73counter -= to.counter;
74}
75
76void addManyDefaults(const IColumn & column, size_t length, Arena * arena)
77{
78for (size_t i = 0; i < length; ++i)
79add(column, 0, arena);
80}
81
82void write(WriteBuffer & buf, const ISerialization & serialization) const
83{
84data().write(buf, serialization);
85writeBinaryLittleEndian(counter, buf);
86}
87
88void read(ReadBuffer & buf, const ISerialization & serialization, Arena * arena)
89{
90data().read(buf, serialization, arena);
91readBinaryLittleEndian(counter, buf);
92}
93
94void insertResultInto(IColumn & to) const { data().insertResultInto(to); }
95};
96
97
98class AggregateFunctionAnyHeavy final : public IAggregateFunctionDataHelper<AggregateFunctionAnyHeavyData, AggregateFunctionAnyHeavy>
99{
100private:
101SerializationPtr serialization;
102const TypeIndex value_type_index;
103
104public:
105explicit AggregateFunctionAnyHeavy(const DataTypePtr & type)
106: IAggregateFunctionDataHelper<AggregateFunctionAnyHeavyData, AggregateFunctionAnyHeavy>({type}, {}, type)
107, serialization(type->getDefaultSerialization())
108, value_type_index(WhichDataType(type).idx)
109{
110}
111
112void create(AggregateDataPtr __restrict place) const override { new (place) AggregateFunctionAnyHeavyData(value_type_index); }
113
114String getName() const override { return "anyHeavy"; }
115
116void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
117{
118data(place).add(*columns[0], row_num, arena);
119}
120
121void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override
122{
123data(place).addManyDefaults(*columns[0], 0, arena);
124}
125
126void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
127{
128data(place).add(data(rhs), arena);
129}
130
131void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
132{
133data(place).write(buf, *serialization);
134}
135
136void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
137{
138data(place).read(buf, *serialization, arena);
139}
140
141bool allocatesMemoryInArena() const override { return singleValueTypeAllocatesMemoryInArena(value_type_index); }
142
143void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
144{
145data(place).insertResultInto(to);
146}
147};
148
149
150AggregateFunctionPtr
151createAggregateFunctionAnyHeavy(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
152{
153assertNoParameters(name, parameters);
154assertUnary(name, argument_types);
155
156const DataTypePtr & res_type = argument_types[0];
157return AggregateFunctionPtr(new AggregateFunctionAnyHeavy(res_type));
158}
159
160}
161
162void registerAggregateFunctionAnyHeavy(AggregateFunctionFactory & factory)
163{
164AggregateFunctionProperties default_properties = {.returns_default_when_only_null = false, .is_order_dependent = true};
165factory.registerFunction("anyHeavy", {createAggregateFunctionAnyHeavy, default_properties});
166}
167
168}
169