ClickHouse
541 строка · 23.2 Кб
1#include "MongoDBSource.h"2
3#include <string>4#include <vector>5
6#include <Poco/MongoDB/Array.h>7#include <Poco/MongoDB/Database.h>8#include <Poco/MongoDB/Connection.h>9#include <Poco/MongoDB/Cursor.h>10#include <Poco/MongoDB/OpMsgCursor.h>11#include <Poco/MongoDB/ObjectId.h>12
13#include <Columns/ColumnArray.h>14#include <Columns/ColumnNullable.h>15#include <Columns/ColumnString.h>16#include <Columns/ColumnsNumber.h>17#include <IO/ReadHelpers.h>18#include <Common/assert_cast.h>19#include <Common/quoteString.h>20#include <base/range.h>21#include <Poco/URI.h>22
23#include <DataTypes/DataTypeArray.h>24#include <DataTypes/DataTypeNullable.h>25
26// only after poco
27// naming conflict:
28// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value);
29// src/IO/WriteHelpers.h:146 #define writeCString(s, buf)
30#include <IO/WriteHelpers.h>31
32namespace DB33{
34
35namespace ErrorCodes36{
37extern const int TYPE_MISMATCH;38extern const int UNKNOWN_TYPE;39extern const int MONGODB_ERROR;40extern const int BAD_ARGUMENTS;41}
42
43namespace
44{
45using ValueType = ExternalResultDescription::ValueType;46using ObjectId = Poco::MongoDB::ObjectId;47using MongoArray = Poco::MongoDB::Array;48
49
50template <typename T>51Field getNumber(const Poco::MongoDB::Element & value, const std::string & name)52{53switch (value.type())54{55case Poco::MongoDB::ElementTraits<Int32>::TypeId:56return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Int32> &>(value).value());57case Poco::MongoDB::ElementTraits<Poco::Int64>::TypeId:58return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Int64> &>(value).value());59case Poco::MongoDB::ElementTraits<Float64>::TypeId:60return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Float64> &>(value).value());61case Poco::MongoDB::ElementTraits<bool>::TypeId:62return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<bool> &>(value).value());63case Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId:64return Field();65case Poco::MongoDB::ElementTraits<String>::TypeId:66return parse<T>(static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value());67default:68throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected a number, got type id = {} for column {}",69toString(value.type()), name);70}71}72
73void prepareMongoDBArrayInfo(74std::unordered_map<size_t, MongoDBArrayInfo> & array_info, size_t column_idx, const DataTypePtr data_type)75{76const auto * array_type = assert_cast<const DataTypeArray *>(data_type.get());77auto nested = array_type->getNestedType();78
79size_t count_dimensions = 1;80while (isArray(nested))81{82++count_dimensions;83nested = assert_cast<const DataTypeArray *>(nested.get())->getNestedType();84}85
86Field default_value = nested->getDefault();87if (nested->isNullable())88nested = assert_cast<const DataTypeNullable *>(nested.get())->getNestedType();89
90WhichDataType which(nested);91std::function<Field(const Poco::MongoDB::Element & value, const std::string & name)> parser;92
93if (which.isUInt8())94parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt8>(value, name); };95else if (which.isUInt16())96parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt16>(value, name); };97else if (which.isUInt32())98parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt32>(value, name); };99else if (which.isUInt64())100parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt64>(value, name); };101else if (which.isInt8())102parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int8>(value, name); };103else if (which.isInt16())104parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int16>(value, name); };105else if (which.isInt32())106parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int32>(value, name); };107else if (which.isInt64())108parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int64>(value, name); };109else if (which.isFloat32())110parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Float32>(value, name); };111else if (which.isFloat64())112parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Float64>(value, name); };113else if (which.isString() || which.isFixedString())114parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field115{116if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId)117{118String string_id = value.toString();119return Field(string_id.data(), string_id.size());120}121else if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)122{123String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();124return Field(string.data(), string.size());125}126
127throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String, got type id = {} for column {}",128toString(value.type()), name);129};130else if (which.isDate())131parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field132{133if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)134throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",135toString(value.type()), name);136
137return static_cast<UInt16>(DateLUT::instance().toDayNum(138static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime()));139};140else if (which.isDateTime())141parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field142{143if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)144throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",145toString(value.type()), name);146
147return static_cast<UInt32>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime());148};149else if (which.isUUID())150parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field151{152if (value.type() != Poco::MongoDB::ElementTraits<String>::TypeId)153throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String (UUID), got type id = {} for column {}",154toString(value.type()), name);155
156String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();157return parse<UUID>(string);158};159else160throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName());161
162array_info[column_idx] = {count_dimensions, default_value, parser};163}164
165template <typename T>166void insertNumber(IColumn & column, const Poco::MongoDB::Element & value, const std::string & name)167{168switch (value.type())169{170case Poco::MongoDB::ElementTraits<Int32>::TypeId:171assert_cast<ColumnVector<T> &>(column).getData().push_back(172static_cast<const Poco::MongoDB::ConcreteElement<Int32> &>(value).value());173break;174case Poco::MongoDB::ElementTraits<Poco::Int64>::TypeId:175assert_cast<ColumnVector<T> &>(column).getData().push_back(176static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Int64> &>(value).value()));177break;178case Poco::MongoDB::ElementTraits<Float64>::TypeId:179assert_cast<ColumnVector<T> &>(column).getData().push_back(static_cast<T>(180static_cast<const Poco::MongoDB::ConcreteElement<Float64> &>(value).value()));181break;182case Poco::MongoDB::ElementTraits<bool>::TypeId:183assert_cast<ColumnVector<T> &>(column).getData().push_back(184static_cast<const Poco::MongoDB::ConcreteElement<bool> &>(value).value());185break;186case Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId:187assert_cast<ColumnVector<T> &>(column).getData().emplace_back();188break;189case Poco::MongoDB::ElementTraits<String>::TypeId:190assert_cast<ColumnVector<T> &>(column).getData().push_back(191parse<T>(static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value()));192break;193default:194throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected a number, got type id = {} for column {}",195toString(value.type()), name);196}197}198
199void insertValue(200IColumn & column,201const ValueType type,202const Poco::MongoDB::Element & value,203const std::string & name,204std::unordered_map<size_t, MongoDBArrayInfo> & array_info,205size_t idx)206{207switch (type)208{209case ValueType::vtUInt8:210insertNumber<UInt8>(column, value, name);211break;212case ValueType::vtUInt16:213insertNumber<UInt16>(column, value, name);214break;215case ValueType::vtUInt32:216insertNumber<UInt32>(column, value, name);217break;218case ValueType::vtUInt64:219insertNumber<UInt64>(column, value, name);220break;221case ValueType::vtInt8:222insertNumber<Int8>(column, value, name);223break;224case ValueType::vtInt16:225insertNumber<Int16>(column, value, name);226break;227case ValueType::vtInt32:228insertNumber<Int32>(column, value, name);229break;230case ValueType::vtInt64:231insertNumber<Int64>(column, value, name);232break;233case ValueType::vtFloat32:234insertNumber<Float32>(column, value, name);235break;236case ValueType::vtFloat64:237insertNumber<Float64>(column, value, name);238break;239
240case ValueType::vtEnum8:241case ValueType::vtEnum16:242case ValueType::vtString:243{244if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId)245{246std::string string_id = value.toString();247assert_cast<ColumnString &>(column).insertData(string_id.data(), string_id.size());248break;249}250else if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)251{252String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();253assert_cast<ColumnString &>(column).insertData(string.data(), string.size());254break;255}256
257throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String, got type id = {} for column {}",258toString(value.type()), name);259}260
261case ValueType::vtDate:262{263if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)264throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",265toString(value.type()), name);266
267assert_cast<ColumnUInt16 &>(column).getData().push_back(static_cast<UInt16>(DateLUT::instance().toDayNum(268static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime())));269break;270}271
272case ValueType::vtDateTime:273{274if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)275throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",276toString(value.type()), name);277
278assert_cast<ColumnUInt32 &>(column).getData().push_back(279static_cast<UInt32>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime()));280break;281}282case ValueType::vtUUID:283{284if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)285{286String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();287assert_cast<ColumnUUID &>(column).getData().push_back(parse<UUID>(string));288}289else290throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String (UUID), got type id = {} for column {}",291toString(value.type()), name);292break;293}294case ValueType::vtArray:295{296if (value.type() != Poco::MongoDB::ElementTraits<MongoArray::Ptr>::TypeId)297throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Array, got type id = {} for column {}",298toString(value.type()), name);299
300size_t expected_dimensions = array_info[idx].num_dimensions;301const auto parse_value = array_info[idx].parser;302std::vector<Row> dimensions(expected_dimensions + 1);303
304auto array = static_cast<const Poco::MongoDB::ConcreteElement<MongoArray::Ptr> &>(value).value();305
306std::vector<std::pair<const Poco::MongoDB::Element *, size_t>> arrays;307arrays.emplace_back(&value, 0);308
309while (!arrays.empty())310{311size_t dimension_idx = arrays.size() - 1;312
313if (dimension_idx + 1 > expected_dimensions)314throw Exception(ErrorCodes::BAD_ARGUMENTS, "Got more dimensions than expected");315
316auto [parent_ptr, child_idx] = arrays.back();317auto parent = static_cast<const Poco::MongoDB::ConcreteElement<MongoArray::Ptr> &>(*parent_ptr).value();318
319if (child_idx >= parent->size())320{321arrays.pop_back();322
323if (dimension_idx == 0)324break;325
326dimensions[dimension_idx].emplace_back(Array(dimensions[dimension_idx + 1].begin(), dimensions[dimension_idx + 1].end()));327dimensions[dimension_idx + 1].clear();328
329continue;330}331
332Poco::MongoDB::Element::Ptr child = parent->get(static_cast<int>(child_idx));333arrays.back().second += 1;334
335if (child->type() == Poco::MongoDB::ElementTraits<MongoArray::Ptr>::TypeId)336{337arrays.emplace_back(child.get(), 0);338}339else if (child->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId)340{341if (dimension_idx + 1 == expected_dimensions)342dimensions[dimension_idx + 1].emplace_back(array_info[idx].default_value);343else344dimensions[dimension_idx + 1].emplace_back(Array());345}346else if (dimension_idx + 1 == expected_dimensions)347{348dimensions[dimension_idx + 1].emplace_back(parse_value(*child, name));349}350else351{352throw Exception(ErrorCodes::BAD_ARGUMENTS,353"Got less dimensions than expected. ({} instead of {})", dimension_idx + 1, expected_dimensions);354}355}356
357assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));358break;359
360}361default:362throw Exception(ErrorCodes::UNKNOWN_TYPE, "Value of unsupported type: {}", column.getName());363}364}365
366void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }367}
368
369
370bool isMongoDBWireProtocolOld(Poco::MongoDB::Connection & connection_)371{
372Poco::MongoDB::Database db("config");373Poco::MongoDB::Document::Ptr doc = db.queryServerHello(connection_, false);374
375if (doc->exists("maxWireVersion"))376{377auto wire_version = doc->getInteger("maxWireVersion");378return wire_version < Poco::MongoDB::Database::WireVersion::VER_36;379}380
381doc = db.queryServerHello(connection_, true);382if (doc->exists("maxWireVersion"))383{384auto wire_version = doc->getInteger("maxWireVersion");385return wire_version < Poco::MongoDB::Database::WireVersion::VER_36;386}387
388return true;389}
390
391
392MongoDBCursor::MongoDBCursor(393const std::string & database,394const std::string & collection,395const Block & sample_block_to_select,396const Poco::MongoDB::Document & query,397Poco::MongoDB::Connection & connection)398: is_wire_protocol_old(isMongoDBWireProtocolOld(connection))399{
400Poco::MongoDB::Document projection;401
402/// Looks like selecting _id column is implicit by default.403if (!sample_block_to_select.has("_id"))404projection.add("_id", 0);405
406for (const auto & column : sample_block_to_select)407projection.add(column.name, 1);408
409if (is_wire_protocol_old)410{411old_cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);412old_cursor->query().selector() = query;413old_cursor->query().returnFieldSelector() = projection;414}415else416{417new_cursor = std::make_unique<Poco::MongoDB::OpMsgCursor>(database, collection);418new_cursor->query().setCommandName(Poco::MongoDB::OpMsgMessage::CMD_FIND);419new_cursor->query().body().addNewDocument("filter") = query;420new_cursor->query().body().addNewDocument("projection") = projection;421}422}
423
424Poco::MongoDB::Document::Vector MongoDBCursor::nextDocuments(Poco::MongoDB::Connection & connection)425{
426if (is_wire_protocol_old)427{428auto response = old_cursor->next(connection);429cursor_id = response.cursorID();430return std::move(response.documents());431}432else433{434auto response = new_cursor->next(connection);435cursor_id = new_cursor->cursorID();436return std::move(response.documents());437}438}
439
440Int64 MongoDBCursor::cursorID() const441{
442return cursor_id;443}
444
445
446MongoDBSource::MongoDBSource(447std::shared_ptr<Poco::MongoDB::Connection> & connection_,448const String & database_name_,449const String & collection_name_,450const Poco::MongoDB::Document & query_,451const Block & sample_block,452UInt64 max_block_size_)453: ISource(sample_block.cloneEmpty())454, connection(connection_)455, cursor(database_name_, collection_name_, sample_block, query_, *connection_)456, max_block_size{max_block_size_}457{
458description.init(sample_block);459
460for (const auto idx : collections::range(0, description.sample_block.columns()))461if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)462prepareMongoDBArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type);463}
464
465
466MongoDBSource::~MongoDBSource() = default;467
468Chunk MongoDBSource::generate()469{
470if (all_read)471return {};472
473MutableColumns columns(description.sample_block.columns());474const size_t size = columns.size();475
476for (const auto i : collections::range(0, size))477columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();478
479size_t num_rows = 0;480while (num_rows < max_block_size)481{482auto documents = cursor.nextDocuments(*connection);483
484for (auto & document : documents)485{486if (document->exists("ok") && document->exists("$err")487&& document->exists("code") && document->getInteger("ok") == 0)488{489auto code = document->getInteger("code");490const Poco::MongoDB::Element::Ptr value = document->get("$err");491auto message = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(*value).value();492throw Exception(ErrorCodes::MONGODB_ERROR, "Got error from MongoDB: {}, code: {}", message, code);493}494++num_rows;495
496for (const auto idx : collections::range(0, size))497{498const auto & name = description.sample_block.getByPosition(idx).name;499
500bool exists_in_current_document = document->exists(name);501if (!exists_in_current_document)502{503insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);504continue;505}506
507const Poco::MongoDB::Element::Ptr value = document->get(name);508
509if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId)510{511insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);512}513else514{515bool is_nullable = description.types[idx].second;516if (is_nullable)517{518ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);519insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name, array_info, idx);520column_nullable.getNullMapData().emplace_back(0);521}522else523insertValue(*columns[idx], description.types[idx].first, *value, name, array_info, idx);524}525}526}527
528if (cursor.cursorID() == 0)529{530all_read = true;531break;532}533}534
535if (num_rows == 0)536return {};537
538return Chunk(std::move(columns), num_rows);539}
540
541}
542