ClickHouse

Форк
0
/
AggregateFunctionAny.cpp 
369 строк · 12.4 Кб
1
#include <AggregateFunctions/AggregateFunctionFactory.h>
2
#include <AggregateFunctions/SingleValueData.h>
3
#include <IO/ReadHelpers.h>
4
#include <IO/WriteHelpers.h>
5
#include <base/defines.h>
6

7

8
namespace DB
9
{
10
struct Settings;
11

12
namespace ErrorCodes
13
{
14
extern const int NOT_IMPLEMENTED;
15
}
16

17
namespace
18
{
19

20
template <typename Data>
21
class AggregateFunctionAny final : public IAggregateFunctionDataHelper<Data, AggregateFunctionAny<Data>>
22
{
23
private:
24
    SerializationPtr serialization;
25

26
public:
27
    explicit AggregateFunctionAny(const DataTypes & argument_types_)
28
        : IAggregateFunctionDataHelper<Data, AggregateFunctionAny<Data>>(argument_types_, {}, argument_types_[0])
29
        , serialization(this->result_type->getDefaultSerialization())
30
    {
31
    }
32

33
    String getName() const override { return "any"; }
34

35
    void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
36
    {
37
        if (!this->data(place).has())
38
            this->data(place).set(*columns[0], row_num, arena);
39
    }
40

41
    void addBatchSinglePlace(
42
        size_t row_begin,
43
        size_t row_end,
44
        AggregateDataPtr __restrict place,
45
        const IColumn ** __restrict columns,
46
        Arena * arena,
47
        ssize_t if_argument_pos) const override
48
    {
49
        if (this->data(place).has() || row_begin >= row_end)
50
            return;
51

52
        if (if_argument_pos >= 0)
53
        {
54
            const auto & if_map = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
55
            for (size_t i = row_begin; i < row_end; i++)
56
            {
57
                if (if_map.data()[i] != 0)
58
                {
59
                    this->data(place).set(*columns[0], i, arena);
60
                    return;
61
                }
62
            }
63
        }
64
        else
65
        {
66
            this->data(place).set(*columns[0], row_begin, arena);
67
        }
68
    }
69

70
    void addBatchSinglePlaceNotNull(
71
        size_t row_begin,
72
        size_t row_end,
73
        AggregateDataPtr __restrict place,
74
        const IColumn ** __restrict columns,
75
        const UInt8 * __restrict null_map,
76
        Arena * arena,
77
        ssize_t if_argument_pos) const override
78
    {
79
        if (this->data(place).has() || row_begin >= row_end)
80
            return;
81

82
        if (if_argument_pos >= 0)
83
        {
84
            const auto & if_map = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
85
            for (size_t i = row_begin; i < row_end; i++)
86
            {
87
                if (if_map.data()[i] != 0 && null_map[i] == 0)
88
                {
89
                    this->data(place).set(*columns[0], i, arena);
90
                    return;
91
                }
92
            }
93
        }
94
        else
95
        {
96
            for (size_t i = row_begin; i < row_end; i++)
97
            {
98
                if (null_map[i] == 0)
99
                {
100
                    this->data(place).set(*columns[0], i, arena);
101
                    return;
102
                }
103
            }
104
        }
105
    }
106

107
    void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override
108
    {
109
        if (!this->data(place).has())
110
            this->data(place).set(*columns[0], 0, arena);
111
    }
112

113
    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
114
    {
115
        if (!this->data(place).has())
116
            this->data(place).set(this->data(rhs), arena);
117
    }
118

119
    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
120
    {
121
        this->data(place).write(buf, *serialization);
122
    }
123

124
    void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
125
    {
126
        this->data(place).read(buf, *serialization, arena);
127
    }
128

129
    bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); }
130

131
    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
132
    {
133
        this->data(place).insertResultInto(to);
134
    }
135

136
#if USE_EMBEDDED_COMPILER
137
    bool isCompilable() const override
138
    {
139
        if constexpr (!Data::is_compilable)
140
            return false;
141
        else
142
            return Data::isCompilable(*this->argument_types[0]);
143
    }
144

145
    void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
146
    {
147
        if constexpr (Data::is_compilable)
148
            Data::compileCreate(builder, aggregate_data_ptr);
149
        else
150
            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
151
    }
152

153
    void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override
154
    {
155
        if constexpr (Data::is_compilable)
156
            Data::compileAny(builder, aggregate_data_ptr, arguments[0].value);
157
        else
158
            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
159
    }
160

161
    void
162
    compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override
163
    {
164
        if constexpr (Data::is_compilable)
165
            Data::compileAnyMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr);
166
        else
167
            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
168
    }
169

170
    llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
171
    {
172
        if constexpr (Data::is_compilable)
173
            return Data::compileGetResult(builder, aggregate_data_ptr);
174
        else
175
            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
176
    }
177
#endif
178
};
179

