ClickHouse

Форк
0
/
MySQLSource.cpp 
420 строк · 15.8 Кб
1
#include "config.h"
2

3
#if USE_MYSQL
4
#include <vector>
5
#include <Core/MySQL/MySQLReplication.h>
6
#include <Columns/ColumnNullable.h>
7
#include <Columns/ColumnString.h>
8
#include <Columns/ColumnsNumber.h>
9
#include <Columns/ColumnDecimal.h>
10
#include <Columns/ColumnFixedString.h>
11
#include <Columns/ColumnTuple.h>
12
#include <DataTypes/IDataType.h>
13
#include <DataTypes/DataTypeEnum.h>
14
#include <DataTypes/DataTypeNullable.h>
15
#include <DataTypes/DataTypeDateTime.h>
16
#include <IO/ReadBufferFromString.h>
17
#include <IO/ReadHelpers.h>
18
#include <IO/WriteHelpers.h>
19
#include <IO/Operators.h>
20
#include <Common/assert_cast.h>
21
#include <base/range.h>
22
#include <Common/logger_useful.h>
23
#include <Processors/Sources/MySQLSource.h>
24
#include <boost/algorithm/string.hpp>
25

26

27
namespace DB
28
{
29

30
namespace ErrorCodes
31
{
32
    extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
33
    extern const int NOT_IMPLEMENTED;
34
}
35

36
StreamSettings::StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_, size_t max_retry_)
37
    : max_read_mysql_row_nums((settings.external_storage_max_read_rows) ? settings.external_storage_max_read_rows : settings.max_block_size)
38
    , max_read_mysql_bytes_size(settings.external_storage_max_read_bytes)
39
    , auto_close(auto_close_)
40
    , fetch_by_name(fetch_by_name_)
41
    , default_num_tries_on_connection_loss(max_retry_)
42
{
43
}
44

45
MySQLSource::Connection::Connection(
46
    const mysqlxx::PoolWithFailover::Entry & entry_,
47
    const std::string & query_str)
48
    : entry(entry_)
49
    , query{entry->query(query_str)}
50
    , result{query.use()}
51
{
52
}
53

54
/// Used in MaterializedMySQL and in doInvalidateQuery for dictionary source.
55
MySQLSource::MySQLSource(
56
    const mysqlxx::PoolWithFailover::Entry & entry,
57
    const std::string & query_str,
58
    const Block & sample_block,
59
    const StreamSettings & settings_)
60
    : ISource(sample_block.cloneEmpty())
61
    , log(getLogger("MySQLSource"))
62
    , connection{std::make_unique<Connection>(entry, query_str)}
63
    , settings{std::make_unique<StreamSettings>(settings_)}
64
{
65
    description.init(sample_block);
66
    initPositionMappingFromQueryResultStructure();
67
}
68

69
/// For descendant MySQLWithFailoverSource
70
MySQLSource::MySQLSource(const Block &sample_block_, const StreamSettings & settings_)
71
    : ISource(sample_block_.cloneEmpty())
72
    , log(getLogger("MySQLSource"))
73
    , settings(std::make_unique<StreamSettings>(settings_))
74
{
75
    description.init(sample_block_);
76
}
77

78
/// Used by MySQL storage / table function and dictionary source.
79
MySQLWithFailoverSource::MySQLWithFailoverSource(
80
    mysqlxx::PoolWithFailoverPtr pool_,
81
    const std::string & query_str_,
82
    const Block & sample_block_,
83
    const StreamSettings & settings_)
84
    : MySQLSource(sample_block_, settings_)
85
    , pool(pool_)
86
    , query_str(query_str_)
87
{
88
}
89

