ClickHouse

Форк
0
/
MongoDBSource.cpp 
541 строка · 23.2 Кб
1
#include "MongoDBSource.h"
2

3
#include <string>
4
#include <vector>
5

6
#include <Poco/MongoDB/Array.h>
7
#include <Poco/MongoDB/Database.h>
8
#include <Poco/MongoDB/Connection.h>
9
#include <Poco/MongoDB/Cursor.h>
10
#include <Poco/MongoDB/OpMsgCursor.h>
11
#include <Poco/MongoDB/ObjectId.h>
12

13
#include <Columns/ColumnArray.h>
14
#include <Columns/ColumnNullable.h>
15
#include <Columns/ColumnString.h>
16
#include <Columns/ColumnsNumber.h>
17
#include <IO/ReadHelpers.h>
18
#include <Common/assert_cast.h>
19
#include <Common/quoteString.h>
20
#include <base/range.h>
21
#include <Poco/URI.h>
22

23
#include <DataTypes/DataTypeArray.h>
24
#include <DataTypes/DataTypeNullable.h>
25

26
// only after poco
27
// naming conflict:
28
// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value);
29
// src/IO/WriteHelpers.h:146 #define writeCString(s, buf)
30
#include <IO/WriteHelpers.h>
31

32
namespace DB
33
{
34

35
namespace ErrorCodes
36
{
37
    extern const int TYPE_MISMATCH;
38
    extern const int UNKNOWN_TYPE;
39
    extern const int MONGODB_ERROR;
40
    extern const int BAD_ARGUMENTS;
41
}
42

43
namespace
44
{
45
    using ValueType = ExternalResultDescription::ValueType;
46
    using ObjectId = Poco::MongoDB::ObjectId;
47
    using MongoArray = Poco::MongoDB::Array;
48

49

50
    template <typename T>
51
    Field getNumber(const Poco::MongoDB::Element & value, const std::string & name)
52
    {
53
        switch (value.type())
54
        {
55
            case Poco::MongoDB::ElementTraits<Int32>::TypeId:
56
                return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Int32> &>(value).value());
57
            case Poco::MongoDB::ElementTraits<Poco::Int64>::TypeId:
58
                return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Int64> &>(value).value());
59
            case Poco::MongoDB::ElementTraits<Float64>::TypeId:
60
                return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Float64> &>(value).value());
61
            case Poco::MongoDB::ElementTraits<bool>::TypeId:
62
                return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<bool> &>(value).value());
63
            case Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId:
64
                return Field();
65
            case Poco::MongoDB::ElementTraits<String>::TypeId:
66
                return parse<T>(static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value());
67
            default:
68
                throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected a number, got type id = {} for column {}",
69
                    toString(value.type()), name);
70
        }
71
    }
72

73
    void prepareMongoDBArrayInfo(
74
        std::unordered_map<size_t, MongoDBArrayInfo> & array_info, size_t column_idx, const DataTypePtr data_type)
75
    {
76
        const auto * array_type = assert_cast<const DataTypeArray *>(data_type.get());
77
        auto nested = array_type->getNestedType();
78

79
        size_t count_dimensions = 1;
80
        while (isArray(nested))
81
        {
82
            ++count_dimensions;
83
            nested = assert_cast<const DataTypeArray *>(nested.get())->getNestedType();
84
        }
85

86
        Field default_value = nested->getDefault();
87
        if (nested->isNullable())
88
            nested = assert_cast<const DataTypeNullable *>(nested.get())->getNestedType();
89

90
        WhichDataType which(nested);
91
        std::function<Field(const Poco::MongoDB::Element & value, const std::string & name)> parser;
92

93
        if (which.isUInt8())
94
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt8>(value, name); };
95
        else if (which.isUInt16())
96
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt16>(value, name); };
97
        else if (which.isUInt32())
98
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt32>(value, name); };
99
        else if (which.isUInt64())
100
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt64>(value, name); };
101
        else if (which.isInt8())
102
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int8>(value, name); };
103
        else if (which.isInt16())
104
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int16>(value, name); };
105
        else if (which.isInt32())
106
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int32>(value, name); };
107
        else if (which.isInt64())
108
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int64>(value, name); };
109
        else if (which.isFloat32())
110
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Float32>(value, name); };
111
        else if (which.isFloat64())
112
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Float64>(value, name); };
113
        else if (which.isString() || which.isFixedString())
114
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field
115
            {
116
                if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId)
117
                {
118
                    String string_id = value.toString();
119
                    return Field(string_id.data(), string_id.size());
120
                }
121
                else if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)
122
                {
123
                    String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();
124
                    return Field(string.data(), string.size());
125
                }
126

127
                throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String, got type id = {} for column {}",
128
                                toString(value.type()), name);
129
            };
130
        else if (which.isDate())