180
AggregateFunctionPtr
181
createAggregateFunctionAny(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
182
{
183
    return AggregateFunctionPtr(
184
        createAggregateFunctionSingleValue<AggregateFunctionAny, /* unary */ true>(name, argument_types, parameters, settings));
185
}
186

187

188
template <typename Data>
189
class AggregateFunctionAnyLast final : public IAggregateFunctionDataHelper<Data, AggregateFunctionAnyLast<Data>>
190
{
191
private:
192
    SerializationPtr serialization;
193

194
public:
195
    explicit AggregateFunctionAnyLast(const DataTypes & argument_types_)
196
        : IAggregateFunctionDataHelper<Data, AggregateFunctionAnyLast<Data>>(argument_types_, {}, argument_types_[0])
197
        , serialization(this->result_type->getDefaultSerialization())
198
    {
199
    }
200

201
    String getName() const override { return "anyLast"; }
202

203
    void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
204
    {
205
        this->data(place).set(*columns[0], row_num, arena);
206
    }
207

208
    void addBatchSinglePlace(
209
        size_t row_begin,
210
        size_t row_end,
211
        AggregateDataPtr __restrict place,
212
        const IColumn ** __restrict columns,
213
        Arena * arena,
214
        ssize_t if_argument_pos) const override
215
    {
216
        if (row_begin >= row_end)
217
            return;
218

219
        size_t batch_size = row_end - row_begin;
220
        if (if_argument_pos >= 0)
221
        {
222
            const auto & if_map = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
223
            for (size_t i = 0; i < batch_size; i++)
224
            {
225
                size_t pos = (row_end - 1) - i;
226
                if (if_map.data()[pos] != 0)
227
                {
228
                    this->data(place).set(*columns[0], pos, arena);
229
                    return;
230
                }
231
            }
232
        }
233
        else
234
        {
235
            this->data(place).set(*columns[0], row_end - 1, arena);
236
        }
237
    }
238

239
    void addBatchSinglePlaceNotNull(
240
        size_t row_begin,
241
        size_t row_end,
242
        AggregateDataPtr __restrict place,
243
        const IColumn ** __restrict columns,
244
        const UInt8 * __restrict null_map,
245
        Arena * arena,
246
        ssize_t if_argument_pos) const override
247
    {
248
        if (row_begin >= row_end)
249
            return;
250

251
        size_t batch_size = row_end - row_begin;
252
        if (if_argument_pos >= 0)
253
        {
254
            const auto & if_map = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
255
            for (size_t i = 0; i < batch_size; i++)
256
            {
257
                size_t pos = (row_end - 1) - i;
258
                if (if_map.data()[pos] != 0 && null_map[pos] == 0)
259
                {
260
                    this->data(place).set(*columns[0], pos, arena);
261
                    return;
262
                }
263
            }
264
        }
265
        else
266
        {
267
            for (size_t i = 0; i < batch_size; i++)
268
            {
269
                size_t pos = (row_end - 1) - i;
270
                if (null_map[pos] == 0)
271
                {
272
                    this->data(place).set(*columns[0], pos, arena);
273
                    return;
274
                }
275
            }
276
        }
277
    }
278

279
    void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override
280
    {
281
        this->data(place).set(*columns[0], 0, arena);
282
    }
283

284
    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
285
    {
286
        this->data(place).set(this->data(rhs), arena);
287
    }
288

289
    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
290
    {
291
        this->data(place).write(buf, *serialization);
292
    }
293

294
    void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
295
    {
296
        this->data(place).read(buf, *serialization, arena);
297
    }
298

299
    bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); }
300

301
    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
302
    {
303
        this->data(place).insertResultInto(to);
304
    }
305

306
#if USE_EMBEDDED_COMPILER
307
    bool isCompilable() const override
308
    {
309
        if constexpr (!Data::is_compilable)
310
            return false;
311
        else
312
            return Data::isCompilable(*this->argument_types[0]);
313
    }
314

315
    void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
316
    {
317
        if constexpr (Data::is_compilable)
318
            Data::compileCreate(builder, aggregate_data_ptr);
319
        else
320
            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
321
    }
322

323
    void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override
324
    {
325
        if constexpr (Data::is_compilable)
326
            Data::compileAnyLast(builder, aggregate_data_ptr, arguments[0].value);
327
        else
328
            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
329
    }
330

331
    void
332
    compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override
333
    {
334
        if constexpr (Data::is_compilable)
335
            Data::compileAnyLastMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr);
336
        else
337
            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
338
    }
339

340
    llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override
341
    {
342
        if constexpr (Data::is_compilable)
343
            return Data::compileGetResult(builder, aggregate_data_ptr);
344
        else
345
            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
346
    }
347
#endif
348
};
349

350
AggregateFunctionPtr createAggregateFunctionAnyLast(
351
    const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
352
{
353
    return AggregateFunctionPtr(
354
        createAggregateFunctionSingleValue<AggregateFunctionAnyLast, /* unary */ true>(name, argument_types, parameters, settings));
355
}
356

357
}
358

359
void registerAggregateFunctionsAny(AggregateFunctionFactory & factory)
360
{
361
    AggregateFunctionProperties default_properties = {.returns_default_when_only_null = false, .is_order_dependent = true};
362

363
    factory.registerFunction("any", {createAggregateFunctionAny, default_properties});
364
    factory.registerAlias("any_value", "any", AggregateFunctionFactory::CaseInsensitive);
365
    factory.registerAlias("first_value", "any", AggregateFunctionFactory::CaseInsensitive);
366
    factory.registerFunction("anyLast", {createAggregateFunctionAnyLast, default_properties});
367
    factory.registerAlias("last_value", "anyLast", AggregateFunctionFactory::CaseInsensitive);
368
}
369
}
370

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.