ClickHouse

Форк
0
/
snowflake.cpp 
272 строки · 10.5 Кб
1
#include <Functions/FunctionFactory.h>
2
#include <Functions/extractTimeZoneFromFunctionArguments.h>
3
#include <Functions/IFunction.h>
4
#include <Functions/FunctionHelpers.h>
5
#include <DataTypes/DataTypeDateTime64.h>
6
#include <DataTypes/DataTypesDecimal.h>
7
#include <DataTypes/DataTypesNumber.h>
8
#include <Columns/ColumnConst.h>
9
#include <Columns/ColumnsNumber.h>
10
#include <Core/DecimalFunctions.h>
11
#include <Interpreters/Context.h>
12

13

14
namespace DB
15
{
16

17
namespace ErrorCodes
18
{
19
    extern const int ILLEGAL_TYPE_OF_ARGUMENT;
20
}
21

22
namespace
23
{
24

25
/** According to Twitter's post on Snowflake, we can extract the timestamp for a snowflake ID by right shifting
26
 * the snowflake ID by 22 bits(10 bits machine ID and 12 bits sequence ID) and adding the Twitter epoch time of 1288834974657.
27
 * https://en.wikipedia.org/wiki/Snowflake_ID
28
 * https://blog.twitter.com/engineering/en_us/a/2010/announcing-snowflake
29
 * https://ws-dl.blogspot.com/2019/08/2019-08-03-tweetedat-finding-tweet.html
30
*/
31
constexpr size_t snowflake_epoch = 1288834974657L;
32
constexpr int time_shift = 22;
33

34
class FunctionDateTimeToSnowflake : public IFunction
35
{
36
private:
37
    const char * name;
38

39
public:
40
    explicit FunctionDateTimeToSnowflake(const char * name_) : name(name_) { }
41

42
    String getName() const override { return name; }
43
    size_t getNumberOfArguments() const override { return 1; }
44
    bool useDefaultImplementationForConstants() const override { return true; }
45
    bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
46

47
    DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
48
    {
49
        FunctionArgumentDescriptors args{
50
            {"value", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isDateTime), nullptr, "DateTime"}
51
        };
52
        validateFunctionArgumentTypes(*this, arguments, args);
53

54
        return std::make_shared<DataTypeInt64>();
55
    }
56

57
    ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
58
    {
59
        const auto & src = arguments[0];
60
        const auto & src_column = *src.column;
61

62
        auto res_column = ColumnInt64::create(input_rows_count);
63
        auto & res_data = res_column->getData();
64

65
        const auto & src_data = typeid_cast<const ColumnUInt32 &>(src_column).getData();
66
        for (size_t i = 0; i < input_rows_count; ++i)
67
            res_data[i] = (Int64(src_data[i]) * 1000 - snowflake_epoch) << time_shift;
68

69
        return res_column;
70
    }
71
};
72

73
class FunctionSnowflakeToDateTime : public IFunction
74
{
75
private:
76
    const char * name;
77
    const bool allow_nonconst_timezone_arguments;
78

79
public:
80
    explicit FunctionSnowflakeToDateTime(const char * name_, ContextPtr context)
81
        : name(name_)
82
        , allow_nonconst_timezone_arguments(context->getSettings().allow_nonconst_timezone_arguments)
83
    {}
84

85
    String getName() const override { return name; }
86
    size_t getNumberOfArguments() const override { return 0; }
87
    bool isVariadic() const override { return true; }
88
    bool useDefaultImplementationForConstants() const override { return true; }
89
    bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
90

91
    DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
92
    {
93
        FunctionArgumentDescriptors mandatory_args{
94
            {"value", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isInt64), nullptr, "Int64"}
95
        };
96
        FunctionArgumentDescriptors optional_args{
97
            {"time_zone", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isString), nullptr, "String"}
98
        };
99
        validateFunctionArgumentTypes(*this, arguments, mandatory_args, optional_args);
100

101
        String timezone;
102
        if (arguments.size() == 2)
103
            timezone = extractTimeZoneNameFromFunctionArguments(arguments, 1, 0, allow_nonconst_timezone_arguments);
104

105
        return std::make_shared<DataTypeDateTime>(timezone);
106
    }
107

108
    ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
109
    {
110
        const auto & src = arguments[0];
111
        const auto & src_column = *src.column;
112

113
        auto res_column = ColumnUInt32::create(input_rows_count);
114
        auto & res_data = res_column->getData();
115

116
        if (const auto * src_column_non_const = typeid_cast<const ColumnInt64 *>(&src_column))
117
        {
118
            const auto & src_data = src_column_non_const->getData();
119
            for (size_t i = 0; i < input_rows_count; ++i)
120
                res_data[i] = static_cast<UInt32>(
121
                    ((src_data[i] >> time_shift) + snowflake_epoch) / 1000);
122
        }
123
        else if (const auto * src_column_const = typeid_cast<const ColumnConst *>(&src_column))
124
        {
125
            Int64 src_val = src_column_const->getValue<Int64>();
126
            for (size_t i = 0; i < input_rows_count; ++i)
127
                res_data[i] = static_cast<UInt32>(
128
                    ((src_val >> time_shift) + snowflake_epoch) / 1000);
129
        }
130
        else
131
            throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal argument for function {}", name);
132

133
        return res_column;
134
    }
135
};
136

137