131
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field
132
            {
133
                if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)
134
                    throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",
135
                                    toString(value.type()), name);
136

137
                return static_cast<UInt16>(DateLUT::instance().toDayNum(
138
                    static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime()));
139
            };
140
        else if (which.isDateTime())
141
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field
142
            {
143
                if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)
144
                    throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",
145
                                    toString(value.type()), name);
146

147
                return static_cast<UInt32>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime());
148
            };
149
        else if (which.isUUID())
150
            parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field
151
            {
152
                if (value.type() != Poco::MongoDB::ElementTraits<String>::TypeId)
153
                    throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String (UUID), got type id = {} for column {}",
154
                                        toString(value.type()), name);
155

156
                String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();
157
                return parse<UUID>(string);
158
            };
159
        else
160
            throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName());
161

162
        array_info[column_idx] = {count_dimensions, default_value, parser};
163
    }
164

165
    template <typename T>
166
    void insertNumber(IColumn & column, const Poco::MongoDB::Element & value, const std::string & name)
167
    {
168
        switch (value.type())
169
        {
170
            case Poco::MongoDB::ElementTraits<Int32>::TypeId:
171
                assert_cast<ColumnVector<T> &>(column).getData().push_back(
172
                    static_cast<const Poco::MongoDB::ConcreteElement<Int32> &>(value).value());
173
                break;
174
            case Poco::MongoDB::ElementTraits<Poco::Int64>::TypeId:
175
                assert_cast<ColumnVector<T> &>(column).getData().push_back(
176
                    static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Int64> &>(value).value()));
177
                break;
178
            case Poco::MongoDB::ElementTraits<Float64>::TypeId:
179
                assert_cast<ColumnVector<T> &>(column).getData().push_back(static_cast<T>(
180
                    static_cast<const Poco::MongoDB::ConcreteElement<Float64> &>(value).value()));
181
                break;
182
            case Poco::MongoDB::ElementTraits<bool>::TypeId:
183
                assert_cast<ColumnVector<T> &>(column).getData().push_back(
184
                    static_cast<const Poco::MongoDB::ConcreteElement<bool> &>(value).value());
185
                break;
186
            case Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId:
187
                assert_cast<ColumnVector<T> &>(column).getData().emplace_back();
188
                break;
189
            case Poco::MongoDB::ElementTraits<String>::TypeId:
190
                assert_cast<ColumnVector<T> &>(column).getData().push_back(
191
                    parse<T>(static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value()));
192
                break;
193
            default:
194
                throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected a number, got type id = {} for column {}",
195
                    toString(value.type()), name);
196
        }
197
    }
198

199
    void insertValue(
200
        IColumn & column,
201
        const ValueType type,
202
        const Poco::MongoDB::Element & value,
203
        const std::string & name,
204
        std::unordered_map<size_t, MongoDBArrayInfo> & array_info,
205
        size_t idx)
206
    {
207
        switch (type)
208
        {
209
            case ValueType::vtUInt8:
210
                insertNumber<UInt8>(column, value, name);
211
                break;
212
            case ValueType::vtUInt16:
213
                insertNumber<UInt16>(column, value, name);
214
                break;
215
            case ValueType::vtUInt32:
216
                insertNumber<UInt32>(column, value, name);
217
                break;
218
            case ValueType::vtUInt64:
219
                insertNumber<UInt64>(column, value, name);
220
                break;
221
            case ValueType::vtInt8:
222
                insertNumber<Int8>(column, value, name);
223
                break;
224
            case ValueType::vtInt16:
225
                insertNumber<Int16>(column, value, name);
226
                break;
227
            case ValueType::vtInt32:
228
                insertNumber<Int32>(column, value, name);
229
                break;
230
            case ValueType::vtInt64:
231
                insertNumber<Int64>(column, value, name);
232
                break;
233
            case ValueType::vtFloat32:
234
                insertNumber<Float32>(column, value, name);
235
                break;
236
            case ValueType::vtFloat64:
237
                insertNumber<Float64>(column, value, name);
238
                break;
239

240
            case ValueType::vtEnum8:
241
            case ValueType::vtEnum16:
242
            case ValueType::vtString:
243
            {
244
                if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId)
245
                {
246
                    std::string string_id = value.toString();
247
                    assert_cast<ColumnString &>(column).insertData(string_id.data(), string_id.size());
248
                    break;
249
                }
250
                else if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)
251
                {
252
                    String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();
253
                    assert_cast<ColumnString &>(column).insertData(string.data(), string.size());
254
                    break;
255
                }
256

257
                throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String, got type id = {} for column {}",
258
                                toString(value.type()), name);
259
            }
260

261
            case ValueType::vtDate:
262
            {
263
                if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)
264
                    throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",
