ClickHouse
203 строки · 5.4 Кб
1#include <AggregateFunctions/AggregateFunctionFactory.h>
2#include <AggregateFunctions/FactoryHelpers.h>
3
4#include <DataTypes/DataTypesNumber.h>
5#include <Columns/ColumnsNumber.h>
6#include <IO/ReadHelpers.h>
7#include <IO/WriteHelpers.h>
8#include <AggregateFunctions/IAggregateFunction.h>
9#include <Common/assert_cast.h>
10#include <Common/transformEndianness.h>
11
12
13namespace DB
14{
15struct Settings;
16
17namespace ErrorCodes
18{
19extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
20extern const int BAD_ARGUMENTS;
21}
22
23namespace
24{
25
26/** Tracks the leftmost and rightmost (x, y) data points.
27*/
28struct AggregateFunctionBoundingRatioData
29{
30struct Point
31{
32Float64 x;
33Float64 y;
34};
35
36bool empty = true;
37Point left;
38Point right;
39
40void add(Float64 x, Float64 y)
41{
42Point point{x, y};
43
44if (empty)
45{
46left = point;
47right = point;
48empty = false;
49}
50else if (point.x < left.x)
51{
52left = point;
53}
54else if (point.x > right.x)
55{
56right = point;
57}
58}
59
60void merge(const AggregateFunctionBoundingRatioData & other)
61{
62if (empty)
63{
64*this = other;
65}
66else
67{
68if (other.left.x < left.x)
69left = other.left;
70if (other.right.x > right.x)
71right = other.right;
72}
73}
74
75void serialize(WriteBuffer & buf) const;
76void deserialize(ReadBuffer & buf);
77};
78
79template <std::endian endian>
80inline void transformEndianness(AggregateFunctionBoundingRatioData::Point & p)
81{
82DB::transformEndianness<endian>(p.x);
83DB::transformEndianness<endian>(p.y);
84}
85
86void AggregateFunctionBoundingRatioData::serialize(WriteBuffer & buf) const
87{
88writeBinaryLittleEndian(empty, buf);
89
90if (!empty)
91{
92writeBinaryLittleEndian(left, buf);
93writeBinaryLittleEndian(right, buf);
94}
95}
96
97void AggregateFunctionBoundingRatioData::deserialize(ReadBuffer & buf)
98{
99readBinaryLittleEndian(empty, buf);
100
101if (!empty)
102{
103readBinaryLittleEndian(left, buf);
104readBinaryLittleEndian(right, buf);
105}
106}
107
108inline void writeBinary(const AggregateFunctionBoundingRatioData::Point & p, WriteBuffer & buf)
109{
110writePODBinary(p, buf);
111}
112
113inline void readBinary(AggregateFunctionBoundingRatioData::Point & p, ReadBuffer & buf)
114{
115readPODBinary(p, buf);
116}
117
118
119class AggregateFunctionBoundingRatio final : public IAggregateFunctionDataHelper<AggregateFunctionBoundingRatioData, AggregateFunctionBoundingRatio>
120{
121private:
122/** Calculates the slope of a line between leftmost and rightmost data points.
123* (y2 - y1) / (x2 - x1)
124*/
125static Float64 NO_SANITIZE_UNDEFINED getBoundingRatio(const AggregateFunctionBoundingRatioData & data)
126{
127if (data.empty)
128return std::numeric_limits<Float64>::quiet_NaN();
129
130return (data.right.y - data.left.y) / (data.right.x - data.left.x);
131}
132
133public:
134String getName() const override
135{
136return "boundingRatio";
137}
138
139explicit AggregateFunctionBoundingRatio(const DataTypes & arguments)
140: IAggregateFunctionDataHelper<AggregateFunctionBoundingRatioData, AggregateFunctionBoundingRatio>(arguments, {}, std::make_shared<DataTypeFloat64>())
141{
142const auto * x_arg = arguments.at(0).get();
143const auto * y_arg = arguments.at(1).get();
144
145if (!x_arg->isValueRepresentedByNumber() || !y_arg->isValueRepresentedByNumber())
146throw Exception(ErrorCodes::BAD_ARGUMENTS,
147"Illegal types of arguments of aggregate function {}, must have number representation.",
148getName());
149}
150
151bool allocatesMemoryInArena() const override { return false; }
152
153void add(AggregateDataPtr __restrict place, const IColumn ** columns, const size_t row_num, Arena *) const override
154{
155/// NOTE Slightly inefficient.
156const auto x = columns[0]->getFloat64(row_num);
157const auto y = columns[1]->getFloat64(row_num);
158data(place).add(x, y);
159}
160
161void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
162{
163data(place).merge(data(rhs));
164}
165
166void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
167{
168data(place).serialize(buf);
169}
170
171void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
172{
173data(place).deserialize(buf);
174}
175
176void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
177{
178assert_cast<ColumnFloat64 &>(to).getData().push_back(getBoundingRatio(data(place)));
179}
180};
181
182
183AggregateFunctionPtr createAggregateFunctionRate(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
184{
185assertNoParameters(name, parameters);
186assertBinary(name, argument_types);
187
188if (argument_types.size() < 2)
189throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
190"Aggregate function {} requires at least two arguments",
191name);
192
193return std::make_shared<AggregateFunctionBoundingRatio>(argument_types);
194}
195
196}
197
198void registerAggregateFunctionRate(AggregateFunctionFactory & factory)
199{
200factory.registerFunction("boundingRatio", createAggregateFunctionRate);
201}
202
203}
204