138
class FunctionDateTime64ToSnowflake : public IFunction
139
{
140
private:
141
    const char * name;
142

143
public:
144
    explicit FunctionDateTime64ToSnowflake(const char * name_) : name(name_) { }
145

146
    String getName() const override { return name; }
147
    size_t getNumberOfArguments() const override { return 1; }
148
    bool useDefaultImplementationForConstants() const override { return true; }
149
    bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
150

151
    DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
152
    {
153
        FunctionArgumentDescriptors args{
154
            {"value", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isDateTime64), nullptr, "DateTime64"}
155
        };
156
        validateFunctionArgumentTypes(*this, arguments, args);
157

158
        return std::make_shared<DataTypeInt64>();
159
    }
160

161
    ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
162
    {
163
        const auto & src = arguments[0];
164

165
        const auto & src_column = *src.column;
166
        auto res_column = ColumnInt64::create(input_rows_count);
167
        auto & res_data = res_column->getData();
168

169
        const auto & src_data = typeid_cast<const ColumnDecimal<DateTime64> &>(src_column).getData();
170

171
        /// timestamps in snowflake-ids are millisecond-based, convert input to milliseconds
172
        UInt32 src_scale = getDecimalScale(*arguments[0].type);
173
        Int64 multiplier_msec = DecimalUtils::scaleMultiplier<DateTime64>(3);
174
        Int64 multiplier_src = DecimalUtils::scaleMultiplier<DateTime64>(src_scale);
175
        auto factor = multiplier_msec / static_cast<double>(multiplier_src);
176

177
        for (size_t i = 0; i < input_rows_count; ++i)
178
            res_data[i] = static_cast<Int64>(src_data[i] * factor - snowflake_epoch) << time_shift;
179

180
        return res_column;
181
    }
182
};
183

184

185
class FunctionSnowflakeToDateTime64 : public IFunction
186
{
187
private:
188
    const char * name;
189
    const bool allow_nonconst_timezone_arguments;
190

191
public:
192
    explicit FunctionSnowflakeToDateTime64(const char * name_, ContextPtr context)
193
        : name(name_)
194
        , allow_nonconst_timezone_arguments(context->getSettings().allow_nonconst_timezone_arguments)
195
    {}
196

197
    String getName() const override { return name; }
198
    size_t getNumberOfArguments() const override { return 0; }
199
    bool isVariadic() const override { return true; }
200
    bool useDefaultImplementationForConstants() const override { return true; }
201
    bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
202

203
    DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
204
    {
205
        FunctionArgumentDescriptors mandatory_args{
206
            {"value", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isInt64), nullptr, "Int64"}
207
        };
208
        FunctionArgumentDescriptors optional_args{
209
            {"time_zone", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isString), nullptr, "String"}
210
        };
211
        validateFunctionArgumentTypes(*this, arguments, mandatory_args, optional_args);
212

213
        String timezone;
214
        if (arguments.size() == 2)
215
            timezone = extractTimeZoneNameFromFunctionArguments(arguments, 1, 0, allow_nonconst_timezone_arguments);
216

217
        return std::make_shared<DataTypeDateTime64>(3, timezone);
218
    }
219

220
    ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
221
    {
222
        const auto & src = arguments[0];
223
        const auto & src_column = *src.column;
224

225
        auto res_column = ColumnDecimal<DateTime64>::create(input_rows_count, 3);
226
        auto & res_data = res_column->getData();
227

228
        if (const auto * src_column_non_const = typeid_cast<const ColumnInt64 *>(&src_column))
229
        {
230
            const auto & src_data = src_column_non_const->getData();
231
            for (size_t i = 0; i < input_rows_count; ++i)
232
                res_data[i] = (src_data[i] >> time_shift) + snowflake_epoch;
233
        }
234
        else if (const auto * src_column_const = typeid_cast<const ColumnConst *>(&src_column))
235
        {
236
            Int64 src_val = src_column_const->getValue<Int64>();
237
            for (size_t i = 0; i < input_rows_count; ++i)
238
                res_data[i] = (src_val >> time_shift) + snowflake_epoch;
239
        }
240
        else
241
            throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal argument for function {}", name);
242

243
        return res_column;
244
    }
245
};
246

247
}
248

249
REGISTER_FUNCTION(DateTimeToSnowflake)
250
{
251
    factory.registerFunction("dateTimeToSnowflake",
252
        [](ContextPtr){ return std::make_shared<FunctionDateTimeToSnowflake>("dateTimeToSnowflake"); });
253
}
254

255
REGISTER_FUNCTION(DateTime64ToSnowflake)
256
{
257
    factory.registerFunction("dateTime64ToSnowflake",
258
        [](ContextPtr){ return std::make_shared<FunctionDateTime64ToSnowflake>("dateTime64ToSnowflake"); });
259
}
260

261
REGISTER_FUNCTION(SnowflakeToDateTime)
262
{
263
    factory.registerFunction("snowflakeToDateTime",
264
        [](ContextPtr context){ return std::make_shared<FunctionSnowflakeToDateTime>("snowflakeToDateTime", context); });
265
}
266
REGISTER_FUNCTION(SnowflakeToDateTime64)
267
{
268
    factory.registerFunction("snowflakeToDateTime64",
269
        [](ContextPtr context){ return std::make_shared<FunctionSnowflakeToDateTime64>("snowflakeToDateTime64", context); });
270
}
271

272
}
273

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.