265
                                    toString(value.type()), name);
266

267
                assert_cast<ColumnUInt16 &>(column).getData().push_back(static_cast<UInt16>(DateLUT::instance().toDayNum(
268
                    static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime())));
269
                break;
270
            }
271

272
            case ValueType::vtDateTime:
273
            {
274
                if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)
275
                    throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",
276
                                    toString(value.type()), name);
277

278
                assert_cast<ColumnUInt32 &>(column).getData().push_back(
279
                    static_cast<UInt32>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime()));
280
                break;
281
            }
282
            case ValueType::vtUUID:
283
            {
284
                if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)
285
                {
286
                    String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();
287
                    assert_cast<ColumnUUID &>(column).getData().push_back(parse<UUID>(string));
288
                }
289
                else
290
                    throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String (UUID), got type id = {} for column {}",
291
                                        toString(value.type()), name);
292
                break;
293
            }
294
            case ValueType::vtArray:
295
            {
296
                if (value.type() != Poco::MongoDB::ElementTraits<MongoArray::Ptr>::TypeId)
297
                    throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Array, got type id = {} for column {}",
298
                                    toString(value.type()), name);
299

300
                size_t expected_dimensions = array_info[idx].num_dimensions;
301
                const auto parse_value = array_info[idx].parser;
302
                std::vector<Row> dimensions(expected_dimensions + 1);
303

304
                auto array = static_cast<const Poco::MongoDB::ConcreteElement<MongoArray::Ptr> &>(value).value();
305

306
                std::vector<std::pair<const Poco::MongoDB::Element *, size_t>> arrays;
307
                arrays.emplace_back(&value, 0);
308

309
                while (!arrays.empty())
310
                {
311
                    size_t dimension_idx = arrays.size() - 1;
312

313
                    if (dimension_idx + 1 > expected_dimensions)
314
                        throw Exception(ErrorCodes::BAD_ARGUMENTS, "Got more dimensions than expected");
315

316
                    auto [parent_ptr, child_idx] = arrays.back();
317
                    auto parent = static_cast<const Poco::MongoDB::ConcreteElement<MongoArray::Ptr> &>(*parent_ptr).value();
318

319
                    if (child_idx >= parent->size())
320
                    {
321
                        arrays.pop_back();
322

323
                        if (dimension_idx == 0)
324
                            break;
325

326
                        dimensions[dimension_idx].emplace_back(Array(dimensions[dimension_idx + 1].begin(), dimensions[dimension_idx + 1].end()));
327
                        dimensions[dimension_idx + 1].clear();
328

329
                        continue;
330
                    }
331

332
                    Poco::MongoDB::Element::Ptr child = parent->get(static_cast<int>(child_idx));
333
                    arrays.back().second += 1;
334

335
                    if (child->type() == Poco::MongoDB::ElementTraits<MongoArray::Ptr>::TypeId)
336
                    {
337
                        arrays.emplace_back(child.get(), 0);
338
                    }
339
                    else if (child->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId)
340
                    {
341
                        if (dimension_idx + 1 == expected_dimensions)
342
                            dimensions[dimension_idx + 1].emplace_back(array_info[idx].default_value);
343
                        else
344
                            dimensions[dimension_idx + 1].emplace_back(Array());
345
                    }
346
                    else if (dimension_idx + 1 == expected_dimensions)
347
                    {
348
                        dimensions[dimension_idx + 1].emplace_back(parse_value(*child, name));
349
                    }
350
                    else
351
                    {
352
                        throw Exception(ErrorCodes::BAD_ARGUMENTS,
353
                            "Got less dimensions than expected. ({} instead of {})", dimension_idx + 1, expected_dimensions);
354
                    }
355
                }
356

357
                assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));
358
                break;
359

360
            }
361
            default:
362
                throw Exception(ErrorCodes::UNKNOWN_TYPE, "Value of unsupported type: {}", column.getName());
363
        }
364
    }
365

366
    void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }
367
}
368

369

370
bool isMongoDBWireProtocolOld(Poco::MongoDB::Connection & connection_)
371
{
372
    Poco::MongoDB::Database db("config");
373
    Poco::MongoDB::Document::Ptr doc = db.queryServerHello(connection_, false);
374

375
    if (doc->exists("maxWireVersion"))
376
    {
377
        auto wire_version = doc->getInteger("maxWireVersion");
378
        return wire_version < Poco::MongoDB::Database::WireVersion::VER_36;
379
    }
380

381
    doc = db.queryServerHello(connection_, true);
382
    if (doc->exists("maxWireVersion"))
383
    {
384
        auto wire_version = doc->getInteger("maxWireVersion");
385
        return wire_version < Poco::MongoDB::Database::WireVersion::VER_36;
386
    }
387

388
    return true;
389
}
390

391

