ClickHouse
420 строк · 15.8 Кб
1#include "config.h"2
3#if USE_MYSQL4#include <vector>5#include <Core/MySQL/MySQLReplication.h>6#include <Columns/ColumnNullable.h>7#include <Columns/ColumnString.h>8#include <Columns/ColumnsNumber.h>9#include <Columns/ColumnDecimal.h>10#include <Columns/ColumnFixedString.h>11#include <Columns/ColumnTuple.h>12#include <DataTypes/IDataType.h>13#include <DataTypes/DataTypeEnum.h>14#include <DataTypes/DataTypeNullable.h>15#include <DataTypes/DataTypeDateTime.h>16#include <IO/ReadBufferFromString.h>17#include <IO/ReadHelpers.h>18#include <IO/WriteHelpers.h>19#include <IO/Operators.h>20#include <Common/assert_cast.h>21#include <base/range.h>22#include <Common/logger_useful.h>23#include <Processors/Sources/MySQLSource.h>24#include <boost/algorithm/string.hpp>25
26
27namespace DB28{
29
30namespace ErrorCodes31{
32extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;33extern const int NOT_IMPLEMENTED;34}
35
36StreamSettings::StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_, size_t max_retry_)37: max_read_mysql_row_nums((settings.external_storage_max_read_rows) ? settings.external_storage_max_read_rows : settings.max_block_size)38, max_read_mysql_bytes_size(settings.external_storage_max_read_bytes)39, auto_close(auto_close_)40, fetch_by_name(fetch_by_name_)41, default_num_tries_on_connection_loss(max_retry_)42{
43}
44
45MySQLSource::Connection::Connection(46const mysqlxx::PoolWithFailover::Entry & entry_,47const std::string & query_str)48: entry(entry_)49, query{entry->query(query_str)}50, result{query.use()}51{
52}
53
54/// Used in MaterializedMySQL and in doInvalidateQuery for dictionary source.
55MySQLSource::MySQLSource(56const mysqlxx::PoolWithFailover::Entry & entry,57const std::string & query_str,58const Block & sample_block,59const StreamSettings & settings_)60: ISource(sample_block.cloneEmpty())61, log(getLogger("MySQLSource"))62, connection{std::make_unique<Connection>(entry, query_str)}63, settings{std::make_unique<StreamSettings>(settings_)}64{
65description.init(sample_block);66initPositionMappingFromQueryResultStructure();67}
68
69/// For descendant MySQLWithFailoverSource
70MySQLSource::MySQLSource(const Block &sample_block_, const StreamSettings & settings_)71: ISource(sample_block_.cloneEmpty())72, log(getLogger("MySQLSource"))73, settings(std::make_unique<StreamSettings>(settings_))74{
75description.init(sample_block_);76}
77
78/// Used by MySQL storage / table function and dictionary source.
79MySQLWithFailoverSource::MySQLWithFailoverSource(80mysqlxx::PoolWithFailoverPtr pool_,81const std::string & query_str_,82const Block & sample_block_,83const StreamSettings & settings_)84: MySQLSource(sample_block_, settings_)85, pool(pool_)86, query_str(query_str_)87{
88}
89
90void MySQLWithFailoverSource::onStart()91{
92size_t count_connect_attempts = 0;93
94/// For recovering from "Lost connection to MySQL server during query" errors95while (true)96{97try98{99connection = std::make_unique<Connection>(pool->get(), query_str);100break;101}102catch (const mysqlxx::ConnectionLost & ecl) /// There are two retriable failures: CR_SERVER_GONE_ERROR, CR_SERVER_LOST103{104LOG_WARNING(log, "Failed connection ({}/{}). Trying to reconnect... (Info: {})", count_connect_attempts, settings->default_num_tries_on_connection_loss, ecl.displayText());105
106if (++count_connect_attempts > settings->default_num_tries_on_connection_loss)107{108LOG_ERROR(log, "Failed to create connection to MySQL. ({}/{})", count_connect_attempts, settings->default_num_tries_on_connection_loss);109throw;110}111}112catch (const mysqlxx::BadQuery & e)113{114LOG_ERROR(log, "Error processing query '{}': {}", query_str, e.displayText());115throw;116}117}118
119initPositionMappingFromQueryResultStructure();120}
121
122Chunk MySQLWithFailoverSource::generate()123{
124if (!is_initialized)125{126onStart();127is_initialized = true;128}129
130return MySQLSource::generate();131}
132
133
134namespace
135{
136using ValueType = ExternalResultDescription::ValueType;137
138void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size, enum enum_field_types mysql_type)139{140switch (type)141{142case ValueType::vtUInt8:143assert_cast<ColumnUInt8 &>(column).insertValue(value.getUInt());144read_bytes_size += 1;145break;146case ValueType::vtUInt16:147assert_cast<ColumnUInt16 &>(column).insertValue(value.getUInt());148read_bytes_size += 2;149break;150case ValueType::vtUInt32:151assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value.getUInt()));152read_bytes_size += 4;153break;154case ValueType::vtUInt64:155{156if (mysql_type == enum_field_types::MYSQL_TYPE_BIT)157{158size_t n = value.size();159UInt64 val = 0UL;160ReadBufferFromMemory payload(const_cast<char *>(value.data()), n);161MySQLReplication::readBigEndianStrict(payload, reinterpret_cast<char *>(&val), n);162assert_cast<ColumnUInt64 &>(column).insertValue(val);163read_bytes_size += n;164}165else166{167assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt());168read_bytes_size += 8;169}170break;171}172case ValueType::vtInt8:173assert_cast<ColumnInt8 &>(column).insertValue(value.getInt());174read_bytes_size += 1;175break;176case ValueType::vtInt16:177assert_cast<ColumnInt16 &>(column).insertValue(value.getInt());178read_bytes_size += 2;179break;180case ValueType::vtInt32:181assert_cast<ColumnInt32 &>(column).insertValue(static_cast<Int32>(value.getInt()));182read_bytes_size += 4;183break;184case ValueType::vtInt64:185{186if (mysql_type == enum_field_types::MYSQL_TYPE_TIME)187{188String time_str(value.data(), value.size());189bool negative = time_str.starts_with("-");190if (negative) time_str = time_str.substr(1);191std::vector<String> hhmmss;192boost::split(hhmmss, time_str, [](char c) { return c == ':'; });193Int64 v = 0;194
195if (hhmmss.size() == 3)196v = static_cast<Int64>((std::stoi(hhmmss[0]) * 3600 + std::stoi(hhmmss[1]) * 60 + std::stold(hhmmss[2])) * 1000000);197else198throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported value format");199
200if (negative) v = -v;201assert_cast<ColumnInt64 &>(column).insertValue(v);202read_bytes_size += value.size();203}204else205{206assert_cast<ColumnInt64 &>(column).insertValue(value.getInt());207read_bytes_size += 8;208}209break;210}211case ValueType::vtFloat32:212assert_cast<ColumnFloat32 &>(column).insertValue(static_cast<Float32>(value.getDouble()));213read_bytes_size += 4;214break;215case ValueType::vtFloat64:216assert_cast<ColumnFloat64 &>(column).insertValue(value.getDouble());217read_bytes_size += 8;218break;219case ValueType::vtEnum8:220assert_cast<ColumnInt8 &>(column).insertValue(assert_cast<const DataTypeEnum<Int8> &>(data_type).castToValue(value.data()).get<Int8>());221read_bytes_size += assert_cast<ColumnInt8 &>(column).byteSize();222break;223case ValueType::vtEnum16:224assert_cast<ColumnInt16 &>(column).insertValue(assert_cast<const DataTypeEnum<Int16> &>(data_type).castToValue(value.data()).get<Int16>());225read_bytes_size += assert_cast<ColumnInt16 &>(column).byteSize();226break;227case ValueType::vtString:228assert_cast<ColumnString &>(column).insertData(value.data(), value.size());229read_bytes_size += assert_cast<ColumnString &>(column).byteSize();230break;231case ValueType::vtDate:232assert_cast<ColumnUInt16 &>(column).insertValue(UInt16(value.getDate().getDayNum()));233read_bytes_size += 2;234break;235case ValueType::vtDate32:236assert_cast<ColumnInt32 &>(column).insertValue(Int32(value.getDate().getExtenedDayNum()));237read_bytes_size += 4;238break;239case ValueType::vtDateTime:240{241ReadBufferFromString in(value);242time_t time = 0;243readDateTimeText(time, in, assert_cast<const DataTypeDateTime &>(data_type).getTimeZone());244if (time < 0)245time = 0;246assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(time));247read_bytes_size += 4;248break;249}250case ValueType::vtUUID:251assert_cast<ColumnUUID &>(column).insert(parse<UUID>(value.data(), value.size()));252read_bytes_size += assert_cast<ColumnUUID &>(column).byteSize();253break;254case ValueType::vtDateTime64:[[fallthrough]];255case ValueType::vtDecimal32: [[fallthrough]];256case ValueType::vtDecimal64: [[fallthrough]];257case ValueType::vtDecimal128:[[fallthrough]];258case ValueType::vtDecimal256:259{260ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);261data_type.getDefaultSerialization()->deserializeWholeText(column, buffer, FormatSettings{});262read_bytes_size += column.sizeOfValueIfFixed();263break;264}265case ValueType::vtFixedString:266assert_cast<ColumnFixedString &>(column).insertData(value.data(), value.size());267read_bytes_size += column.sizeOfValueIfFixed();268break;269case ValueType::vtPoint:270{271/// The value is 25 bytes:272/// 4 bytes for integer SRID (0)273/// 1 byte for integer byte order (1 = little-endian)274/// 4 bytes for integer type information (1 = Point)275/// 8 bytes for double-precision X coordinate276/// 8 bytes for double-precision Y coordinate277ReadBufferFromMemory payload(value.data(), value.size());278String val;279payload.ignore(4);280
281UInt8 endian;282readBinary(endian, payload);283
284Int32 point_type;285readBinary(point_type, payload);286if (point_type != 1)287throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only Point data type is supported");288
289Float64 x, y;290if (endian == 1)291{292readBinaryLittleEndian(x, payload);293readBinaryLittleEndian(y, payload);294}295else296{297readBinaryBigEndian(x, payload);298readBinaryBigEndian(y, payload);299}300
301assert_cast<ColumnTuple &>(column).insert(Tuple({Field(x), Field(y)}));302read_bytes_size += value.size();303break;304}305default:306throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported value type");307}308}309
310void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }311}
312
313
314Chunk MySQLSource::generate()315{
316auto row = connection->result.fetch();317if (!row)318{319if (settings->auto_close)320connection->entry.disconnect();321
322return {};323}324
325MutableColumns columns(description.sample_block.columns());326for (const auto i : collections::range(0, columns.size()))327columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();328
329size_t num_rows = 0;330size_t read_bytes_size = 0;331
332while (row)333{334for (size_t index = 0; index < position_mapping.size(); ++index)335{336const auto value = row[position_mapping[index]];337const auto & sample = description.sample_block.getByPosition(index);338
339bool is_type_nullable = description.types[index].second;340
341if (!value.isNull())342{343if (is_type_nullable)344{345ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]);346const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);347insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index]));348column_nullable.getNullMapData().emplace_back(false);349}350else351{352insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index]));353}354}355else356{357insertDefaultValue(*columns[index], *sample.column);358
359if (is_type_nullable)360{361ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]);362column_nullable.getNullMapData().back() = true;363}364}365}366
367++num_rows;368if (num_rows == settings->max_read_mysql_row_nums || (settings->max_read_mysql_bytes_size && read_bytes_size >= settings->max_read_mysql_bytes_size))369break;370
371row = connection->result.fetch();372}373return Chunk(std::move(columns), num_rows);374}
375
376void MySQLSource::initPositionMappingFromQueryResultStructure()377{
378position_mapping.resize(description.sample_block.columns());379
380if (!settings->fetch_by_name)381{382if (description.sample_block.columns() != connection->result.getNumFields())383throw Exception(384ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH,385"mysqlxx::UseQueryResult contains {} columns while {} expected",386connection->result.getNumFields(),387description.sample_block.columns());388
389for (const auto idx : collections::range(0, connection->result.getNumFields()))390position_mapping[idx] = idx;391}392else393{394const auto & sample_names = description.sample_block.getNames();395std::unordered_set<std::string> missing_names(sample_names.begin(), sample_names.end());396
397size_t fields_size = connection->result.getNumFields();398
399for (const size_t & idx : collections::range(0, fields_size))400{401const auto & field_name = connection->result.getFieldName(idx);402if (description.sample_block.has(field_name))403{404const auto & position = description.sample_block.getPositionByName(field_name);405position_mapping[position] = idx;406missing_names.erase(field_name);407}408}409
410if (!missing_names.empty())411throw Exception(412ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH,413"mysqlxx::UseQueryResult must contain columns: {}",414fmt::join(missing_names, ", "));415}416}
417
418}
419
420#endif421