ClickHouse
958 строк · 38.3 Кб
1#include <algorithm>
2#include <limits>
3#include <memory>
4#include <numeric>
5#include <queue>
6#include <unordered_map>
7#include <vector>
8
9#include <Core/Field.h>
10#include <DataTypes/DataTypeLowCardinality.h>
11#include <DataTypes/DataTypeNullable.h>
12#include <DataTypes/DataTypeArray.h>
13#include <DataTypes/DataTypeMap.h>
14#include <DataTypes/DataTypeTuple.h>
15#include <DataTypes/DataTypeVariant.h>
16#include <Interpreters/ExpressionAnalyzer.h>
17#include <Interpreters/TreeRewriter.h>
18#include <Parsers/ASTFunction.h>
19#include <Parsers/ASTIdentifier.h>
20#include <Processors/QueryPlan/PartsSplitter.h>
21#include <Processors/Transforms/FilterSortedStreamByRange.h>
22#include <Storages/MergeTree/IMergeTreeDataPart.h>
23#include <Storages/MergeTree/RangesInDataPart.h>
24#include <Common/FieldVisitorsAccurateComparison.h>
25
26using namespace DB;
27
28namespace
29{
30
31using Values = std::vector<Field>;
32
33std::string toString(const Values & value)
34{
35return fmt::format("({})", fmt::join(value, ", "));
36}
37
38/** We rely that FieldVisitorAccurateLess will have strict weak ordering for any Field values including
39* NaN, Null and containers (Array, Tuple, Map) that contain NaN or Null. But right now it does not properly
40* support NaN and Nulls inside containers, because it uses Field operator< or accurate::lessOp for comparison
41* that compares Nulls and NaNs differently than FieldVisitorAccurateLess.
42* TODO: Update Field operator< to compare NaNs and Nulls the same way as FieldVisitorAccurateLess.
43*/
44bool isSafePrimaryDataKeyType(const IDataType & data_type)
45{
46auto type_id = data_type.getTypeId();
47switch (type_id)
48{
49case TypeIndex::Float32:
50case TypeIndex::Float64:
51case TypeIndex::Nullable:
52case TypeIndex::Object:
53return false;
54case TypeIndex::Array:
55{
56const auto & data_type_array = static_cast<const DataTypeArray &>(data_type);
57return isSafePrimaryDataKeyType(*data_type_array.getNestedType());
58}
59case TypeIndex::Tuple:
60{
61const auto & data_type_tuple = static_cast<const DataTypeTuple &>(data_type);
62const auto & data_type_tuple_elements = data_type_tuple.getElements();
63for (const auto & data_type_tuple_element : data_type_tuple_elements)
64if (!isSafePrimaryDataKeyType(*data_type_tuple_element))
65return false;
66
67return true;
68}
69case TypeIndex::LowCardinality:
70{
71const auto & data_type_low_cardinality = static_cast<const DataTypeLowCardinality &>(data_type);
72return isSafePrimaryDataKeyType(*data_type_low_cardinality.getDictionaryType());
73}
74case TypeIndex::Map:
75{
76const auto & data_type_map = static_cast<const DataTypeMap &>(data_type);
77return isSafePrimaryDataKeyType(*data_type_map.getKeyType()) && isSafePrimaryDataKeyType(*data_type_map.getValueType());
78}
79case TypeIndex::Variant:
80{
81const auto & data_type_variant = static_cast<const DataTypeVariant &>(data_type);
82const auto & data_type_variant_elements = data_type_variant.getVariants();
83for (const auto & data_type_variant_element : data_type_variant_elements)
84if (!isSafePrimaryDataKeyType(*data_type_variant_element))
85return false;
86
87return false;
88}
89default:
90{
91break;
92}
93}
94
95return true;
96}
97
98bool isSafePrimaryKey(const KeyDescription & primary_key)
99{
100for (const auto & type : primary_key.data_types)
101{
102if (!isSafePrimaryDataKeyType(*type))
103return false;
104}
105
106return true;
107}
108
109int compareValues(const Values & lhs, const Values & rhs)
110{
111size_t size = std::min(lhs.size(), rhs.size());
112
113for (size_t i = 0; i < size; ++i)
114{
115if (applyVisitor(FieldVisitorAccurateLess(), lhs[i], rhs[i]))
116return -1;
117
118if (!applyVisitor(FieldVisitorAccurateEquals(), lhs[i], rhs[i]))
119return 1;
120}
121
122return 0;
123}
124
125/// Adaptor to access PK values from index.
126class IndexAccess
127{
128public:
129explicit IndexAccess(const RangesInDataParts & parts_) : parts(parts_)
130{
131/// Some suffix of index columns might not be loaded (see `primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns`)
132/// and we need to use the same set of index columns across all parts.
133for (const auto & part : parts)
134loaded_columns = std::min(loaded_columns, part.data_part->getIndex().size());
135}
136
137Values getValue(size_t part_idx, size_t mark) const
138{
139const auto & index = parts[part_idx].data_part->getIndex();
140chassert(index.size() >= loaded_columns);
141Values values(loaded_columns);
142for (size_t i = 0; i < loaded_columns; ++i)
143{
144index[i]->get(mark, values[i]);
145if (values[i].isNull())
146values[i] = POSITIVE_INFINITY;
147}
148return values;
149}
150
151std::optional<size_t> findRightmostMarkLessThanValueInRange(size_t part_index, Values value, size_t range_begin, size_t range_end) const
152{
153size_t left = range_begin;
154size_t right = range_end;
155
156while (left < right)
157{
158size_t middle = left + (right - left) / 2;
159int compare_result = compareValues(getValue(part_index, middle), value);
160if (compare_result != -1)
161right = middle;
162else
163left = middle + 1;
164}
165
166if (right == range_begin)
167return {};
168
169return right - 1;
170}
171
172std::optional<size_t> findRightmostMarkLessThanValueInRange(size_t part_index, Values value, MarkRange mark_range) const
173{
174return findRightmostMarkLessThanValueInRange(part_index, value, mark_range.begin, mark_range.end);
175}
176
177std::optional<size_t> findLeftmostMarkGreaterThanValueInRange(size_t part_index, Values value, size_t range_begin, size_t range_end) const
178{
179size_t left = range_begin;
180size_t right = range_end;
181
182while (left < right)
183{
184size_t middle = left + (right - left) / 2;
185int compare_result = compareValues(getValue(part_index, middle), value);
186if (compare_result != 1)
187left = middle + 1;
188else
189right = middle;
190}
191
192if (left == range_end)
193return {};
194
195return left;
196}
197
198std::optional<size_t> findLeftmostMarkGreaterThanValueInRange(size_t part_index, Values value, MarkRange mark_range) const
199{
200return findLeftmostMarkGreaterThanValueInRange(part_index, value, mark_range.begin, mark_range.end);
201}
202
203size_t getMarkRows(size_t part_idx, size_t mark) const
204{
205return parts[part_idx].data_part->index_granularity.getMarkRows(mark);
206}
207private:
208const RangesInDataParts & parts;
209size_t loaded_columns = std::numeric_limits<size_t>::max();
210};
211
212class RangesInDataPartsBuilder
213{
214public:
215explicit RangesInDataPartsBuilder(const RangesInDataParts & initial_ranges_in_data_parts_) : initial_ranges_in_data_parts(initial_ranges_in_data_parts_) { }
216
217void addRange(size_t part_index, MarkRange mark_range)
218{
219auto [it, inserted] = part_index_to_current_ranges_in_data_parts_index.emplace(part_index, ranges_in_data_parts.size());
220
221if (inserted)
222{
223ranges_in_data_parts.emplace_back(
224initial_ranges_in_data_parts[part_index].data_part,
225initial_ranges_in_data_parts[part_index].alter_conversions,
226initial_ranges_in_data_parts[part_index].part_index_in_query,
227MarkRanges{mark_range});
228part_index_to_initial_ranges_in_data_parts_index[it->second] = part_index;
229return;
230}
231
232ranges_in_data_parts[it->second].ranges.push_back(mark_range);
233}
234
235RangesInDataParts & getCurrentRangesInDataParts()
236{
237return ranges_in_data_parts;
238}
239
240private:
241std::unordered_map<size_t, size_t> part_index_to_current_ranges_in_data_parts_index;
242std::unordered_map<size_t, size_t> part_index_to_initial_ranges_in_data_parts_index;
243RangesInDataParts ranges_in_data_parts;
244const RangesInDataParts & initial_ranges_in_data_parts;
245};
246
247struct PartsRangesIterator
248{
249enum class EventType : uint8_t
250{
251RangeStart = 0,
252RangeEnd,
253};
254
255[[maybe_unused]] bool operator<(const PartsRangesIterator & other) const
256{
257int compare_result = compareValues(value, other.value);
258if (compare_result == -1)
259return true;
260else if (compare_result == 1)
261return false;
262
263if (event == other.event)
264{
265if (part_index == other.part_index)
266{
267/// Within the same part we should process events in order of mark numbers,
268/// because they already ordered by value and range ends have greater mark numbers than the beginnings.
269/// Otherwise we could get invalid ranges with the right bound that is less than the left bound.
270const auto ev_mark = event == EventType::RangeStart ? range.begin : range.end;
271const auto other_ev_mark = other.event == EventType::RangeStart ? other.range.begin : other.range.end;
272return ev_mark < other_ev_mark;
273}
274
275return part_index < other.part_index;
276}
277
278// Start event always before end event
279return event < other.event;
280}
281
282[[maybe_unused]] bool operator==(const PartsRangesIterator & other) const
283{
284if (value.size() != other.value.size())
285return false;
286
287for (size_t i = 0; i < value.size(); ++i)
288if (!applyVisitor(FieldVisitorAccurateEquals(), value[i], other.value[i]))
289return false;
290
291return range == other.range && part_index == other.part_index && event == other.event;
292}
293
294[[maybe_unused]] bool operator>(const PartsRangesIterator & other) const
295{
296if (operator<(other) || operator==(other))
297return false;
298
299return true;
300}
301
302void dump(WriteBuffer & buffer) const
303{
304buffer << "Part index " << part_index;
305buffer << " event " << (event == PartsRangesIterator::EventType::RangeStart ? "Range Start" : "Range End");
306buffer << " range begin " << range.begin;
307buffer << " end " << range.end;
308buffer << " value " << ::toString(value) << '\n';
309}
310
311[[maybe_unused]] String toString() const
312{
313WriteBufferFromOwnString buffer;
314dump(buffer);
315return buffer.str();
316}
317
318Values value;
319MarkRange range;
320size_t part_index;
321EventType event;
322};
323
324struct PartRangeIndex
325{
326explicit PartRangeIndex(PartsRangesIterator & ranges_iterator)
327: part_index(ranges_iterator.part_index)
328, range(ranges_iterator.range)
329{}
330
331bool operator==(const PartRangeIndex & other) const
332{
333return std::tie(part_index, range.begin, range.end) == std::tie(other.part_index, other.range.begin, other.range.end);
334}
335
336bool operator<(const PartRangeIndex & other) const
337{
338return std::tie(part_index, range.begin, range.end) < std::tie(other.part_index, other.range.begin, other.range.end);
339}
340
341size_t part_index;
342MarkRange range;
343};
344
345struct PartRangeIndexHash
346{
347size_t operator()(const PartRangeIndex & part_range_index) const noexcept
348{
349size_t result = 0;
350
351boost::hash_combine(result, part_range_index.part_index);
352boost::hash_combine(result, part_range_index.range.begin);
353boost::hash_combine(result, part_range_index.range.end);
354
355return result;
356}
357};
358
359struct SplitPartsRangesResult
360{
361RangesInDataParts non_intersecting_parts_ranges;
362RangesInDataParts intersecting_parts_ranges;
363};
364
365void dump(const std::vector<PartsRangesIterator> & ranges_iterators, WriteBuffer & buffer)
366{
367for (const auto & range_iterator : ranges_iterators)
368range_iterator.dump(buffer);
369}
370
371String toString(const std::vector<PartsRangesIterator> & ranges_iterators)
372{
373WriteBufferFromOwnString buffer;
374dump(ranges_iterators, buffer);
375return buffer.str();
376}
377
378SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts, const LoggerPtr & logger)
379{
380/** Split ranges in data parts into intersecting ranges in data parts and non intersecting ranges in data parts.
381*
382* For each marks range we will create 2 events (RangeStart, RangeEnd), add these events into array and sort them by primary key index
383* value at this event.
384*
385* After that we will scan sorted events and maintain current intersecting parts ranges.
386* If current intersecting parts ranges is 1, for each event (RangeStart, RangeEnd) we can extract non intersecting range
387* from single part range.
388*
389* There can be 4 possible cases:
390*
391* 1. RangeStart after RangeStart:
392*
393* Example:
394*
395* range 1 [---- ...
396* range 2 [(value_1) ...
397*
398* In this scenario we can extract non intersecting part of range 1. This non intersecting part will have start
399* of range 1 and end with rightmost mark from range 1 that contains value less than value_1.
400*
401* 2. RangeStart after RangeEnd:
402*
403* Example:
404*
405* range 1 [ ---- ...
406* range 2 [ (value_1)]
407* range 3 [(value_2) ...
408*
409* In this case we can extract non intersecting part of range 1. This non intersecting part will have start
410* of leftmost mark from range 1 that contains value greater than value_1 and end with rightmost mark from range 1
411* that contains value less than value_2.
412*
413* 3. RangeEnd after RangeStart:
414*
415* Example:
416*
417* range 1 [----]
418*
419* In this case we can extract range 1 as non intersecting.
420*
421* 4. RangeEnd after RangeEnd
422*
423* Example:
424*
425* range 1 [ ... ----]
426* range 2 [ ... (value_1)]
427*
428* In this case we can extract non intersecting part of range 1. This non intersecting part will have start
429* of leftmost mark from range 1 that contains value greater than value_1 and end with range 1 end.
430*
431* Additional details:
432*
433* 1. If part level is 0, we must process all ranges from this part, because they can contain duplicate primary keys.
434* 2. If non intersecting range is small, it is better to not add it to non intersecting ranges, to avoid expensive seeks.
435*/
436
437IndexAccess index_access(ranges_in_data_parts);
438std::vector<PartsRangesIterator> parts_ranges;
439
440for (size_t part_index = 0; part_index < ranges_in_data_parts.size(); ++part_index)
441{
442for (const auto & range : ranges_in_data_parts[part_index].ranges)
443{
444const auto & index_granularity = ranges_in_data_parts[part_index].data_part->index_granularity;
445parts_ranges.push_back(
446{index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart});
447
448const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
449if (!value_is_defined_at_end_mark)
450continue;
451
452parts_ranges.push_back(
453{index_access.getValue(part_index, range.end), range, part_index, PartsRangesIterator::EventType::RangeEnd});
454}
455}
456
457LOG_TEST(logger, "Parts ranges before sort {}", toString(parts_ranges));
458
459::sort(parts_ranges.begin(), parts_ranges.end());
460
461LOG_TEST(logger, "Parts ranges after sort {}", toString(parts_ranges));
462
463RangesInDataPartsBuilder intersecting_ranges_in_data_parts_builder(ranges_in_data_parts);
464RangesInDataPartsBuilder non_intersecting_ranges_in_data_parts_builder(ranges_in_data_parts);
465
466static constexpr size_t min_number_of_marks_for_non_intersecting_range = 2;
467
468auto add_non_intersecting_range = [&](size_t part_index, MarkRange mark_range)
469{
470non_intersecting_ranges_in_data_parts_builder.addRange(part_index, mark_range);
471};
472
473auto add_intersecting_range = [&](size_t part_index, MarkRange mark_range)
474{
475intersecting_ranges_in_data_parts_builder.addRange(part_index, mark_range);
476};
477
478std::unordered_map<PartRangeIndex, MarkRange, PartRangeIndexHash> part_index_start_to_range;
479
480chassert(!parts_ranges.empty());
481chassert(parts_ranges[0].event == PartsRangesIterator::EventType::RangeStart);
482part_index_start_to_range[PartRangeIndex(parts_ranges[0])] = parts_ranges[0].range;
483
484size_t parts_ranges_size = parts_ranges.size();
485for (size_t i = 1; i < parts_ranges_size; ++i)
486{
487auto & previous_part_range = parts_ranges[i - 1];
488PartRangeIndex previous_part_range_index(previous_part_range);
489auto & current_part_range = parts_ranges[i];
490PartRangeIndex current_part_range_index(current_part_range);
491size_t intersecting_parts = part_index_start_to_range.size();
492bool range_start = current_part_range.event == PartsRangesIterator::EventType::RangeStart;
493
494if (range_start)
495{
496auto [it, inserted] = part_index_start_to_range.emplace(current_part_range_index, current_part_range.range);
497if (!inserted)
498throw Exception(ErrorCodes::LOGICAL_ERROR, "PartsSplitter expected unique range");
499
500if (intersecting_parts != 1)
501continue;
502
503if (previous_part_range.event == PartsRangesIterator::EventType::RangeStart)
504{
505/// If part level is 0, we must process whole previous part because it can contain duplicate primary keys
506if (ranges_in_data_parts[previous_part_range.part_index].data_part->info.level == 0)
507continue;
508
509/// Case 1 Range Start after Range Start
510size_t begin = previous_part_range.range.begin;
511std::optional<size_t> end_optional = index_access.findRightmostMarkLessThanValueInRange(previous_part_range.part_index,
512current_part_range.value,
513previous_part_range.range);
514
515if (!end_optional)
516continue;
517
518size_t end = *end_optional;
519
520if (end - begin >= min_number_of_marks_for_non_intersecting_range)
521{
522part_index_start_to_range[previous_part_range_index].begin = end;
523add_non_intersecting_range(previous_part_range.part_index, MarkRange{begin, end});
524}
525
526continue;
527}
528
529auto other_interval_it = part_index_start_to_range.begin();
530for (; other_interval_it != part_index_start_to_range.end(); ++other_interval_it)
531{
532if (other_interval_it != it)
533break;
534}
535
536if (!(other_interval_it != part_index_start_to_range.end() && other_interval_it != it))
537throw Exception(ErrorCodes::LOGICAL_ERROR, "PartsSplitter expected single other interval");
538
539size_t other_interval_part_index = other_interval_it->first.part_index;
540MarkRange other_interval_range = other_interval_it->second;
541
542/// If part level is 0, we must process whole other intersecting part because it can contain duplicate primary keys
543if (ranges_in_data_parts[other_interval_part_index].data_part->info.level == 0)
544continue;
545
546/// Case 2 Range Start after Range End
547std::optional<size_t> begin_optional = index_access.findLeftmostMarkGreaterThanValueInRange(other_interval_part_index,
548previous_part_range.value,
549other_interval_range);
550if (!begin_optional)
551continue;
552
553std::optional<size_t> end_optional = index_access.findRightmostMarkLessThanValueInRange(other_interval_part_index,
554current_part_range.value,
555other_interval_range);
556if (!end_optional)
557continue;
558
559size_t begin = *end_optional;
560size_t end = *end_optional;
561
562if (end - begin >= min_number_of_marks_for_non_intersecting_range)
563{
564other_interval_it->second.begin = end;
565add_intersecting_range(other_interval_part_index, MarkRange{other_interval_range.begin, begin});
566add_non_intersecting_range(other_interval_part_index, MarkRange{begin, end});
567}
568continue;
569}
570
571chassert(current_part_range.event == PartsRangesIterator::EventType::RangeEnd);
572chassert(part_index_start_to_range.contains(current_part_range_index));
573
574/** If there are more than 1 part ranges that we are currently processing
575* that means that this part range is intersecting with other range.
576*
577* If part level is 0, we must process whole part because it can contain duplicate primary keys.
578*/
579if (intersecting_parts != 1 || ranges_in_data_parts[current_part_range.part_index].data_part->info.level == 0)
580{
581add_intersecting_range(current_part_range.part_index, part_index_start_to_range[current_part_range_index]);
582part_index_start_to_range.erase(current_part_range_index);
583continue;
584}
585
586if (previous_part_range.event == PartsRangesIterator::EventType::RangeStart)
587{
588chassert(current_part_range.part_index == previous_part_range.part_index);
589chassert(current_part_range.range == previous_part_range.range);
590
591/// Case 3 Range End after Range Start
592add_non_intersecting_range(current_part_range.part_index, current_part_range.range);
593part_index_start_to_range.erase(current_part_range_index);
594continue;
595}
596
597chassert(previous_part_range.event == PartsRangesIterator::EventType::RangeEnd);
598
599/// Case 4 Range End after Range End
600std::optional<size_t> begin_optional = index_access.findLeftmostMarkGreaterThanValueInRange(current_part_range.part_index,
601previous_part_range.value,
602current_part_range.range);
603size_t end = current_part_range.range.end;
604
605if (begin_optional && end - *begin_optional >= min_number_of_marks_for_non_intersecting_range)
606{
607size_t begin = *begin_optional;
608add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range_index].begin, begin});
609add_non_intersecting_range(current_part_range.part_index, MarkRange{begin, end});
610}
611else
612{
613add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range_index].begin, end});
614}
615
616part_index_start_to_range.erase(current_part_range_index);
617}
618
619/// Process parts ranges with undefined value at end mark
620bool is_intersecting = part_index_start_to_range.size() > 1;
621for (const auto & [part_range_index, mark_range] : part_index_start_to_range)
622{
623if (is_intersecting)
624add_intersecting_range(part_range_index.part_index, mark_range);
625else
626add_non_intersecting_range(part_range_index.part_index, mark_range);
627}
628
629auto && non_intersecting_ranges_in_data_parts = std::move(non_intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts());
630auto && intersecting_ranges_in_data_parts = std::move(intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts());
631
632std::stable_sort(
633non_intersecting_ranges_in_data_parts.begin(),
634non_intersecting_ranges_in_data_parts.end(),
635[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
636
637std::stable_sort(
638intersecting_ranges_in_data_parts.begin(),
639intersecting_ranges_in_data_parts.end(),
640[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
641
642LOG_TEST(logger, "Non intersecting ranges in data parts {}", non_intersecting_ranges_in_data_parts.getDescriptions().describe());
643LOG_TEST(logger, "Intersecting ranges in data parts {}", intersecting_ranges_in_data_parts.getDescriptions().describe());
644
645return {std::move(non_intersecting_ranges_in_data_parts), std::move(intersecting_ranges_in_data_parts)};
646}
647
648std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersectingPartsRangesIntoLayers(RangesInDataParts intersecting_ranges_in_data_parts,
649size_t max_layers,
650const LoggerPtr & logger)
651{
652/** We will advance the iterator pointing to the mark with the smallest PK value until
653* there will be not less than rows_per_layer rows in the current layer (roughly speaking).
654* Then we choose the last observed value as the new border, so the current layer will consists
655* of granules with values greater than the previous mark and less or equal than the new border.
656*
657* We use PartRangeIndex to track currently processing ranges, because after sort, RangeStart event is always placed
658* before Range End event and it is possible to encounter overlapping Range Start events for the same part.
659*/
660IndexAccess index_access(intersecting_ranges_in_data_parts);
661
662using PartsRangesIteratorWithIndex = std::pair<PartsRangesIterator, PartRangeIndex>;
663std::priority_queue<PartsRangesIteratorWithIndex, std::vector<PartsRangesIteratorWithIndex>, std::greater<>> parts_ranges_queue;
664
665for (size_t part_index = 0; part_index < intersecting_ranges_in_data_parts.size(); ++part_index)
666{
667for (const auto & range : intersecting_ranges_in_data_parts[part_index].ranges)
668{
669const auto & index_granularity = intersecting_ranges_in_data_parts[part_index].data_part->index_granularity;
670PartsRangesIterator parts_range_start{index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart};
671PartRangeIndex parts_range_start_index(parts_range_start);
672parts_ranges_queue.push({std::move(parts_range_start), std::move(parts_range_start_index)});
673
674const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
675if (!value_is_defined_at_end_mark)
676continue;
677
678PartsRangesIterator parts_range_end{index_access.getValue(part_index, range.end), range, part_index, PartsRangesIterator::EventType::RangeEnd};
679PartRangeIndex parts_range_end_index(parts_range_end);
680parts_ranges_queue.push({std::move(parts_range_end), std::move(parts_range_end_index)});
681}
682}
683
684/// The beginning of currently started (but not yet finished) range of marks of a part in the current layer.
685std::unordered_map<PartRangeIndex, size_t, PartRangeIndexHash> current_part_range_begin;
686/// The current ending of a range of marks of a part in the current layer.
687std::unordered_map<PartRangeIndex, size_t, PartRangeIndexHash> current_part_range_end;
688
689/// Determine borders between layers.
690std::vector<Values> borders;
691std::vector<RangesInDataParts> result_layers;
692
693size_t total_intersecting_rows_count = intersecting_ranges_in_data_parts.getRowsCountAllParts();
694const size_t rows_per_layer = std::max<size_t>(total_intersecting_rows_count / max_layers, 1);
695
696while (!parts_ranges_queue.empty())
697{
698// New layer should include last granules of still open ranges from the previous layer,
699// because they may already contain values greater than the last border.
700size_t rows_in_current_layer = 0;
701size_t marks_in_current_layer = 0;
702
703// Intersection between the current and next layers is just the last observed marks of each still open part range. Ratio is empirical.
704auto layers_intersection_is_too_big = [&]()
705{
706const auto intersected_parts = current_part_range_end.size();
707return marks_in_current_layer < intersected_parts * 2;
708};
709
710RangesInDataPartsBuilder current_layer_builder(intersecting_ranges_in_data_parts);
711result_layers.emplace_back();
712
713while (rows_in_current_layer < rows_per_layer || layers_intersection_is_too_big() || result_layers.size() == max_layers)
714{
715// We're advancing iterators until a new value showed up.
716Values last_value;
717while (!parts_ranges_queue.empty() && (last_value.empty() || last_value == parts_ranges_queue.top().first.value))
718{
719auto [current, current_range_index] = parts_ranges_queue.top();
720PartRangeIndex current_part_range_index(current);
721parts_ranges_queue.pop();
722
723const auto part_index = current.part_index;
724
725if (current.event == PartsRangesIterator::EventType::RangeEnd)
726{
727current_layer_builder.addRange(part_index, MarkRange{current_part_range_begin[current_range_index], current.range.end});
728current_part_range_begin.erase(current_range_index);
729current_part_range_end.erase(current_range_index);
730continue;
731}
732
733last_value = std::move(current.value);
734rows_in_current_layer += index_access.getMarkRows(part_index, current.range.begin);
735++marks_in_current_layer;
736
737current_part_range_begin.try_emplace(current_range_index, current.range.begin);
738current_part_range_end[current_range_index] = current.range.begin;
739
740if (current.range.begin + 1 < current.range.end)
741{
742++current.range.begin;
743current.value = index_access.getValue(part_index, current.range.begin);
744parts_ranges_queue.push({std::move(current), current_range_index});
745}
746}
747
748if (parts_ranges_queue.empty())
749break;
750
751if (rows_in_current_layer >= rows_per_layer && !layers_intersection_is_too_big() && result_layers.size() < max_layers)
752borders.push_back(last_value);
753}
754
755for (const auto & [current_range_index, last_mark] : current_part_range_end)
756{
757current_layer_builder.addRange(current_range_index.part_index, MarkRange{current_part_range_begin[current_range_index], last_mark + 1});
758current_part_range_begin[current_range_index] = current_part_range_end[current_range_index];
759}
760
761result_layers.back() = std::move(current_layer_builder.getCurrentRangesInDataParts());
762}
763
764size_t result_layers_size = result_layers.size();
765LOG_TEST(logger, "Split intersecting ranges into {} layers", result_layers_size);
766
767for (size_t i = 0; i < result_layers_size; ++i)
768{
769auto & layer = result_layers[i];
770
771LOG_TEST(logger, "Layer {} {} filter values in ({}, {}])",
772i,
773layer.getDescriptions().describe(),
774i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
775
776std::stable_sort(
777layer.begin(),
778layer.end(),
779[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
780}
781
782return {std::move(result_layers), std::move(borders)};
783}
784
785
786/// Will return borders.size()+1 filters in total, i-th filter will accept rows with PK values within the range (borders[i-1], borders[i]].
787ASTs buildFilters(const KeyDescription & primary_key, const std::vector<Values> & borders)
788{
789auto add_and_condition = [&](ASTPtr & result, const ASTPtr & foo) { result = (!result) ? foo : makeASTFunction("and", result, foo); };
790
791/// Produces ASTPtr to predicate (pk_col0, pk_col1, ... , pk_colN) > (value[0], value[1], ... , value[N]), possibly with conversions.
792/// For example, if table PK is (a, toDate(d)), where `a` is UInt32 and `d` is DateTime, and PK columns values are (8192, 19160),
793/// it will build the following predicate: greater(tuple(a, toDate(d)), tuple(8192, cast(19160, 'Date'))).
794auto lexicographically_greater = [&](const Values & values) -> ASTPtr
795{
796ASTs pks_ast;
797ASTs values_ast;
798for (size_t i = 0; i < values.size(); ++i)
799{
800const auto & type = primary_key.data_types.at(i);
801
802// PK may contain functions of the table columns, so we need the actual PK AST with all expressions it contains.
803auto pk_ast = primary_key.expression_list_ast->children.at(i)->clone();
804
805// If PK is nullable, prepend a null mask column for > comparison.
806// Also transform the AST into assumeNotNull(pk) so that the result type is not-nullable.
807if (type->isNullable())
808{
809pks_ast.push_back(makeASTFunction("isNull", pk_ast));
810values_ast.push_back(std::make_shared<ASTLiteral>(values[i].isNull() ? 1 : 0));
811pk_ast = makeASTFunction("assumeNotNull", pk_ast);
812}
813
814pks_ast.push_back(pk_ast);
815
816// If value is null, the comparison is already complete by looking at the null mask column.
817// Here we put the pk_ast as a placeholder: (pk_null_mask, pk_ast_not_null) > (value_is_null?, pk_ast_not_null).
818if (values[i].isNull())
819{
820values_ast.push_back(pk_ast);
821}
822else
823{
824ASTPtr component_ast = std::make_shared<ASTLiteral>(values[i]);
825auto decayed_type = removeNullable(removeLowCardinality(primary_key.data_types.at(i)));
826
827// Values of some types (e.g. Date, DateTime) are stored in columns as numbers and we get them as just numbers from the index.
828// So we need an explicit Cast for them.
829component_ast = makeASTFunction("cast", std::move(component_ast), std::make_shared<ASTLiteral>(decayed_type->getName()));
830
831values_ast.push_back(std::move(component_ast));
832}
833}
834
835ASTPtr pk_columns_as_tuple = makeASTFunction("tuple", pks_ast);
836ASTPtr values_as_tuple = makeASTFunction("tuple", values_ast);
837
838return makeASTFunction("greater", pk_columns_as_tuple, values_as_tuple);
839};
840
841ASTs filters(borders.size() + 1);
842for (size_t layer = 0; layer <= borders.size(); ++layer)
843{
844if (layer > 0)
845add_and_condition(filters[layer], lexicographically_greater(borders[layer - 1]));
846if (layer < borders.size())
847add_and_condition(filters[layer], makeASTFunction("not", lexicographically_greater(borders[layer])));
848}
849return filters;
850}
851}
852
853
854namespace DB
855{
856
857namespace ErrorCodes
858{
859extern const int LOGICAL_ERROR;
860}
861
862static void reorderColumns(ActionsDAG & dag, const Block & header, const std::string & filter_column)
863{
864std::unordered_map<std::string_view, const ActionsDAG::Node *> inputs_map;
865for (const auto * input : dag.getInputs())
866inputs_map[input->result_name] = input;
867
868for (const auto & col : header)
869{
870auto & input = inputs_map[col.name];
871if (!input)
872input = &dag.addInput(col);
873}
874
875ActionsDAG::NodeRawConstPtrs new_outputs;
876new_outputs.reserve(header.columns() + 1);
877
878new_outputs.push_back(&dag.findInOutputs(filter_column));
879for (const auto & col : header)
880{
881auto & input = inputs_map[col.name];
882new_outputs.push_back(input);
883}
884
885dag.getOutputs() = std::move(new_outputs);
886}
887
888SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
889const KeyDescription & primary_key,
890ExpressionActionsPtr sorting_expr,
891RangesInDataParts parts,
892size_t max_layers,
893ContextPtr context,
894ReadingInOrderStepGetter && in_order_reading_step_getter,
895bool split_parts_ranges_into_intersecting_and_non_intersecting_final,
896bool split_intersecting_parts_ranges_into_layers)
897{
898auto logger = getLogger("PartsSplitter");
899
900SplitPartsWithRangesByPrimaryKeyResult result;
901
902RangesInDataParts intersecting_parts_ranges = std::move(parts);
903
904if (!isSafePrimaryKey(primary_key))
905{
906result.merging_pipes.emplace_back(in_order_reading_step_getter(intersecting_parts_ranges));
907return result;
908}
909
910if (split_parts_ranges_into_intersecting_and_non_intersecting_final)
911{
912SplitPartsRangesResult split_result = splitPartsRanges(intersecting_parts_ranges, logger);
913result.non_intersecting_parts_ranges = std::move(split_result.non_intersecting_parts_ranges);
914intersecting_parts_ranges = std::move(split_result.intersecting_parts_ranges);
915}
916
917if (!split_intersecting_parts_ranges_into_layers)
918{
919result.merging_pipes.emplace_back(in_order_reading_step_getter(intersecting_parts_ranges));
920return result;
921}
922
923if (max_layers <= 1)
924throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1");
925
926auto && [layers, borders] = splitIntersectingPartsRangesIntoLayers(intersecting_parts_ranges, max_layers, logger);
927auto filters = buildFilters(primary_key, borders);
928result.merging_pipes.resize(layers.size());
929
930for (size_t i = 0; i < layers.size(); ++i)
931{
932result.merging_pipes[i] = in_order_reading_step_getter(std::move(layers[i]));
933result.merging_pipes[i].addSimpleTransform([sorting_expr](const Block & header)
934{ return std::make_shared<ExpressionTransform>(header, sorting_expr); });
935
936auto & filter_function = filters[i];
937if (!filter_function)
938continue;
939
940auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes());
941auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false);
942reorderColumns(*actions, result.merging_pipes[i].getHeader(), filter_function->getColumnName());
943ExpressionActionsPtr expression_actions = std::make_shared<ExpressionActions>(std::move(actions));
944auto description = fmt::format(
945"filter values in ({}, {}]", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
946result.merging_pipes[i].addSimpleTransform(
947[&](const Block & header)
948{
949auto step = std::make_shared<FilterSortedStreamByRange>(header, expression_actions, filter_function->getColumnName(), true);
950step->setDescription(description);
951return step;
952});
953}
954
955return result;
956}
957
958}
959