ClickHouse
285 строк · 11.1 Кб
1#include <AggregateFunctions/AggregateFunctionFactory.h>2#include <AggregateFunctions/FactoryHelpers.h>3#include <DataTypes/DataTypeAggregateFunction.h>4
5#include <AggregateFunctions/IAggregateFunction.h>6#include <Columns/ColumnAggregateFunction.h>7#include <Columns/ColumnVector.h>8#include <DataTypes/DataTypesNumber.h>9#include <Common/assert_cast.h>10
11// TODO include this last because of a broken roaring header. See the comment inside.
12#include <AggregateFunctions/AggregateFunctionGroupBitmapData.h>13
14
15namespace DB16{
17struct Settings;18
19namespace ErrorCodes20{
21extern const int ILLEGAL_TYPE_OF_ARGUMENT;22}
23
24namespace
25{
26
27/// Counts bitmap operation on numbers.
28template <typename T, typename Data>29class AggregateFunctionBitmap final : public IAggregateFunctionDataHelper<Data, AggregateFunctionBitmap<T, Data>>30{
31public:32explicit AggregateFunctionBitmap(const DataTypePtr & type)33: IAggregateFunctionDataHelper<Data, AggregateFunctionBitmap<T, Data>>({type}, {}, createResultType())34{35}36
37String getName() const override { return Data::name(); }38
39static DataTypePtr createResultType() { return std::make_shared<DataTypeNumber<T>>(); }40
41bool allocatesMemoryInArena() const override { return false; }42
43void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override44{45this->data(place).roaring_bitmap_with_small_set.add(assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num]);46}47
48void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override49{50this->data(place).roaring_bitmap_with_small_set.merge(this->data(rhs).roaring_bitmap_with_small_set);51}52
53void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override54{55this->data(place).roaring_bitmap_with_small_set.write(buf);56}57
58void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override59{60this->data(place).roaring_bitmap_with_small_set.read(buf);61}62
63void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override64{65assert_cast<ColumnVector<T> &>(to).getData().push_back(66static_cast<T>(this->data(place).roaring_bitmap_with_small_set.size()));67}68};69
70
71/// This aggregate function takes the states of AggregateFunctionBitmap as its argument.
72template <typename T, typename Data, typename Policy>73class AggregateFunctionBitmapL2 final : public IAggregateFunctionDataHelper<Data, AggregateFunctionBitmapL2<T, Data, Policy>>74{
75private:76static constexpr size_t STATE_VERSION_1_MIN_REVISION = 54455;77public:78explicit AggregateFunctionBitmapL2(const DataTypePtr & type)79: IAggregateFunctionDataHelper<Data, AggregateFunctionBitmapL2<T, Data, Policy>>({type}, {}, createResultType())80{81}82
83String getName() const override { return Policy::name; }84
85static DataTypePtr createResultType() { return std::make_shared<DataTypeNumber<T>>(); }86
87bool allocatesMemoryInArena() const override { return false; }88
89DataTypePtr getStateType() const override90{91return this->argument_types.at(0);92}93
94void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override95{96Data & data_lhs = this->data(place);97const Data & data_rhs = this->data(assert_cast<const ColumnAggregateFunction &>(*columns[0]).getData()[row_num]);98if (!data_lhs.init)99{100data_lhs.init = true;101data_lhs.roaring_bitmap_with_small_set.merge(data_rhs.roaring_bitmap_with_small_set);102}103else104{105Policy::apply(data_lhs, data_rhs);106}107}108
109void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override110{111Data & data_lhs = this->data(place);112const Data & data_rhs = this->data(rhs);113
114if (!data_rhs.init)115return;116
117if (!data_lhs.init)118{119data_lhs.init = true;120data_lhs.roaring_bitmap_with_small_set.merge(data_rhs.roaring_bitmap_with_small_set);121}122else123{124Policy::apply(data_lhs, data_rhs);125}126}127
128bool isVersioned() const override { return true; }129
130size_t getDefaultVersion() const override { return 1; }131
132size_t getVersionFromRevision(size_t revision) const override133{134if (revision >= STATE_VERSION_1_MIN_REVISION)135return 1;136else137return 0;138}139
140void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override141{142if (!version)143version = getDefaultVersion();144
145if (*version >= 1)146DB::writeBoolText(this->data(place).init, buf);147
148this->data(place).roaring_bitmap_with_small_set.write(buf);149}150
151void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version, Arena *) const override152{153if (!version)154version = getDefaultVersion();155
156if (*version >= 1)157DB::readBoolText(this->data(place).init, buf);158this->data(place).roaring_bitmap_with_small_set.read(buf);159}160
161void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override162{163assert_cast<ColumnVector<T> &>(to).getData().push_back(164static_cast<T>(this->data(place).roaring_bitmap_with_small_set.size()));165}166};167
168
169template <typename Data>170class BitmapAndPolicy171{
172public:173static constexpr auto name = "groupBitmapAnd";174static void apply(Data & lhs, const Data & rhs) { lhs.roaring_bitmap_with_small_set.rb_and(rhs.roaring_bitmap_with_small_set); }175};176
177template <typename Data>178class BitmapOrPolicy179{
180public:181static constexpr auto name = "groupBitmapOr";182static void apply(Data & lhs, const Data & rhs) { lhs.roaring_bitmap_with_small_set.rb_or(rhs.roaring_bitmap_with_small_set); }183};184
185template <typename Data>186class BitmapXorPolicy187{
188public:189static constexpr auto name = "groupBitmapXor";190static void apply(Data & lhs, const Data & rhs) { lhs.roaring_bitmap_with_small_set.rb_xor(rhs.roaring_bitmap_with_small_set); }191};192
193template <typename T, typename Data>194using AggregateFunctionBitmapL2And = AggregateFunctionBitmapL2<T, Data, BitmapAndPolicy<Data>>;195
196template <typename T, typename Data>197using AggregateFunctionBitmapL2Or = AggregateFunctionBitmapL2<T, Data, BitmapOrPolicy<Data>>;198
199template <typename T, typename Data>200using AggregateFunctionBitmapL2Xor = AggregateFunctionBitmapL2<T, Data, BitmapXorPolicy<Data>>;201
202
203template <template <typename, typename> class AggregateFunctionTemplate, template <typename> typename Data, typename... TArgs>204IAggregateFunction * createWithIntegerType(const IDataType & argument_type, TArgs &&... args)205{
206WhichDataType which(argument_type);207if (which.idx == TypeIndex::UInt8) return new AggregateFunctionTemplate<UInt8, Data<UInt8>>(std::forward<TArgs>(args)...);208if (which.idx == TypeIndex::UInt16) return new AggregateFunctionTemplate<UInt16, Data<UInt16>>(std::forward<TArgs>(args)...);209if (which.idx == TypeIndex::UInt32) return new AggregateFunctionTemplate<UInt32, Data<UInt32>>(std::forward<TArgs>(args)...);210if (which.idx == TypeIndex::UInt64) return new AggregateFunctionTemplate<UInt64, Data<UInt64>>(std::forward<TArgs>(args)...);211if (which.idx == TypeIndex::Int8) return new AggregateFunctionTemplate<Int8, Data<Int8>>(std::forward<TArgs>(args)...);212if (which.idx == TypeIndex::Int16) return new AggregateFunctionTemplate<Int16, Data<Int16>>(std::forward<TArgs>(args)...);213if (which.idx == TypeIndex::Int32) return new AggregateFunctionTemplate<Int32, Data<Int32>>(std::forward<TArgs>(args)...);214if (which.idx == TypeIndex::Int64) return new AggregateFunctionTemplate<Int64, Data<Int64>>(std::forward<TArgs>(args)...);215return nullptr;216}
217
218template <template <typename> typename Data>219AggregateFunctionPtr createAggregateFunctionBitmap(220const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)221{
222assertNoParameters(name, parameters);223assertUnary(name, argument_types);224
225if (!argument_types[0]->canBeUsedInBitOperations())226throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,227"The type {} of argument for aggregate function {} "228"is illegal, because it cannot be used in Bitmap operations",229argument_types[0]->getName(), name);230
231AggregateFunctionPtr res(createWithIntegerType<AggregateFunctionBitmap, Data>(*argument_types[0], argument_types[0]));232
233if (!res)234throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",235argument_types[0]->getName(), name);236
237return res;238}
239
240// Additional aggregate functions to manipulate bitmaps.
241template <template <typename, typename> typename AggregateFunctionTemplate>242AggregateFunctionPtr createAggregateFunctionBitmapL2(243const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)244{
245assertNoParameters(name, parameters);246assertUnary(name, argument_types);247
248DataTypePtr argument_type_ptr = argument_types[0];249WhichDataType which(*argument_type_ptr);250if (which.idx != TypeIndex::AggregateFunction)251throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",252argument_types[0]->getName(), name);253
254/// groupBitmap needs to know about the data type that was used to create bitmaps.255/// We need to look inside the type of its argument to obtain it.256const DataTypeAggregateFunction & datatype_aggfunc = dynamic_cast<const DataTypeAggregateFunction &>(*argument_type_ptr);257AggregateFunctionPtr aggfunc = datatype_aggfunc.getFunction();258
259if (aggfunc->getName() != AggregateFunctionGroupBitmapData<UInt8>::name())260throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",261argument_types[0]->getName(), name);262
263DataTypePtr nested_argument_type_ptr = aggfunc->getArgumentTypes()[0];264
265AggregateFunctionPtr res(createWithIntegerType<AggregateFunctionTemplate, AggregateFunctionGroupBitmapData>(266*nested_argument_type_ptr, argument_type_ptr));267
268if (!res)269throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",270argument_types[0]->getName(), name);271
272return res;273}
274
275}
276
277void registerAggregateFunctionsBitmap(AggregateFunctionFactory & factory)278{
279factory.registerFunction("groupBitmap", createAggregateFunctionBitmap<AggregateFunctionGroupBitmapData>);280factory.registerFunction("groupBitmapAnd", createAggregateFunctionBitmapL2<AggregateFunctionBitmapL2And>);281factory.registerFunction("groupBitmapOr", createAggregateFunctionBitmapL2<AggregateFunctionBitmapL2Or>);282factory.registerFunction("groupBitmapXor", createAggregateFunctionBitmapL2<AggregateFunctionBitmapL2Xor>);283}
284
285}
286