ClickHouse
369 строк · 12.4 Кб
1#include <AggregateFunctions/AggregateFunctionFactory.h>
2#include <AggregateFunctions/SingleValueData.h>
3#include <IO/ReadHelpers.h>
4#include <IO/WriteHelpers.h>
5#include <base/defines.h>
6
7
8namespace DB
9{
10struct Settings;
11
12namespace ErrorCodes
13{
14extern const int NOT_IMPLEMENTED;
15}
16
17namespace
18{
19
20template <typename Data>
21class AggregateFunctionAny final : public IAggregateFunctionDataHelper<Data, AggregateFunctionAny<Data>>
22{
23private:
24SerializationPtr serialization;
25
26public:
27explicit AggregateFunctionAny(const DataTypes & argument_types_)
28: IAggregateFunctionDataHelper<Data, AggregateFunctionAny<Data>>(argument_types_, {}, argument_types_[0])
29, serialization(this->result_type->getDefaultSerialization())
30{
31}
32
33String getName() const override { return "any"; }
34
35void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
36{
37if (!this->data(place).has())
38this->data(place).set(*columns[0], row_num, arena);
39}
40
41void addBatchSinglePlace(
42size_t row_begin,
43size_t row_end,
44AggregateDataPtr __restrict place,
45const IColumn ** __restrict columns,
46Arena * arena,
47ssize_t if_argument_pos) const override
48{
49if (this->data(place).has() || row_begin >= row_end)
50return;
51
52if (if_argument_pos >= 0)
53{
54const auto & if_map = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
55for (size_t i = row_begin; i < row_end; i++)
56{
57if (if_map.data()[i] != 0)
58{
59this->data(place).set(*columns[0], i, arena);
60return;
61}
62}
63}
64else
65{
66this->data(place).set(*columns[0], row_begin, arena);
67}
68}
69
70void addBatchSinglePlaceNotNull(
71size_t row_begin,
72size_t row_end,
73AggregateDataPtr __restrict place,
74const IColumn ** __restrict columns,
75const UInt8 * __restrict null_map,
76Arena * arena,
77ssize_t if_argument_pos) const override
78{
79if (this->data(place).has() || row_begin >= row_end)
80return;
81
82if (if_argument_pos >= 0)
83{
84const auto & if_map = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
85for (size_t i = row_begin; i < row_end; i++)
86{
87if (if_map.data()[i] != 0 && null_map[i] == 0)
88{
89this->data(place).set(*columns[0], i, arena);
90return;
91}
92}
93}
94else
95{
96for (size_t i = row_begin; i < row_end; i++)
97{
98if (null_map[i] == 0)
99{
100this->data(place).set(*columns[0], i, arena);
101return;
102}
103}
104}
105}
106
107void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override
108{
109if (!this->data(place).has())
110this->data(place).set(*columns[0], 0, arena);
111}
112
113void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
114{
115if (!this->data(place).has())
116this->data(place).set(this->data(rhs), arena);
117}
118
119void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
120{
121this->data(place).write(buf, *serialization);
122}
123
124void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
125{
126this->data(place).read(buf, *serialization, arena);
127}
128
129bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); }
130
131void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
132{
133this->data(place).insertResultInto(to);
134}
135
136#if USE_EMBEDDED_COMPILER
137bool isCompilable() const override
138{
139if constexpr (!Data::is_compilable)
140return false;
141else
142return Data::isCompilable(*this->argument_types[0]);
143}
144
145void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
146{
147if constexpr (Data::is_compilable)
148Data::compileCreate(builder, aggregate_data_ptr);
149else
150throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
151}
152
153void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override
154{
155if constexpr (Data::is_compilable)
156Data::compileAny(builder, aggregate_data_ptr, arguments[0].value);
157else
158throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
159}
160
161void
162compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override
163{
164if constexpr (Data::is_compilable)
165Data::compileAnyMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr);
166else
167throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
168}
169
170llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
171{
172if constexpr (Data::is_compilable)
173return Data::compileGetResult(builder, aggregate_data_ptr);
174else
175throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
176}
177#endif
178};
179
180AggregateFunctionPtr
181createAggregateFunctionAny(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
182{
183return AggregateFunctionPtr(
184createAggregateFunctionSingleValue<AggregateFunctionAny, /* unary */ true>(name, argument_types, parameters, settings));
185}
186
187
188template <typename Data>
189class AggregateFunctionAnyLast final : public IAggregateFunctionDataHelper<Data, AggregateFunctionAnyLast<Data>>
190{
191private:
192SerializationPtr serialization;
193
194public:
195explicit AggregateFunctionAnyLast(const DataTypes & argument_types_)
196: IAggregateFunctionDataHelper<Data, AggregateFunctionAnyLast<Data>>(argument_types_, {}, argument_types_[0])
197, serialization(this->result_type->getDefaultSerialization())
198{
199}
200
201String getName() const override { return "anyLast"; }
202
203void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
204{
205this->data(place).set(*columns[0], row_num, arena);
206}
207
208void addBatchSinglePlace(
209size_t row_begin,
210size_t row_end,
211AggregateDataPtr __restrict place,
212const IColumn ** __restrict columns,
213Arena * arena,
214ssize_t if_argument_pos) const override
215{
216if (row_begin >= row_end)
217return;
218
219size_t batch_size = row_end - row_begin;
220if (if_argument_pos >= 0)
221{
222const auto & if_map = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
223for (size_t i = 0; i < batch_size; i++)
224{
225size_t pos = (row_end - 1) - i;
226if (if_map.data()[pos] != 0)
227{
228this->data(place).set(*columns[0], pos, arena);
229return;
230}
231}
232}
233else
234{
235this->data(place).set(*columns[0], row_end - 1, arena);
236}
237}
238
239void addBatchSinglePlaceNotNull(
240size_t row_begin,
241size_t row_end,
242AggregateDataPtr __restrict place,
243const IColumn ** __restrict columns,
244const UInt8 * __restrict null_map,
245Arena * arena,
246ssize_t if_argument_pos) const override
247{
248if (row_begin >= row_end)
249return;
250
251size_t batch_size = row_end - row_begin;
252if (if_argument_pos >= 0)
253{
254const auto & if_map = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
255for (size_t i = 0; i < batch_size; i++)
256{
257size_t pos = (row_end - 1) - i;
258if (if_map.data()[pos] != 0 && null_map[pos] == 0)
259{
260this->data(place).set(*columns[0], pos, arena);
261return;
262}
263}
264}
265else
266{
267for (size_t i = 0; i < batch_size; i++)
268{
269size_t pos = (row_end - 1) - i;
270if (null_map[pos] == 0)
271{
272this->data(place).set(*columns[0], pos, arena);
273return;
274}
275}
276}
277}
278
279void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override
280{
281this->data(place).set(*columns[0], 0, arena);
282}
283
284void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
285{
286this->data(place).set(this->data(rhs), arena);
287}
288
289void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
290{
291this->data(place).write(buf, *serialization);
292}
293
294void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
295{
296this->data(place).read(buf, *serialization, arena);
297}
298
299bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); }
300
301void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
302{
303this->data(place).insertResultInto(to);
304}
305
306#if USE_EMBEDDED_COMPILER
307bool isCompilable() const override
308{
309if constexpr (!Data::is_compilable)
310return false;
311else
312return Data::isCompilable(*this->argument_types[0]);
313}
314
315void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
316{
317if constexpr (Data::is_compilable)
318Data::compileCreate(builder, aggregate_data_ptr);
319else
320throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
321}
322
323void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override
324{
325if constexpr (Data::is_compilable)
326Data::compileAnyLast(builder, aggregate_data_ptr, arguments[0].value);
327else
328throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
329}
330
331void
332compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override
333{
334if constexpr (Data::is_compilable)
335Data::compileAnyLastMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr);
336else
337throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
338}
339
340llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
341{
342if constexpr (Data::is_compilable)
343return Data::compileGetResult(builder, aggregate_data_ptr);
344else
345throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
346}
347#endif
348};
349
350AggregateFunctionPtr createAggregateFunctionAnyLast(
351const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
352{
353return AggregateFunctionPtr(
354createAggregateFunctionSingleValue<AggregateFunctionAnyLast, /* unary */ true>(name, argument_types, parameters, settings));
355}
356
357}
358
359void registerAggregateFunctionsAny(AggregateFunctionFactory & factory)
360{
361AggregateFunctionProperties default_properties = {.returns_default_when_only_null = false, .is_order_dependent = true};
362
363factory.registerFunction("any", {createAggregateFunctionAny, default_properties});
364factory.registerAlias("any_value", "any", AggregateFunctionFactory::CaseInsensitive);
365factory.registerAlias("first_value", "any", AggregateFunctionFactory::CaseInsensitive);
366factory.registerFunction("anyLast", {createAggregateFunctionAnyLast, default_properties});
367factory.registerAlias("last_value", "anyLast", AggregateFunctionFactory::CaseInsensitive);
368}
369}
370