ClickHouse

Форк
0
/
PartsSplitter.cpp 
958 строк · 38.3 Кб
1
#include <algorithm>
2
#include <limits>
3
#include <memory>
4
#include <numeric>
5
#include <queue>
6
#include <unordered_map>
7
#include <vector>
8

9
#include <Core/Field.h>
10
#include <DataTypes/DataTypeLowCardinality.h>
11
#include <DataTypes/DataTypeNullable.h>
12
#include <DataTypes/DataTypeArray.h>
13
#include <DataTypes/DataTypeMap.h>
14
#include <DataTypes/DataTypeTuple.h>
15
#include <DataTypes/DataTypeVariant.h>
16
#include <Interpreters/ExpressionAnalyzer.h>
17
#include <Interpreters/TreeRewriter.h>
18
#include <Parsers/ASTFunction.h>
19
#include <Parsers/ASTIdentifier.h>
20
#include <Processors/QueryPlan/PartsSplitter.h>
21
#include <Processors/Transforms/FilterSortedStreamByRange.h>
22
#include <Storages/MergeTree/IMergeTreeDataPart.h>
23
#include <Storages/MergeTree/RangesInDataPart.h>
24
#include <Common/FieldVisitorsAccurateComparison.h>
25

26
using namespace DB;
27

28
namespace
29
{
30

31
using Values = std::vector<Field>;
32

33
std::string toString(const Values & value)
34
{
35
    return fmt::format("({})", fmt::join(value, ", "));
36
}
37

38
/** We rely that FieldVisitorAccurateLess will have strict weak ordering for any Field values including
39
  * NaN, Null and containers (Array, Tuple, Map) that contain NaN or Null. But right now it does not properly
40
  * support NaN and Nulls inside containers, because it uses Field operator< or accurate::lessOp for comparison
41
  * that compares Nulls and NaNs differently than FieldVisitorAccurateLess.
42
  * TODO: Update Field operator< to compare NaNs and Nulls the same way as FieldVisitorAccurateLess.
43
  */
44
bool isSafePrimaryDataKeyType(const IDataType & data_type)
45
{
46
    auto type_id = data_type.getTypeId();
47
    switch (type_id)
48
    {
49
        case TypeIndex::Float32:
50
        case TypeIndex::Float64:
51
        case TypeIndex::Nullable:
52
        case TypeIndex::Object:
53
            return false;
54
        case TypeIndex::Array:
55
        {
56
            const auto & data_type_array = static_cast<const DataTypeArray &>(data_type);
57
            return isSafePrimaryDataKeyType(*data_type_array.getNestedType());
58
        }
59
        case TypeIndex::Tuple:
60
        {
61
            const auto & data_type_tuple = static_cast<const DataTypeTuple &>(data_type);
62
            const auto & data_type_tuple_elements = data_type_tuple.getElements();
63
            for (const auto & data_type_tuple_element : data_type_tuple_elements)
64
                if (!isSafePrimaryDataKeyType(*data_type_tuple_element))
65
                    return false;
66

67
            return true;
68
        }
69
        case TypeIndex::LowCardinality:
70
        {
71
            const auto & data_type_low_cardinality = static_cast<const DataTypeLowCardinality &>(data_type);
72
            return isSafePrimaryDataKeyType(*data_type_low_cardinality.getDictionaryType());
73
        }
74
        case TypeIndex::Map:
75
        {
76
            const auto & data_type_map = static_cast<const DataTypeMap &>(data_type);
77
            return isSafePrimaryDataKeyType(*data_type_map.getKeyType()) && isSafePrimaryDataKeyType(*data_type_map.getValueType());
78
        }
79
        case TypeIndex::Variant:
80
        {
81
            const auto & data_type_variant = static_cast<const DataTypeVariant &>(data_type);
82
            const auto & data_type_variant_elements = data_type_variant.getVariants();
83
            for (const auto & data_type_variant_element : data_type_variant_elements)
84
                if (!isSafePrimaryDataKeyType(*data_type_variant_element))
85
                    return false;
86

87
            return false;
88
        }
89
        default:
90
        {
91
            break;
92
        }
93
    }
94

95
    return true;
96
}
97

98
bool isSafePrimaryKey(const KeyDescription & primary_key)
99
{
100
    for (const auto & type : primary_key.data_types)
101
    {
102
        if (!isSafePrimaryDataKeyType(*type))
103
            return false;
104
    }
105

106
    return true;
107
}
108

109
int compareValues(const Values & lhs, const Values & rhs)
110
{
111
    size_t size = std::min(lhs.size(), rhs.size());
112

113
    for (size_t i = 0; i < size; ++i)
114
    {
115
        if (applyVisitor(FieldVisitorAccurateLess(), lhs[i], rhs[i]))
116
            return -1;
117

118
        if (!applyVisitor(FieldVisitorAccurateEquals(), lhs[i], rhs[i]))
119
            return 1;
120
    }
121

122
    return 0;
123
}
124

125
/// Adaptor to access PK values from index.
126
class IndexAccess
127
{
128
public:
129
    explicit IndexAccess(const RangesInDataParts & parts_) : parts(parts_)
130
    {
131
        /// Some suffix of index columns might not be loaded (see `primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns`)
132
        /// and we need to use the same set of index columns across all parts.
133
        for (const auto & part : parts)
134
            loaded_columns = std::min(loaded_columns, part.data_part->getIndex().size());
135
    }
136

137
    Values getValue(size_t part_idx, size_t mark) const
138
    {
139
        const auto & index = parts[part_idx].data_part->getIndex();
140
        chassert(index.size() >= loaded_columns);
141
        Values values(loaded_columns);
142
        for (size_t i = 0; i < loaded_columns; ++i)
143
        {
144
            index[i]->get(mark, values[i]);
145
            if (values[i].isNull())
146
                values[i] = POSITIVE_INFINITY;
147
        }
148
        return values;
149
    }
150

151
    std::optional<size_t> findRightmostMarkLessThanValueInRange(size_t part_index, Values value, size_t range_begin, size_t range_end) const
152
    {
153
        size_t left = range_begin;
154
        size_t right = range_end;
155

156
        while (left < right)
157
        {
158
            size_t middle = left + (right - left) / 2;
159
            int compare_result = compareValues(getValue(part_index, middle), value);
160
            if (compare_result != -1)
161
                right = middle;
162
            else
163
                left = middle + 1;
164
        }
165

166
        if (right == range_begin)
167
            return {};
168

169
        return right - 1;
170
    }
171

172
    std::optional<size_t> findRightmostMarkLessThanValueInRange(size_t part_index, Values value, MarkRange mark_range) const
173
    {
174
        return findRightmostMarkLessThanValueInRange(part_index, value, mark_range.begin, mark_range.end);
175
    }
176

177
    std::optional<size_t> findLeftmostMarkGreaterThanValueInRange(size_t part_index, Values value, size_t range_begin, size_t range_end) const
178
    {
179
        size_t left = range_begin;
180
        size_t right = range_end;
181

182
        while (left < right)
183
        {
184
            size_t middle = left + (right - left) / 2;
185
            int compare_result = compareValues(getValue(part_index, middle), value);
186
            if (compare_result != 1)
187
                left = middle + 1;
188
            else
189
                right = middle;
190
        }
191

192
        if (left == range_end)
193
            return {};
194

195
        return left;
196
    }
197

198
    std::optional<size_t> findLeftmostMarkGreaterThanValueInRange(size_t part_index, Values value, MarkRange mark_range) const
199
    {
200
        return findLeftmostMarkGreaterThanValueInRange(part_index, value, mark_range.begin, mark_range.end);
201
    }
202

203
    size_t getMarkRows(size_t part_idx, size_t mark) const
204
    {
205
        return parts[part_idx].data_part->index_granularity.getMarkRows(mark);
206
    }
207
private:
208
    const RangesInDataParts & parts;
209
    size_t loaded_columns = std::numeric_limits<size_t>::max();
210
};
211

212
class RangesInDataPartsBuilder
213
{
214
public:
215
    explicit RangesInDataPartsBuilder(const RangesInDataParts & initial_ranges_in_data_parts_) : initial_ranges_in_data_parts(initial_ranges_in_data_parts_) { }
216

217
    void addRange(size_t part_index, MarkRange mark_range)
218
    {
219
        auto [it, inserted] = part_index_to_current_ranges_in_data_parts_index.emplace(part_index, ranges_in_data_parts.size());
220

221
        if (inserted)
222
        {
223
            ranges_in_data_parts.emplace_back(
224
                initial_ranges_in_data_parts[part_index].data_part,
225
                initial_ranges_in_data_parts[part_index].alter_conversions,
226
                initial_ranges_in_data_parts[part_index].part_index_in_query,
227
                MarkRanges{mark_range});
228
            part_index_to_initial_ranges_in_data_parts_index[it->second] = part_index;
229
            return;
230
        }
231

232
        ranges_in_data_parts[it->second].ranges.push_back(mark_range);
233
    }
234

235
    RangesInDataParts & getCurrentRangesInDataParts()
236
    {
237
        return ranges_in_data_parts;
238
    }
239

240
private:
241
    std::unordered_map<size_t, size_t> part_index_to_current_ranges_in_data_parts_index;
242
    std::unordered_map<size_t, size_t> part_index_to_initial_ranges_in_data_parts_index;
243
    RangesInDataParts ranges_in_data_parts;
244
    const RangesInDataParts & initial_ranges_in_data_parts;
245
};
246

247
struct PartsRangesIterator
248
{
249
    enum class EventType : uint8_t
250
    {
251
        RangeStart = 0,
252
        RangeEnd,
253
    };
254

255
    [[maybe_unused]] bool operator<(const PartsRangesIterator & other) const
256
    {
257
        int compare_result = compareValues(value, other.value);
258
        if (compare_result == -1)
259
            return true;
260
        else if (compare_result == 1)
261
            return false;
262

263
        if (event == other.event)
264
        {
265
            if (part_index == other.part_index)
266
            {
267
                /// Within the same part we should process events in order of mark numbers,
268
                /// because they already ordered by value and range ends have greater mark numbers than the beginnings.
269
                /// Otherwise we could get invalid ranges with the right bound that is less than the left bound.
270
                const auto ev_mark = event == EventType::RangeStart ? range.begin : range.end;
271
                const auto other_ev_mark = other.event == EventType::RangeStart ? other.range.begin : other.range.end;
272
                return ev_mark < other_ev_mark;
273
            }
274

275
            return part_index < other.part_index;
276
        }
277

278
        // Start event always before end event
279
        return event < other.event;
280
    }
281

282
    [[maybe_unused]] bool operator==(const PartsRangesIterator & other) const
283
    {
284
        if (value.size() != other.value.size())
285
            return false;
286

287
        for (size_t i = 0; i < value.size(); ++i)
288
            if (!applyVisitor(FieldVisitorAccurateEquals(), value[i], other.value[i]))
289
                return false;
290

291
        return range == other.range && part_index == other.part_index && event == other.event;
292
    }
293

294
    [[maybe_unused]] bool operator>(const PartsRangesIterator & other) const
295
    {
296
        if (operator<(other) || operator==(other))
297
            return false;
298

299
        return true;
300
    }
301

302
    void dump(WriteBuffer & buffer) const
303
    {
304
        buffer << "Part index " << part_index;
305
        buffer << " event " << (event == PartsRangesIterator::EventType::RangeStart ? "Range Start" : "Range End");
306
        buffer << " range begin " << range.begin;
307
        buffer << " end " << range.end;
308
        buffer << " value " << ::toString(value) << '\n';
309
    }
310

311
    [[maybe_unused]] String toString() const
312
    {
313
        WriteBufferFromOwnString buffer;
314
        dump(buffer);
315
        return buffer.str();
316
    }
317

318
    Values value;
319
    MarkRange range;
320
    size_t part_index;
321
    EventType event;
322
};
323

324
struct PartRangeIndex
325
{
326
    explicit PartRangeIndex(PartsRangesIterator & ranges_iterator)
327
        : part_index(ranges_iterator.part_index)
328
        , range(ranges_iterator.range)
329
    {}
330

331
    bool operator==(const PartRangeIndex & other) const
332
    {
333
        return std::tie(part_index, range.begin, range.end) == std::tie(other.part_index, other.range.begin, other.range.end);
334
    }
335

336
    bool operator<(const PartRangeIndex & other) const
337
    {
338
        return std::tie(part_index, range.begin, range.end) < std::tie(other.part_index, other.range.begin, other.range.end);
339
    }
340

341
    size_t part_index;
342
    MarkRange range;
343
};
344

345
struct PartRangeIndexHash
346
{
347
    size_t operator()(const PartRangeIndex & part_range_index) const noexcept
348
    {
349
        size_t result = 0;
350

351
        boost::hash_combine(result, part_range_index.part_index);
352
        boost::hash_combine(result, part_range_index.range.begin);
353
        boost::hash_combine(result, part_range_index.range.end);
354

355
        return result;
356
    }
357
};
358

359
struct SplitPartsRangesResult
360
{
361
    RangesInDataParts non_intersecting_parts_ranges;
362
    RangesInDataParts intersecting_parts_ranges;
363
};
364

365
void dump(const std::vector<PartsRangesIterator> & ranges_iterators, WriteBuffer & buffer)
366
{
367
    for (const auto & range_iterator : ranges_iterators)
368
        range_iterator.dump(buffer);
369
}
370

371
String toString(const std::vector<PartsRangesIterator> & ranges_iterators)
372
{
373
    WriteBufferFromOwnString buffer;
374
    dump(ranges_iterators, buffer);
375
    return buffer.str();
376
}
377

378
SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts, const LoggerPtr & logger)
379
{
380
    /** Split ranges in data parts into intersecting ranges in data parts and non intersecting ranges in data parts.
381
      *
382
      * For each marks range we will create 2 events (RangeStart, RangeEnd), add these events into array and sort them by primary key index
383
      * value at this event.
384
      *
385
      * After that we will scan sorted events and maintain current intersecting parts ranges.
386
      * If current intersecting parts ranges is 1, for each event (RangeStart, RangeEnd) we can extract non intersecting range
387
      * from single part range.
388
      *
389
      * There can be 4 possible cases:
390
      *
391
      * 1. RangeStart after RangeStart:
392
      *
393
      * Example:
394
      *
395
      * range 1 [----            ...
396
      * range 2      [(value_1)    ...
397
      *
398
      * In this scenario we can extract non intersecting part of range 1. This non intersecting part will have start
399
      * of range 1 and end with rightmost mark from range 1 that contains value less than value_1.
400
      *
401
      * 2. RangeStart after RangeEnd:
402
      *
403
      * Example:
404
      *
405
      * range 1   [              ----              ...
406
      * range 2   [   (value_1)]
407
      * range 3                      [(value_2)    ...
408
      *
409
      * In this case we can extract non intersecting part of range 1. This non intersecting part will have start
410
      * of leftmost mark from range 1 that contains value greater than value_1 and end with rightmost mark from range 1
411
      * that contains value less than value_2.
412
      *
413
      * 3. RangeEnd after RangeStart:
414
      *
415
      * Example:
416
      *
417
      * range 1   [----]
418
      *
419
      * In this case we can extract range 1 as non intersecting.
420
      *
421
      * 4. RangeEnd after RangeEnd
422
      *
423
      * Example:
424
      *
425
      * range 1    [    ...              ----]
426
      * range 2    [    ...    (value_1)]
427
      *
428
      * In this case we can extract non intersecting part of range 1. This non intersecting part will have start
429
      * of leftmost mark from range 1 that contains value greater than value_1 and end with range 1 end.
430
      *
431
      * Additional details:
432
      *
433
      * 1. If part level is 0, we must process all ranges from this part, because they can contain duplicate primary keys.
434
      * 2. If non intersecting range is small, it is better to not add it to non intersecting ranges, to avoid expensive seeks.
435
      */
436

437
    IndexAccess index_access(ranges_in_data_parts);
438
    std::vector<PartsRangesIterator> parts_ranges;
439

440
    for (size_t part_index = 0; part_index < ranges_in_data_parts.size(); ++part_index)
441
    {
442
        for (const auto & range : ranges_in_data_parts[part_index].ranges)
443
        {
444
            const auto & index_granularity = ranges_in_data_parts[part_index].data_part->index_granularity;
445
            parts_ranges.push_back(
446
                {index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart});
447

448
            const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
449
            if (!value_is_defined_at_end_mark)
450
                continue;
451

452
            parts_ranges.push_back(
453
                {index_access.getValue(part_index, range.end), range, part_index, PartsRangesIterator::EventType::RangeEnd});
454
        }
455
    }
456

457
    LOG_TEST(logger, "Parts ranges before sort {}", toString(parts_ranges));
458

459
    ::sort(parts_ranges.begin(), parts_ranges.end());
460

461
    LOG_TEST(logger, "Parts ranges after sort {}", toString(parts_ranges));
462

463
    RangesInDataPartsBuilder intersecting_ranges_in_data_parts_builder(ranges_in_data_parts);
464
    RangesInDataPartsBuilder non_intersecting_ranges_in_data_parts_builder(ranges_in_data_parts);
465

466
    static constexpr size_t min_number_of_marks_for_non_intersecting_range = 2;
467

468
    auto add_non_intersecting_range = [&](size_t part_index, MarkRange mark_range)
469
    {
470
        non_intersecting_ranges_in_data_parts_builder.addRange(part_index, mark_range);
471
    };
472

473
    auto add_intersecting_range = [&](size_t part_index, MarkRange mark_range)
474
    {
475
        intersecting_ranges_in_data_parts_builder.addRange(part_index, mark_range);
476
    };
477

478
    std::unordered_map<PartRangeIndex, MarkRange, PartRangeIndexHash> part_index_start_to_range;
479

480
    chassert(!parts_ranges.empty());
481
    chassert(parts_ranges[0].event == PartsRangesIterator::EventType::RangeStart);
482
    part_index_start_to_range[PartRangeIndex(parts_ranges[0])] = parts_ranges[0].range;
483

484
    size_t parts_ranges_size = parts_ranges.size();
485
    for (size_t i = 1; i < parts_ranges_size; ++i)
486
    {
487
        auto & previous_part_range = parts_ranges[i - 1];
488
        PartRangeIndex previous_part_range_index(previous_part_range);
489
        auto & current_part_range = parts_ranges[i];
490
        PartRangeIndex current_part_range_index(current_part_range);
491
        size_t intersecting_parts = part_index_start_to_range.size();
492
        bool range_start = current_part_range.event == PartsRangesIterator::EventType::RangeStart;
493

494
        if (range_start)
495
        {
496
            auto [it, inserted] = part_index_start_to_range.emplace(current_part_range_index, current_part_range.range);
497
            if (!inserted)
498
                throw Exception(ErrorCodes::LOGICAL_ERROR, "PartsSplitter expected unique range");
499

500
            if (intersecting_parts != 1)
501
                continue;
502

503
            if (previous_part_range.event == PartsRangesIterator::EventType::RangeStart)
504
            {
505
                /// If part level is 0, we must process whole previous part because it can contain duplicate primary keys
506
                if (ranges_in_data_parts[previous_part_range.part_index].data_part->info.level == 0)
507
                    continue;
508

509
                /// Case 1 Range Start after Range Start
510
                size_t begin = previous_part_range.range.begin;
511
                std::optional<size_t> end_optional = index_access.findRightmostMarkLessThanValueInRange(previous_part_range.part_index,
512
                    current_part_range.value,
513
                    previous_part_range.range);
514

515
                if (!end_optional)
516
                    continue;
517

518
                size_t end = *end_optional;
519

520
                if (end - begin >= min_number_of_marks_for_non_intersecting_range)
521
                {
522
                    part_index_start_to_range[previous_part_range_index].begin = end;
523
                    add_non_intersecting_range(previous_part_range.part_index, MarkRange{begin, end});
524
                }
525

526
                continue;
527
            }
528

529
            auto other_interval_it = part_index_start_to_range.begin();
530
            for (; other_interval_it != part_index_start_to_range.end(); ++other_interval_it)
531
            {
532
                if (other_interval_it != it)
533
                    break;
534
            }
535

536
            if (!(other_interval_it != part_index_start_to_range.end() && other_interval_it != it))
537
                throw Exception(ErrorCodes::LOGICAL_ERROR, "PartsSplitter expected single other interval");
538

539
            size_t other_interval_part_index = other_interval_it->first.part_index;
540
            MarkRange other_interval_range = other_interval_it->second;
541

542
            /// If part level is 0, we must process whole other intersecting part because it can contain duplicate primary keys
543
            if (ranges_in_data_parts[other_interval_part_index].data_part->info.level == 0)
544
                continue;
545

546
            /// Case 2 Range Start after Range End
547
            std::optional<size_t> begin_optional = index_access.findLeftmostMarkGreaterThanValueInRange(other_interval_part_index,
548
                previous_part_range.value,
549
                other_interval_range);
550
            if (!begin_optional)
551
                continue;
552

553
            std::optional<size_t> end_optional = index_access.findRightmostMarkLessThanValueInRange(other_interval_part_index,
554
                current_part_range.value,
555
                other_interval_range);
556
            if (!end_optional)
557
                continue;
558

559
            size_t begin = *end_optional;
560
            size_t end = *end_optional;
561

562
            if (end - begin >= min_number_of_marks_for_non_intersecting_range)
563
            {
564
                other_interval_it->second.begin = end;
565
                add_intersecting_range(other_interval_part_index, MarkRange{other_interval_range.begin, begin});
566
                add_non_intersecting_range(other_interval_part_index, MarkRange{begin, end});
567
            }
568
            continue;
569
        }
570

571
        chassert(current_part_range.event == PartsRangesIterator::EventType::RangeEnd);
572
        chassert(part_index_start_to_range.contains(current_part_range_index));
573

574
        /** If there are more than 1 part ranges that we are currently processing
575
          * that means that this part range is intersecting with other range.
576
          *
577
          * If part level is 0, we must process whole part because it can contain duplicate primary keys.
578
          */
579
        if (intersecting_parts != 1 || ranges_in_data_parts[current_part_range.part_index].data_part->info.level == 0)
580
        {
581
            add_intersecting_range(current_part_range.part_index, part_index_start_to_range[current_part_range_index]);
582
            part_index_start_to_range.erase(current_part_range_index);
583
            continue;
584
        }
585

586
        if (previous_part_range.event == PartsRangesIterator::EventType::RangeStart)
587
        {
588
            chassert(current_part_range.part_index == previous_part_range.part_index);
589
            chassert(current_part_range.range == previous_part_range.range);
590

591
            /// Case 3 Range End after Range Start
592
            add_non_intersecting_range(current_part_range.part_index, current_part_range.range);
593
            part_index_start_to_range.erase(current_part_range_index);
594
            continue;
595
        }
596

597
        chassert(previous_part_range.event == PartsRangesIterator::EventType::RangeEnd);
598

599
        /// Case 4 Range End after Range End
600
        std::optional<size_t> begin_optional = index_access.findLeftmostMarkGreaterThanValueInRange(current_part_range.part_index,
601
            previous_part_range.value,
602
            current_part_range.range);
603
        size_t end = current_part_range.range.end;
604

605
        if (begin_optional && end - *begin_optional >= min_number_of_marks_for_non_intersecting_range)
606
        {
607
            size_t begin = *begin_optional;
608
            add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range_index].begin, begin});
