ClickHouse
2339 строк · 95.4 Кб
1#include <Processors/QueryPlan/ReadFromMergeTree.h>
2
3#include <IO/Operators.h>
4#include <Interpreters/Context.h>
5#include <Interpreters/ExpressionAnalyzer.h>
6#include <Interpreters/InterpreterSelectQuery.h>
7#include <Interpreters/TreeRewriter.h>
8#include <Parsers/ASTFunction.h>
9#include <Parsers/ASTIdentifier.h>
10#include <Parsers/ASTSelectQuery.h>
11#include <Processors/ConcatProcessor.h>
12#include <Processors/Merges/AggregatingSortedTransform.h>
13#include <Processors/Merges/CollapsingSortedTransform.h>
14#include <Processors/Merges/GraphiteRollupSortedTransform.h>
15#include <Processors/Merges/MergingSortedTransform.h>
16#include <Processors/Merges/ReplacingSortedTransform.h>
17#include <Processors/Merges/SummingSortedTransform.h>
18#include <Processors/Merges/VersionedCollapsingTransform.h>
19#include <Processors/QueryPlan/PartsSplitter.h>
20#include <Processors/Sources/NullSource.h>
21#include <Processors/Transforms/ExpressionTransform.h>
22#include <Processors/Transforms/FilterTransform.h>
23#include <Processors/Transforms/ReverseTransform.h>
24#include <Processors/Transforms/SelectByIndicesTransform.h>
25#include <QueryPipeline/QueryPipelineBuilder.h>
26#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
27#include <Storages/MergeTree/MergeTreeIndexAnnoy.h>
28#include <Storages/MergeTree/MergeTreeIndexUSearch.h>
29#include <Storages/MergeTree/MergeTreeReadPool.h>
30#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
31#include <Storages/MergeTree/MergeTreeReadPoolInOrder.h>
32#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>
33#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
34#include <Storages/MergeTree/MergeTreeSource.h>
35#include <Storages/MergeTree/RangesInDataPart.h>
36#include <Storages/MergeTree/RequestResponse.h>
37#include <Storages/VirtualColumnUtils.h>
38#include <base/sort.h>
39#include <Poco/Logger.h>
40#include <Common/JSONBuilder.h>
41#include <Common/isLocalAddress.h>
42#include <Common/logger_useful.h>
43#include <Processors/QueryPlan/IQueryPlanStep.h>
44#include <Parsers/parseIdentifierOrStringLiteral.h>
45#include <Parsers/ExpressionListParsers.h>
46#include <Storages/MergeTree/MergeTreeIndexMinMax.h>
47
48#include <algorithm>
49#include <iterator>
50#include <memory>
51#include <unordered_map>
52
53using namespace DB;
54
55namespace
56{
57template <typename Container, typename Getter>
58size_t countPartitions(const Container & parts, Getter get_partition_id)
59{
60if (parts.empty())
61return 0;
62
63String cur_partition_id = get_partition_id(parts[0]);
64size_t unique_partitions = 1;
65for (size_t i = 1; i < parts.size(); ++i)
66{
67if (get_partition_id(parts[i]) != cur_partition_id)
68{
69++unique_partitions;
70cur_partition_id = get_partition_id(parts[i]);
71}
72}
73return unique_partitions;
74}
75
76size_t countPartitions(const RangesInDataParts & parts_with_ranges)
77{
78auto get_partition_id = [](const RangesInDataPart & rng) { return rng.data_part->info.partition_id; };
79return countPartitions(parts_with_ranges, get_partition_id);
80}
81
82size_t countPartitions(const MergeTreeData::DataPartsVector & prepared_parts)
83{
84auto get_partition_id = [](const MergeTreeData::DataPartPtr data_part) { return data_part->info.partition_id; };
85return countPartitions(prepared_parts, get_partition_id);
86}
87
88bool restoreDAGInputs(ActionsDAG & dag, const NameSet & inputs)
89{
90std::unordered_set<const ActionsDAG::Node *> outputs(dag.getOutputs().begin(), dag.getOutputs().end());
91bool added = false;
92for (const auto * input : dag.getInputs())
93{
94if (inputs.contains(input->result_name) && !outputs.contains(input))
95{
96dag.getOutputs().push_back(input);
97added = true;
98}
99}
100
101return added;
102}
103
104bool restorePrewhereInputs(PrewhereInfo & info, const NameSet & inputs)
105{
106bool added = false;
107if (info.row_level_filter)
108added = added || restoreDAGInputs(*info.row_level_filter, inputs);
109
110if (info.prewhere_actions)
111added = added || restoreDAGInputs(*info.prewhere_actions, inputs);
112
113return added;
114}
115
116}
117
118namespace ProfileEvents
119{
120extern const Event SelectedParts;
121extern const Event SelectedRanges;
122extern const Event SelectedMarks;
123}
124
125namespace DB
126{
127
128namespace ErrorCodes
129{
130extern const int INDEX_NOT_USED;
131extern const int LOGICAL_ERROR;
132extern const int TOO_MANY_ROWS;
133extern const int CANNOT_PARSE_TEXT;
134extern const int PARAMETER_OUT_OF_BOUND;
135}
136
137static MergeTreeReaderSettings getMergeTreeReaderSettings(
138const ContextPtr & context, const SelectQueryInfo & query_info)
139{
140const auto & settings = context->getSettingsRef();
141return
142{
143.read_settings = context->getReadSettings(),
144.save_marks_in_cache = true,
145.checksum_on_read = settings.checksum_on_read,
146.read_in_order = query_info.input_order_info != nullptr,
147.apply_deleted_mask = settings.apply_deleted_mask,
148.use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree
149&& (settings.max_streams_to_max_threads_ratio > 1 || settings.max_streams_for_merge_tree_reading > 1),
150.enable_multiple_prewhere_read_steps = settings.enable_multiple_prewhere_read_steps,
151};
152}
153
154static bool checkAllPartsOnRemoteFS(const RangesInDataParts & parts)
155{
156for (const auto & part : parts)
157{
158if (!part.data_part->isStoredOnRemoteDisk())
159return false;
160}
161return true;
162}
163
164/// build sort description for output stream
165static void updateSortDescriptionForOutputStream(
166DataStream & output_stream, const Names & sorting_key_columns, const int sort_direction, InputOrderInfoPtr input_order_info, PrewhereInfoPtr prewhere_info, bool enable_vertical_final)
167{
168/// Updating sort description can be done after PREWHERE actions are applied to the header.
169/// Aftert PREWHERE actions are applied, column names in header can differ from storage column names due to aliases
170/// To mitigate it, we're trying to build original header and use it to deduce sorting description
171/// TODO: this approach is fragile, it'd be more robust to update sorting description for the whole plan during plan optimization
172Block original_header = output_stream.header.cloneEmpty();
173if (prewhere_info)
174{
175if (prewhere_info->prewhere_actions)
176{
177FindOriginalNodeForOutputName original_column_finder(prewhere_info->prewhere_actions);
178for (auto & column : original_header)
179{
180const auto * original_node = original_column_finder.find(column.name);
181if (original_node)
182column.name = original_node->result_name;
183}
184}
185
186if (prewhere_info->row_level_filter)
187{
188FindOriginalNodeForOutputName original_column_finder(prewhere_info->row_level_filter);
189for (auto & column : original_header)
190{
191const auto * original_node = original_column_finder.find(column.name);
192if (original_node)
193column.name = original_node->result_name;
194}
195}
196}
197
198SortDescription sort_description;
199const Block & header = output_stream.header;
200for (const auto & sorting_key : sorting_key_columns)
201{
202const auto it = std::find_if(
203original_header.begin(), original_header.end(), [&sorting_key](const auto & column) { return column.name == sorting_key; });
204if (it == original_header.end())
205break;
206
207const size_t column_pos = std::distance(original_header.begin(), it);
208sort_description.emplace_back((header.begin() + column_pos)->name, sort_direction);
209}
210
211if (!sort_description.empty())
212{
213if (input_order_info && !enable_vertical_final)
214{
215output_stream.sort_scope = DataStream::SortScope::Stream;
216const size_t used_prefix_of_sorting_key_size = input_order_info->used_prefix_of_sorting_key_size;
217if (sort_description.size() > used_prefix_of_sorting_key_size)
218sort_description.resize(used_prefix_of_sorting_key_size);
219}
220else
221output_stream.sort_scope = DataStream::SortScope::Chunk;
222}
223
224output_stream.sort_description = std::move(sort_description);
225}
226
227void ReadFromMergeTree::AnalysisResult::checkLimits(const Settings & settings, const SelectQueryInfo & query_info_) const
228{
229
230/// Do not check number of read rows if we have reading
231/// in order of sorting key with limit.
232/// In general case, when there exists WHERE clause
233/// it's impossible to estimate number of rows precisely,
234/// because we can stop reading at any time.
235
236SizeLimits limits;
237if (settings.read_overflow_mode == OverflowMode::THROW
238&& settings.max_rows_to_read
239&& !query_info_.input_order_info)
240limits = SizeLimits(settings.max_rows_to_read, 0, settings.read_overflow_mode);
241
242SizeLimits leaf_limits;
243if (settings.read_overflow_mode_leaf == OverflowMode::THROW
244&& settings.max_rows_to_read_leaf
245&& !query_info_.input_order_info)
246leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf);
247
248if (limits.max_rows || leaf_limits.max_rows)
249{
250/// Fail fast if estimated number of rows to read exceeds the limit
251size_t total_rows_estimate = selected_rows;
252if (query_info_.limit > 0 && total_rows_estimate > query_info_.limit)
253{
254total_rows_estimate = query_info_.limit;
255}
256limits.check(total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read' setting)", ErrorCodes::TOO_MANY_ROWS);
257leaf_limits.check(
258total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read_leaf' setting)", ErrorCodes::TOO_MANY_ROWS);
259}
260}
261
262ReadFromMergeTree::ReadFromMergeTree(
263MergeTreeData::DataPartsVector parts_,
264std::vector<AlterConversionsPtr> alter_conversions_,
265Names all_column_names_,
266const MergeTreeData & data_,
267const SelectQueryInfo & query_info_,
268const StorageSnapshotPtr & storage_snapshot_,
269const ContextPtr & context_,
270size_t max_block_size_,
271size_t num_streams_,
272std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
273LoggerPtr log_,
274AnalysisResultPtr analyzed_result_ptr_,
275bool enable_parallel_reading)
276: SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader(
277storage_snapshot_->getSampleBlockForColumns(all_column_names_),
278query_info_.prewhere_info)}, all_column_names_, query_info_, storage_snapshot_, context_)
279, reader_settings(getMergeTreeReaderSettings(context_, query_info_))
280, prepared_parts(std::move(parts_))
281, alter_conversions_for_parts(std::move(alter_conversions_))
282, all_column_names(std::move(all_column_names_))
283, data(data_)
284, actions_settings(ExpressionActionsSettings::fromContext(context_))
285, metadata_for_reading(storage_snapshot->getMetadataForQuery())
286, block_size{
287.max_block_size_rows = max_block_size_,
288.preferred_block_size_bytes = context->getSettingsRef().preferred_block_size_bytes,
289.preferred_max_column_in_block_size_bytes = context->getSettingsRef().preferred_max_column_in_block_size_bytes}
290, requested_num_streams(num_streams_)
291, max_block_numbers_to_read(std::move(max_block_numbers_to_read_))
292, log(std::move(log_))
293, analyzed_result_ptr(analyzed_result_ptr_)
294, is_parallel_reading_from_replicas(enable_parallel_reading)
295{
296if (is_parallel_reading_from_replicas)
297{
298all_ranges_callback = context->getMergeTreeAllRangesCallback();
299read_task_callback = context->getMergeTreeReadTaskCallback();
300}
301
302const auto & settings = context->getSettingsRef();
303if (settings.max_streams_for_merge_tree_reading)
304{
305if (settings.allow_asynchronous_read_from_io_pool_for_merge_tree)
306{
307/// When async reading is enabled, allow to read using more streams.
308/// Will add resize to output_streams_limit to reduce memory usage.
309output_streams_limit = std::min<size_t>(requested_num_streams, settings.max_streams_for_merge_tree_reading);
310/// We intentionally set `max_streams` to 1 in InterpreterSelectQuery in case of small limit.
311/// Changing it here to `max_streams_for_merge_tree_reading` proven itself as a threat for performance.
312if (requested_num_streams != 1)
313requested_num_streams = std::max<size_t>(requested_num_streams, settings.max_streams_for_merge_tree_reading);
314}
315else
316/// Just limit requested_num_streams otherwise.
317requested_num_streams = std::min<size_t>(requested_num_streams, settings.max_streams_for_merge_tree_reading);
318}
319
320/// Add explicit description.
321setStepDescription(data.getStorageID().getFullNameNotQuoted());
322enable_vertical_final = query_info.isFinal() && context->getSettingsRef().enable_vertical_final && data.merging_params.mode == MergeTreeData::MergingParams::Replacing;
323
324updateSortDescriptionForOutputStream(
325*output_stream,
326storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(),
327getSortDirection(),
328query_info.input_order_info,
329prewhere_info,
330enable_vertical_final);
331}
332
333
334Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
335RangesInDataParts parts_with_range,
336Names required_columns,
337PoolSettings pool_settings)
338{
339const auto & client_info = context->getClientInfo();
340
341auto extension = ParallelReadingExtension
342{
343.all_callback = all_ranges_callback.value(),
344.callback = read_task_callback.value(),
345.count_participating_replicas = client_info.count_participating_replicas,
346.number_of_current_replica = client_info.number_of_current_replica,
347.columns_to_read = required_columns,
348};
349
350/// We have a special logic for local replica. It has to read less data, because in some cases it should
351/// merge states of aggregate functions or do some other important stuff other than reading from Disk.
352const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
353if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
354pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
355else
356throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
357"Exceeded limit for the number of marks per a single task for parallel replicas. "
358"Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
359multiplier);
360
361auto pool = std::make_shared<MergeTreeReadPoolParallelReplicas>(
362std::move(extension),
363std::move(parts_with_range),
364shared_virtual_fields,
365storage_snapshot,
366prewhere_info,
367actions_settings,
368reader_settings,
369required_columns,
370pool_settings,
371context);
372
373auto block_size_copy = block_size;
374block_size_copy.min_marks_to_read = pool_settings.min_marks_for_concurrent_read;
375
376Pipes pipes;
377
378for (size_t i = 0; i < pool_settings.threads; ++i)
379{
380auto algorithm = std::make_unique<MergeTreeThreadSelectAlgorithm>(i);
381
382auto processor = std::make_unique<MergeTreeSelectProcessor>(
383pool, std::move(algorithm), storage_snapshot, prewhere_info,
384actions_settings, block_size_copy, reader_settings);
385
386auto source = std::make_shared<MergeTreeSource>(std::move(processor));
387pipes.emplace_back(std::move(source));
388}
389
390return Pipe::unitePipes(std::move(pipes));
391}
392
393
394Pipe ReadFromMergeTree::readFromPool(
395RangesInDataParts parts_with_range,
396Names required_columns,
397PoolSettings pool_settings)
398{
399size_t total_rows = parts_with_range.getRowsCountAllParts();
400
401if (query_info.limit > 0 && query_info.limit < total_rows)
402total_rows = query_info.limit;
403
404const auto & settings = context->getSettingsRef();
405
406/// round min_marks_to_read up to nearest multiple of block_size expressed in marks
407/// If granularity is adaptive it doesn't make sense
408/// Maybe it will make sense to add settings `max_block_size_bytes`
409if (block_size.max_block_size_rows && !data.canUseAdaptiveGranularity())
410{
411size_t fixed_index_granularity = data.getSettings()->index_granularity;
412pool_settings.min_marks_for_concurrent_read = (pool_settings.min_marks_for_concurrent_read * fixed_index_granularity + block_size.max_block_size_rows - 1)
413/ block_size.max_block_size_rows * block_size.max_block_size_rows / fixed_index_granularity;
414}
415
416bool all_parts_are_remote = true;
417bool all_parts_are_local = true;
418for (const auto & part : parts_with_range)
419{
420const bool is_remote = part.data_part->isStoredOnRemoteDisk();
421all_parts_are_local &= !is_remote;
422all_parts_are_remote &= is_remote;
423}
424
425MergeTreeReadPoolPtr pool;
426
427bool allow_prefetched_remote = all_parts_are_remote
428&& settings.allow_prefetched_read_pool_for_remote_filesystem
429&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method);
430
431bool allow_prefetched_local = all_parts_are_local
432&& settings.allow_prefetched_read_pool_for_local_filesystem
433&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.local_fs_method);
434
435/** Do not use prefetched read pool if query is trivial limit query.
436* Because time spend during filling per thread tasks can be greater than whole query
437* execution for big tables with small limit.
438*/
439bool use_prefetched_read_pool = query_info.limit == 0 && (allow_prefetched_remote || allow_prefetched_local);
440
441if (use_prefetched_read_pool)
442{
443pool = std::make_shared<MergeTreePrefetchedReadPool>(
444std::move(parts_with_range),
445shared_virtual_fields,
446storage_snapshot,
447prewhere_info,
448actions_settings,
449reader_settings,
450required_columns,
451pool_settings,
452context);
453}
454else
455{
456pool = std::make_shared<MergeTreeReadPool>(
457std::move(parts_with_range),
458shared_virtual_fields,
459storage_snapshot,
460prewhere_info,
461actions_settings,
462reader_settings,
463required_columns,
464pool_settings,
465context);
466}
467
468LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows, pool_settings.threads);
469
470/// The reason why we change this setting is because MergeTreeReadPool takes the full task
471/// ignoring min_marks_to_read setting in case of remote disk (see MergeTreeReadPool::getTask).
472/// In this case, we won't limit the number of rows to read based on adaptive granularity settings.
473auto block_size_copy = block_size;
474block_size_copy.min_marks_to_read = pool_settings.min_marks_for_concurrent_read;
475
476Pipes pipes;
477for (size_t i = 0; i < pool_settings.threads; ++i)
478{
479auto algorithm = std::make_unique<MergeTreeThreadSelectAlgorithm>(i);
480
481auto processor = std::make_unique<MergeTreeSelectProcessor>(
482pool, std::move(algorithm), storage_snapshot, prewhere_info,
483actions_settings, block_size_copy, reader_settings);
484
485auto source = std::make_shared<MergeTreeSource>(std::move(processor));
486
487if (i == 0)
488source->addTotalRowsApprox(total_rows);
489
490pipes.emplace_back(std::move(source));
491}
492
493auto pipe = Pipe::unitePipes(std::move(pipes));
494if (output_streams_limit && output_streams_limit < pipe.numOutputPorts())
495pipe.resize(output_streams_limit);
496return pipe;
497}
498
499Pipe ReadFromMergeTree::readInOrder(
500RangesInDataParts parts_with_ranges,
501Names required_columns,
502PoolSettings pool_settings,
503ReadType read_type,
504UInt64 limit)
505{
506/// For reading in order it makes sense to read only
507/// one range per task to reduce number of read rows.
508bool has_limit_below_one_block = read_type != ReadType::Default && limit && limit < block_size.max_block_size_rows;
509MergeTreeReadPoolPtr pool;
510
511if (is_parallel_reading_from_replicas)
512{
513const auto & client_info = context->getClientInfo();
514ParallelReadingExtension extension
515{
516.all_callback = all_ranges_callback.value(),
517.callback = read_task_callback.value(),
518.count_participating_replicas = client_info.count_participating_replicas,
519.number_of_current_replica = client_info.number_of_current_replica,
520.columns_to_read = required_columns,
521};
522
523const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
524if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
525pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
526else
527throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
528"Exceeded limit for the number of marks per a single task for parallel replicas. "
529"Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
530multiplier);
531
532CoordinationMode mode = read_type == ReadType::InOrder
533? CoordinationMode::WithOrder
534: CoordinationMode::ReverseOrder;
535
536pool = std::make_shared<MergeTreeReadPoolParallelReplicasInOrder>(
537std::move(extension),
538mode,
539parts_with_ranges,
540shared_virtual_fields,
541storage_snapshot,
542prewhere_info,
543actions_settings,
544reader_settings,
545required_columns,
546pool_settings,
547context);
548}
549else
550{
551pool = std::make_shared<MergeTreeReadPoolInOrder>(
552has_limit_below_one_block,
553read_type,
554parts_with_ranges,
555shared_virtual_fields,
556storage_snapshot,
557prewhere_info,
558actions_settings,
559reader_settings,
560required_columns,
561pool_settings,
562context);
563}
564
565/// Actually it means that parallel reading from replicas enabled
566/// and we have to collaborate with initiator.
567/// In this case we won't set approximate rows, because it will be accounted multiple times.
568/// Also do not count amount of read rows if we read in order of sorting key,
569/// because we don't know actual amount of read rows in case when limit is set.
570bool set_rows_approx = !is_parallel_reading_from_replicas && !reader_settings.read_in_order;
571
572Pipes pipes;
573for (size_t i = 0; i < parts_with_ranges.size(); ++i)
574{
575const auto & part_with_ranges = parts_with_ranges[i];
576
577UInt64 total_rows = part_with_ranges.getRowsCount();
578if (query_info.limit > 0 && query_info.limit < total_rows)
579total_rows = query_info.limit;
580
581LOG_TRACE(log, "Reading {} ranges in{}order from part {}, approx. {} rows starting from {}",
582part_with_ranges.ranges.size(),
583read_type == ReadType::InReverseOrder ? " reverse " : " ",
584part_with_ranges.data_part->name, total_rows,
585part_with_ranges.data_part->index_granularity.getMarkStartingRow(part_with_ranges.ranges.front().begin));
586
587MergeTreeSelectAlgorithmPtr algorithm;
588if (read_type == ReadType::InReverseOrder)
589algorithm = std::make_unique<MergeTreeInReverseOrderSelectAlgorithm>(i);
590else
591algorithm = std::make_unique<MergeTreeInOrderSelectAlgorithm>(i);
592
593auto processor = std::make_unique<MergeTreeSelectProcessor>(
594pool, std::move(algorithm), storage_snapshot, prewhere_info,
595actions_settings, block_size, reader_settings);
596
597processor->addPartLevelToChunk(isQueryWithFinal());
598
599auto source = std::make_shared<MergeTreeSource>(std::move(processor));
600if (set_rows_approx)
601source->addTotalRowsApprox(total_rows);
602
603pipes.emplace_back(std::move(source));
604}
605
606auto pipe = Pipe::unitePipes(std::move(pipes));
607
608if (read_type == ReadType::InReverseOrder)
609{
610pipe.addSimpleTransform([&](const Block & header)
611{
612return std::make_shared<ReverseTransform>(header);
613});
614}
615
616return pipe;
617}
618
619Pipe ReadFromMergeTree::read(
620RangesInDataParts parts_with_range,
621Names required_columns,
622ReadType read_type,
623size_t max_streams,
624size_t min_marks_for_concurrent_read,
625bool use_uncompressed_cache)
626{
627const auto & settings = context->getSettingsRef();
628size_t sum_marks = parts_with_range.getMarksCountAllParts();
629
630PoolSettings pool_settings
631{
632.threads = max_streams,
633.sum_marks = sum_marks,
634.min_marks_for_concurrent_read = min_marks_for_concurrent_read,
635.preferred_block_size_bytes = settings.preferred_block_size_bytes,
636.use_uncompressed_cache = use_uncompressed_cache,
637.use_const_size_tasks_for_remote_reading = settings.merge_tree_use_const_size_tasks_for_remote_reading,
638};
639
640if (read_type == ReadType::ParallelReplicas)
641return readFromPoolParallelReplicas(std::move(parts_with_range), std::move(required_columns), std::move(pool_settings));
642
643/// Reading from default thread pool is beneficial for remote storage because of new prefetches.
644if (read_type == ReadType::Default && (max_streams > 1 || checkAllPartsOnRemoteFS(parts_with_range)))
645return readFromPool(std::move(parts_with_range), std::move(required_columns), std::move(pool_settings));
646
647auto pipe = readInOrder(parts_with_range, required_columns, pool_settings, read_type, /*limit=*/ 0);
648
649/// Use ConcatProcessor to concat sources together.
650/// It is needed to read in parts order (and so in PK order) if single thread is used.
651if (read_type == ReadType::Default && pipe.numOutputPorts() > 1)
652pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
653
654return pipe;
655}
656
657namespace
658{
659
660struct PartRangesReadInfo
661{
662std::vector<size_t> sum_marks_in_parts;
663
664size_t sum_marks = 0;
665size_t total_rows = 0;
666size_t adaptive_parts = 0;
667size_t index_granularity_bytes = 0;
668size_t max_marks_to_use_cache = 0;
669size_t min_marks_for_concurrent_read = 0;
670bool use_uncompressed_cache = false;
671
672PartRangesReadInfo(
673const RangesInDataParts & parts,
674const Settings & settings,
675const MergeTreeSettings & data_settings)
676{
677/// Count marks for each part.
678sum_marks_in_parts.resize(parts.size());
679
680for (size_t i = 0; i < parts.size(); ++i)
681{
682total_rows += parts[i].getRowsCount();
683sum_marks_in_parts[i] = parts[i].getMarksCount();
684sum_marks += sum_marks_in_parts[i];
685
686if (parts[i].data_part->index_granularity_info.mark_type.adaptive)
687++adaptive_parts;
688}
689
690if (adaptive_parts > parts.size() / 2)
691index_granularity_bytes = data_settings.index_granularity_bytes;
692
693max_marks_to_use_cache = MergeTreeDataSelectExecutor::roundRowsOrBytesToMarks(
694settings.merge_tree_max_rows_to_use_cache,
695settings.merge_tree_max_bytes_to_use_cache,
696data_settings.index_granularity,
697index_granularity_bytes);
698
699auto all_parts_on_remote_disk = checkAllPartsOnRemoteFS(parts);
700
701size_t min_rows_for_concurrent_read;
702size_t min_bytes_for_concurrent_read;
703if (all_parts_on_remote_disk)
704{
705min_rows_for_concurrent_read = settings.merge_tree_min_rows_for_concurrent_read_for_remote_filesystem;
706min_bytes_for_concurrent_read = settings.merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem;
707}
708else
709{
710min_rows_for_concurrent_read = settings.merge_tree_min_rows_for_concurrent_read;
711min_bytes_for_concurrent_read = settings.merge_tree_min_bytes_for_concurrent_read;
712}
713
714min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
715min_rows_for_concurrent_read, min_bytes_for_concurrent_read,
716data_settings.index_granularity, index_granularity_bytes, sum_marks);
717
718use_uncompressed_cache = settings.use_uncompressed_cache;
719if (sum_marks > max_marks_to_use_cache)
720use_uncompressed_cache = false;
721}
722};
723
724}
725
726Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & column_names)
727{
728const auto & settings = context->getSettingsRef();
729const auto data_settings = data.getSettings();
730
731LOG_TRACE(log, "Spreading mark ranges among streams (default reading)");
732
733PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
734
735if (0 == info.sum_marks)
736return {};
737
738if (num_streams > 1)
739{
740/// Reduce the number of num_streams if the data is small.
741if (info.sum_marks < num_streams * info.min_marks_for_concurrent_read && parts_with_ranges.size() < num_streams)
742{
743/*
744If the data is fragmented, then allocate the size of parts to num_streams. If the data is not fragmented, besides the sum_marks and
745min_marks_for_concurrent_read, involve the system cores to get the num_streams. Increase the num_streams and decrease the min_marks_for_concurrent_read
746if the data is small but system has plentiful cores. It helps to improve the parallel performance of `MergeTreeRead` significantly.
747Make sure the new num_streams `num_streams * increase_num_streams_ratio` will not exceed the previous calculated prev_num_streams.
748The new info.min_marks_for_concurrent_read `info.min_marks_for_concurrent_read / increase_num_streams_ratio` should be larger than 8.
749https://github.com/ClickHouse/ClickHouse/pull/53867
750*/
751if ((info.sum_marks + info.min_marks_for_concurrent_read - 1) / info.min_marks_for_concurrent_read > parts_with_ranges.size())
752{
753const size_t prev_num_streams = num_streams;
754num_streams = (info.sum_marks + info.min_marks_for_concurrent_read - 1) / info.min_marks_for_concurrent_read;
755const size_t increase_num_streams_ratio = std::min(prev_num_streams / num_streams, info.min_marks_for_concurrent_read / 8);
756if (increase_num_streams_ratio > 1)
757{
758num_streams = num_streams * increase_num_streams_ratio;
759info.min_marks_for_concurrent_read = (info.sum_marks + num_streams - 1) / num_streams;
760}
761}
762else
763num_streams = parts_with_ranges.size();
764}
765}
766
767auto read_type = is_parallel_reading_from_replicas ? ReadType::ParallelReplicas : ReadType::Default;
768
769double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability = settings.merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability;
770std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability);
771
772if (read_type != ReadType::ParallelReplicas &&
773num_streams > 1 &&
774read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0 &&
775fault(thread_local_rng) &&
776!isQueryWithFinal() &&
777data.merging_params.is_deleted_column.empty() &&
778!prewhere_info)
779{
780NameSet column_names_set(column_names.begin(), column_names.end());
781Names in_order_column_names_to_read(column_names);
782
783/// Add columns needed to calculate the sorting expression
784for (const auto & column_name : metadata_for_reading->getColumnsRequiredForSortingKey())
785{
786if (column_names_set.contains(column_name))
787continue;
788
789in_order_column_names_to_read.push_back(column_name);
790column_names_set.insert(column_name);
791}
792
793auto in_order_reading_step_getter = [this, &in_order_column_names_to_read, &info](auto parts)
794{
795return this->read(
796std::move(parts),
797in_order_column_names_to_read,
798ReadType::InOrder,
7991 /* num_streams */,
8000 /* min_marks_for_concurrent_read */,
801info.use_uncompressed_cache);
802};
803
804auto sorting_expr = std::make_shared<ExpressionActions>(metadata_for_reading->getSortingKey().expression->getActionsDAG().clone());
805
806SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
807metadata_for_reading->getPrimaryKey(),
808std::move(sorting_expr),
809std::move(parts_with_ranges),
810num_streams,
811context,
812std::move(in_order_reading_step_getter),
813true /*split_parts_ranges_into_intersecting_and_non_intersecting_final*/,
814true /*split_intersecting_parts_ranges_into_layers*/);
815
816auto merging_pipes = std::move(split_ranges_result.merging_pipes);
817auto non_intersecting_parts_ranges_read_pipe = read(std::move(split_ranges_result.non_intersecting_parts_ranges),
818column_names,
819read_type,
820num_streams,
821info.min_marks_for_concurrent_read,
822info.use_uncompressed_cache);
823
824if (merging_pipes.empty())
825return non_intersecting_parts_ranges_read_pipe;
826
827Pipes pipes;
828pipes.resize(2);
829pipes[0] = Pipe::unitePipes(std::move(merging_pipes));
830pipes[1] = std::move(non_intersecting_parts_ranges_read_pipe);
831
832auto conversion_action = ActionsDAG::makeConvertingActions(
833pipes[0].getHeader().getColumnsWithTypeAndName(),
834pipes[1].getHeader().getColumnsWithTypeAndName(),
835ActionsDAG::MatchColumnsMode::Name);
836pipes[0].addSimpleTransform(
837[conversion_action](const Block & header)
838{
839auto converting_expr = std::make_shared<ExpressionActions>(conversion_action);
840return std::make_shared<ExpressionTransform>(header, converting_expr);
841});
842return Pipe::unitePipes(std::move(pipes));
843}
844
845return read(std::move(parts_with_ranges),
846column_names,
847read_type,
848num_streams,
849info.min_marks_for_concurrent_read,
850info.use_uncompressed_cache);
851}
852
853static ActionsDAGPtr createProjection(const Block & header)
854{
855auto projection = std::make_shared<ActionsDAG>(header.getNamesAndTypesList());
856projection->removeUnusedActions(header.getNames());
857projection->projectInput();
858return projection;
859}
860
861Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
862RangesInDataParts && parts_with_ranges,
863size_t num_streams,
864const Names & column_names,
865ActionsDAGPtr & out_projection,
866const InputOrderInfoPtr & input_order_info)
867{
868const auto & settings = context->getSettingsRef();
869const auto data_settings = data.getSettings();
870
871LOG_TRACE(log, "Spreading ranges among streams with order");
872
873PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
874
875Pipes res;
876
877if (info.sum_marks == 0)
878return {};
879
880/// PREWHERE actions can remove some input columns (which are needed only for prewhere condition).
881/// In case of read-in-order, PREWHERE is executed before sorting. But removed columns could be needed for sorting key.
882/// To fix this, we prohibit removing any input in prewhere actions. Instead, projection actions will be added after sorting.
883/// See 02354_read_in_order_prewhere.sql as an example.
884bool have_input_columns_removed_after_prewhere = false;
885if (prewhere_info)
886{
887NameSet sorting_columns;
888for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
889sorting_columns.insert(column.name);
890
891have_input_columns_removed_after_prewhere = restorePrewhereInputs(*prewhere_info, sorting_columns);
892}
893
894/// Let's split ranges to avoid reading much data.
895auto split_ranges
896= [rows_granularity = data_settings->index_granularity, my_max_block_size = block_size.max_block_size_rows]
897(const auto & ranges, int direction)
898{
899MarkRanges new_ranges;
900const size_t max_marks_in_range = (my_max_block_size + rows_granularity - 1) / rows_granularity;
901size_t marks_in_range = 1;
902
903if (direction == 1)
904{
905/// Split first few ranges to avoid reading much data.
906bool split = false;
907for (auto range : ranges)
908{
909while (!split && range.begin + marks_in_range < range.end)
910{
911new_ranges.emplace_back(range.begin, range.begin + marks_in_range);
912range.begin += marks_in_range;
913marks_in_range *= 2;
914
915if (marks_in_range > max_marks_in_range)
916split = true;
917}
918new_ranges.emplace_back(range.begin, range.end);
919}
920}
921else
922{
923/// Split all ranges to avoid reading much data, because we have to
924/// store whole range in memory to reverse it.
925for (auto it = ranges.rbegin(); it != ranges.rend(); ++it)
926{
927auto range = *it;
928while (range.begin + marks_in_range < range.end)
929{
930new_ranges.emplace_front(range.end - marks_in_range, range.end);
931range.end -= marks_in_range;
932marks_in_range = std::min(marks_in_range * 2, max_marks_in_range);
933}
934new_ranges.emplace_front(range.begin, range.end);
935}
936}
937
938return new_ranges;
939};
940
941const size_t min_marks_per_stream = (info.sum_marks - 1) / num_streams + 1;
942bool need_preliminary_merge = (parts_with_ranges.size() > settings.read_in_order_two_level_merge_threshold);
943
944const auto read_type = input_order_info->direction == 1 ? ReadType::InOrder : ReadType::InReverseOrder;
945
946PoolSettings pool_settings
947{
948.min_marks_for_concurrent_read = info.min_marks_for_concurrent_read,
949.preferred_block_size_bytes = settings.preferred_block_size_bytes,
950.use_uncompressed_cache = info.use_uncompressed_cache,
951};
952
953Pipes pipes;
954/// For parallel replicas the split will be performed on the initiator side.
955if (is_parallel_reading_from_replicas)
956{
957pipes.emplace_back(readInOrder(std::move(parts_with_ranges), column_names, pool_settings, read_type, input_order_info->limit));
958}
959else
960{
961std::vector<RangesInDataParts> splitted_parts_and_ranges;
962splitted_parts_and_ranges.reserve(num_streams);
963
964for (size_t i = 0; i < num_streams && !parts_with_ranges.empty(); ++i)
965{
966size_t need_marks = min_marks_per_stream;
967RangesInDataParts new_parts;
968
969/// Loop over parts.
970/// We will iteratively take part or some subrange of a part from the back
971/// and assign a stream to read from it.
972while (need_marks > 0 && !parts_with_ranges.empty())
973{
974RangesInDataPart part = parts_with_ranges.back();
975parts_with_ranges.pop_back();
976size_t & marks_in_part = info.sum_marks_in_parts.back();
977
978/// We will not take too few rows from a part.
979if (marks_in_part >= info.min_marks_for_concurrent_read && need_marks < info.min_marks_for_concurrent_read)
980need_marks = info.min_marks_for_concurrent_read;
981
982/// Do not leave too few rows in the part.
983if (marks_in_part > need_marks && marks_in_part - need_marks < info.min_marks_for_concurrent_read)
984need_marks = marks_in_part;
985
986MarkRanges ranges_to_get_from_part;
987
988/// We take full part if it contains enough marks or
989/// if we know limit and part contains less than 'limit' rows.
990bool take_full_part = marks_in_part <= need_marks || (input_order_info->limit && input_order_info->limit < part.getRowsCount());
991
992/// We take the whole part if it is small enough.
993if (take_full_part)
994{
995ranges_to_get_from_part = part.ranges;
996
997need_marks -= marks_in_part;
998info.sum_marks_in_parts.pop_back();
999}
1000else
1001{
1002/// Loop through ranges in part. Take enough ranges to cover "need_marks".
1003while (need_marks > 0)
1004{
1005if (part.ranges.empty())
1006throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected end of ranges while spreading marks among streams");
1007
1008MarkRange & range = part.ranges.front();
1009
1010const size_t marks_in_range = range.end - range.begin;
1011const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
1012
1013ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
1014range.begin += marks_to_get_from_range;
1015marks_in_part -= marks_to_get_from_range;
1016need_marks -= marks_to_get_from_range;
1017if (range.begin == range.end)
1018part.ranges.pop_front();
1019}
1020parts_with_ranges.emplace_back(part);
1021}
1022
1023ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
1024new_parts.emplace_back(part.data_part, part.alter_conversions, part.part_index_in_query, std::move(ranges_to_get_from_part));
1025}
1026
1027splitted_parts_and_ranges.emplace_back(std::move(new_parts));
1028}
1029
1030for (auto && item : splitted_parts_and_ranges)
1031pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit));
1032}
1033
1034Block pipe_header;
1035if (!pipes.empty())
1036pipe_header = pipes.front().getHeader();
1037
1038if (need_preliminary_merge || output_each_partition_through_separate_port)
1039{
1040size_t prefix_size = input_order_info->used_prefix_of_sorting_key_size;
1041auto order_key_prefix_ast = metadata_for_reading->getSortingKey().expression_list_ast->clone();
1042order_key_prefix_ast->children.resize(prefix_size);
1043
1044auto syntax_result = TreeRewriter(context).analyze(order_key_prefix_ast, metadata_for_reading->getColumns().getAllPhysical());
1045auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActionsDAG(false);
1046const auto & sorting_columns = metadata_for_reading->getSortingKey().column_names;
1047
1048SortDescription sort_description;
1049sort_description.compile_sort_description = settings.compile_sort_description;
1050sort_description.min_count_to_compile_sort_description = settings.min_count_to_compile_sort_description;
1051
1052for (size_t j = 0; j < prefix_size; ++j)
1053sort_description.emplace_back(sorting_columns[j], input_order_info->direction);
1054
1055auto sorting_key_expr = std::make_shared<ExpressionActions>(sorting_key_prefix_expr);
1056
1057auto merge_streams = [&](Pipe & pipe)
1058{
1059pipe.addSimpleTransform([sorting_key_expr](const Block & header)
1060{ return std::make_shared<ExpressionTransform>(header, sorting_key_expr); });
1061
1062if (pipe.numOutputPorts() > 1)
1063{
1064auto transform = std::make_shared<MergingSortedTransform>(
1065pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
1066
1067pipe.addTransform(std::move(transform));
1068}
1069};
1070
1071if (!pipes.empty() && output_each_partition_through_separate_port)
1072{
1073/// In contrast with usual aggregation in order that allocates separate AggregatingTransform for each data part,
1074/// aggregation of partitioned data uses the same AggregatingTransform for all parts of the same partition.
1075/// Thus we need to merge all partition parts into a single sorted stream.
1076Pipe pipe = Pipe::unitePipes(std::move(pipes));
1077merge_streams(pipe);
1078return pipe;
1079}
1080
1081for (auto & pipe : pipes)
1082merge_streams(pipe);
1083}
1084
1085if (!pipes.empty() && (need_preliminary_merge || have_input_columns_removed_after_prewhere))
1086/// Drop temporary columns, added by 'sorting_key_prefix_expr'
1087out_projection = createProjection(pipe_header);
1088
1089return Pipe::unitePipes(std::move(pipes));
1090}
1091
1092static void addMergingFinal(
1093Pipe & pipe,
1094const SortDescription & sort_description,
1095MergeTreeData::MergingParams merging_params,
1096Names partition_key_columns,
1097size_t max_block_size_rows,
1098bool enable_vertical_final,
1099bool can_merge_final_indices_to_next_step_filter)
1100{
1101const auto & header = pipe.getHeader();
1102size_t num_outputs = pipe.numOutputPorts();
1103
1104auto now = time(nullptr);
1105
1106auto get_merging_processor = [&]() -> MergingTransformPtr
1107{
1108switch (merging_params.mode)
1109{
1110case MergeTreeData::MergingParams::Ordinary:
1111return std::make_shared<MergingSortedTransform>(header, num_outputs,
1112sort_description, max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
1113
1114case MergeTreeData::MergingParams::Collapsing:
1115return std::make_shared<CollapsingSortedTransform>(header, num_outputs,
1116sort_description, merging_params.sign_column, true, max_block_size_rows, /*max_block_size_bytes=*/0);
1117
1118case MergeTreeData::MergingParams::Summing:
1119return std::make_shared<SummingSortedTransform>(header, num_outputs,
1120sort_description, merging_params.columns_to_sum, partition_key_columns, max_block_size_rows, /*max_block_size_bytes=*/0);
1121
1122case MergeTreeData::MergingParams::Aggregating:
1123return std::make_shared<AggregatingSortedTransform>(header, num_outputs,
1124sort_description, max_block_size_rows, /*max_block_size_bytes=*/0);
1125
1126case MergeTreeData::MergingParams::Replacing:
1127return std::make_shared<ReplacingSortedTransform>(header, num_outputs,
1128sort_description, merging_params.is_deleted_column, merging_params.version_column, max_block_size_rows, /*max_block_size_bytes=*/0, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false, /*cleanup*/ !merging_params.is_deleted_column.empty(), enable_vertical_final);
1129
1130case MergeTreeData::MergingParams::VersionedCollapsing:
1131return std::make_shared<VersionedCollapsingTransform>(header, num_outputs,
1132sort_description, merging_params.sign_column, max_block_size_rows, /*max_block_size_bytes=*/0);
1133
1134case MergeTreeData::MergingParams::Graphite:
1135return std::make_shared<GraphiteRollupSortedTransform>(header, num_outputs,
1136sort_description, max_block_size_rows, /*max_block_size_bytes=*/0, merging_params.graphite_params, now);
1137}
1138
1139UNREACHABLE();
1140};
1141
1142pipe.addTransform(get_merging_processor());
1143if (enable_vertical_final && !can_merge_final_indices_to_next_step_filter)
1144pipe.addSimpleTransform([](const Block & header_)
1145{ return std::make_shared<SelectByIndicesTransform>(header_); });
1146}
1147
1148bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
1149{
1150const auto & settings = context->getSettingsRef();
1151
1152/// If setting do_not_merge_across_partitions_select_final is set always prefer it
1153if (settings.do_not_merge_across_partitions_select_final.changed)
1154return settings.do_not_merge_across_partitions_select_final;
1155
1156if (!metadata_for_reading->hasPrimaryKey() || !metadata_for_reading->hasPartitionKey())
1157return false;
1158
1159/** To avoid merging parts across partitions we want result of partition key expression for
1160* rows with same primary key to be the same.
1161*
1162* If partition key expression is deterministic, and contains only columns that are included
1163* in primary key, then for same primary key column values, result of partition key expression
1164* will be the same.
1165*/
1166const auto & partition_key_expression = metadata_for_reading->getPartitionKey().expression;
1167if (partition_key_expression->getActionsDAG().hasNonDeterministic())
1168return false;
1169
1170const auto & primary_key_columns = metadata_for_reading->getPrimaryKey().column_names;
1171NameSet primary_key_columns_set(primary_key_columns.begin(), primary_key_columns.end());
1172
1173const auto & partition_key_required_columns = partition_key_expression->getRequiredColumns();
1174for (const auto & partition_key_required_column : partition_key_required_columns)
1175if (!primary_key_columns_set.contains(partition_key_required_column))
1176return false;
1177
1178return true;
1179}
1180
1181Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
1182RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection)
1183{
1184const auto & settings = context->getSettingsRef();
1185const auto & data_settings = data.getSettings();
1186PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
1187
1188assert(num_streams == requested_num_streams);
1189if (num_streams > settings.max_final_threads)
1190num_streams = settings.max_final_threads;
1191
1192/// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions.
1193/// We have all parts in parts vector, where parts with same partition are nearby.
1194/// So we will store iterators pointed to the beginning of each partition range (and parts.end()),
1195/// then we will create a pipe for each partition that will run selecting processor and merging processor
1196/// for the parts with this partition. In the end we will unite all the pipes.
1197std::vector<RangesInDataParts::iterator> parts_to_merge_ranges;
1198auto it = parts_with_ranges.begin();
1199parts_to_merge_ranges.push_back(it);
1200
1201bool do_not_merge_across_partitions_select_final = doNotMergePartsAcrossPartitionsFinal();
1202if (do_not_merge_across_partitions_select_final)
1203{
1204while (it != parts_with_ranges.end())
1205{
1206it = std::find_if(
1207it, parts_with_ranges.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; });
1208parts_to_merge_ranges.push_back(it);
1209}
1210}
1211else
1212{
1213/// If do_not_merge_across_partitions_select_final is false we just merge all the parts.
1214parts_to_merge_ranges.push_back(parts_with_ranges.end());
1215}
1216
1217Pipes merging_pipes;
1218Pipes no_merging_pipes;
1219
1220/// If do_not_merge_across_partitions_select_final is true and num_streams > 1
1221/// we will store lonely parts with level > 0 to use parallel select on them.
1222RangesInDataParts non_intersecting_parts_by_primary_key;
1223
1224auto sorting_expr = std::make_shared<ExpressionActions>(metadata_for_reading->getSortingKey().expression->getActionsDAG().clone());
1225
1226if (prewhere_info)
1227{
1228NameSet sorting_columns;
1229for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
1230sorting_columns.insert(column.name);
1231restorePrewhereInputs(*prewhere_info, sorting_columns);
1232}
1233
1234for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
1235{
1236/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
1237/// with level > 0 then we won't post-process this part, and if num_streams > 1 we
1238/// can use parallel select on such parts.
1239bool no_merging_final = do_not_merge_across_partitions_select_final &&
1240std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
1241parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
1242data.merging_params.is_deleted_column.empty();
1243
1244if (no_merging_final)
1245{
1246non_intersecting_parts_by_primary_key.push_back(std::move(*parts_to_merge_ranges[range_index]));
1247continue;
1248}
1249
1250Pipes pipes;
1251{
1252RangesInDataParts new_parts;
1253
1254for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
1255new_parts.emplace_back(part_it->data_part, part_it->alter_conversions, part_it->part_index_in_query, part_it->ranges);
1256
1257if (new_parts.empty())
1258continue;
1259
1260if (num_streams > 1 && metadata_for_reading->hasPrimaryKey())
1261{
1262// Let's split parts into non intersecting parts ranges and layers to ensure data parallelism of FINAL.
1263auto in_order_reading_step_getter = [this, &column_names, &info](auto parts)
1264{
1265return this->read(
1266std::move(parts),
1267column_names,
1268ReadType::InOrder,
12691 /* num_streams */,
12700 /* min_marks_for_concurrent_read */,
1271info.use_uncompressed_cache);
1272};
1273
1274/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column,
1275/// so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step.
1276bool split_parts_ranges_into_intersecting_and_non_intersecting_final = settings.split_parts_ranges_into_intersecting_and_non_intersecting_final &&
1277data.merging_params.is_deleted_column.empty();
1278
1279SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
1280metadata_for_reading->getPrimaryKey(),
1281sorting_expr,
1282std::move(new_parts),
1283num_streams,
1284context,
1285std::move(in_order_reading_step_getter),
1286split_parts_ranges_into_intersecting_and_non_intersecting_final,
1287settings.split_intersecting_parts_ranges_into_layers_final);
1288
1289for (auto && non_intersecting_parts_range : split_ranges_result.non_intersecting_parts_ranges)
1290non_intersecting_parts_by_primary_key.push_back(std::move(non_intersecting_parts_range));
1291
1292for (auto && merging_pipe : split_ranges_result.merging_pipes)
1293pipes.push_back(std::move(merging_pipe));
1294}
1295else
1296{
1297pipes.emplace_back(read(
1298std::move(new_parts), column_names, ReadType::InOrder, num_streams, 0, info.use_uncompressed_cache));
1299
1300pipes.back().addSimpleTransform([sorting_expr](const Block & header)
1301{ return std::make_shared<ExpressionTransform>(header, sorting_expr); });
1302}
1303
1304/// Drop temporary columns, added by 'sorting_key_expr'
1305if (!out_projection && !pipes.empty())
1306out_projection = createProjection(pipes.front().getHeader());
1307}
1308
1309if (pipes.empty())
1310continue;
1311
1312Names sort_columns = metadata_for_reading->getSortingKeyColumns();
1313SortDescription sort_description;
1314sort_description.compile_sort_description = settings.compile_sort_description;
1315sort_description.min_count_to_compile_sort_description = settings.min_count_to_compile_sort_description;
1316
1317size_t sort_columns_size = sort_columns.size();
1318sort_description.reserve(sort_columns_size);
1319
1320Names partition_key_columns = metadata_for_reading->getPartitionKey().column_names;
1321
1322for (size_t i = 0; i < sort_columns_size; ++i)
1323sort_description.emplace_back(sort_columns[i], 1, 1);
1324
1325for (auto & pipe : pipes)
1326addMergingFinal(
1327pipe,
1328sort_description,
1329data.merging_params,
1330partition_key_columns,
1331block_size.max_block_size_rows,
1332enable_vertical_final,
1333query_info.has_filters_and_no_array_join_before_filter);
1334
1335merging_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
1336}
1337
1338if (!non_intersecting_parts_by_primary_key.empty())
1339{
1340auto pipe = spreadMarkRangesAmongStreams(std::move(non_intersecting_parts_by_primary_key), num_streams, origin_column_names);
1341no_merging_pipes.emplace_back(std::move(pipe));
1342}
1343
1344if (!merging_pipes.empty() && !no_merging_pipes.empty())
1345{
1346out_projection = nullptr; /// We do projection here
1347Pipes pipes;
1348pipes.resize(2);
1349pipes[0] = Pipe::unitePipes(std::move(merging_pipes));
1350pipes[1] = Pipe::unitePipes(std::move(no_merging_pipes));
1351auto conversion_action = ActionsDAG::makeConvertingActions(
1352pipes[0].getHeader().getColumnsWithTypeAndName(),
1353pipes[1].getHeader().getColumnsWithTypeAndName(),
1354ActionsDAG::MatchColumnsMode::Name);
1355pipes[0].addSimpleTransform(
1356[conversion_action](const Block & header)
1357{
1358auto converting_expr = std::make_shared<ExpressionActions>(conversion_action);
1359return std::make_shared<ExpressionTransform>(header, converting_expr);
1360});
1361return Pipe::unitePipes(std::move(pipes));
1362}
1363else
1364return merging_pipes.empty() ? Pipe::unitePipes(std::move(no_merging_pipes)) : Pipe::unitePipes(std::move(merging_pipes));
1365}
1366
1367ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
1368MergeTreeData::DataPartsVector parts,
1369std::vector<AlterConversionsPtr> alter_conversions) const
1370{
1371return selectRangesToRead(
1372std::move(parts),
1373std::move(alter_conversions),
1374metadata_for_reading,
1375query_info,
1376context,
1377requested_num_streams,
1378max_block_numbers_to_read,
1379data,
1380all_column_names,
1381log,
1382indexes);
1383}
1384
1385static void buildIndexes(
1386std::optional<ReadFromMergeTree::Indexes> & indexes,
1387ActionsDAGPtr filter_actions_dag,
1388const MergeTreeData & data,
1389const MergeTreeData::DataPartsVector & parts,
1390const ContextPtr & context,
1391const SelectQueryInfo & query_info,
1392const StorageMetadataPtr & metadata_snapshot)
1393{
1394indexes.reset();
1395
1396// Build and check if primary key is used when necessary
1397const auto & primary_key = metadata_snapshot->getPrimaryKey();
1398const Names & primary_key_column_names = primary_key.column_names;
1399
1400const auto & settings = context->getSettingsRef();
1401
1402indexes.emplace(ReadFromMergeTree::Indexes{{
1403filter_actions_dag,
1404context,
1405primary_key_column_names,
1406primary_key.expression}, {}, {}, {}, {}, false, {}});
1407
1408if (metadata_snapshot->hasPartitionKey())
1409{
1410const auto & partition_key = metadata_snapshot->getPartitionKey();
1411auto minmax_columns_names = MergeTreeData::getMinMaxColumnsNames(partition_key);
1412auto minmax_expression_actions = MergeTreeData::getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context));
1413
1414indexes->minmax_idx_condition.emplace(filter_actions_dag, context, minmax_columns_names, minmax_expression_actions);
1415indexes->partition_pruner.emplace(metadata_snapshot, filter_actions_dag, context, false /* strict */);
1416}
1417
1418indexes->part_values
1419= MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(metadata_snapshot, data, parts, filter_actions_dag, context);
1420MergeTreeDataSelectExecutor::buildKeyConditionFromPartOffset(indexes->part_offset_condition, filter_actions_dag, context);
1421
1422indexes->use_skip_indexes = settings.use_skip_indexes;
1423bool final = query_info.isFinal();
1424
1425if (final && !settings.use_skip_indexes_if_final)
1426indexes->use_skip_indexes = false;
1427
1428if (!indexes->use_skip_indexes)
1429return;
1430
1431std::unordered_set<std::string> ignored_index_names;
1432
1433if (settings.ignore_data_skipping_indices.changed)
1434{
1435const auto & indices = settings.ignore_data_skipping_indices.toString();
1436Tokens tokens(indices.data(), indices.data() + indices.size(), settings.max_query_size);
1437IParser::Pos pos(tokens, static_cast<unsigned>(settings.max_parser_depth), static_cast<unsigned>(settings.max_parser_backtracks));
1438Expected expected;
1439
1440/// Use an unordered list rather than string vector
1441auto parse_single_id_or_literal = [&]
1442{
1443String str;
1444if (!parseIdentifierOrStringLiteral(pos, expected, str))
1445return false;
1446
1447ignored_index_names.insert(std::move(str));
1448return true;
1449};
1450
1451if (!ParserList::parseUtil(pos, expected, parse_single_id_or_literal, false))
1452throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse ignore_data_skipping_indices ('{}')", indices);
1453}
1454
1455UsefulSkipIndexes skip_indexes;
1456using Key = std::pair<String, size_t>;
1457std::map<Key, size_t> merged;
1458
1459for (const auto & index : metadata_snapshot->getSecondaryIndices())
1460{
1461if (!ignored_index_names.contains(index.name))
1462{
1463auto index_helper = MergeTreeIndexFactory::instance().get(index);
1464if (index_helper->isMergeable())
1465{
1466auto [it, inserted] = merged.emplace(Key{index_helper->index.type, index_helper->getGranularity()}, skip_indexes.merged_indices.size());
1467if (inserted)
1468{
1469skip_indexes.merged_indices.emplace_back();
1470skip_indexes.merged_indices.back().condition = index_helper->createIndexMergedCondition(query_info, metadata_snapshot);
1471}
1472
1473skip_indexes.merged_indices[it->second].addIndex(index_helper);
1474}
1475else
1476{
1477MergeTreeIndexConditionPtr condition;
1478if (index_helper->isVectorSearch())
1479{
1480#ifdef ENABLE_ANNOY
1481if (const auto * annoy = typeid_cast<const MergeTreeIndexAnnoy *>(index_helper.get()))
1482condition = annoy->createIndexCondition(query_info, context);
1483#endif
1484#ifdef ENABLE_USEARCH
1485if (const auto * usearch = typeid_cast<const MergeTreeIndexUSearch *>(index_helper.get()))
1486condition = usearch->createIndexCondition(query_info, context);
1487#endif
1488if (!condition)
1489throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown vector search index {}", index_helper->index.name);
1490}
1491else
1492condition = index_helper->createIndexCondition(filter_actions_dag, context);
1493
1494if (!condition->alwaysUnknownOrTrue())
1495skip_indexes.useful_indices.emplace_back(index_helper, condition);
1496}
1497}
1498}
1499
1500// move minmax indices to first positions, so they will be applied first as cheapest ones
1501std::stable_sort(begin(skip_indexes.useful_indices), end(skip_indexes.useful_indices), [](const auto & l, const auto & r)
1502{
1503const bool l_min_max = (typeid_cast<const MergeTreeIndexMinMax *>(l.index.get()));
1504const bool r_min_max = (typeid_cast<const MergeTreeIndexMinMax *>(r.index.get()));
1505if (l_min_max == r_min_max)
1506return false;
1507
1508if (l_min_max)
1509return true; // left is min max but right is not
1510
1511return false; // right is min max but left is not
1512});
1513
1514indexes->skip_indexes = std::move(skip_indexes);
1515}
1516
1517void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes)
1518{
1519if (!indexes)
1520{
1521/// Analyzer generates unique ColumnIdentifiers like __table1.__partition_id in filter nodes,
1522/// while key analysis still requires unqualified column names.
1523std::unordered_map<std::string, ColumnWithTypeAndName> node_name_to_input_node_column;
1524if (query_info.planner_context)
1525{
1526const auto & table_expression_data = query_info.planner_context->getTableExpressionDataOrThrow(query_info.table_expression);
1527const auto & alias_column_expressions = table_expression_data.getAliasColumnExpressions();
1528for (const auto & [column_identifier, column_name] : table_expression_data.getColumnIdentifierToColumnName())
1529{
1530/// ALIAS columns cannot be used in the filter expression without being calculated in ActionsDAG,
1531/// so they should not be added to the input nodes.
1532if (alias_column_expressions.contains(column_name))
1533continue;
1534const auto & column = table_expression_data.getColumnOrThrow(column_name);
1535node_name_to_input_node_column.emplace(column_identifier, ColumnWithTypeAndName(column.type, column_name));
1536}
1537}
1538
1539filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes, node_name_to_input_node_column);
1540
1541/// NOTE: Currently we store two DAGs for analysis:
1542/// (1) SourceStepWithFilter::filter_nodes, (2) query_info.filter_actions_dag. Make sure there are consistent.
1543/// TODO: Get rid of filter_actions_dag in query_info after we move analysis of
1544/// parallel replicas and unused shards into optimization, similar to projection analysis.
1545query_info.filter_actions_dag = filter_actions_dag;
1546
1547buildIndexes(
1548indexes,
1549filter_actions_dag,
1550data,
1551prepared_parts,
1552context,
1553query_info,
1554metadata_for_reading);
1555}
1556}
1557
1558ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
1559MergeTreeData::DataPartsVector parts,
1560std::vector<AlterConversionsPtr> alter_conversions,
1561const StorageMetadataPtr & metadata_snapshot,
1562const SelectQueryInfo & query_info_,
1563ContextPtr context_,
1564size_t num_streams,
1565std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
1566const MergeTreeData & data,
1567const Names & all_column_names,
1568LoggerPtr log,
1569std::optional<Indexes> & indexes)
1570{
1571return selectRangesToReadImpl(
1572std::move(parts),
1573std::move(alter_conversions),
1574metadata_snapshot,
1575query_info_,
1576context_,
1577num_streams,
1578max_block_numbers_to_read,
1579data,
1580all_column_names,
1581log,
1582indexes);
1583}
1584
1585ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
1586MergeTreeData::DataPartsVector parts,
1587std::vector<AlterConversionsPtr> alter_conversions,
1588const StorageMetadataPtr & metadata_snapshot,
1589const SelectQueryInfo & query_info_,
1590ContextPtr context_,
1591size_t num_streams,
1592std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
1593const MergeTreeData & data,
1594const Names & all_column_names,
1595LoggerPtr log,
1596std::optional<Indexes> & indexes)
1597{
1598AnalysisResult result;
1599const auto & settings = context_->getSettingsRef();
1600
1601size_t total_parts = parts.size();
1602
1603result.column_names_to_read = all_column_names;
1604
1605/// If there are only virtual columns in the query, you must request at least one non-virtual one.
1606if (result.column_names_to_read.empty())
1607{
1608NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical();
1609result.column_names_to_read.push_back(ExpressionActions::getSmallestColumn(available_real_columns).name);
1610}
1611
1612// Build and check if primary key is used when necessary
1613const auto & primary_key = metadata_snapshot->getPrimaryKey();
1614const Names & primary_key_column_names = primary_key.column_names;
1615
1616if (!indexes)
1617buildIndexes(indexes, query_info_.filter_actions_dag, data, parts, context_, query_info_, metadata_snapshot);
1618
1619if (indexes->part_values && indexes->part_values->empty())
1620return std::make_shared<AnalysisResult>(std::move(result));
1621
1622if (settings.force_primary_key && indexes->key_condition.alwaysUnknownOrTrue())
1623{
1624throw Exception(ErrorCodes::INDEX_NOT_USED,
1625"Primary key ({}) is not used and setting 'force_primary_key' is set",
1626fmt::join(primary_key_column_names, ", "));
1627}
1628
1629LOG_DEBUG(log, "Key condition: {}", indexes->key_condition.toString());
1630
1631if (indexes->part_offset_condition)
1632LOG_DEBUG(log, "Part offset condition: {}", indexes->part_offset_condition->toString());
1633
1634if (indexes->key_condition.alwaysFalse())
1635return std::make_shared<AnalysisResult>(std::move(result));
1636
1637size_t total_marks_pk = 0;
1638size_t parts_before_pk = 0;
1639
1640{
1641MergeTreeDataSelectExecutor::filterPartsByPartition(
1642indexes->partition_pruner,
1643indexes->minmax_idx_condition,
1644parts,
1645alter_conversions,
1646indexes->part_values,
1647metadata_snapshot,
1648data,
1649context_,
1650max_block_numbers_to_read.get(),
1651log,
1652result.index_stats);
1653
1654result.sampling = MergeTreeDataSelectExecutor::getSampling(
1655query_info_,
1656metadata_snapshot->getColumns().getAllPhysical(),
1657parts,
1658indexes->key_condition,
1659data,
1660metadata_snapshot,
1661context_,
1662log);
1663
1664if (result.sampling.read_nothing)
1665return std::make_shared<AnalysisResult>(std::move(result));
1666
1667for (const auto & part : parts)
1668total_marks_pk += part->index_granularity.getMarksCountWithoutFinal();
1669parts_before_pk = parts.size();
1670
1671auto reader_settings = getMergeTreeReaderSettings(context_, query_info_);
1672result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
1673std::move(parts),
1674std::move(alter_conversions),
1675metadata_snapshot,
1676context_,
1677indexes->key_condition,
1678indexes->part_offset_condition,
1679indexes->skip_indexes,
1680reader_settings,
1681log,
1682num_streams,
1683result.index_stats,
1684indexes->use_skip_indexes);
1685}
1686
1687size_t sum_marks_pk = total_marks_pk;
1688for (const auto & stat : result.index_stats)
1689if (stat.type == IndexType::PrimaryKey)
1690sum_marks_pk = stat.num_granules_after;
1691
1692size_t sum_marks = 0;
1693size_t sum_ranges = 0;
1694size_t sum_rows = 0;
1695
1696for (const auto & part : result.parts_with_ranges)
1697{
1698sum_ranges += part.ranges.size();
1699sum_marks += part.getMarksCount();
1700sum_rows += part.getRowsCount();
1701}
1702
1703result.total_parts = total_parts;
1704result.parts_before_pk = parts_before_pk;
1705result.selected_parts = result.parts_with_ranges.size();
1706result.selected_ranges = sum_ranges;
1707result.selected_marks = sum_marks;
1708result.selected_marks_pk = sum_marks_pk;
1709result.total_marks_pk = total_marks_pk;
1710result.selected_rows = sum_rows;
1711
1712if (query_info_.input_order_info)
1713result.read_type = (query_info_.input_order_info->direction > 0)
1714? ReadType::InOrder
1715: ReadType::InReverseOrder;
1716
1717return std::make_shared<AnalysisResult>(std::move(result));
1718}
1719
1720bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t limit)
1721{
1722/// if dirction is not set, use current one
1723if (!direction)
1724direction = getSortDirection();
1725
1726/// Disable read-in-order optimization for reverse order with final.
1727/// Otherwise, it can lead to incorrect final behavior because the implementation may rely on the reading in direct order).
1728if (direction != 1 && query_info.isFinal())
1729return false;
1730
1731query_info.input_order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, limit);
1732reader_settings.read_in_order = true;
1733
1734/// In case or read-in-order, don't create too many reading streams.
1735/// Almost always we are reading from a single stream at a time because of merge sort.
1736if (output_streams_limit)
1737requested_num_streams = output_streams_limit;
1738
1739/// update sort info for output stream
1740SortDescription sort_description;
1741const Names & sorting_key_columns = metadata_for_reading->getSortingKeyColumns();
1742const Block & header = output_stream->header;
1743const int sort_direction = getSortDirection();
1744for (const auto & column_name : sorting_key_columns)
1745{
1746if (std::find_if(header.begin(), header.end(), [&](ColumnWithTypeAndName const & col) { return col.name == column_name; })
1747== header.end())
1748break;
1749sort_description.emplace_back(column_name, sort_direction);
1750}
1751if (!sort_description.empty())
1752{
1753const size_t used_prefix_of_sorting_key_size = query_info.input_order_info->used_prefix_of_sorting_key_size;
1754if (sort_description.size() > used_prefix_of_sorting_key_size)
1755sort_description.resize(used_prefix_of_sorting_key_size);
1756output_stream->sort_description = std::move(sort_description);
1757output_stream->sort_scope = DataStream::SortScope::Stream;
1758}
1759
1760/// All *InOrder optimization rely on an assumption that output stream is sorted, but vertical FINAL breaks this rule
1761/// Let prefer in-order optimization over vertical FINAL for now
1762enable_vertical_final = false;
1763
1764return true;
1765}
1766
1767bool ReadFromMergeTree::readsInOrder() const
1768{
1769return reader_settings.read_in_order;
1770}
1771
1772void ReadFromMergeTree::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value)
1773{
1774query_info.prewhere_info = prewhere_info_value;
1775prewhere_info = prewhere_info_value;
1776
1777output_stream = DataStream{.header = MergeTreeSelectProcessor::transformHeader(
1778storage_snapshot->getSampleBlockForColumns(all_column_names),
1779prewhere_info_value)};
1780
1781updateSortDescriptionForOutputStream(
1782*output_stream,
1783storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(),
1784getSortDirection(),
1785query_info.input_order_info,
1786prewhere_info,
1787enable_vertical_final);
1788}
1789
1790bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
1791{
1792if (isQueryWithFinal())
1793return false;
1794
1795const auto & settings = context->getSettingsRef();
1796
1797const auto partitions_cnt = countPartitions(prepared_parts);
1798if (!settings.force_aggregate_partitions_independently && (partitions_cnt == 1 || partitions_cnt < settings.max_threads / 2))
1799{
1800LOG_TRACE(
1801log,
1802"Independent aggregation by partitions won't be used because there are too few of them: {}. You can set "
1803"force_aggregate_partitions_independently to suppress this check",
1804partitions_cnt);
1805return false;
1806}
1807
1808if (!settings.force_aggregate_partitions_independently
1809&& (partitions_cnt > settings.max_number_of_partitions_for_independent_aggregation))
1810{
1811LOG_TRACE(
1812log,
1813"Independent aggregation by partitions won't be used because there are too many of them: {}. You can increase "
1814"max_number_of_partitions_for_independent_aggregation (current value is {}) or set "
1815"force_aggregate_partitions_independently to suppress this check",
1816partitions_cnt,
1817settings.max_number_of_partitions_for_independent_aggregation);
1818return false;
1819}
1820
1821if (!settings.force_aggregate_partitions_independently)
1822{
1823std::unordered_map<String, size_t> partition_rows;
1824for (const auto & part : prepared_parts)
1825partition_rows[part->info.partition_id] += part->rows_count;
1826size_t sum_rows = 0;
1827size_t max_rows = 0;
1828for (const auto & [_, rows] : partition_rows)
1829{
1830sum_rows += rows;
1831max_rows = std::max(max_rows, rows);
1832}
1833
1834/// Merging shouldn't take more time than preaggregation in normal cases. And exec time is proportional to the amount of data.
1835/// We assume that exec time of independent aggr is proportional to the maximum of sizes and
1836/// exec time of ordinary aggr is proportional to sum of sizes divided by number of threads and multiplied by two (preaggregation + merging).
1837const size_t avg_rows_in_partition = sum_rows / settings.max_threads;
1838if (max_rows > avg_rows_in_partition * 2)
1839{
1840LOG_TRACE(
1841log,
1842"Independent aggregation by partitions won't be used because there are too big skew in the number of rows between "
1843"partitions. You can set force_aggregate_partitions_independently to suppress this check");
1844return false;
1845}
1846}
1847
1848return output_each_partition_through_separate_port = true;
1849}
1850
1851ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
1852{
1853auto result_ptr = analyzed_result_ptr
1854? analyzed_result_ptr
1855: selectRangesToRead(prepared_parts, alter_conversions_for_parts);
1856
1857return *result_ptr;
1858}
1859
1860bool ReadFromMergeTree::isQueryWithSampling() const
1861{
1862if (context->getSettingsRef().parallel_replicas_count > 1 && data.supportsSampling())
1863return true;
1864
1865const auto & select = query_info.query->as<ASTSelectQuery &>();
1866if (query_info.table_expression_modifiers)
1867return query_info.table_expression_modifiers->getSampleSizeRatio() != std::nullopt;
1868else
1869return select.sampleSize() != nullptr;
1870}
1871
1872Pipe ReadFromMergeTree::spreadMarkRanges(
1873RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, ActionsDAGPtr & result_projection)
1874{
1875const bool final = isQueryWithFinal();
1876Names column_names_to_read = result.column_names_to_read;
1877NameSet names(column_names_to_read.begin(), column_names_to_read.end());
1878
1879if (!final && result.sampling.use_sampling)
1880{
1881NameSet sampling_columns;
1882
1883/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
1884/// Skip this if final was used, because such columns were already added from PK.
1885for (const auto & column : result.sampling.filter_expression->getRequiredColumns().getNames())
1886{
1887if (!names.contains(column))
1888column_names_to_read.push_back(column);
1889
1890sampling_columns.insert(column);
1891}
1892
1893if (prewhere_info)
1894restorePrewhereInputs(*prewhere_info, sampling_columns);
1895}
1896
1897if (final)
1898{
1899chassert(!is_parallel_reading_from_replicas);
1900
1901if (output_each_partition_through_separate_port)
1902throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimization isn't supposed to be used for queries with final");
1903
1904/// Add columns needed to calculate the sorting expression and the sign.
1905for (const auto & column : metadata_for_reading->getColumnsRequiredForSortingKey())
1906{
1907if (!names.contains(column))
1908{
1909column_names_to_read.push_back(column);
1910names.insert(column);
1911}
1912}
1913
1914if (!data.merging_params.is_deleted_column.empty() && !names.contains(data.merging_params.is_deleted_column))
1915column_names_to_read.push_back(data.merging_params.is_deleted_column);
1916if (!data.merging_params.sign_column.empty() && !names.contains(data.merging_params.sign_column))
1917column_names_to_read.push_back(data.merging_params.sign_column);
1918if (!data.merging_params.version_column.empty() && !names.contains(data.merging_params.version_column))
1919column_names_to_read.push_back(data.merging_params.version_column);
1920
1921return spreadMarkRangesAmongStreamsFinal(std::move(parts_with_ranges), num_streams, result.column_names_to_read, column_names_to_read, result_projection);
1922}
1923else if (query_info.input_order_info)
1924{
1925return spreadMarkRangesAmongStreamsWithOrder(
1926std::move(parts_with_ranges), num_streams, column_names_to_read, result_projection, query_info.input_order_info);
1927}
1928else
1929{
1930return spreadMarkRangesAmongStreams(std::move(parts_with_ranges), num_streams, column_names_to_read);
1931}
1932}
1933
1934Pipe ReadFromMergeTree::groupStreamsByPartition(AnalysisResult & result, ActionsDAGPtr & result_projection)
1935{
1936auto && parts_with_ranges = std::move(result.parts_with_ranges);
1937
1938if (parts_with_ranges.empty())
1939return {};
1940
1941const size_t partitions_cnt = std::max<size_t>(countPartitions(parts_with_ranges), 1);
1942const size_t partitions_per_stream = std::max<size_t>(1, partitions_cnt / requested_num_streams);
1943const size_t num_streams = std::max<size_t>(1, requested_num_streams / partitions_cnt);
1944
1945Pipes pipes;
1946for (auto begin = parts_with_ranges.begin(), end = begin; end != parts_with_ranges.end(); begin = end)
1947{
1948for (size_t i = 0; i < partitions_per_stream; ++i)
1949end = std::find_if(
1950end,
1951parts_with_ranges.end(),
1952[&end](const auto & part) { return end->data_part->info.partition_id != part.data_part->info.partition_id; });
1953
1954RangesInDataParts partition_parts{std::make_move_iterator(begin), std::make_move_iterator(end)};
1955
1956pipes.emplace_back(spreadMarkRanges(std::move(partition_parts), num_streams, result, result_projection));
1957if (!pipes.back().empty())
1958pipes.back().resize(1);
1959}
1960
1961return Pipe::unitePipes(std::move(pipes));
1962}
1963
1964void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
1965{
1966auto result = getAnalysisResult();
1967
1968/// Do not keep data parts in snapshot.
1969/// They are stored separately, and some could be released after PK analysis.
1970storage_snapshot->data = std::make_unique<MergeTreeData::SnapshotData>();
1971
1972result.checkLimits(context->getSettingsRef(), query_info);
1973shared_virtual_fields.emplace("_sample_factor", result.sampling.used_sample_factor);
1974
1975LOG_DEBUG(
1976log,
1977"Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
1978result.parts_before_pk,
1979result.total_parts,
1980result.selected_parts,
1981result.selected_marks_pk,
1982result.total_marks_pk,
1983result.selected_marks,
1984result.selected_ranges);
1985
1986// Adding partition info to QueryAccessInfo.
1987if (context->hasQueryContext() && !query_info.is_internal)
1988{
1989Names partition_names;
1990for (const auto & part : result.parts_with_ranges)
1991{
1992partition_names.emplace_back(
1993fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
1994}
1995context->getQueryContext()->addQueryAccessInfo(partition_names);
1996
1997if (storage_snapshot->projection)
1998context->getQueryContext()->addQueryAccessInfo(
1999Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name});
2000}
2001
2002ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);
2003ProfileEvents::increment(ProfileEvents::SelectedRanges, result.selected_ranges);
2004ProfileEvents::increment(ProfileEvents::SelectedMarks, result.selected_marks);
2005
2006auto query_id_holder = MergeTreeDataSelectExecutor::checkLimits(data, result, context);
2007
2008if (result.parts_with_ranges.empty())
2009{
2010pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
2011return;
2012}
2013
2014selected_marks = result.selected_marks;
2015selected_rows = result.selected_rows;
2016selected_parts = result.selected_parts;
2017/// Projection, that needed to drop columns, which have appeared by execution
2018/// of some extra expressions, and to allow execute the same expressions later.
2019/// NOTE: It may lead to double computation of expressions.
2020ActionsDAGPtr result_projection;
2021
2022Pipe pipe = output_each_partition_through_separate_port
2023? groupStreamsByPartition(result, result_projection)
2024: spreadMarkRanges(std::move(result.parts_with_ranges), requested_num_streams, result, result_projection);
2025
2026for (const auto & processor : pipe.getProcessors())
2027processor->setStorageLimits(query_info.storage_limits);
2028
2029if (pipe.empty())
2030{
2031pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
2032return;
2033}
2034
2035if (result.sampling.use_sampling)
2036{
2037auto sampling_actions = std::make_shared<ExpressionActions>(result.sampling.filter_expression);
2038pipe.addSimpleTransform([&](const Block & header)
2039{
2040return std::make_shared<FilterTransform>(
2041header,
2042sampling_actions,
2043result.sampling.filter_function->getColumnName(),
2044false);
2045});
2046}
2047
2048Block cur_header = pipe.getHeader();
2049
2050auto append_actions = [&result_projection](ActionsDAGPtr actions)
2051{
2052if (!result_projection)
2053result_projection = std::move(actions);
2054else
2055result_projection = ActionsDAG::merge(std::move(*result_projection), std::move(*actions));
2056};
2057
2058if (result_projection)
2059cur_header = result_projection->updateHeader(cur_header);
2060
2061/// Extra columns may be returned (for example, if sampling is used).
2062/// Convert pipe to step header structure.
2063if (!isCompatibleHeader(cur_header, getOutputStream().header))
2064{
2065auto converting = ActionsDAG::makeConvertingActions(
2066cur_header.getColumnsWithTypeAndName(),
2067getOutputStream().header.getColumnsWithTypeAndName(),
2068ActionsDAG::MatchColumnsMode::Name);
2069
2070append_actions(std::move(converting));
2071}
2072
2073if (result_projection)
2074{
2075auto projection_actions = std::make_shared<ExpressionActions>(result_projection);
2076pipe.addSimpleTransform([&](const Block & header)
2077{
2078return std::make_shared<ExpressionTransform>(header, projection_actions);
2079});
2080}
2081
2082/// Some extra columns could be added by sample/final/in-order/etc
2083/// Remove them from header if not needed.
2084if (!blocksHaveEqualStructure(pipe.getHeader(), getOutputStream().header))
2085{
2086auto convert_actions_dag = ActionsDAG::makeConvertingActions(
2087pipe.getHeader().getColumnsWithTypeAndName(),
2088getOutputStream().header.getColumnsWithTypeAndName(),
2089ActionsDAG::MatchColumnsMode::Name,
2090true);
2091
2092auto converting_dag_expr = std::make_shared<ExpressionActions>(convert_actions_dag);
2093
2094pipe.addSimpleTransform([&](const Block & header)
2095{
2096return std::make_shared<ExpressionTransform>(header, converting_dag_expr);
2097});
2098}
2099
2100for (const auto & processor : pipe.getProcessors())
2101processors.emplace_back(processor);
2102
2103pipeline.init(std::move(pipe));
2104pipeline.addContext(context);
2105// Attach QueryIdHolder if needed
2106if (query_id_holder)
2107pipeline.setQueryIdHolder(std::move(query_id_holder));
2108}
2109
2110static const char * indexTypeToString(ReadFromMergeTree::IndexType type)
2111{
2112switch (type)
2113{
2114case ReadFromMergeTree::IndexType::None:
2115return "None";
2116case ReadFromMergeTree::IndexType::MinMax:
2117return "MinMax";
2118case ReadFromMergeTree::IndexType::Partition:
2119return "Partition";
2120case ReadFromMergeTree::IndexType::PrimaryKey:
2121return "PrimaryKey";
2122case ReadFromMergeTree::IndexType::Skip:
2123return "Skip";
2124}
2125
2126UNREACHABLE();
2127}
2128
2129static const char * readTypeToString(ReadFromMergeTree::ReadType type)
2130{
2131switch (type)
2132{
2133case ReadFromMergeTree::ReadType::Default:
2134return "Default";
2135case ReadFromMergeTree::ReadType::InOrder:
2136return "InOrder";
2137case ReadFromMergeTree::ReadType::InReverseOrder:
2138return "InReverseOrder";
2139case ReadFromMergeTree::ReadType::ParallelReplicas:
2140return "Parallel";
2141}
2142
2143UNREACHABLE();
2144}
2145
2146void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
2147{
2148auto result = getAnalysisResult();
2149std::string prefix(format_settings.offset, format_settings.indent_char);
2150format_settings.out << prefix << "ReadType: " << readTypeToString(result.read_type) << '\n';
2151
2152if (!result.index_stats.empty())
2153{
2154format_settings.out << prefix << "Parts: " << result.index_stats.back().num_parts_after << '\n';
2155format_settings.out << prefix << "Granules: " << result.index_stats.back().num_granules_after << '\n';
2156}
2157
2158if (prewhere_info)
2159{
2160format_settings.out << prefix << "Prewhere info" << '\n';
2161format_settings.out << prefix << "Need filter: " << prewhere_info->need_filter << '\n';
2162
2163prefix.push_back(format_settings.indent_char);
2164prefix.push_back(format_settings.indent_char);
2165
2166if (prewhere_info->prewhere_actions)
2167{
2168format_settings.out << prefix << "Prewhere filter" << '\n';
2169format_settings.out << prefix << "Prewhere filter column: " << prewhere_info->prewhere_column_name;
2170if (prewhere_info->remove_prewhere_column)
2171format_settings.out << " (removed)";
2172format_settings.out << '\n';
2173
2174auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
2175expression->describeActions(format_settings.out, prefix);
2176}
2177
2178if (prewhere_info->row_level_filter)
2179{
2180format_settings.out << prefix << "Row level filter" << '\n';
2181format_settings.out << prefix << "Row level filter column: " << prewhere_info->row_level_column_name << '\n';
2182
2183auto expression = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter);
2184expression->describeActions(format_settings.out, prefix);
2185}
2186}
2187}
2188
2189void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
2190{
2191auto result = getAnalysisResult();
2192map.add("Read Type", readTypeToString(result.read_type));
2193if (!result.index_stats.empty())
2194{
2195map.add("Parts", result.index_stats.back().num_parts_after);
2196map.add("Granules", result.index_stats.back().num_granules_after);
2197}
2198
2199if (prewhere_info)
2200{
2201std::unique_ptr<JSONBuilder::JSONMap> prewhere_info_map = std::make_unique<JSONBuilder::JSONMap>();
2202prewhere_info_map->add("Need filter", prewhere_info->need_filter);
2203
2204if (prewhere_info->prewhere_actions)
2205{
2206std::unique_ptr<JSONBuilder::JSONMap> prewhere_filter_map = std::make_unique<JSONBuilder::JSONMap>();
2207prewhere_filter_map->add("Prewhere filter column", prewhere_info->prewhere_column_name);
2208prewhere_filter_map->add("Prewhere filter remove filter column", prewhere_info->remove_prewhere_column);
2209auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
2210prewhere_filter_map->add("Prewhere filter expression", expression->toTree());
2211
2212prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map));
2213}
2214
2215if (prewhere_info->row_level_filter)
2216{
2217std::unique_ptr<JSONBuilder::JSONMap> row_level_filter_map = std::make_unique<JSONBuilder::JSONMap>();
2218row_level_filter_map->add("Row level filter column", prewhere_info->row_level_column_name);
2219auto expression = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter);
2220row_level_filter_map->add("Row level filter expression", expression->toTree());
2221
2222prewhere_info_map->add("Row level filter", std::move(row_level_filter_map));
2223}
2224
2225map.add("Prewhere info", std::move(prewhere_info_map));
2226}
2227}
2228
2229void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
2230{
2231auto result = getAnalysisResult();
2232const auto & index_stats = result.index_stats;
2233
2234std::string prefix(format_settings.offset, format_settings.indent_char);
2235if (!index_stats.empty())
2236{
2237/// Do not print anything if no indexes is applied.
2238if (index_stats.size() == 1 && index_stats.front().type == IndexType::None)
2239return;
2240
2241std::string indent(format_settings.indent, format_settings.indent_char);
2242format_settings.out << prefix << "Indexes:\n";
2243
2244for (size_t i = 0; i < index_stats.size(); ++i)
2245{
2246const auto & stat = index_stats[i];
2247if (stat.type == IndexType::None)
2248continue;
2249
2250format_settings.out << prefix << indent << indexTypeToString(stat.type) << '\n';
2251
2252if (!stat.name.empty())
2253format_settings.out << prefix << indent << indent << "Name: " << stat.name << '\n';
2254
2255if (!stat.description.empty())
2256format_settings.out << prefix << indent << indent << "Description: " << stat.description << '\n';
2257
2258if (!stat.used_keys.empty())
2259{
2260format_settings.out << prefix << indent << indent << "Keys: " << stat.name << '\n';
2261for (const auto & used_key : stat.used_keys)
2262format_settings.out << prefix << indent << indent << indent << used_key << '\n';
2263}
2264
2265if (!stat.condition.empty())
2266format_settings.out << prefix << indent << indent << "Condition: " << stat.condition << '\n';
2267
2268format_settings.out << prefix << indent << indent << "Parts: " << stat.num_parts_after;
2269if (i)
2270format_settings.out << '/' << index_stats[i - 1].num_parts_after;
2271format_settings.out << '\n';
2272
2273format_settings.out << prefix << indent << indent << "Granules: " << stat.num_granules_after;
2274if (i)
2275format_settings.out << '/' << index_stats[i - 1].num_granules_after;
2276format_settings.out << '\n';
2277}
2278}
2279}
2280
2281void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
2282{
2283auto result = getAnalysisResult();
2284auto index_stats = std::move(result.index_stats);
2285
2286if (!index_stats.empty())
2287{
2288/// Do not print anything if no indexes is applied.
2289if (index_stats.size() == 1 && index_stats.front().type == IndexType::None)
2290return;
2291
2292auto indexes_array = std::make_unique<JSONBuilder::JSONArray>();
2293
2294for (size_t i = 0; i < index_stats.size(); ++i)
2295{
2296const auto & stat = index_stats[i];
2297if (stat.type == IndexType::None)
2298continue;
2299
2300auto index_map = std::make_unique<JSONBuilder::JSONMap>();
2301
2302index_map->add("Type", indexTypeToString(stat.type));
2303
2304if (!stat.name.empty())
2305index_map->add("Name", stat.name);
2306
2307if (!stat.description.empty())
2308index_map->add("Description", stat.description);
2309
2310if (!stat.used_keys.empty())
2311{
2312auto keys_array = std::make_unique<JSONBuilder::JSONArray>();
2313
2314for (const auto & used_key : stat.used_keys)
2315keys_array->add(used_key);
2316
2317index_map->add("Keys", std::move(keys_array));
2318}
2319
2320if (!stat.condition.empty())
2321index_map->add("Condition", stat.condition);
2322
2323if (i)
2324index_map->add("Initial Parts", index_stats[i - 1].num_parts_after);
2325index_map->add("Selected Parts", stat.num_parts_after);
2326
2327if (i)
2328index_map->add("Initial Granules", index_stats[i - 1].num_granules_after);
2329index_map->add("Selected Granules", stat.num_granules_after);
2330
2331indexes_array->add(std::move(index_map));
2332}
2333
2334map.add("Indexes", std::move(indexes_array));
2335}
2336}
2337
2338
2339}
2340