ClickHouse
269 строк · 11.7 Кб
1#include "insertPostgreSQLValue.h"2
3#if USE_LIBPQXX4#include <Columns/ColumnNullable.h>5#include <Columns/ColumnString.h>6#include <Columns/ColumnArray.h>7#include <Columns/ColumnsNumber.h>8#include <Columns/ColumnDecimal.h>9#include <DataTypes/IDataType.h>10#include <DataTypes/DataTypeNullable.h>11#include <DataTypes/DataTypeArray.h>12#include <DataTypes/DataTypeDateTime64.h>13#include <DataTypes/DataTypesDecimal.h>14#include <Interpreters/convertFieldToType.h>15#include <IO/ReadHelpers.h>16#include <IO/ReadBufferFromString.h>17#include <Common/assert_cast.h>18#include <pqxx/pqxx>19
20
21namespace DB22{
23
24namespace ErrorCodes25{
26extern const int BAD_ARGUMENTS;27extern const int NOT_IMPLEMENTED;28}
29
30
31void insertDefaultPostgreSQLValue(IColumn & column, const IColumn & sample_column)32{
33column.insertFrom(sample_column, 0);34}
35
36
37void insertPostgreSQLValue(38IColumn & column, std::string_view value,39ExternalResultDescription::ValueType type, DataTypePtr data_type,40const std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx)41{
42switch (type)43{44
45case ExternalResultDescription::ValueType::vtUInt8:46{47if (value == "t")48assert_cast<ColumnUInt8 &>(column).insertValue(1);49else if (value == "f")50assert_cast<ColumnUInt8 &>(column).insertValue(0);51else52assert_cast<ColumnUInt8 &>(column).insertValue(pqxx::from_string<uint16_t>(value));53break;54}55case ExternalResultDescription::ValueType::vtUInt16:56assert_cast<ColumnUInt16 &>(column).insertValue(pqxx::from_string<uint16_t>(value));57break;58case ExternalResultDescription::ValueType::vtUInt32:59assert_cast<ColumnUInt32 &>(column).insertValue(pqxx::from_string<uint32_t>(value));60break;61case ExternalResultDescription::ValueType::vtUInt64:62assert_cast<ColumnUInt64 &>(column).insertValue(pqxx::from_string<uint64_t>(value));63break;64case ExternalResultDescription::ValueType::vtInt8:65assert_cast<ColumnInt8 &>(column).insertValue(pqxx::from_string<int16_t>(value));66break;67case ExternalResultDescription::ValueType::vtInt16:68assert_cast<ColumnInt16 &>(column).insertValue(pqxx::from_string<int16_t>(value));69break;70case ExternalResultDescription::ValueType::vtInt32:71assert_cast<ColumnInt32 &>(column).insertValue(pqxx::from_string<int32_t>(value));72break;73case ExternalResultDescription::ValueType::vtInt64:74assert_cast<ColumnInt64 &>(column).insertValue(pqxx::from_string<int64_t>(value));75break;76case ExternalResultDescription::ValueType::vtFloat32:77assert_cast<ColumnFloat32 &>(column).insertValue(pqxx::from_string<float>(value));78break;79case ExternalResultDescription::ValueType::vtFloat64:80assert_cast<ColumnFloat64 &>(column).insertValue(pqxx::from_string<double>(value));81break;82case ExternalResultDescription::ValueType::vtEnum8:83case ExternalResultDescription::ValueType::vtEnum16:84case ExternalResultDescription::ValueType::vtFixedString:85case ExternalResultDescription::ValueType::vtString:86assert_cast<ColumnString &>(column).insertData(value.data(), value.size());87break;88case ExternalResultDescription::ValueType::vtUUID:89assert_cast<ColumnUUID &>(column).insertValue(parse<UUID>(value.data(), value.size()));90break;91case ExternalResultDescription::ValueType::vtDate:92assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()});93break;94case ExternalResultDescription::ValueType::vtDate32:95assert_cast<ColumnInt32 &>(column).insertValue(Int32{LocalDate{std::string(value)}.getExtenedDayNum()});96break;97case ExternalResultDescription::ValueType::vtDateTime:98{99ReadBufferFromString in(value);100time_t time = 0;101readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(data_type.get())->getTimeZone());102if (time < 0)103time = 0;104assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(time));105break;106}107case ExternalResultDescription::ValueType::vtDateTime64:108{109ReadBufferFromString in(value);110DateTime64 time = 0;111readDateTime64Text(time, 6, in, assert_cast<const DataTypeDateTime64 *>(data_type.get())->getTimeZone());112assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(time);113break;114}115case ExternalResultDescription::ValueType::vtDecimal32: [[fallthrough]];116case ExternalResultDescription::ValueType::vtDecimal64: [[fallthrough]];117case ExternalResultDescription::ValueType::vtDecimal128: [[fallthrough]];118case ExternalResultDescription::ValueType::vtDecimal256:119{120ReadBufferFromString istr(value);121data_type->getDefaultSerialization()->deserializeWholeText(column, istr, FormatSettings{});122break;123}124case ExternalResultDescription::ValueType::vtArray:125{126pqxx::array_parser parser{value};127std::pair<pqxx::array_parser::juncture, std::string> parsed = parser.get_next();128
129size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info.at(idx).num_dimensions;130const auto parse_value = array_info.at(idx).pqxx_parser;131std::vector<Row> dimensions(expected_dimensions + 1);132
133while (parsed.first != pqxx::array_parser::juncture::done)134{135if ((parsed.first == pqxx::array_parser::juncture::row_start) && (++dimension > expected_dimensions))136throw Exception(ErrorCodes::BAD_ARGUMENTS, "Got more dimensions than expected");137
138else if (parsed.first == pqxx::array_parser::juncture::string_value)139dimensions[dimension].emplace_back(parse_value(parsed.second));140
141else if (parsed.first == pqxx::array_parser::juncture::null_value)142dimensions[dimension].emplace_back(array_info.at(idx).default_value);143
144else if (parsed.first == pqxx::array_parser::juncture::row_end)145{146max_dimension = std::max(max_dimension, dimension);147
148--dimension;149if (dimension == 0)150break;151
152dimensions[dimension].emplace_back(Array(dimensions[dimension + 1].begin(), dimensions[dimension + 1].end()));153dimensions[dimension + 1].clear();154}155
156parsed = parser.get_next();157}158
159if (max_dimension < expected_dimensions)160throw Exception(ErrorCodes::BAD_ARGUMENTS,161"Got less dimensions than expected. ({} instead of {})", max_dimension, expected_dimensions);162
163assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));164break;165}166default:167throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported value type");168}169}
170
171
172void preparePostgreSQLArrayInfo(173std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t column_idx, DataTypePtr data_type)174{
175const auto * array_type = typeid_cast<const DataTypeArray *>(data_type.get());176auto nested = array_type->getNestedType();177
178size_t count_dimensions = 1;179while (isArray(nested))180{181++count_dimensions;182nested = typeid_cast<const DataTypeArray *>(nested.get())->getNestedType();183}184
185Field default_value = nested->getDefault();186if (nested->isNullable())187nested = static_cast<const DataTypeNullable *>(nested.get())->getNestedType();188
189WhichDataType which(nested);190std::function<Field(std::string & fields)> parser;191
192if (which.isUInt8() || which.isUInt16())193parser = [](std::string & field) -> Field { return pqxx::from_string<uint16_t>(field); };194else if (which.isInt8() || which.isInt16())195parser = [](std::string & field) -> Field { return pqxx::from_string<int16_t>(field); };196else if (which.isUInt32())197parser = [](std::string & field) -> Field { return pqxx::from_string<uint32_t>(field); };198else if (which.isInt32())199parser = [](std::string & field) -> Field { return pqxx::from_string<int32_t>(field); };200else if (which.isUInt64())201parser = [](std::string & field) -> Field { return pqxx::from_string<uint64_t>(field); };202else if (which.isInt64())203parser = [](std::string & field) -> Field { return pqxx::from_string<int64_t>(field); };204else if (which.isFloat32())205parser = [](std::string & field) -> Field { return pqxx::from_string<float>(field); };206else if (which.isFloat64())207parser = [](std::string & field) -> Field { return pqxx::from_string<double>(field); };208else if (which.isUUID())209parser = [](std::string & field) -> Field { return parse<UUID>(field); };210else if (which.isString() || which.isFixedString())211parser = [](std::string & field) -> Field { return field; };212else if (which.isDate())213parser = [](std::string & field) -> Field { return UInt16{LocalDate{field}.getDayNum()}; };214else if (which.isDateTime())215parser = [nested](std::string & field) -> Field216{217ReadBufferFromString in(field);218time_t time = 0;219readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(nested.get())->getTimeZone());220if (time < 0)221time = 0;222return time;223};224else if (which.isDateTime64())225parser = [nested](std::string & field) -> Field226{227ReadBufferFromString in(field);228DateTime64 time = 0;229readDateTime64Text(time, 6, in, assert_cast<const DataTypeDateTime64 *>(nested.get())->getTimeZone());230if (time < 0)231time = 0;232return time;233};234else if (which.isDecimal32())235parser = [nested](std::string & field) -> Field236{237const auto & type = typeid_cast<const DataTypeDecimal<Decimal32> *>(nested.get());238DataTypeDecimal<Decimal32> res(getDecimalPrecision(*type), getDecimalScale(*type));239return convertFieldToType(field, res);240};241else if (which.isDecimal64())242parser = [nested](std::string & field) -> Field243{244const auto & type = typeid_cast<const DataTypeDecimal<Decimal64> *>(nested.get());245DataTypeDecimal<Decimal64> res(getDecimalPrecision(*type), getDecimalScale(*type));246return convertFieldToType(field, res);247};248else if (which.isDecimal128())249parser = [nested](std::string & field) -> Field250{251const auto & type = typeid_cast<const DataTypeDecimal<Decimal128> *>(nested.get());252DataTypeDecimal<Decimal128> res(getDecimalPrecision(*type), getDecimalScale(*type));253return convertFieldToType(field, res);254};255else if (which.isDecimal256())256parser = [nested](std::string & field) -> Field257{258const auto & type = typeid_cast<const DataTypeDecimal<Decimal256> *>(nested.get());259DataTypeDecimal<Decimal256> res(getDecimalPrecision(*type), getDecimalScale(*type));260return convertFieldToType(field, res);261};262else263throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName());264
265array_info[column_idx] = {count_dimensions, default_value, parser};266}
267}
268
269#endif270