609
            add_non_intersecting_range(current_part_range.part_index, MarkRange{begin, end});
610
        }
611
        else
612
        {
613
            add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range_index].begin, end});
614
        }
615

616
        part_index_start_to_range.erase(current_part_range_index);
617
    }
618

619
    /// Process parts ranges with undefined value at end mark
620
    bool is_intersecting = part_index_start_to_range.size() > 1;
621
    for (const auto & [part_range_index, mark_range] : part_index_start_to_range)
622
    {
623
        if (is_intersecting)
624
            add_intersecting_range(part_range_index.part_index, mark_range);
625
        else
626
            add_non_intersecting_range(part_range_index.part_index, mark_range);
627
    }
628

629
    auto && non_intersecting_ranges_in_data_parts = std::move(non_intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts());
630
    auto && intersecting_ranges_in_data_parts = std::move(intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts());
631

632
    std::stable_sort(
633
        non_intersecting_ranges_in_data_parts.begin(),
634
        non_intersecting_ranges_in_data_parts.end(),
635
        [](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
636

637
    std::stable_sort(
638
        intersecting_ranges_in_data_parts.begin(),
639
        intersecting_ranges_in_data_parts.end(),
640
        [](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
641

642
    LOG_TEST(logger, "Non intersecting ranges in data parts {}", non_intersecting_ranges_in_data_parts.getDescriptions().describe());
643
    LOG_TEST(logger, "Intersecting ranges in data parts {}", intersecting_ranges_in_data_parts.getDescriptions().describe());
644

645
    return {std::move(non_intersecting_ranges_in_data_parts), std::move(intersecting_ranges_in_data_parts)};
646
}
647

648
std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersectingPartsRangesIntoLayers(RangesInDataParts intersecting_ranges_in_data_parts,
649
    size_t max_layers,
650
    const LoggerPtr & logger)
651
{
652
    /** We will advance the iterator pointing to the mark with the smallest PK value until
653
      * there will be not less than rows_per_layer rows in the current layer (roughly speaking).
654
      * Then we choose the last observed value as the new border, so the current layer will consists
655
      * of granules with values greater than the previous mark and less or equal than the new border.
656
      *
657
      * We use PartRangeIndex to track currently processing ranges, because after sort, RangeStart event is always placed
658
      * before Range End event and it is possible to encounter overlapping Range Start events for the same part.
659
      */
660
    IndexAccess index_access(intersecting_ranges_in_data_parts);
661

662
    using PartsRangesIteratorWithIndex = std::pair<PartsRangesIterator, PartRangeIndex>;
663
    std::priority_queue<PartsRangesIteratorWithIndex, std::vector<PartsRangesIteratorWithIndex>, std::greater<>> parts_ranges_queue;
664

665
    for (size_t part_index = 0; part_index < intersecting_ranges_in_data_parts.size(); ++part_index)
666
    {
667
        for (const auto & range : intersecting_ranges_in_data_parts[part_index].ranges)
668
        {
669
            const auto & index_granularity = intersecting_ranges_in_data_parts[part_index].data_part->index_granularity;
670
            PartsRangesIterator parts_range_start{index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart};
671
            PartRangeIndex parts_range_start_index(parts_range_start);
672
            parts_ranges_queue.push({std::move(parts_range_start), std::move(parts_range_start_index)});
673

674
            const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
675
            if (!value_is_defined_at_end_mark)
676
                continue;
677

678
            PartsRangesIterator parts_range_end{index_access.getValue(part_index, range.end), range, part_index, PartsRangesIterator::EventType::RangeEnd};
679
            PartRangeIndex parts_range_end_index(parts_range_end);
680
            parts_ranges_queue.push({std::move(parts_range_end), std::move(parts_range_end_index)});
681
        }
682
    }
683

684
    /// The beginning of currently started (but not yet finished) range of marks of a part in the current layer.
685
    std::unordered_map<PartRangeIndex, size_t, PartRangeIndexHash> current_part_range_begin;
686
    /// The current ending of a range of marks of a part in the current layer.
687
    std::unordered_map<PartRangeIndex, size_t, PartRangeIndexHash> current_part_range_end;
688

689
    /// Determine borders between layers.
690
    std::vector<Values> borders;
691
    std::vector<RangesInDataParts> result_layers;
692

693
    size_t total_intersecting_rows_count = intersecting_ranges_in_data_parts.getRowsCountAllParts();
694
    const size_t rows_per_layer = std::max<size_t>(total_intersecting_rows_count / max_layers, 1);
695

696
    while (!parts_ranges_queue.empty())
697
    {
698
        // New layer should include last granules of still open ranges from the previous layer,
699
        // because they may already contain values greater than the last border.
700
        size_t rows_in_current_layer = 0;
701
        size_t marks_in_current_layer = 0;
702

703
        // Intersection between the current and next layers is just the last observed marks of each still open part range. Ratio is empirical.
704
        auto layers_intersection_is_too_big = [&]()
705
        {
706
            const auto intersected_parts = current_part_range_end.size();
707
            return marks_in_current_layer < intersected_parts * 2;
708
        };
709

710
        RangesInDataPartsBuilder current_layer_builder(intersecting_ranges_in_data_parts);
711
        result_layers.emplace_back();
712

713
        while (rows_in_current_layer < rows_per_layer || layers_intersection_is_too_big() || result_layers.size() == max_layers)
714
        {
715
            // We're advancing iterators until a new value showed up.
716
            Values last_value;
717
            while (!parts_ranges_queue.empty() && (last_value.empty() || last_value == parts_ranges_queue.top().first.value))
718
            {
719
                auto [current, current_range_index] = parts_ranges_queue.top();
720
                PartRangeIndex current_part_range_index(current);
721
                parts_ranges_queue.pop();
722

723
                const auto part_index = current.part_index;
724

725
                if (current.event == PartsRangesIterator::EventType::RangeEnd)
726
                {
727
                    current_layer_builder.addRange(part_index, MarkRange{current_part_range_begin[current_range_index], current.range.end});
728
                    current_part_range_begin.erase(current_range_index);
729
                    current_part_range_end.erase(current_range_index);
730
                    continue;
731
                }
732

733
                last_value = std::move(current.value);
734
                rows_in_current_layer += index_access.getMarkRows(part_index, current.range.begin);
735
                ++marks_in_current_layer;
736

737
                current_part_range_begin.try_emplace(current_range_index, current.range.begin);
738
                current_part_range_end[current_range_index] = current.range.begin;
739

740
                if (current.range.begin + 1 < current.range.end)
741
                {
742
                    ++current.range.begin;
743
                    current.value = index_access.getValue(part_index, current.range.begin);
744
                    parts_ranges_queue.push({std::move(current), current_range_index});
745
                }
746
            }
747

748
            if (parts_ranges_queue.empty())
749
                break;
750

751
            if (rows_in_current_layer >= rows_per_layer && !layers_intersection_is_too_big() && result_layers.size() < max_layers)
752
                borders.push_back(last_value);
753
        }
754

755
        for (const auto & [current_range_index, last_mark] : current_part_range_end)
756
        {
757
            current_layer_builder.addRange(current_range_index.part_index, MarkRange{current_part_range_begin[current_range_index], last_mark + 1});
758
            current_part_range_begin[current_range_index] = current_part_range_end[current_range_index];
759
        }
760

761
        result_layers.back() = std::move(current_layer_builder.getCurrentRangesInDataParts());
762
    }
763

764
    size_t result_layers_size = result_layers.size();
765
    LOG_TEST(logger, "Split intersecting ranges into {} layers", result_layers_size);
766

767
    for (size_t i = 0; i < result_layers_size; ++i)
768
    {
769
        auto & layer = result_layers[i];
770

771
        LOG_TEST(logger, "Layer {} {} filter values in ({}, {}])",
772
            i,
773
            layer.getDescriptions().describe(),
774
            i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
775

776
        std::stable_sort(
777
            layer.begin(),
778
            layer.end(),
779
            [](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
780
    }
781

782
    return {std::move(result_layers), std::move(borders)};
783
}
784

785

786
/// Will return borders.size()+1 filters in total, i-th filter will accept rows with PK values within the range (borders[i-1], borders[i]].
787
ASTs buildFilters(const KeyDescription & primary_key, const std::vector<Values> & borders)
788
{
789
    auto add_and_condition = [&](ASTPtr & result, const ASTPtr & foo) { result = (!result) ? foo : makeASTFunction("and", result, foo); };
790

791
    /// Produces ASTPtr to predicate (pk_col0, pk_col1, ... , pk_colN) > (value[0], value[1], ... , value[N]), possibly with conversions.
792
    /// For example, if table PK is (a, toDate(d)), where `a` is UInt32 and `d` is DateTime, and PK columns values are (8192, 19160),
793
    /// it will build the following predicate: greater(tuple(a, toDate(d)), tuple(8192, cast(19160, 'Date'))).
794
    auto lexicographically_greater = [&](const Values & values) -> ASTPtr
795
    {
796
        ASTs pks_ast;
797
        ASTs values_ast;
798
        for (size_t i = 0; i < values.size(); ++i)
799
        {
800
            const auto & type = primary_key.data_types.at(i);
801

802
            // PK may contain functions of the table columns, so we need the actual PK AST with all expressions it contains.
803
            auto pk_ast = primary_key.expression_list_ast->children.at(i)->clone();
804

805
            // If PK is nullable, prepend a null mask column for > comparison.
806
            // Also transform the AST into assumeNotNull(pk) so that the result type is not-nullable.
807
            if (type->isNullable())
808
            {
809
                pks_ast.push_back(makeASTFunction("isNull", pk_ast));
810
                values_ast.push_back(std::make_shared<ASTLiteral>(values[i].isNull() ? 1 : 0));
811
                pk_ast = makeASTFunction("assumeNotNull", pk_ast);
812
            }
813

814
            pks_ast.push_back(pk_ast);
815

816
            // If value is null, the comparison is already complete by looking at the null mask column.
817
            // Here we put the pk_ast as a placeholder: (pk_null_mask, pk_ast_not_null) > (value_is_null?, pk_ast_not_null).
818
            if (values[i].isNull())
819
            {
820
                values_ast.push_back(pk_ast);
821
            }
822
            else
823
            {
824
                ASTPtr component_ast = std::make_shared<ASTLiteral>(values[i]);
825
                auto decayed_type = removeNullable(removeLowCardinality(primary_key.data_types.at(i)));
826

827
                // Values of some types (e.g. Date, DateTime) are stored in columns as numbers and we get them as just numbers from the index.
828
                // So we need an explicit Cast for them.
829
                component_ast = makeASTFunction("cast", std::move(component_ast), std::make_shared<ASTLiteral>(decayed_type->getName()));
830

831
                values_ast.push_back(std::move(component_ast));
832
            }
833
        }
834

835
        ASTPtr pk_columns_as_tuple = makeASTFunction("tuple", pks_ast);
836
        ASTPtr values_as_tuple = makeASTFunction("tuple", values_ast);
837

838
        return makeASTFunction("greater", pk_columns_as_tuple, values_as_tuple);
839
    };
840

841
    ASTs filters(borders.size() + 1);
842
    for (size_t layer = 0; layer <= borders.size(); ++layer)
843
    {
844
        if (layer > 0)
845
            add_and_condition(filters[layer], lexicographically_greater(borders[layer - 1]));
846
        if (layer < borders.size())
847
            add_and_condition(filters[layer], makeASTFunction("not", lexicographically_greater(borders[layer])));
848
    }
849
    return filters;
850
}
851
}
852

853

854
namespace DB
855
{
856

857
namespace ErrorCodes
858
{
859
    extern const int LOGICAL_ERROR;
860
}
861

862
static void reorderColumns(ActionsDAG & dag, const Block & header, const std::string & filter_column)
863
{
864
    std::unordered_map<std::string_view, const ActionsDAG::Node *> inputs_map;
865
    for (const auto * input : dag.getInputs())
866
        inputs_map[input->result_name] = input;
867

868
    for (const auto & col : header)
869
    {
870
        auto & input = inputs_map[col.name];
871
        if (!input)
872
            input = &dag.addInput(col);
873
    }
874

875
    ActionsDAG::NodeRawConstPtrs new_outputs;
876
    new_outputs.reserve(header.columns() + 1);
877

878
    new_outputs.push_back(&dag.findInOutputs(filter_column));
879
    for (const auto & col : header)
880
    {
881
        auto & input = inputs_map[col.name];
882
        new_outputs.push_back(input);
883
    }
884

885
    dag.getOutputs() = std::move(new_outputs);
886
}
887

888
SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
889
    const KeyDescription & primary_key,
890
    ExpressionActionsPtr sorting_expr,
891
    RangesInDataParts parts,
892
    size_t max_layers,
893
    ContextPtr context,
894
    ReadingInOrderStepGetter && in_order_reading_step_getter,
895
    bool split_parts_ranges_into_intersecting_and_non_intersecting_final,
896
    bool split_intersecting_parts_ranges_into_layers)
897
{
898
    auto logger = getLogger("PartsSplitter");
899

900
    SplitPartsWithRangesByPrimaryKeyResult result;
901

902
    RangesInDataParts intersecting_parts_ranges = std::move(parts);
903

904
    if (!isSafePrimaryKey(primary_key))
905
    {
906
        result.merging_pipes.emplace_back(in_order_reading_step_getter(intersecting_parts_ranges));
907
        return result;
908
    }
909

910
    if (split_parts_ranges_into_intersecting_and_non_intersecting_final)
911
    {
912
        SplitPartsRangesResult split_result = splitPartsRanges(intersecting_parts_ranges, logger);
913
        result.non_intersecting_parts_ranges = std::move(split_result.non_intersecting_parts_ranges);
914
        intersecting_parts_ranges = std::move(split_result.intersecting_parts_ranges);
915
    }
916

917
    if (!split_intersecting_parts_ranges_into_layers)
918
    {
919
        result.merging_pipes.emplace_back(in_order_reading_step_getter(intersecting_parts_ranges));
920
        return result;
921
    }
922

923
    if (max_layers <= 1)
924
        throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1");
925

926
    auto && [layers, borders] = splitIntersectingPartsRangesIntoLayers(intersecting_parts_ranges, max_layers, logger);
927
    auto filters = buildFilters(primary_key, borders);
928
    result.merging_pipes.resize(layers.size());
929

930
    for (size_t i = 0; i < layers.size(); ++i)
931
    {
932
        result.merging_pipes[i] = in_order_reading_step_getter(std::move(layers[i]));
933
        result.merging_pipes[i].addSimpleTransform([sorting_expr](const Block & header)
934
                                    { return std::make_shared<ExpressionTransform>(header, sorting_expr); });
935

936
        auto & filter_function = filters[i];
937
        if (!filter_function)
938
            continue;
939

940
        auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes());
941
        auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false);
942
        reorderColumns(*actions, result.merging_pipes[i].getHeader(), filter_function->getColumnName());
943
        ExpressionActionsPtr expression_actions = std::make_shared<ExpressionActions>(std::move(actions));
944
        auto description = fmt::format(
945
            "filter values in ({}, {}]", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
946
        result.merging_pipes[i].addSimpleTransform(
947
            [&](const Block & header)
948
            {
949
                auto step = std::make_shared<FilterSortedStreamByRange>(header, expression_actions, filter_function->getColumnName(), true);
950
                step->setDescription(description);
951
                return step;
952
            });
953
    }
954

955
    return result;
956
}
957

958
}
959

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.