392
MongoDBCursor::MongoDBCursor(
393
    const std::string & database,
394
    const std::string & collection,
395
    const Block & sample_block_to_select,
396
    const Poco::MongoDB::Document & query,
397
    Poco::MongoDB::Connection & connection)
398
    : is_wire_protocol_old(isMongoDBWireProtocolOld(connection))
399
{
400
    Poco::MongoDB::Document projection;
401

402
    /// Looks like selecting _id column is implicit by default.
403
    if (!sample_block_to_select.has("_id"))
404
        projection.add("_id", 0);
405

406
    for (const auto & column : sample_block_to_select)
407
        projection.add(column.name, 1);
408

409
    if (is_wire_protocol_old)
410
    {
411
        old_cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
412
        old_cursor->query().selector() = query;
413
        old_cursor->query().returnFieldSelector() = projection;
414
    }
415
    else
416
    {
417
        new_cursor = std::make_unique<Poco::MongoDB::OpMsgCursor>(database, collection);
418
        new_cursor->query().setCommandName(Poco::MongoDB::OpMsgMessage::CMD_FIND);
419
        new_cursor->query().body().addNewDocument("filter") = query;
420
        new_cursor->query().body().addNewDocument("projection") = projection;
421
    }
422
}
423

424
Poco::MongoDB::Document::Vector MongoDBCursor::nextDocuments(Poco::MongoDB::Connection & connection)
425
{
426
    if (is_wire_protocol_old)
427
    {
428
        auto response = old_cursor->next(connection);
429
        cursor_id = response.cursorID();
430
        return std::move(response.documents());
431
    }
432
    else
433
    {
434
        auto response = new_cursor->next(connection);
435
        cursor_id = new_cursor->cursorID();
436
        return std::move(response.documents());
437
    }
438
}
439

440
Int64 MongoDBCursor::cursorID() const
441
{
442
    return cursor_id;
443
}
444

445

446
MongoDBSource::MongoDBSource(
447
    std::shared_ptr<Poco::MongoDB::Connection> & connection_,
448
    const String & database_name_,
449
    const String & collection_name_,
450
    const Poco::MongoDB::Document & query_,
451
    const Block & sample_block,
452
    UInt64 max_block_size_)
453
    : ISource(sample_block.cloneEmpty())
454
    , connection(connection_)
455
    , cursor(database_name_, collection_name_, sample_block, query_, *connection_)
456
    , max_block_size{max_block_size_}
457
{
458
    description.init(sample_block);
459

460
    for (const auto idx : collections::range(0, description.sample_block.columns()))
461
        if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
462
            prepareMongoDBArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type);
463
}
464

465

466
MongoDBSource::~MongoDBSource() = default;
467

468
Chunk MongoDBSource::generate()
469
{
470
    if (all_read)
471
        return {};
472

473
    MutableColumns columns(description.sample_block.columns());
474
    const size_t size = columns.size();
475

476
    for (const auto i : collections::range(0, size))
477
        columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
478

479
    size_t num_rows = 0;
480
    while (num_rows < max_block_size)
481
    {
482
        auto documents = cursor.nextDocuments(*connection);
483

484
        for (auto & document : documents)
485
        {
486
            if (document->exists("ok") && document->exists("$err")
487
                && document->exists("code") && document->getInteger("ok") == 0)
488
            {
489
                auto code = document->getInteger("code");
490
                const Poco::MongoDB::Element::Ptr value = document->get("$err");
491
                auto message = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(*value).value();
492
                throw Exception(ErrorCodes::MONGODB_ERROR, "Got error from MongoDB: {}, code: {}", message, code);
493
            }
494
            ++num_rows;
495

496
            for (const auto idx : collections::range(0, size))
497
            {
498
                const auto & name = description.sample_block.getByPosition(idx).name;
499

500
                bool exists_in_current_document = document->exists(name);
501
                if (!exists_in_current_document)
502
                {
503
                    insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
504
                    continue;
505
                }
506

507
                const Poco::MongoDB::Element::Ptr value = document->get(name);
508

509
                if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId)
510
                {
511
                    insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
512
                }
513
                else
514
                {
515
                    bool is_nullable = description.types[idx].second;
516
                    if (is_nullable)
517
                    {
518
                        ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
519
                        insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name, array_info, idx);
520
                        column_nullable.getNullMapData().emplace_back(0);
521
                    }
522
                    else
523
                        insertValue(*columns[idx], description.types[idx].first, *value, name, array_info, idx);
524
                }
525
            }
526
        }
527

528
        if (cursor.cursorID() == 0)
529
        {
530
            all_read = true;
531
            break;
532
        }
533
    }
534

535
    if (num_rows == 0)
536
        return {};
537

538
    return Chunk(std::move(columns), num_rows);
539
}
540

541
}
542

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.