90
void MySQLWithFailoverSource::onStart()
91
{
92
    size_t count_connect_attempts = 0;
93

94
    /// For recovering from "Lost connection to MySQL server during query" errors
95
    while (true)
96
    {
97
        try
98
        {
99
            connection = std::make_unique<Connection>(pool->get(), query_str);
100
            break;
101
        }
102
        catch (const mysqlxx::ConnectionLost & ecl)  /// There are two retriable failures: CR_SERVER_GONE_ERROR, CR_SERVER_LOST
103
        {
104
            LOG_WARNING(log, "Failed connection ({}/{}). Trying to reconnect... (Info: {})", count_connect_attempts, settings->default_num_tries_on_connection_loss, ecl.displayText());
105

106
            if (++count_connect_attempts > settings->default_num_tries_on_connection_loss)
107
            {
108
                LOG_ERROR(log, "Failed to create connection to MySQL. ({}/{})", count_connect_attempts, settings->default_num_tries_on_connection_loss);
109
                throw;
110
            }
111
        }
112
        catch (const mysqlxx::BadQuery & e)
113
        {
114
            LOG_ERROR(log, "Error processing query '{}': {}", query_str, e.displayText());
115
            throw;
116
        }
117
    }
118

119
    initPositionMappingFromQueryResultStructure();
120
}
121

122
Chunk MySQLWithFailoverSource::generate()
123
{
124
    if (!is_initialized)
125
    {
126
        onStart();
127
        is_initialized = true;
128
    }
129

130
    return MySQLSource::generate();
131
}
132

133

134
namespace
135
{
136
    using ValueType = ExternalResultDescription::ValueType;
137

138
    void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size, enum enum_field_types mysql_type)
139
    {
140
        switch (type)
141
        {
142
            case ValueType::vtUInt8:
143
                assert_cast<ColumnUInt8 &>(column).insertValue(value.getUInt());
144
                read_bytes_size += 1;
145
                break;
146
            case ValueType::vtUInt16:
147
                assert_cast<ColumnUInt16 &>(column).insertValue(value.getUInt());
148
                read_bytes_size += 2;
149
                break;
150
            case ValueType::vtUInt32:
151
                assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value.getUInt()));
152
                read_bytes_size += 4;
153
                break;
154
            case ValueType::vtUInt64:
155
            {
156
                if (mysql_type == enum_field_types::MYSQL_TYPE_BIT)
157
                {
158
                    size_t n = value.size();
159
                    UInt64 val = 0UL;
160
                    ReadBufferFromMemory payload(const_cast<char *>(value.data()), n);
161
                    MySQLReplication::readBigEndianStrict(payload, reinterpret_cast<char *>(&val), n);
162
                    assert_cast<ColumnUInt64 &>(column).insertValue(val);
163
                    read_bytes_size += n;
164
                }
165
                else
166
                {
167
                    assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt());
168
                    read_bytes_size += 8;
169
                }
170
                break;
171
            }
172
            case ValueType::vtInt8:
173
                assert_cast<ColumnInt8 &>(column).insertValue(value.getInt());
174
                read_bytes_size += 1;
175
                break;
176
            case ValueType::vtInt16:
177
                assert_cast<ColumnInt16 &>(column).insertValue(value.getInt());
178
                read_bytes_size += 2;
179
                break;
180
            case ValueType::vtInt32:
181
                assert_cast<ColumnInt32 &>(column).insertValue(static_cast<Int32>(value.getInt()));
182
                read_bytes_size += 4;
183
                break;
184
            case ValueType::vtInt64:
185
            {
186
                if (mysql_type == enum_field_types::MYSQL_TYPE_TIME)
187
                {
188
                    String time_str(value.data(), value.size());
189
                    bool negative = time_str.starts_with("-");
190
                    if (negative) time_str = time_str.substr(1);
191
                    std::vector<String> hhmmss;
192
                    boost::split(hhmmss, time_str, [](char c) { return c == ':'; });
193
                    Int64 v = 0;
194

195
                    if (hhmmss.size() == 3)
196
                        v = static_cast<Int64>((std::stoi(hhmmss[0]) * 3600 + std::stoi(hhmmss[1]) * 60 + std::stold(hhmmss[2])) * 1000000);
197
                    else
198
                        throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported value format");
199

200
                    if (negative) v = -v;
201
                    assert_cast<ColumnInt64 &>(column).insertValue(v);
202
                    read_bytes_size += value.size();
203
                }
204
                else
205
                {
206
                    assert_cast<ColumnInt64 &>(column).insertValue(value.getInt());
207
                    read_bytes_size += 8;
208
                }
209
                break;
210
            }
211
            case ValueType::vtFloat32:
212
                assert_cast<ColumnFloat32 &>(column).insertValue(static_cast<Float32>(value.getDouble()));
213
                read_bytes_size += 4;
214
                break;
215
            case ValueType::vtFloat64:
216
                assert_cast<ColumnFloat64 &>(column).insertValue(value.getDouble());
217
                read_bytes_size += 8;
218
                break;
219
            case ValueType::vtEnum8:
220
                assert_cast<ColumnInt8 &>(column).insertValue(assert_cast<const DataTypeEnum<Int8> &>(data_type).castToValue(value.data()).get<Int8>());
221
                read_bytes_size += assert_cast<ColumnInt8 &>(column).byteSize();
222
                break;
223
            case ValueType::vtEnum16:
224
                assert_cast<ColumnInt16 &>(column).insertValue(assert_cast<const DataTypeEnum<Int16> &>(data_type).castToValue(value.data()).get<Int16>());
225
                read_bytes_size += assert_cast<ColumnInt16 &>(column).byteSize();
226
                break;
227
            case ValueType::vtString:
228
                assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
229
                read_bytes_size += assert_cast<ColumnString &>(column).byteSize();
230
                break;
231
            case ValueType::vtDate:
232
                assert_cast<ColumnUInt16 &>(column).insertValue(UInt16(value.getDate().getDayNum()));
233
                read_bytes_size += 2;
234
                break;
235
            case ValueType::vtDate32:
236
                assert_cast<ColumnInt32 &>(column).insertValue(Int32(value.getDate().getExtenedDayNum()));
237
                read_bytes_size += 4;
238
                break;
239
            case ValueType::vtDateTime:
240
            {
241
                ReadBufferFromString in(value);
242
                time_t time = 0;
243
                readDateTimeText(time, in, assert_cast<const DataTypeDateTime &>(data_type).getTimeZone());
244
                if (time < 0)
245
                    time = 0;
246
                assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(time));
247
                read_bytes_size += 4;
248
                break;
249
            }
250
            case ValueType::vtUUID:
251
                assert_cast<ColumnUUID &>(column).insert(parse<UUID>(value.data(), value.size()));
252
                read_bytes_size += assert_cast<ColumnUUID &>(column).byteSize();
253
                break;
254
            case ValueType::vtDateTime64:[[fallthrough]];
255
            case ValueType::vtDecimal32: [[fallthrough]];
256
            case ValueType::vtDecimal64: [[fallthrough]];
257
            case ValueType::vtDecimal128:[[fallthrough]];
258
            case ValueType::vtDecimal256:
259
            {
260
                ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
261
                data_type.getDefaultSerialization()->deserializeWholeText(column, buffer, FormatSettings{});
262
                read_bytes_size += column.sizeOfValueIfFixed();
263
                break;
264
            }
265
            case ValueType::vtFixedString:
266
                assert_cast<ColumnFixedString &>(column).insertData(value.data(), value.size());
267
                read_bytes_size += column.sizeOfValueIfFixed();
268
                break;
269
            case ValueType::vtPoint:
270
            {
271
                /// The value is 25 bytes:
272
                /// 4 bytes for integer SRID (0)
273
                /// 1 byte for integer byte order (1 = little-endian)
274
                /// 4 bytes for integer type information (1 = Point)
275
                /// 8 bytes for double-precision X coordinate
276
                /// 8 bytes for double-precision Y coordinate
277
                ReadBufferFromMemory payload(value.data(), value.size());
278
                String val;
279
                payload.ignore(4);
280

281
                UInt8 endian;
282
                readBinary(endian, payload);
283

284
                Int32 point_type;
285
                readBinary(point_type, payload);
286
                if (point_type != 1)
287
                    throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only Point data type is supported");
288

289
                Float64 x, y;
290
                if (endian == 1)
291
                {
292
                    readBinaryLittleEndian(x, payload);
293
                    readBinaryLittleEndian(y, payload);
294
                }
295
                else
296
                {
297
                    readBinaryBigEndian(x, payload);
298
                    readBinaryBigEndian(y, payload);
299
                }
300

301
                assert_cast<ColumnTuple &>(column).insert(Tuple({Field(x), Field(y)}));
302
                read_bytes_size += value.size();
303
                break;
304
            }
305
            default:
306
                throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported value type");
307
        }
308
    }
309

310
    void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }
311
}
312

313

314
Chunk MySQLSource::generate()
315
{
316
    auto row = connection->result.fetch();
317
    if (!row)
318
    {
319
        if (settings->auto_close)
320
           connection->entry.disconnect();
321

322
        return {};
323
    }
324

325
    MutableColumns columns(description.sample_block.columns());
326
    for (const auto i : collections::range(0, columns.size()))
327
        columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
328

329
    size_t num_rows = 0;
330
    size_t read_bytes_size = 0;
331

332
    while (row)
333
    {
334
        for (size_t index = 0; index < position_mapping.size(); ++index)
335
        {
336
            const auto value = row[position_mapping[index]];
337
            const auto & sample = description.sample_block.getByPosition(index);
338

339
            bool is_type_nullable = description.types[index].second;
340

341
            if (!value.isNull())
342
            {
343
                if (is_type_nullable)
344
                {
345
                    ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]);
346
                    const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
347
                    insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index]));
348
                    column_nullable.getNullMapData().emplace_back(false);
349
                }
350
                else
351
                {
352
                    insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index]));
353
                }
354
            }
355
            else
356
            {
357
                insertDefaultValue(*columns[index], *sample.column);
358

359
                if (is_type_nullable)
360
                {
361
                    ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]);
362
                    column_nullable.getNullMapData().back() = true;
363
                }
364
            }
365
        }
366

367
        ++num_rows;
368
        if (num_rows == settings->max_read_mysql_row_nums || (settings->max_read_mysql_bytes_size && read_bytes_size >= settings->max_read_mysql_bytes_size))
369
            break;
370

371
        row = connection->result.fetch();
372
    }
373
    return Chunk(std::move(columns), num_rows);
374
}
375

376
void MySQLSource::initPositionMappingFromQueryResultStructure()
377
{
378
    position_mapping.resize(description.sample_block.columns());
379

380
    if (!settings->fetch_by_name)
381
    {
382
        if (description.sample_block.columns() != connection->result.getNumFields())
383
            throw Exception(
384
                ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH,
385
                "mysqlxx::UseQueryResult contains {} columns while {} expected",
386
                connection->result.getNumFields(),
387
                description.sample_block.columns());
388

389
        for (const auto idx : collections::range(0, connection->result.getNumFields()))
390
            position_mapping[idx] = idx;
391
    }
392
    else
393
    {
394
        const auto & sample_names = description.sample_block.getNames();
395
        std::unordered_set<std::string> missing_names(sample_names.begin(), sample_names.end());
396

397
        size_t fields_size = connection->result.getNumFields();
398

399
        for (const size_t & idx : collections::range(0, fields_size))
400
        {
401
            const auto & field_name = connection->result.getFieldName(idx);
402
            if (description.sample_block.has(field_name))
403
            {
404
                const auto & position = description.sample_block.getPositionByName(field_name);
405
                position_mapping[position] = idx;
406
                missing_names.erase(field_name);
407
            }
408
        }
409

410
        if (!missing_names.empty())
411
            throw Exception(
412
                ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH,
413
                "mysqlxx::UseQueryResult must contain columns: {}",
414
                fmt::join(missing_names, ", "));
415
    }
416
}
417

418
}
419

420
#endif
421

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.