ClickHouse

Форк
0
/
ReadFromMergeTree.cpp 
2339 строк · 95.4 Кб
1
#include <Processors/QueryPlan/ReadFromMergeTree.h>
2

3
#include <IO/Operators.h>
4
#include <Interpreters/Context.h>
5
#include <Interpreters/ExpressionAnalyzer.h>
6
#include <Interpreters/InterpreterSelectQuery.h>
7
#include <Interpreters/TreeRewriter.h>
8
#include <Parsers/ASTFunction.h>
9
#include <Parsers/ASTIdentifier.h>
10
#include <Parsers/ASTSelectQuery.h>
11
#include <Processors/ConcatProcessor.h>
12
#include <Processors/Merges/AggregatingSortedTransform.h>
13
#include <Processors/Merges/CollapsingSortedTransform.h>
14
#include <Processors/Merges/GraphiteRollupSortedTransform.h>
15
#include <Processors/Merges/MergingSortedTransform.h>
16
#include <Processors/Merges/ReplacingSortedTransform.h>
17
#include <Processors/Merges/SummingSortedTransform.h>
18
#include <Processors/Merges/VersionedCollapsingTransform.h>
19
#include <Processors/QueryPlan/PartsSplitter.h>
20
#include <Processors/Sources/NullSource.h>
21
#include <Processors/Transforms/ExpressionTransform.h>
22
#include <Processors/Transforms/FilterTransform.h>
23
#include <Processors/Transforms/ReverseTransform.h>
24
#include <Processors/Transforms/SelectByIndicesTransform.h>
25
#include <QueryPipeline/QueryPipelineBuilder.h>
26
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
27
#include <Storages/MergeTree/MergeTreeIndexAnnoy.h>
28
#include <Storages/MergeTree/MergeTreeIndexUSearch.h>
29
#include <Storages/MergeTree/MergeTreeReadPool.h>
30
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
31
#include <Storages/MergeTree/MergeTreeReadPoolInOrder.h>
32
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>
33
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
34
#include <Storages/MergeTree/MergeTreeSource.h>
35
#include <Storages/MergeTree/RangesInDataPart.h>
36
#include <Storages/MergeTree/RequestResponse.h>
37
#include <Storages/VirtualColumnUtils.h>
38
#include <base/sort.h>
39
#include <Poco/Logger.h>
40
#include <Common/JSONBuilder.h>
41
#include <Common/isLocalAddress.h>
42
#include <Common/logger_useful.h>
43
#include <Processors/QueryPlan/IQueryPlanStep.h>
44
#include <Parsers/parseIdentifierOrStringLiteral.h>
45
#include <Parsers/ExpressionListParsers.h>
46
#include <Storages/MergeTree/MergeTreeIndexMinMax.h>
47

48
#include <algorithm>
49
#include <iterator>
50
#include <memory>
51
#include <unordered_map>
52

53
using namespace DB;
54

55
namespace
56
{
57
template <typename Container, typename Getter>
58
size_t countPartitions(const Container & parts, Getter get_partition_id)
59
{
60
    if (parts.empty())
61
        return 0;
62

63
    String cur_partition_id = get_partition_id(parts[0]);
64
    size_t unique_partitions = 1;
65
    for (size_t i = 1; i < parts.size(); ++i)
66
    {
67
        if (get_partition_id(parts[i]) != cur_partition_id)
68
        {
69
            ++unique_partitions;
70
            cur_partition_id = get_partition_id(parts[i]);
71
        }
72
    }
73
    return unique_partitions;
74
}
75

76
size_t countPartitions(const RangesInDataParts & parts_with_ranges)
77
{
78
    auto get_partition_id = [](const RangesInDataPart & rng) { return rng.data_part->info.partition_id; };
79
    return countPartitions(parts_with_ranges, get_partition_id);
80
}
81

82
size_t countPartitions(const MergeTreeData::DataPartsVector & prepared_parts)
83
{
84
    auto get_partition_id = [](const MergeTreeData::DataPartPtr data_part) { return data_part->info.partition_id; };
85
    return countPartitions(prepared_parts, get_partition_id);
86
}
87

88
bool restoreDAGInputs(ActionsDAG & dag, const NameSet & inputs)
89
{
90
    std::unordered_set<const ActionsDAG::Node *> outputs(dag.getOutputs().begin(), dag.getOutputs().end());
91
    bool added = false;
92
    for (const auto * input : dag.getInputs())
93
    {
94
        if (inputs.contains(input->result_name) && !outputs.contains(input))
95
        {
96
            dag.getOutputs().push_back(input);
97
            added = true;
98
        }
99
    }
100

101
    return added;
102
}
103

104
bool restorePrewhereInputs(PrewhereInfo & info, const NameSet & inputs)
105
{
106
    bool added = false;
107
    if (info.row_level_filter)
108
        added = added || restoreDAGInputs(*info.row_level_filter, inputs);
109

110
    if (info.prewhere_actions)
111
        added = added || restoreDAGInputs(*info.prewhere_actions, inputs);
112

113
    return added;
114
}
115

116
}
117

118
namespace ProfileEvents
119
{
120
    extern const Event SelectedParts;
121
    extern const Event SelectedRanges;
122
    extern const Event SelectedMarks;
123
}
124

125
namespace DB
126
{
127

128
namespace ErrorCodes
129
{
130
    extern const int INDEX_NOT_USED;
131
    extern const int LOGICAL_ERROR;
132
    extern const int TOO_MANY_ROWS;
133
    extern const int CANNOT_PARSE_TEXT;
134
    extern const int PARAMETER_OUT_OF_BOUND;
135
}
136

137
static MergeTreeReaderSettings getMergeTreeReaderSettings(
138
    const ContextPtr & context, const SelectQueryInfo & query_info)
139
{
140
    const auto & settings = context->getSettingsRef();
141
    return
142
    {
143
        .read_settings = context->getReadSettings(),
144
        .save_marks_in_cache = true,
145
        .checksum_on_read = settings.checksum_on_read,
146
        .read_in_order = query_info.input_order_info != nullptr,
147
        .apply_deleted_mask = settings.apply_deleted_mask,
148
        .use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree
149
            && (settings.max_streams_to_max_threads_ratio > 1 || settings.max_streams_for_merge_tree_reading > 1),
150
        .enable_multiple_prewhere_read_steps = settings.enable_multiple_prewhere_read_steps,
151
    };
152
}
153

154
static bool checkAllPartsOnRemoteFS(const RangesInDataParts & parts)
155
{
156
    for (const auto & part : parts)
157
    {
158
        if (!part.data_part->isStoredOnRemoteDisk())
159
            return false;
160
    }
161
    return true;
162
}
163

164
/// build sort description for output stream
165
static void updateSortDescriptionForOutputStream(
166
    DataStream & output_stream, const Names & sorting_key_columns, const int sort_direction, InputOrderInfoPtr input_order_info, PrewhereInfoPtr prewhere_info, bool enable_vertical_final)
167
{
168
    /// Updating sort description can be done after PREWHERE actions are applied to the header.
169
    /// Aftert PREWHERE actions are applied, column names in header can differ from storage column names due to aliases
170
    /// To mitigate it, we're trying to build original header and use it to deduce sorting description
171
    /// TODO: this approach is fragile, it'd be more robust to update sorting description for the whole plan during plan optimization
172
    Block original_header = output_stream.header.cloneEmpty();
173
    if (prewhere_info)
174
    {
175
        if (prewhere_info->prewhere_actions)
176
        {
177
            FindOriginalNodeForOutputName original_column_finder(prewhere_info->prewhere_actions);
178
            for (auto & column : original_header)
179
            {
180
                const auto * original_node = original_column_finder.find(column.name);
181
                if (original_node)
182
                    column.name = original_node->result_name;
183
            }
184
        }
185

186
        if (prewhere_info->row_level_filter)
187
        {
188
            FindOriginalNodeForOutputName original_column_finder(prewhere_info->row_level_filter);
189
            for (auto & column : original_header)
190
            {
191
                const auto * original_node = original_column_finder.find(column.name);
192
                if (original_node)
193
                    column.name = original_node->result_name;
194
            }
195
        }
196
    }
197

198
    SortDescription sort_description;
199
    const Block & header = output_stream.header;
200
    for (const auto & sorting_key : sorting_key_columns)
201
    {
202
        const auto it = std::find_if(
203
            original_header.begin(), original_header.end(), [&sorting_key](const auto & column) { return column.name == sorting_key; });
204
        if (it == original_header.end())
205
            break;
206

207
        const size_t column_pos = std::distance(original_header.begin(), it);
208
        sort_description.emplace_back((header.begin() + column_pos)->name, sort_direction);
209
    }
210

211
    if (!sort_description.empty())
212
    {
213
        if (input_order_info && !enable_vertical_final)
214
        {
215
            output_stream.sort_scope = DataStream::SortScope::Stream;
216
            const size_t used_prefix_of_sorting_key_size = input_order_info->used_prefix_of_sorting_key_size;
217
            if (sort_description.size() > used_prefix_of_sorting_key_size)
218
                sort_description.resize(used_prefix_of_sorting_key_size);
219
        }
220
        else
221
            output_stream.sort_scope = DataStream::SortScope::Chunk;
222
    }
223

224
    output_stream.sort_description = std::move(sort_description);
225
}
226

227
void ReadFromMergeTree::AnalysisResult::checkLimits(const Settings & settings, const SelectQueryInfo & query_info_) const
228
{
229

230
    /// Do not check number of read rows if we have reading
231
    /// in order of sorting key with limit.
232
    /// In general case, when there exists WHERE clause
233
    /// it's impossible to estimate number of rows precisely,
234
    /// because we can stop reading at any time.
235

236
    SizeLimits limits;
237
    if (settings.read_overflow_mode == OverflowMode::THROW
238
        && settings.max_rows_to_read
239
        && !query_info_.input_order_info)
240
        limits = SizeLimits(settings.max_rows_to_read, 0, settings.read_overflow_mode);
241

242
    SizeLimits leaf_limits;
243
    if (settings.read_overflow_mode_leaf == OverflowMode::THROW
244
        && settings.max_rows_to_read_leaf
245
        && !query_info_.input_order_info)
246
        leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf);
247

248
    if (limits.max_rows || leaf_limits.max_rows)
249
    {
250
        /// Fail fast if estimated number of rows to read exceeds the limit
251
        size_t total_rows_estimate = selected_rows;
252
        if (query_info_.limit > 0 && total_rows_estimate > query_info_.limit)
253
        {
254
            total_rows_estimate = query_info_.limit;
255
        }
256
        limits.check(total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read' setting)", ErrorCodes::TOO_MANY_ROWS);
257
        leaf_limits.check(
258
            total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read_leaf' setting)", ErrorCodes::TOO_MANY_ROWS);
259
    }
260
}
261

262
ReadFromMergeTree::ReadFromMergeTree(
263
    MergeTreeData::DataPartsVector parts_,
264
    std::vector<AlterConversionsPtr> alter_conversions_,
265
    Names all_column_names_,
266
    const MergeTreeData & data_,
267
    const SelectQueryInfo & query_info_,
268
    const StorageSnapshotPtr & storage_snapshot_,
269
    const ContextPtr & context_,
270
    size_t max_block_size_,
271
    size_t num_streams_,
272
    std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
273
    LoggerPtr log_,
274
    AnalysisResultPtr analyzed_result_ptr_,
275
    bool enable_parallel_reading)
276
    : SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader(
277
        storage_snapshot_->getSampleBlockForColumns(all_column_names_),
278
        query_info_.prewhere_info)}, all_column_names_, query_info_, storage_snapshot_, context_)
279
    , reader_settings(getMergeTreeReaderSettings(context_, query_info_))
280
    , prepared_parts(std::move(parts_))
281
    , alter_conversions_for_parts(std::move(alter_conversions_))
282
    , all_column_names(std::move(all_column_names_))
283
    , data(data_)
284
    , actions_settings(ExpressionActionsSettings::fromContext(context_))
285
    , metadata_for_reading(storage_snapshot->getMetadataForQuery())
286
    , block_size{
287
        .max_block_size_rows = max_block_size_,
288
        .preferred_block_size_bytes = context->getSettingsRef().preferred_block_size_bytes,
289
        .preferred_max_column_in_block_size_bytes = context->getSettingsRef().preferred_max_column_in_block_size_bytes}
290
    , requested_num_streams(num_streams_)
291
    , max_block_numbers_to_read(std::move(max_block_numbers_to_read_))
292
    , log(std::move(log_))
293
    , analyzed_result_ptr(analyzed_result_ptr_)
294
    , is_parallel_reading_from_replicas(enable_parallel_reading)
295
{
296
    if (is_parallel_reading_from_replicas)
297
    {
298
        all_ranges_callback = context->getMergeTreeAllRangesCallback();
299
        read_task_callback = context->getMergeTreeReadTaskCallback();
300
    }
301

302
    const auto & settings = context->getSettingsRef();
303
    if (settings.max_streams_for_merge_tree_reading)
304
    {
305
        if (settings.allow_asynchronous_read_from_io_pool_for_merge_tree)
306
        {
307
            /// When async reading is enabled, allow to read using more streams.
308
            /// Will add resize to output_streams_limit to reduce memory usage.
309
            output_streams_limit = std::min<size_t>(requested_num_streams, settings.max_streams_for_merge_tree_reading);
310
            /// We intentionally set `max_streams` to 1 in InterpreterSelectQuery in case of small limit.
311
            /// Changing it here to `max_streams_for_merge_tree_reading` proven itself as a threat for performance.
312
            if (requested_num_streams != 1)
313
                requested_num_streams = std::max<size_t>(requested_num_streams, settings.max_streams_for_merge_tree_reading);
314
        }
315
        else
316
            /// Just limit requested_num_streams otherwise.
317
            requested_num_streams = std::min<size_t>(requested_num_streams, settings.max_streams_for_merge_tree_reading);
318
    }
319

320
    /// Add explicit description.
321
    setStepDescription(data.getStorageID().getFullNameNotQuoted());
322
    enable_vertical_final = query_info.isFinal() && context->getSettingsRef().enable_vertical_final && data.merging_params.mode == MergeTreeData::MergingParams::Replacing;
323

324
    updateSortDescriptionForOutputStream(
325
        *output_stream,
326
        storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(),
327
        getSortDirection(),
328
        query_info.input_order_info,
329
        prewhere_info,
330
        enable_vertical_final);
331
}
332

333

334
Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
335
    RangesInDataParts parts_with_range,
336
    Names required_columns,
337
    PoolSettings pool_settings)
338
{
339
    const auto & client_info = context->getClientInfo();
340

341
    auto extension = ParallelReadingExtension
342
    {
343
        .all_callback = all_ranges_callback.value(),
344
        .callback = read_task_callback.value(),
345
        .count_participating_replicas = client_info.count_participating_replicas,
346
        .number_of_current_replica = client_info.number_of_current_replica,
347
        .columns_to_read = required_columns,
348
    };
349

350
    /// We have a special logic for local replica. It has to read less data, because in some cases it should
351
    /// merge states of aggregate functions or do some other important stuff other than reading from Disk.
352
    const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
353
    if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
354
        pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
355
    else
356
        throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
357
            "Exceeded limit for the number of marks per a single task for parallel replicas. "
358
            "Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
359
            multiplier);
360

361
    auto pool = std::make_shared<MergeTreeReadPoolParallelReplicas>(
362
        std::move(extension),
363
        std::move(parts_with_range),
364
        shared_virtual_fields,
365
        storage_snapshot,
366
        prewhere_info,
367
        actions_settings,
368
        reader_settings,
369
        required_columns,
370
        pool_settings,
371
        context);
372

373
    auto block_size_copy = block_size;
374
    block_size_copy.min_marks_to_read = pool_settings.min_marks_for_concurrent_read;
375

376
    Pipes pipes;
377

378
    for (size_t i = 0; i < pool_settings.threads; ++i)
379
    {
380
        auto algorithm = std::make_unique<MergeTreeThreadSelectAlgorithm>(i);
381

382
        auto processor = std::make_unique<MergeTreeSelectProcessor>(
383
            pool, std::move(algorithm), storage_snapshot, prewhere_info,
384
            actions_settings, block_size_copy, reader_settings);
385

386
        auto source = std::make_shared<MergeTreeSource>(std::move(processor));
387
        pipes.emplace_back(std::move(source));
388
    }
389

390
    return Pipe::unitePipes(std::move(pipes));
391
}
392

393

394
Pipe ReadFromMergeTree::readFromPool(
395
    RangesInDataParts parts_with_range,
396
    Names required_columns,
397
    PoolSettings pool_settings)
398
{
399
    size_t total_rows = parts_with_range.getRowsCountAllParts();
400

401
    if (query_info.limit > 0 && query_info.limit < total_rows)
402
        total_rows = query_info.limit;
403

404
    const auto & settings = context->getSettingsRef();
405

406
    /// round min_marks_to_read up to nearest multiple of block_size expressed in marks
407
    /// If granularity is adaptive it doesn't make sense
408
    /// Maybe it will make sense to add settings `max_block_size_bytes`
409
    if (block_size.max_block_size_rows && !data.canUseAdaptiveGranularity())
410
    {
411
        size_t fixed_index_granularity = data.getSettings()->index_granularity;
412
        pool_settings.min_marks_for_concurrent_read = (pool_settings.min_marks_for_concurrent_read * fixed_index_granularity + block_size.max_block_size_rows - 1)
413
            / block_size.max_block_size_rows * block_size.max_block_size_rows / fixed_index_granularity;
414
    }
415

416
    bool all_parts_are_remote = true;
417
    bool all_parts_are_local = true;
418
    for (const auto & part : parts_with_range)
419
    {
420
        const bool is_remote = part.data_part->isStoredOnRemoteDisk();
421
        all_parts_are_local &= !is_remote;
422
        all_parts_are_remote &= is_remote;
423
    }
424

425
    MergeTreeReadPoolPtr pool;
426

427
    bool allow_prefetched_remote = all_parts_are_remote
428
        && settings.allow_prefetched_read_pool_for_remote_filesystem
429
        && MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method);
430

431
    bool allow_prefetched_local = all_parts_are_local
432
        && settings.allow_prefetched_read_pool_for_local_filesystem
433
        && MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.local_fs_method);
434

435
    /** Do not use prefetched read pool if query is trivial limit query.
436
      * Because time spend during filling per thread tasks can be greater than whole query
437
      * execution for big tables with small limit.
438
      */
439
    bool use_prefetched_read_pool = query_info.limit == 0 && (allow_prefetched_remote || allow_prefetched_local);
440

441
    if (use_prefetched_read_pool)
442
    {
443
        pool = std::make_shared<MergeTreePrefetchedReadPool>(
444
            std::move(parts_with_range),
445
            shared_virtual_fields,
446
            storage_snapshot,
447
            prewhere_info,
448
            actions_settings,
449
            reader_settings,
450
            required_columns,
451
            pool_settings,
452
            context);
453
    }
454
    else
455
    {
456
        pool = std::make_shared<MergeTreeReadPool>(
457
            std::move(parts_with_range),
458
            shared_virtual_fields,
459
            storage_snapshot,
460
            prewhere_info,
461
            actions_settings,
462
            reader_settings,
463
            required_columns,
464
            pool_settings,
465
            context);
466
    }
467

468
    LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows, pool_settings.threads);
469

470
    /// The reason why we change this setting is because MergeTreeReadPool takes the full task
471
    /// ignoring min_marks_to_read setting in case of remote disk (see MergeTreeReadPool::getTask).
472
    /// In this case, we won't limit the number of rows to read based on adaptive granularity settings.
473
    auto block_size_copy = block_size;
474
    block_size_copy.min_marks_to_read = pool_settings.min_marks_for_concurrent_read;
475

476
    Pipes pipes;
477
    for (size_t i = 0; i < pool_settings.threads; ++i)
478
    {
479
        auto algorithm = std::make_unique<MergeTreeThreadSelectAlgorithm>(i);
480

481
        auto processor = std::make_unique<MergeTreeSelectProcessor>(
482
            pool, std::move(algorithm), storage_snapshot, prewhere_info,
483
            actions_settings, block_size_copy, reader_settings);
484

485
        auto source = std::make_shared<MergeTreeSource>(std::move(processor));
486

487
        if (i == 0)
488
            source->addTotalRowsApprox(total_rows);
489

490
        pipes.emplace_back(std::move(source));
491
    }
492

493
    auto pipe = Pipe::unitePipes(std::move(pipes));
494
    if (output_streams_limit && output_streams_limit < pipe.numOutputPorts())
495
        pipe.resize(output_streams_limit);
496
    return pipe;
497
}
498

499
Pipe ReadFromMergeTree::readInOrder(
500
    RangesInDataParts parts_with_ranges,
501
    Names required_columns,
502
    PoolSettings pool_settings,
503
    ReadType read_type,
504
    UInt64 limit)
505
{
506
    /// For reading in order it makes sense to read only
507
    /// one range per task to reduce number of read rows.
508
    bool has_limit_below_one_block = read_type != ReadType::Default && limit && limit < block_size.max_block_size_rows;
509
    MergeTreeReadPoolPtr pool;
510

511
    if (is_parallel_reading_from_replicas)
512
    {
513
        const auto & client_info = context->getClientInfo();
514
        ParallelReadingExtension extension
515
        {
516
            .all_callback = all_ranges_callback.value(),
517
            .callback = read_task_callback.value(),
518
            .count_participating_replicas = client_info.count_participating_replicas,
519
            .number_of_current_replica = client_info.number_of_current_replica,
520
            .columns_to_read = required_columns,
521
        };
522

523
        const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
524
        if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
525
            pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
526
        else
527
            throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
528
                "Exceeded limit for the number of marks per a single task for parallel replicas. "
529
                "Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
530
                multiplier);
531

532
        CoordinationMode mode = read_type == ReadType::InOrder
533
            ? CoordinationMode::WithOrder
534
            : CoordinationMode::ReverseOrder;
535

536
        pool = std::make_shared<MergeTreeReadPoolParallelReplicasInOrder>(
537
            std::move(extension),
538
            mode,
539
            parts_with_ranges,
540
            shared_virtual_fields,
541
            storage_snapshot,
542
            prewhere_info,
543
            actions_settings,
544
            reader_settings,
545
            required_columns,
546
            pool_settings,
547
            context);
548
    }
549
    else
550
    {
551
        pool = std::make_shared<MergeTreeReadPoolInOrder>(
552
            has_limit_below_one_block,
553
            read_type,
554
            parts_with_ranges,
555
            shared_virtual_fields,
556
            storage_snapshot,
557
            prewhere_info,
558
            actions_settings,
559
            reader_settings,
560
            required_columns,
561
            pool_settings,
562
            context);
563
    }
564

565
    /// Actually it means that parallel reading from replicas enabled
566
    /// and we have to collaborate with initiator.
567
    /// In this case we won't set approximate rows, because it will be accounted multiple times.
568
    /// Also do not count amount of read rows if we read in order of sorting key,
569
    /// because we don't know actual amount of read rows in case when limit is set.
570
    bool set_rows_approx = !is_parallel_reading_from_replicas && !reader_settings.read_in_order;
571

572
    Pipes pipes;
573
    for (size_t i = 0; i < parts_with_ranges.size(); ++i)
574
    {
575
        const auto & part_with_ranges = parts_with_ranges[i];
576

577
        UInt64 total_rows = part_with_ranges.getRowsCount();
578
        if (query_info.limit > 0 && query_info.limit < total_rows)
579
            total_rows = query_info.limit;
580

581
        LOG_TRACE(log, "Reading {} ranges in{}order from part {}, approx. {} rows starting from {}",
582
            part_with_ranges.ranges.size(),
583
            read_type == ReadType::InReverseOrder ? " reverse " : " ",
584
            part_with_ranges.data_part->name, total_rows,
585
            part_with_ranges.data_part->index_granularity.getMarkStartingRow(part_with_ranges.ranges.front().begin));
586

587
        MergeTreeSelectAlgorithmPtr algorithm;
588
        if (read_type == ReadType::InReverseOrder)
589
            algorithm = std::make_unique<MergeTreeInReverseOrderSelectAlgorithm>(i);
590
        else
591
            algorithm = std::make_unique<MergeTreeInOrderSelectAlgorithm>(i);
592

593
        auto processor = std::make_unique<MergeTreeSelectProcessor>(
594
            pool, std::move(algorithm), storage_snapshot, prewhere_info,
595
            actions_settings, block_size, reader_settings);
596

597
        processor->addPartLevelToChunk(isQueryWithFinal());
598

599
        auto source = std::make_shared<MergeTreeSource>(std::move(processor));
600
        if (set_rows_approx)
601
            source->addTotalRowsApprox(total_rows);
602

603
        pipes.emplace_back(std::move(source));
604
    }
605

606
    auto pipe = Pipe::unitePipes(std::move(pipes));
607

608
    if (read_type == ReadType::InReverseOrder)
609
    {
610
        pipe.addSimpleTransform([&](const Block & header)
611
        {
612
            return std::make_shared<ReverseTransform>(header);
613
        });
614
    }
615

616
    return pipe;
617
}
618

619
Pipe ReadFromMergeTree::read(
620
    RangesInDataParts parts_with_range,
621
    Names required_columns,
622
    ReadType read_type,
623
    size_t max_streams,
624
    size_t min_marks_for_concurrent_read,
625
    bool use_uncompressed_cache)
626
{
627
    const auto & settings = context->getSettingsRef();
628
    size_t sum_marks = parts_with_range.getMarksCountAllParts();
629

630
    PoolSettings pool_settings
631
    {
632
        .threads = max_streams,
633
        .sum_marks = sum_marks,
634
        .min_marks_for_concurrent_read = min_marks_for_concurrent_read,
635
        .preferred_block_size_bytes = settings.preferred_block_size_bytes,
636
        .use_uncompressed_cache = use_uncompressed_cache,
637
        .use_const_size_tasks_for_remote_reading = settings.merge_tree_use_const_size_tasks_for_remote_reading,
638
    };
639

640
    if (read_type == ReadType::ParallelReplicas)
641
        return readFromPoolParallelReplicas(std::move(parts_with_range), std::move(required_columns), std::move(pool_settings));
642

643
    /// Reading from default thread pool is beneficial for remote storage because of new prefetches.
644
    if (read_type == ReadType::Default && (max_streams > 1 || checkAllPartsOnRemoteFS(parts_with_range)))
645
        return readFromPool(std::move(parts_with_range), std::move(required_columns), std::move(pool_settings));
646

647
    auto pipe = readInOrder(parts_with_range, required_columns, pool_settings, read_type, /*limit=*/ 0);
648

649
    /// Use ConcatProcessor to concat sources together.
650
    /// It is needed to read in parts order (and so in PK order) if single thread is used.
651
    if (read_type == ReadType::Default && pipe.numOutputPorts() > 1)
652
        pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
653

654
    return pipe;
655
}
656

657
namespace
658
{
659

660
struct PartRangesReadInfo
661
{
662
    std::vector<size_t> sum_marks_in_parts;
663

664
    size_t sum_marks = 0;
665
    size_t total_rows = 0;
666
    size_t adaptive_parts = 0;
667
    size_t index_granularity_bytes = 0;
668
    size_t max_marks_to_use_cache = 0;
669
    size_t min_marks_for_concurrent_read = 0;
670
    bool use_uncompressed_cache = false;
671

672
    PartRangesReadInfo(
673
        const RangesInDataParts & parts,
674
        const Settings & settings,
675
        const MergeTreeSettings & data_settings)
676
    {
677
        /// Count marks for each part.
678
        sum_marks_in_parts.resize(parts.size());
679

680
        for (size_t i = 0; i < parts.size(); ++i)
681
        {
682
            total_rows += parts[i].getRowsCount();
683
            sum_marks_in_parts[i] = parts[i].getMarksCount();
684
            sum_marks += sum_marks_in_parts[i];
685

686
            if (parts[i].data_part->index_granularity_info.mark_type.adaptive)
687
                ++adaptive_parts;
688
        }
689

690
        if (adaptive_parts > parts.size() / 2)
691
            index_granularity_bytes = data_settings.index_granularity_bytes;
692

693
        max_marks_to_use_cache = MergeTreeDataSelectExecutor::roundRowsOrBytesToMarks(
694
            settings.merge_tree_max_rows_to_use_cache,
695
            settings.merge_tree_max_bytes_to_use_cache,
696
            data_settings.index_granularity,
697
            index_granularity_bytes);
698

699
        auto all_parts_on_remote_disk = checkAllPartsOnRemoteFS(parts);
700

701
        size_t min_rows_for_concurrent_read;
702
        size_t min_bytes_for_concurrent_read;
703
        if (all_parts_on_remote_disk)
704
        {
705
            min_rows_for_concurrent_read = settings.merge_tree_min_rows_for_concurrent_read_for_remote_filesystem;
706
            min_bytes_for_concurrent_read = settings.merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem;
707
        }
708
        else
709
        {
710
            min_rows_for_concurrent_read = settings.merge_tree_min_rows_for_concurrent_read;
711
            min_bytes_for_concurrent_read = settings.merge_tree_min_bytes_for_concurrent_read;
712
        }
713

714
        min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
715
            min_rows_for_concurrent_read, min_bytes_for_concurrent_read,
716
            data_settings.index_granularity, index_granularity_bytes, sum_marks);
717

718
        use_uncompressed_cache = settings.use_uncompressed_cache;
719
        if (sum_marks > max_marks_to_use_cache)
720
            use_uncompressed_cache = false;
721
    }
722
};
723

724
}
725

726
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & column_names)
727
{
728
    const auto & settings = context->getSettingsRef();
729
    const auto data_settings = data.getSettings();
730

731
    LOG_TRACE(log, "Spreading mark ranges among streams (default reading)");
732

733
    PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
734

735
    if (0 == info.sum_marks)
736
        return {};
737

738
    if (num_streams > 1)
739
    {
740
        /// Reduce the number of num_streams if the data is small.
741
        if (info.sum_marks < num_streams * info.min_marks_for_concurrent_read && parts_with_ranges.size() < num_streams)
742
        {
743
            /*
744
            If the data is fragmented, then allocate the size of parts to num_streams. If the data is not fragmented, besides the sum_marks and
745
            min_marks_for_concurrent_read, involve the system cores to get the num_streams. Increase the num_streams and decrease the min_marks_for_concurrent_read
746
            if the data is small but system has plentiful cores. It helps to improve the parallel performance of `MergeTreeRead` significantly.
747
            Make sure the new num_streams `num_streams * increase_num_streams_ratio` will not exceed the previous calculated prev_num_streams.
748
            The new info.min_marks_for_concurrent_read `info.min_marks_for_concurrent_read / increase_num_streams_ratio` should be larger than 8.
749
            https://github.com/ClickHouse/ClickHouse/pull/53867
750
            */
751
            if ((info.sum_marks + info.min_marks_for_concurrent_read - 1) / info.min_marks_for_concurrent_read > parts_with_ranges.size())
752
            {
753
                const size_t prev_num_streams = num_streams;
754
                num_streams = (info.sum_marks + info.min_marks_for_concurrent_read - 1) / info.min_marks_for_concurrent_read;
755
                const size_t increase_num_streams_ratio = std::min(prev_num_streams / num_streams, info.min_marks_for_concurrent_read / 8);
756
                if (increase_num_streams_ratio > 1)
757
                {
758
                    num_streams = num_streams * increase_num_streams_ratio;
759
                    info.min_marks_for_concurrent_read = (info.sum_marks + num_streams - 1) / num_streams;
760
                }
761
            }
762
            else
763
                num_streams = parts_with_ranges.size();
764
        }
765
    }
766

767
    auto read_type = is_parallel_reading_from_replicas ? ReadType::ParallelReplicas : ReadType::Default;
768

769
    double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability = settings.merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability;
770
    std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability);
771

772
    if (read_type != ReadType::ParallelReplicas &&
773
        num_streams > 1 &&
774
        read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0 &&
775
        fault(thread_local_rng) &&
776
        !isQueryWithFinal() &&
777
        data.merging_params.is_deleted_column.empty() &&
778
        !prewhere_info)
779
    {
780
        NameSet column_names_set(column_names.begin(), column_names.end());
781
        Names in_order_column_names_to_read(column_names);
782

783
        /// Add columns needed to calculate the sorting expression
784
        for (const auto & column_name : metadata_for_reading->getColumnsRequiredForSortingKey())
785
        {
786
            if (column_names_set.contains(column_name))
787
                continue;
788

789
            in_order_column_names_to_read.push_back(column_name);
790
            column_names_set.insert(column_name);
791
        }
792

793
        auto in_order_reading_step_getter = [this, &in_order_column_names_to_read, &info](auto parts)
794
        {
795
            return this->read(
796
                std::move(parts),
797
                in_order_column_names_to_read,
798
                ReadType::InOrder,
799
                1 /* num_streams */,
800
                0 /* min_marks_for_concurrent_read */,
801
                info.use_uncompressed_cache);
802
        };
803

804
        auto sorting_expr = std::make_shared<ExpressionActions>(metadata_for_reading->getSortingKey().expression->getActionsDAG().clone());
805

806
        SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
807
            metadata_for_reading->getPrimaryKey(),
808
            std::move(sorting_expr),
809
            std::move(parts_with_ranges),
810
            num_streams,
811
            context,
812
            std::move(in_order_reading_step_getter),
813
            true /*split_parts_ranges_into_intersecting_and_non_intersecting_final*/,
814
            true /*split_intersecting_parts_ranges_into_layers*/);
815

816
        auto merging_pipes = std::move(split_ranges_result.merging_pipes);
817
        auto non_intersecting_parts_ranges_read_pipe = read(std::move(split_ranges_result.non_intersecting_parts_ranges),
818
            column_names,
819
            read_type,
820
            num_streams,
821
            info.min_marks_for_concurrent_read,
822
            info.use_uncompressed_cache);
823

824
        if (merging_pipes.empty())
825
            return non_intersecting_parts_ranges_read_pipe;
826

827
        Pipes pipes;
828
        pipes.resize(2);
829
        pipes[0] = Pipe::unitePipes(std::move(merging_pipes));
830
        pipes[1] = std::move(non_intersecting_parts_ranges_read_pipe);
831

832
        auto conversion_action = ActionsDAG::makeConvertingActions(
833
            pipes[0].getHeader().getColumnsWithTypeAndName(),
834
            pipes[1].getHeader().getColumnsWithTypeAndName(),
835
            ActionsDAG::MatchColumnsMode::Name);
836
        pipes[0].addSimpleTransform(
837
            [conversion_action](const Block & header)
838
            {
839
                auto converting_expr = std::make_shared<ExpressionActions>(conversion_action);
840
                return std::make_shared<ExpressionTransform>(header, converting_expr);
841
            });
842
        return Pipe::unitePipes(std::move(pipes));
843
    }
844

845
    return read(std::move(parts_with_ranges),
846
        column_names,
847
        read_type,
848
        num_streams,
849
        info.min_marks_for_concurrent_read,
850
        info.use_uncompressed_cache);
851
}
852

853
static ActionsDAGPtr createProjection(const Block & header)
854
{
855
    auto projection = std::make_shared<ActionsDAG>(header.getNamesAndTypesList());
856
    projection->removeUnusedActions(header.getNames());
857
    projection->projectInput();
858
    return projection;
859
}
860

861
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
862
    RangesInDataParts && parts_with_ranges,
863
    size_t num_streams,
864
    const Names & column_names,
865
    ActionsDAGPtr & out_projection,
866
    const InputOrderInfoPtr & input_order_info)
867
{
868
    const auto & settings = context->getSettingsRef();
869
    const auto data_settings = data.getSettings();
870

871
    LOG_TRACE(log, "Spreading ranges among streams with order");
872

873
    PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
874

875
    Pipes res;
876

877
    if (info.sum_marks == 0)
878
        return {};
879

880
    /// PREWHERE actions can remove some input columns (which are needed only for prewhere condition).
881
    /// In case of read-in-order, PREWHERE is executed before sorting. But removed columns could be needed for sorting key.
882
    /// To fix this, we prohibit removing any input in prewhere actions. Instead, projection actions will be added after sorting.
883
    /// See 02354_read_in_order_prewhere.sql as an example.
884
    bool have_input_columns_removed_after_prewhere = false;
885
    if (prewhere_info)
886
    {
887
        NameSet sorting_columns;
888
        for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
889
            sorting_columns.insert(column.name);
890

891
        have_input_columns_removed_after_prewhere = restorePrewhereInputs(*prewhere_info, sorting_columns);
892
    }
893

894
    /// Let's split ranges to avoid reading much data.
895
    auto split_ranges
896
        = [rows_granularity = data_settings->index_granularity, my_max_block_size = block_size.max_block_size_rows]
897
        (const auto & ranges, int direction)
898
    {
899
        MarkRanges new_ranges;
900
        const size_t max_marks_in_range = (my_max_block_size + rows_granularity - 1) / rows_granularity;
901
        size_t marks_in_range = 1;
902

903
        if (direction == 1)
904
        {
905
            /// Split first few ranges to avoid reading much data.
906
            bool split = false;
907
            for (auto range : ranges)
908
            {
909
                while (!split && range.begin + marks_in_range < range.end)
910
                {
911
                    new_ranges.emplace_back(range.begin, range.begin + marks_in_range);
912
                    range.begin += marks_in_range;
913
                    marks_in_range *= 2;
914

915
                    if (marks_in_range > max_marks_in_range)
916
                        split = true;
917
                }
918
                new_ranges.emplace_back(range.begin, range.end);
919
            }
920
        }
921
        else
922
        {
923
            /// Split all ranges to avoid reading much data, because we have to
924
            ///  store whole range in memory to reverse it.
925
            for (auto it = ranges.rbegin(); it != ranges.rend(); ++it)
926
            {
927
                auto range = *it;
928
                while (range.begin + marks_in_range < range.end)
929
                {
930
                    new_ranges.emplace_front(range.end - marks_in_range, range.end);
931
                    range.end -= marks_in_range;
932
                    marks_in_range = std::min(marks_in_range * 2, max_marks_in_range);
933
                }
934
                new_ranges.emplace_front(range.begin, range.end);
935
            }
936
        }
937

938
        return new_ranges;
939
    };
940

941
    const size_t min_marks_per_stream = (info.sum_marks - 1) / num_streams + 1;
942
    bool need_preliminary_merge = (parts_with_ranges.size() > settings.read_in_order_two_level_merge_threshold);
943

944
    const auto read_type = input_order_info->direction == 1 ? ReadType::InOrder : ReadType::InReverseOrder;
945

946
    PoolSettings pool_settings
947
    {
948
        .min_marks_for_concurrent_read = info.min_marks_for_concurrent_read,
949
        .preferred_block_size_bytes = settings.preferred_block_size_bytes,
950
        .use_uncompressed_cache = info.use_uncompressed_cache,
951
    };
952

953
    Pipes pipes;
954
    /// For parallel replicas the split will be performed on the initiator side.
955
    if (is_parallel_reading_from_replicas)
956
    {
957
        pipes.emplace_back(readInOrder(std::move(parts_with_ranges), column_names, pool_settings, read_type, input_order_info->limit));
958
    }
959
    else
960
    {
961
        std::vector<RangesInDataParts> splitted_parts_and_ranges;
962
        splitted_parts_and_ranges.reserve(num_streams);
963

964
        for (size_t i = 0; i < num_streams && !parts_with_ranges.empty(); ++i)
965
        {
966
            size_t need_marks = min_marks_per_stream;
967
            RangesInDataParts new_parts;
968

969
            /// Loop over parts.
970
            /// We will iteratively take part or some subrange of a part from the back
971
            ///  and assign a stream to read from it.
972
            while (need_marks > 0 && !parts_with_ranges.empty())
973
            {
974
                RangesInDataPart part = parts_with_ranges.back();
975
                parts_with_ranges.pop_back();
976
                size_t & marks_in_part = info.sum_marks_in_parts.back();
977

978
                /// We will not take too few rows from a part.
979
                if (marks_in_part >= info.min_marks_for_concurrent_read && need_marks < info.min_marks_for_concurrent_read)
980
                    need_marks = info.min_marks_for_concurrent_read;
981

982
                /// Do not leave too few rows in the part.
983
                if (marks_in_part > need_marks && marks_in_part - need_marks < info.min_marks_for_concurrent_read)
984
                    need_marks = marks_in_part;
985

986
                MarkRanges ranges_to_get_from_part;
987

988
                /// We take full part if it contains enough marks or
989
                /// if we know limit and part contains less than 'limit' rows.
990
                bool take_full_part = marks_in_part <= need_marks || (input_order_info->limit && input_order_info->limit < part.getRowsCount());
991

992
                /// We take the whole part if it is small enough.
993
                if (take_full_part)
994
                {
995
                    ranges_to_get_from_part = part.ranges;
996

997
                    need_marks -= marks_in_part;
998
                    info.sum_marks_in_parts.pop_back();
999
                }
1000
                else
1001
                {
1002
                    /// Loop through ranges in part. Take enough ranges to cover "need_marks".
1003
                    while (need_marks > 0)
1004
                    {
1005
                        if (part.ranges.empty())
1006
                            throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected end of ranges while spreading marks among streams");
1007

1008
                        MarkRange & range = part.ranges.front();
1009

1010
                        const size_t marks_in_range = range.end - range.begin;
1011
                        const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
1012

1013
                        ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
1014
                        range.begin += marks_to_get_from_range;
1015
                        marks_in_part -= marks_to_get_from_range;
1016
                        need_marks -= marks_to_get_from_range;
1017
                        if (range.begin == range.end)
1018
                            part.ranges.pop_front();
1019
                    }
1020
                    parts_with_ranges.emplace_back(part);
1021
                }
1022

1023
                ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
1024
                new_parts.emplace_back(part.data_part, part.alter_conversions, part.part_index_in_query, std::move(ranges_to_get_from_part));
1025
            }
1026

1027
            splitted_parts_and_ranges.emplace_back(std::move(new_parts));
1028
        }
1029

1030
        for (auto && item : splitted_parts_and_ranges)
1031
            pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit));
1032
    }
1033

1034
    Block pipe_header;
1035
    if (!pipes.empty())
1036
        pipe_header = pipes.front().getHeader();
1037

1038
    if (need_preliminary_merge || output_each_partition_through_separate_port)
1039
    {
1040
        size_t prefix_size = input_order_info->used_prefix_of_sorting_key_size;
1041
        auto order_key_prefix_ast = metadata_for_reading->getSortingKey().expression_list_ast->clone();
1042
        order_key_prefix_ast->children.resize(prefix_size);
1043

1044
        auto syntax_result = TreeRewriter(context).analyze(order_key_prefix_ast, metadata_for_reading->getColumns().getAllPhysical());
1045
        auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActionsDAG(false);
1046
        const auto & sorting_columns = metadata_for_reading->getSortingKey().column_names;
1047

1048
        SortDescription sort_description;
1049
        sort_description.compile_sort_description = settings.compile_sort_description;
1050
        sort_description.min_count_to_compile_sort_description = settings.min_count_to_compile_sort_description;
1051

1052
        for (size_t j = 0; j < prefix_size; ++j)
1053
            sort_description.emplace_back(sorting_columns[j], input_order_info->direction);
1054

1055
        auto sorting_key_expr = std::make_shared<ExpressionActions>(sorting_key_prefix_expr);
1056

1057
        auto merge_streams = [&](Pipe & pipe)
1058
        {
1059
            pipe.addSimpleTransform([sorting_key_expr](const Block & header)
1060
                                    { return std::make_shared<ExpressionTransform>(header, sorting_key_expr); });
1061

1062
            if (pipe.numOutputPorts() > 1)
1063
            {
1064
                auto transform = std::make_shared<MergingSortedTransform>(
1065
                    pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
1066

1067
                pipe.addTransform(std::move(transform));
1068
            }
1069
        };
1070

1071
        if (!pipes.empty() && output_each_partition_through_separate_port)
1072
        {
1073
            /// In contrast with usual aggregation in order that allocates separate AggregatingTransform for each data part,
1074
            /// aggregation of partitioned data uses the same AggregatingTransform for all parts of the same partition.
1075
            /// Thus we need to merge all partition parts into a single sorted stream.
1076
            Pipe pipe = Pipe::unitePipes(std::move(pipes));
1077
            merge_streams(pipe);
1078
            return pipe;
1079
        }
1080

1081
        for (auto & pipe : pipes)
1082
            merge_streams(pipe);
1083
    }
1084

1085
    if (!pipes.empty() && (need_preliminary_merge || have_input_columns_removed_after_prewhere))
1086
        /// Drop temporary columns, added by 'sorting_key_prefix_expr'
1087
        out_projection = createProjection(pipe_header);
1088

1089
    return Pipe::unitePipes(std::move(pipes));
1090
}
1091

1092
static void addMergingFinal(
1093
    Pipe & pipe,
1094
    const SortDescription & sort_description,
1095
    MergeTreeData::MergingParams merging_params,
1096
    Names partition_key_columns,
1097
    size_t max_block_size_rows,
1098
    bool enable_vertical_final,
1099
    bool can_merge_final_indices_to_next_step_filter)
1100
{
1101
    const auto & header = pipe.getHeader();
1102
    size_t num_outputs = pipe.numOutputPorts();
1103

1104
    auto now = time(nullptr);
1105

1106
    auto get_merging_processor = [&]() -> MergingTransformPtr
1107
    {
1108
        switch (merging_params.mode)
1109
        {
1110
            case MergeTreeData::MergingParams::Ordinary:
1111
                return std::make_shared<MergingSortedTransform>(header, num_outputs,
1112
                            sort_description, max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
1113

1114
            case MergeTreeData::MergingParams::Collapsing:
1115
                return std::make_shared<CollapsingSortedTransform>(header, num_outputs,
1116
                            sort_description, merging_params.sign_column, true, max_block_size_rows, /*max_block_size_bytes=*/0);
1117

1118
            case MergeTreeData::MergingParams::Summing:
1119
                return std::make_shared<SummingSortedTransform>(header, num_outputs,
1120
                            sort_description, merging_params.columns_to_sum, partition_key_columns, max_block_size_rows, /*max_block_size_bytes=*/0);
1121

1122
            case MergeTreeData::MergingParams::Aggregating:
1123
                return std::make_shared<AggregatingSortedTransform>(header, num_outputs,
1124
                            sort_description, max_block_size_rows, /*max_block_size_bytes=*/0);
1125

1126
            case MergeTreeData::MergingParams::Replacing:
1127
                return std::make_shared<ReplacingSortedTransform>(header, num_outputs,
1128
                            sort_description, merging_params.is_deleted_column, merging_params.version_column, max_block_size_rows, /*max_block_size_bytes=*/0, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false, /*cleanup*/ !merging_params.is_deleted_column.empty(), enable_vertical_final);
1129

1130
            case MergeTreeData::MergingParams::VersionedCollapsing:
1131
                return std::make_shared<VersionedCollapsingTransform>(header, num_outputs,
1132
                            sort_description, merging_params.sign_column, max_block_size_rows, /*max_block_size_bytes=*/0);
1133

1134
            case MergeTreeData::MergingParams::Graphite:
1135
                return std::make_shared<GraphiteRollupSortedTransform>(header, num_outputs,
1136
                            sort_description, max_block_size_rows, /*max_block_size_bytes=*/0, merging_params.graphite_params, now);
1137
        }
1138

1139
        UNREACHABLE();
1140
    };
1141

1142
    pipe.addTransform(get_merging_processor());
1143
    if (enable_vertical_final && !can_merge_final_indices_to_next_step_filter)
1144
        pipe.addSimpleTransform([](const Block & header_)
1145
                                { return std::make_shared<SelectByIndicesTransform>(header_); });
1146
}
1147

1148
bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
1149
{
1150
    const auto & settings = context->getSettingsRef();
1151

1152
    /// If setting do_not_merge_across_partitions_select_final is set always prefer it
1153
    if (settings.do_not_merge_across_partitions_select_final.changed)
1154
        return settings.do_not_merge_across_partitions_select_final;
1155

1156
    if (!metadata_for_reading->hasPrimaryKey() || !metadata_for_reading->hasPartitionKey())
1157
        return false;
1158

1159
    /** To avoid merging parts across partitions we want result of partition key expression for
1160
      * rows with same primary key to be the same.
1161
      *
1162
      * If partition key expression is deterministic, and contains only columns that are included
1163
      * in primary key, then for same primary key column values, result of partition key expression
1164
      * will be the same.
1165
      */
1166
    const auto & partition_key_expression = metadata_for_reading->getPartitionKey().expression;
1167
    if (partition_key_expression->getActionsDAG().hasNonDeterministic())
1168
        return false;
1169

1170
    const auto & primary_key_columns = metadata_for_reading->getPrimaryKey().column_names;
1171
    NameSet primary_key_columns_set(primary_key_columns.begin(), primary_key_columns.end());
1172

1173
    const auto & partition_key_required_columns = partition_key_expression->getRequiredColumns();
1174
    for (const auto & partition_key_required_column : partition_key_required_columns)
1175
        if (!primary_key_columns_set.contains(partition_key_required_column))
1176
            return false;
1177

1178
    return true;
1179
}
1180

1181
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
1182
    RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection)
1183
{
1184
    const auto & settings = context->getSettingsRef();
1185
    const auto & data_settings = data.getSettings();
1186
    PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
1187

1188
    assert(num_streams == requested_num_streams);
1189
    if (num_streams > settings.max_final_threads)
1190
        num_streams = settings.max_final_threads;
1191

1192
    /// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions.
1193
    /// We have all parts in parts vector, where parts with same partition are nearby.
1194
    /// So we will store iterators pointed to the beginning of each partition range (and parts.end()),
1195
    /// then we will create a pipe for each partition that will run selecting processor and merging processor
1196
    /// for the parts with this partition. In the end we will unite all the pipes.
1197
    std::vector<RangesInDataParts::iterator> parts_to_merge_ranges;
1198
    auto it = parts_with_ranges.begin();
1199
    parts_to_merge_ranges.push_back(it);
1200

1201
    bool do_not_merge_across_partitions_select_final = doNotMergePartsAcrossPartitionsFinal();
1202
    if (do_not_merge_across_partitions_select_final)
1203
    {
1204
        while (it != parts_with_ranges.end())
1205
        {
1206
            it = std::find_if(
1207
                it, parts_with_ranges.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; });
1208
            parts_to_merge_ranges.push_back(it);
1209
        }
1210
    }
1211
    else
1212
    {
1213
        /// If do_not_merge_across_partitions_select_final is false we just merge all the parts.
1214
        parts_to_merge_ranges.push_back(parts_with_ranges.end());
1215
    }
1216

1217
    Pipes merging_pipes;
1218
    Pipes no_merging_pipes;
1219

1220
    /// If do_not_merge_across_partitions_select_final is true and num_streams > 1
1221
    /// we will store lonely parts with level > 0 to use parallel select on them.
1222
    RangesInDataParts non_intersecting_parts_by_primary_key;
1223

1224
    auto sorting_expr = std::make_shared<ExpressionActions>(metadata_for_reading->getSortingKey().expression->getActionsDAG().clone());
1225

1226
    if (prewhere_info)
1227
    {
1228
        NameSet sorting_columns;
1229
        for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
1230
            sorting_columns.insert(column.name);
1231
        restorePrewhereInputs(*prewhere_info, sorting_columns);
1232
    }
1233

1234
    for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
1235
    {
1236
        /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
1237
        /// with level > 0 then we won't post-process this part, and if num_streams > 1 we
1238
        /// can use parallel select on such parts.
1239
        bool no_merging_final = do_not_merge_across_partitions_select_final &&
1240
            std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
1241
            parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
1242
            data.merging_params.is_deleted_column.empty();
1243

1244
        if (no_merging_final)
1245
        {
1246
            non_intersecting_parts_by_primary_key.push_back(std::move(*parts_to_merge_ranges[range_index]));
1247
            continue;
1248
        }
1249

1250
        Pipes pipes;
1251
        {
1252
            RangesInDataParts new_parts;
1253

1254
            for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
1255
                new_parts.emplace_back(part_it->data_part, part_it->alter_conversions, part_it->part_index_in_query, part_it->ranges);
1256

1257
            if (new_parts.empty())
1258
                continue;
1259

1260
            if (num_streams > 1 && metadata_for_reading->hasPrimaryKey())
1261
            {
1262
                // Let's split parts into non intersecting parts ranges and layers to ensure data parallelism of FINAL.
1263
                auto in_order_reading_step_getter = [this, &column_names, &info](auto parts)
1264
                {
1265
                    return this->read(
1266
                        std::move(parts),
1267
                        column_names,
1268
                        ReadType::InOrder,
1269
                        1 /* num_streams */,
1270
                        0 /* min_marks_for_concurrent_read */,
1271
                        info.use_uncompressed_cache);
1272
                };
1273

1274
                /// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column,
1275
                /// so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step.
1276
                bool split_parts_ranges_into_intersecting_and_non_intersecting_final = settings.split_parts_ranges_into_intersecting_and_non_intersecting_final &&
1277
                    data.merging_params.is_deleted_column.empty();
1278

1279
                SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
1280
                    metadata_for_reading->getPrimaryKey(),
1281
                    sorting_expr,
1282
                    std::move(new_parts),
1283
                    num_streams,
1284
                    context,
1285
                    std::move(in_order_reading_step_getter),
1286
                    split_parts_ranges_into_intersecting_and_non_intersecting_final,
1287
                    settings.split_intersecting_parts_ranges_into_layers_final);
1288

1289
                for (auto && non_intersecting_parts_range : split_ranges_result.non_intersecting_parts_ranges)
1290
                    non_intersecting_parts_by_primary_key.push_back(std::move(non_intersecting_parts_range));
1291

1292
                for (auto && merging_pipe : split_ranges_result.merging_pipes)
1293
                    pipes.push_back(std::move(merging_pipe));
1294
            }
1295
            else
1296
            {
1297
                pipes.emplace_back(read(
1298
                    std::move(new_parts), column_names, ReadType::InOrder, num_streams, 0, info.use_uncompressed_cache));
1299

1300
                pipes.back().addSimpleTransform([sorting_expr](const Block & header)
1301
                                                { return std::make_shared<ExpressionTransform>(header, sorting_expr); });
1302
            }
1303

1304
            /// Drop temporary columns, added by 'sorting_key_expr'
1305
            if (!out_projection && !pipes.empty())
1306
                out_projection = createProjection(pipes.front().getHeader());
1307
        }
1308

1309
        if (pipes.empty())
1310
            continue;
1311

1312
        Names sort_columns = metadata_for_reading->getSortingKeyColumns();
1313
        SortDescription sort_description;
1314
        sort_description.compile_sort_description = settings.compile_sort_description;
1315
        sort_description.min_count_to_compile_sort_description = settings.min_count_to_compile_sort_description;
1316

1317
        size_t sort_columns_size = sort_columns.size();
1318
        sort_description.reserve(sort_columns_size);
1319

1320
        Names partition_key_columns = metadata_for_reading->getPartitionKey().column_names;
1321

1322
        for (size_t i = 0; i < sort_columns_size; ++i)
1323
            sort_description.emplace_back(sort_columns[i], 1, 1);
1324

1325
        for (auto & pipe : pipes)
1326
            addMergingFinal(
1327
                pipe,
1328
                sort_description,
1329
                data.merging_params,
1330
                partition_key_columns,
1331
                block_size.max_block_size_rows,
1332
                enable_vertical_final,
1333
                query_info.has_filters_and_no_array_join_before_filter);
1334

1335
        merging_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
1336
    }
1337

1338
    if (!non_intersecting_parts_by_primary_key.empty())
1339
    {
1340
        auto pipe = spreadMarkRangesAmongStreams(std::move(non_intersecting_parts_by_primary_key), num_streams, origin_column_names);
1341
        no_merging_pipes.emplace_back(std::move(pipe));
1342
    }
1343

1344
    if (!merging_pipes.empty() && !no_merging_pipes.empty())
1345
    {
1346
        out_projection = nullptr; /// We do projection here
1347
        Pipes pipes;
1348
        pipes.resize(2);
1349
        pipes[0] = Pipe::unitePipes(std::move(merging_pipes));
1350
        pipes[1] = Pipe::unitePipes(std::move(no_merging_pipes));
1351
        auto conversion_action = ActionsDAG::makeConvertingActions(
1352
            pipes[0].getHeader().getColumnsWithTypeAndName(),
1353
            pipes[1].getHeader().getColumnsWithTypeAndName(),
1354
            ActionsDAG::MatchColumnsMode::Name);
1355
        pipes[0].addSimpleTransform(
1356
            [conversion_action](const Block & header)
1357
            {
1358
                auto converting_expr = std::make_shared<ExpressionActions>(conversion_action);
1359
                return std::make_shared<ExpressionTransform>(header, converting_expr);
1360
            });
1361
        return Pipe::unitePipes(std::move(pipes));
1362
    }
1363
    else
1364
        return merging_pipes.empty() ? Pipe::unitePipes(std::move(no_merging_pipes)) : Pipe::unitePipes(std::move(merging_pipes));
1365
}
1366

1367
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
1368
    MergeTreeData::DataPartsVector parts,
1369
    std::vector<AlterConversionsPtr> alter_conversions) const
1370
{
1371
    return selectRangesToRead(
1372
        std::move(parts),
1373
        std::move(alter_conversions),
1374
        metadata_for_reading,
1375
        query_info,
1376
        context,
1377
        requested_num_streams,
1378
        max_block_numbers_to_read,
1379
        data,
1380
        all_column_names,
1381
        log,
1382
        indexes);
1383
}
1384

1385
static void buildIndexes(
1386
    std::optional<ReadFromMergeTree::Indexes> & indexes,
1387
    ActionsDAGPtr filter_actions_dag,
1388
    const MergeTreeData & data,
1389
    const MergeTreeData::DataPartsVector & parts,
1390
    const ContextPtr & context,
1391
    const SelectQueryInfo & query_info,
1392
    const StorageMetadataPtr & metadata_snapshot)
1393
{
1394
    indexes.reset();
1395

1396
    // Build and check if primary key is used when necessary
1397
    const auto & primary_key = metadata_snapshot->getPrimaryKey();
1398
    const Names & primary_key_column_names = primary_key.column_names;
1399

1400
    const auto & settings = context->getSettingsRef();
1401

1402
    indexes.emplace(ReadFromMergeTree::Indexes{{
1403
        filter_actions_dag,
1404
        context,
1405
        primary_key_column_names,
1406
        primary_key.expression}, {}, {}, {}, {}, false, {}});
1407

1408
    if (metadata_snapshot->hasPartitionKey())
1409
    {
1410
        const auto & partition_key = metadata_snapshot->getPartitionKey();
1411
        auto minmax_columns_names = MergeTreeData::getMinMaxColumnsNames(partition_key);
1412
        auto minmax_expression_actions = MergeTreeData::getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context));
1413

1414
        indexes->minmax_idx_condition.emplace(filter_actions_dag, context, minmax_columns_names, minmax_expression_actions);
1415
        indexes->partition_pruner.emplace(metadata_snapshot, filter_actions_dag, context, false /* strict */);
1416
    }
1417

1418
    indexes->part_values
1419
        = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(metadata_snapshot, data, parts, filter_actions_dag, context);
1420
    MergeTreeDataSelectExecutor::buildKeyConditionFromPartOffset(indexes->part_offset_condition, filter_actions_dag, context);
1421

1422
    indexes->use_skip_indexes = settings.use_skip_indexes;
1423
    bool final = query_info.isFinal();
1424

1425
    if (final && !settings.use_skip_indexes_if_final)
1426
        indexes->use_skip_indexes = false;
1427

1428
    if (!indexes->use_skip_indexes)
1429
        return;
1430

1431
    std::unordered_set<std::string> ignored_index_names;
1432

1433
    if (settings.ignore_data_skipping_indices.changed)
1434
    {
1435
        const auto & indices = settings.ignore_data_skipping_indices.toString();
1436
        Tokens tokens(indices.data(), indices.data() + indices.size(), settings.max_query_size);
1437
        IParser::Pos pos(tokens, static_cast<unsigned>(settings.max_parser_depth), static_cast<unsigned>(settings.max_parser_backtracks));
1438
        Expected expected;
1439

1440
        /// Use an unordered list rather than string vector
1441
        auto parse_single_id_or_literal = [&]
1442
        {
1443
            String str;
1444
            if (!parseIdentifierOrStringLiteral(pos, expected, str))
1445
                return false;
1446

1447
            ignored_index_names.insert(std::move(str));
1448
            return true;
1449
        };
1450

1451
        if (!ParserList::parseUtil(pos, expected, parse_single_id_or_literal, false))
1452
            throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse ignore_data_skipping_indices ('{}')", indices);
1453
    }
1454

1455
    UsefulSkipIndexes skip_indexes;
1456
    using Key = std::pair<String, size_t>;
1457
    std::map<Key, size_t> merged;
1458

1459
    for (const auto & index : metadata_snapshot->getSecondaryIndices())
1460
    {
1461
        if (!ignored_index_names.contains(index.name))
1462
        {
1463
            auto index_helper = MergeTreeIndexFactory::instance().get(index);
1464
            if (index_helper->isMergeable())
1465
            {
1466
                auto [it, inserted] = merged.emplace(Key{index_helper->index.type, index_helper->getGranularity()}, skip_indexes.merged_indices.size());
1467
                if (inserted)
1468
                {
1469
                    skip_indexes.merged_indices.emplace_back();
1470
                    skip_indexes.merged_indices.back().condition = index_helper->createIndexMergedCondition(query_info, metadata_snapshot);
1471
                }
1472

1473
                skip_indexes.merged_indices[it->second].addIndex(index_helper);
1474
            }
1475
            else
1476
            {
1477
                MergeTreeIndexConditionPtr condition;
1478
                if (index_helper->isVectorSearch())
1479
                {
1480
#ifdef ENABLE_ANNOY
1481
                    if (const auto * annoy = typeid_cast<const MergeTreeIndexAnnoy *>(index_helper.get()))
1482
                        condition = annoy->createIndexCondition(query_info, context);
1483
#endif
1484
#ifdef ENABLE_USEARCH
1485
                    if (const auto * usearch = typeid_cast<const MergeTreeIndexUSearch *>(index_helper.get()))
1486
                        condition = usearch->createIndexCondition(query_info, context);
1487
#endif
1488
                    if (!condition)
1489
                        throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown vector search index {}", index_helper->index.name);
1490
                }
1491
                else
1492
                    condition = index_helper->createIndexCondition(filter_actions_dag, context);
1493

1494
                if (!condition->alwaysUnknownOrTrue())
1495
                    skip_indexes.useful_indices.emplace_back(index_helper, condition);
1496
            }
1497
        }
1498
    }
1499

1500
    // move minmax indices to first positions, so they will be applied first as cheapest ones
1501
    std::stable_sort(begin(skip_indexes.useful_indices), end(skip_indexes.useful_indices), [](const auto & l, const auto & r)
1502
    {
1503
        const bool l_min_max = (typeid_cast<const MergeTreeIndexMinMax *>(l.index.get()));
1504
        const bool r_min_max = (typeid_cast<const MergeTreeIndexMinMax *>(r.index.get()));
1505
        if (l_min_max == r_min_max)
1506
            return false;
1507

1508
        if (l_min_max)
1509
            return true; // left is min max but right is not
1510

1511
        return false; // right is min max but left is not
1512
    });
1513

1514
    indexes->skip_indexes = std::move(skip_indexes);
1515
}
1516

1517
void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes)
1518
{
1519
    if (!indexes)
1520
    {
1521
        /// Analyzer generates unique ColumnIdentifiers like __table1.__partition_id in filter nodes,
1522
        /// while key analysis still requires unqualified column names.
1523
        std::unordered_map<std::string, ColumnWithTypeAndName> node_name_to_input_node_column;
1524
        if (query_info.planner_context)
1525
        {
1526
            const auto & table_expression_data = query_info.planner_context->getTableExpressionDataOrThrow(query_info.table_expression);
1527
            const auto & alias_column_expressions = table_expression_data.getAliasColumnExpressions();
1528
            for (const auto & [column_identifier, column_name] : table_expression_data.getColumnIdentifierToColumnName())
1529
            {
1530
                /// ALIAS columns cannot be used in the filter expression without being calculated in ActionsDAG,
1531
                /// so they should not be added to the input nodes.
1532
                if (alias_column_expressions.contains(column_name))
1533
                    continue;
1534
                const auto & column = table_expression_data.getColumnOrThrow(column_name);
1535
                node_name_to_input_node_column.emplace(column_identifier, ColumnWithTypeAndName(column.type, column_name));
1536
            }
1537
        }
1538

1539
        filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes, node_name_to_input_node_column);
1540

1541
        /// NOTE: Currently we store two DAGs for analysis:
1542
        /// (1) SourceStepWithFilter::filter_nodes, (2) query_info.filter_actions_dag. Make sure there are consistent.
1543
        /// TODO: Get rid of filter_actions_dag in query_info after we move analysis of
1544
        /// parallel replicas and unused shards into optimization, similar to projection analysis.
1545
        query_info.filter_actions_dag = filter_actions_dag;
1546

1547
        buildIndexes(
1548
            indexes,
1549
            filter_actions_dag,
1550
            data,
1551
            prepared_parts,
1552
            context,
1553
            query_info,
1554
            metadata_for_reading);
1555
    }
1556
}
1557

1558
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
1559
    MergeTreeData::DataPartsVector parts,
1560
    std::vector<AlterConversionsPtr> alter_conversions,
1561
    const StorageMetadataPtr & metadata_snapshot,
1562
    const SelectQueryInfo & query_info_,
1563
    ContextPtr context_,
1564
    size_t num_streams,
1565
    std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
1566
    const MergeTreeData & data,
1567
    const Names & all_column_names,
1568
    LoggerPtr log,
1569
    std::optional<Indexes> & indexes)
1570
{
1571
    return selectRangesToReadImpl(
1572
        std::move(parts),
1573
        std::move(alter_conversions),
1574
        metadata_snapshot,
1575
        query_info_,
1576
        context_,
1577
        num_streams,
1578
        max_block_numbers_to_read,
1579
        data,
1580
        all_column_names,
1581
        log,
1582
        indexes);
1583
}
1584

1585
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
1586
    MergeTreeData::DataPartsVector parts,
1587
    std::vector<AlterConversionsPtr> alter_conversions,
1588
    const StorageMetadataPtr & metadata_snapshot,
1589
    const SelectQueryInfo & query_info_,
1590
    ContextPtr context_,
1591
    size_t num_streams,
1592
    std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
1593
    const MergeTreeData & data,
1594
    const Names & all_column_names,
1595
    LoggerPtr log,
1596
    std::optional<Indexes> & indexes)
1597
{
1598
    AnalysisResult result;
1599
    const auto & settings = context_->getSettingsRef();
1600

1601
    size_t total_parts = parts.size();
1602

1603
    result.column_names_to_read = all_column_names;
1604

1605
    /// If there are only virtual columns in the query, you must request at least one non-virtual one.
1606
    if (result.column_names_to_read.empty())
1607
    {
1608
        NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical();
1609
        result.column_names_to_read.push_back(ExpressionActions::getSmallestColumn(available_real_columns).name);
1610
    }
1611

1612
    // Build and check if primary key is used when necessary
1613
    const auto & primary_key = metadata_snapshot->getPrimaryKey();
1614
    const Names & primary_key_column_names = primary_key.column_names;
1615

1616
    if (!indexes)
1617
        buildIndexes(indexes, query_info_.filter_actions_dag, data, parts, context_, query_info_, metadata_snapshot);
1618

1619
    if (indexes->part_values && indexes->part_values->empty())
1620
        return std::make_shared<AnalysisResult>(std::move(result));
1621

1622
    if (settings.force_primary_key && indexes->key_condition.alwaysUnknownOrTrue())
1623
    {
1624
        throw Exception(ErrorCodes::INDEX_NOT_USED,
1625
            "Primary key ({}) is not used and setting 'force_primary_key' is set",
1626
            fmt::join(primary_key_column_names, ", "));
1627
    }
1628

1629
    LOG_DEBUG(log, "Key condition: {}", indexes->key_condition.toString());
1630

1631
    if (indexes->part_offset_condition)
1632
        LOG_DEBUG(log, "Part offset condition: {}", indexes->part_offset_condition->toString());
1633

1634
    if (indexes->key_condition.alwaysFalse())
1635
        return std::make_shared<AnalysisResult>(std::move(result));
1636

1637
    size_t total_marks_pk = 0;
1638
    size_t parts_before_pk = 0;
1639

1640
    {
1641
        MergeTreeDataSelectExecutor::filterPartsByPartition(
1642
            indexes->partition_pruner,
1643
            indexes->minmax_idx_condition,
1644
            parts,
1645
            alter_conversions,
1646
            indexes->part_values,
1647
            metadata_snapshot,
1648
            data,
1649
            context_,
1650
            max_block_numbers_to_read.get(),
1651
            log,
1652
            result.index_stats);
1653

1654
        result.sampling = MergeTreeDataSelectExecutor::getSampling(
1655
            query_info_,
1656
            metadata_snapshot->getColumns().getAllPhysical(),
1657
            parts,
1658
            indexes->key_condition,
1659
            data,
1660
            metadata_snapshot,
1661
            context_,
1662
            log);
1663

1664
        if (result.sampling.read_nothing)
1665
            return std::make_shared<AnalysisResult>(std::move(result));
1666

1667
        for (const auto & part : parts)
1668
            total_marks_pk += part->index_granularity.getMarksCountWithoutFinal();
1669
        parts_before_pk = parts.size();
1670

1671
        auto reader_settings = getMergeTreeReaderSettings(context_, query_info_);
1672
        result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
1673
            std::move(parts),
1674
            std::move(alter_conversions),
1675
            metadata_snapshot,
1676
            context_,
1677
            indexes->key_condition,
1678
            indexes->part_offset_condition,
1679
            indexes->skip_indexes,
1680
            reader_settings,
1681
            log,
1682
            num_streams,
1683
            result.index_stats,
1684
            indexes->use_skip_indexes);
1685
    }
1686

1687
    size_t sum_marks_pk = total_marks_pk;
1688
    for (const auto & stat : result.index_stats)
1689
        if (stat.type == IndexType::PrimaryKey)
1690
            sum_marks_pk = stat.num_granules_after;
1691

1692
    size_t sum_marks = 0;
1693
    size_t sum_ranges = 0;
1694
    size_t sum_rows = 0;
1695

1696
    for (const auto & part : result.parts_with_ranges)
1697
    {
1698
        sum_ranges += part.ranges.size();
1699
        sum_marks += part.getMarksCount();
1700
        sum_rows += part.getRowsCount();
1701
    }
1702

1703
    result.total_parts = total_parts;
1704
    result.parts_before_pk = parts_before_pk;
1705
    result.selected_parts = result.parts_with_ranges.size();
1706
    result.selected_ranges = sum_ranges;
1707
    result.selected_marks = sum_marks;
1708
    result.selected_marks_pk = sum_marks_pk;
1709
    result.total_marks_pk = total_marks_pk;
1710
    result.selected_rows = sum_rows;
1711

1712
    if (query_info_.input_order_info)
1713
        result.read_type = (query_info_.input_order_info->direction > 0)
1714
            ? ReadType::InOrder
1715
            : ReadType::InReverseOrder;
1716

1717
    return std::make_shared<AnalysisResult>(std::move(result));
1718
}
1719

1720
bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t limit)
1721
{
1722
    /// if dirction is not set, use current one
1723
    if (!direction)
1724
        direction = getSortDirection();
1725

1726
    /// Disable read-in-order optimization for reverse order with final.
1727
    /// Otherwise, it can lead to incorrect final behavior because the implementation may rely on the reading in direct order).
1728
    if (direction != 1 && query_info.isFinal())
1729
        return false;
1730

1731
    query_info.input_order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, limit);
1732
    reader_settings.read_in_order = true;
1733

1734
    /// In case or read-in-order, don't create too many reading streams.
1735
    /// Almost always we are reading from a single stream at a time because of merge sort.
1736
    if (output_streams_limit)
1737
        requested_num_streams = output_streams_limit;
1738

1739
    /// update sort info for output stream
1740
    SortDescription sort_description;
1741
    const Names & sorting_key_columns = metadata_for_reading->getSortingKeyColumns();
1742
    const Block & header = output_stream->header;
1743
    const int sort_direction = getSortDirection();
1744
    for (const auto & column_name : sorting_key_columns)
1745
    {
1746
        if (std::find_if(header.begin(), header.end(), [&](ColumnWithTypeAndName const & col) { return col.name == column_name; })
1747
            == header.end())
1748
            break;
1749
        sort_description.emplace_back(column_name, sort_direction);
1750
    }
1751
    if (!sort_description.empty())
1752
    {
1753
        const size_t used_prefix_of_sorting_key_size = query_info.input_order_info->used_prefix_of_sorting_key_size;
1754
        if (sort_description.size() > used_prefix_of_sorting_key_size)
1755
            sort_description.resize(used_prefix_of_sorting_key_size);
1756
        output_stream->sort_description = std::move(sort_description);
1757
        output_stream->sort_scope = DataStream::SortScope::Stream;
1758
    }
1759

1760
    /// All *InOrder optimization rely on an assumption that output stream is sorted, but vertical FINAL breaks this rule
1761
    /// Let prefer in-order optimization over vertical FINAL for now
1762
    enable_vertical_final = false;
1763

1764
    return true;
1765
}
1766

1767
bool ReadFromMergeTree::readsInOrder() const
1768
{
1769
    return reader_settings.read_in_order;
1770
}
1771

1772
void ReadFromMergeTree::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value)
1773
{
1774
    query_info.prewhere_info = prewhere_info_value;
1775
    prewhere_info = prewhere_info_value;
1776

1777
    output_stream = DataStream{.header = MergeTreeSelectProcessor::transformHeader(
1778
        storage_snapshot->getSampleBlockForColumns(all_column_names),
1779
        prewhere_info_value)};
1780

1781
    updateSortDescriptionForOutputStream(
1782
        *output_stream,
1783
        storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(),
1784
        getSortDirection(),
1785
        query_info.input_order_info,
1786
        prewhere_info,
1787
        enable_vertical_final);
1788
}
1789

1790
bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
1791
{
1792
    if (isQueryWithFinal())
1793
        return false;
1794

1795
    const auto & settings = context->getSettingsRef();
1796

1797
    const auto partitions_cnt = countPartitions(prepared_parts);
1798
    if (!settings.force_aggregate_partitions_independently && (partitions_cnt == 1 || partitions_cnt < settings.max_threads / 2))
1799
    {
1800
        LOG_TRACE(
1801
            log,
1802
            "Independent aggregation by partitions won't be used because there are too few of them: {}. You can set "
1803
            "force_aggregate_partitions_independently to suppress this check",
1804
            partitions_cnt);
1805
        return false;
1806
    }
1807

1808
    if (!settings.force_aggregate_partitions_independently
1809
        && (partitions_cnt > settings.max_number_of_partitions_for_independent_aggregation))
1810
    {
1811
        LOG_TRACE(
1812
            log,
1813
            "Independent aggregation by partitions won't be used because there are too many of them: {}. You can increase "
1814
            "max_number_of_partitions_for_independent_aggregation (current value is {}) or set "
1815
            "force_aggregate_partitions_independently to suppress this check",
1816
            partitions_cnt,
1817
            settings.max_number_of_partitions_for_independent_aggregation);
1818
        return false;
1819
    }
1820

1821
    if (!settings.force_aggregate_partitions_independently)
1822
    {
1823
        std::unordered_map<String, size_t> partition_rows;
1824
        for (const auto & part : prepared_parts)
1825
            partition_rows[part->info.partition_id] += part->rows_count;
1826
        size_t sum_rows = 0;
1827
        size_t max_rows = 0;
1828
        for (const auto & [_, rows] : partition_rows)
1829
        {
1830
            sum_rows += rows;
1831
            max_rows = std::max(max_rows, rows);
1832
        }
1833

1834
        /// Merging shouldn't take more time than preaggregation in normal cases. And exec time is proportional to the amount of data.
1835
        /// We assume that exec time of independent aggr is proportional to the maximum of sizes and
1836
        /// exec time of ordinary aggr is proportional to sum of sizes divided by number of threads and multiplied by two (preaggregation + merging).
1837
        const size_t avg_rows_in_partition = sum_rows / settings.max_threads;
1838
        if (max_rows > avg_rows_in_partition * 2)
1839
        {
1840
            LOG_TRACE(
1841
                log,
1842
                "Independent aggregation by partitions won't be used because there are too big skew in the number of rows between "
1843
                "partitions. You can set force_aggregate_partitions_independently to suppress this check");
1844
            return false;
1845
        }
1846
    }
1847

1848
    return output_each_partition_through_separate_port = true;
1849
}
1850

1851
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
1852
{
1853
    auto result_ptr = analyzed_result_ptr
1854
        ? analyzed_result_ptr
1855
        : selectRangesToRead(prepared_parts, alter_conversions_for_parts);
1856

1857
    return *result_ptr;
1858
}
1859

1860
bool ReadFromMergeTree::isQueryWithSampling() const
1861
{
1862
    if (context->getSettingsRef().parallel_replicas_count > 1 && data.supportsSampling())
1863
        return true;
1864

1865
    const auto & select = query_info.query->as<ASTSelectQuery &>();
1866
    if (query_info.table_expression_modifiers)
1867
        return query_info.table_expression_modifiers->getSampleSizeRatio() != std::nullopt;
1868
    else
1869
        return select.sampleSize() != nullptr;
1870
}
1871

1872
Pipe ReadFromMergeTree::spreadMarkRanges(
1873
    RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, ActionsDAGPtr & result_projection)
1874
{
1875
    const bool final = isQueryWithFinal();
1876
    Names column_names_to_read = result.column_names_to_read;
1877
    NameSet names(column_names_to_read.begin(), column_names_to_read.end());
1878

1879
    if (!final && result.sampling.use_sampling)
1880
    {
1881
        NameSet sampling_columns;
1882

1883
        /// Add columns needed for `sample_by_ast` to `column_names_to_read`.
1884
        /// Skip this if final was used, because such columns were already added from PK.
1885
        for (const auto & column : result.sampling.filter_expression->getRequiredColumns().getNames())
1886
        {
1887
            if (!names.contains(column))
1888
                column_names_to_read.push_back(column);
1889

1890
            sampling_columns.insert(column);
1891
        }
1892

1893
        if (prewhere_info)
1894
            restorePrewhereInputs(*prewhere_info, sampling_columns);
1895
    }
1896

1897
    if (final)
1898
    {
1899
        chassert(!is_parallel_reading_from_replicas);
1900

1901
        if (output_each_partition_through_separate_port)
1902
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimization isn't supposed to be used for queries with final");
1903

1904
        /// Add columns needed to calculate the sorting expression and the sign.
1905
        for (const auto & column : metadata_for_reading->getColumnsRequiredForSortingKey())
1906
        {
1907
            if (!names.contains(column))
1908
            {
1909
                column_names_to_read.push_back(column);
1910
                names.insert(column);
1911
            }
1912
        }
1913

1914
        if (!data.merging_params.is_deleted_column.empty() && !names.contains(data.merging_params.is_deleted_column))
1915
            column_names_to_read.push_back(data.merging_params.is_deleted_column);
1916
        if (!data.merging_params.sign_column.empty() && !names.contains(data.merging_params.sign_column))
1917
            column_names_to_read.push_back(data.merging_params.sign_column);
1918
        if (!data.merging_params.version_column.empty() && !names.contains(data.merging_params.version_column))
1919
            column_names_to_read.push_back(data.merging_params.version_column);
1920

1921
        return spreadMarkRangesAmongStreamsFinal(std::move(parts_with_ranges), num_streams, result.column_names_to_read, column_names_to_read, result_projection);
1922
    }
1923
    else if (query_info.input_order_info)
1924
    {
1925
        return spreadMarkRangesAmongStreamsWithOrder(
1926
            std::move(parts_with_ranges), num_streams, column_names_to_read, result_projection, query_info.input_order_info);
1927
    }
1928
    else
1929
    {
1930
        return spreadMarkRangesAmongStreams(std::move(parts_with_ranges), num_streams, column_names_to_read);
1931
    }
1932
}
1933

1934
Pipe ReadFromMergeTree::groupStreamsByPartition(AnalysisResult & result, ActionsDAGPtr & result_projection)
1935
{
1936
    auto && parts_with_ranges = std::move(result.parts_with_ranges);
1937

1938
    if (parts_with_ranges.empty())
1939
        return {};
1940

1941
    const size_t partitions_cnt = std::max<size_t>(countPartitions(parts_with_ranges), 1);
1942
    const size_t partitions_per_stream = std::max<size_t>(1, partitions_cnt / requested_num_streams);
1943
    const size_t num_streams = std::max<size_t>(1, requested_num_streams / partitions_cnt);
1944

1945
    Pipes pipes;
1946
    for (auto begin = parts_with_ranges.begin(), end = begin; end != parts_with_ranges.end(); begin = end)
1947
    {
1948
        for (size_t i = 0; i < partitions_per_stream; ++i)
1949
            end = std::find_if(
1950
                end,
1951
                parts_with_ranges.end(),
1952
                [&end](const auto & part) { return end->data_part->info.partition_id != part.data_part->info.partition_id; });
1953

1954
        RangesInDataParts partition_parts{std::make_move_iterator(begin), std::make_move_iterator(end)};
1955

1956
        pipes.emplace_back(spreadMarkRanges(std::move(partition_parts), num_streams, result, result_projection));
1957
        if (!pipes.back().empty())
1958
            pipes.back().resize(1);
1959
    }
1960

1961
    return Pipe::unitePipes(std::move(pipes));
1962
}
1963

1964
void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
1965
{
1966
    auto result = getAnalysisResult();
1967

1968
    /// Do not keep data parts in snapshot.
1969
    /// They are stored separately, and some could be released after PK analysis.
1970
    storage_snapshot->data = std::make_unique<MergeTreeData::SnapshotData>();
1971

1972
    result.checkLimits(context->getSettingsRef(), query_info);
1973
    shared_virtual_fields.emplace("_sample_factor", result.sampling.used_sample_factor);
1974

1975
    LOG_DEBUG(
1976
        log,
1977
        "Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
1978
        result.parts_before_pk,
1979
        result.total_parts,
1980
        result.selected_parts,
1981
        result.selected_marks_pk,
1982
        result.total_marks_pk,
1983
        result.selected_marks,
1984
        result.selected_ranges);
1985

1986
    // Adding partition info to QueryAccessInfo.
1987
    if (context->hasQueryContext() && !query_info.is_internal)
1988
    {
1989
        Names partition_names;
1990
        for (const auto & part : result.parts_with_ranges)
1991
        {
1992
            partition_names.emplace_back(
1993
                fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
1994
        }
1995
        context->getQueryContext()->addQueryAccessInfo(partition_names);
1996

1997
        if (storage_snapshot->projection)
1998
            context->getQueryContext()->addQueryAccessInfo(
1999
                Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name});
2000
    }
2001

2002
    ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);
2003
    ProfileEvents::increment(ProfileEvents::SelectedRanges, result.selected_ranges);
2004
    ProfileEvents::increment(ProfileEvents::SelectedMarks, result.selected_marks);
2005

2006
    auto query_id_holder = MergeTreeDataSelectExecutor::checkLimits(data, result, context);
2007

2008
    if (result.parts_with_ranges.empty())
2009
    {
2010
        pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
2011
        return;
2012
    }
2013

2014
    selected_marks = result.selected_marks;
2015
    selected_rows = result.selected_rows;
2016
    selected_parts = result.selected_parts;
2017
    /// Projection, that needed to drop columns, which have appeared by execution
2018
    /// of some extra expressions, and to allow execute the same expressions later.
2019
    /// NOTE: It may lead to double computation of expressions.
2020
    ActionsDAGPtr result_projection;
2021

2022
    Pipe pipe = output_each_partition_through_separate_port
2023
        ? groupStreamsByPartition(result, result_projection)
2024
        : spreadMarkRanges(std::move(result.parts_with_ranges), requested_num_streams, result, result_projection);
2025

2026
    for (const auto & processor : pipe.getProcessors())
2027
        processor->setStorageLimits(query_info.storage_limits);
2028

2029
    if (pipe.empty())
2030
    {
2031
        pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
2032
        return;
2033
    }
2034

2035
    if (result.sampling.use_sampling)
2036
    {
2037
        auto sampling_actions = std::make_shared<ExpressionActions>(result.sampling.filter_expression);
2038
        pipe.addSimpleTransform([&](const Block & header)
2039
        {
2040
            return std::make_shared<FilterTransform>(
2041
                header,
2042
                sampling_actions,
2043
                result.sampling.filter_function->getColumnName(),
2044
                false);
2045
        });
2046
    }
2047

2048
    Block cur_header = pipe.getHeader();
2049

2050
    auto append_actions = [&result_projection](ActionsDAGPtr actions)
2051
    {
2052
        if (!result_projection)
2053
            result_projection = std::move(actions);
2054
        else
2055
            result_projection = ActionsDAG::merge(std::move(*result_projection), std::move(*actions));
2056
    };
2057

2058
    if (result_projection)
2059
        cur_header = result_projection->updateHeader(cur_header);
2060

2061
    /// Extra columns may be returned (for example, if sampling is used).
2062
    /// Convert pipe to step header structure.
2063
    if (!isCompatibleHeader(cur_header, getOutputStream().header))
2064
    {
2065
        auto converting = ActionsDAG::makeConvertingActions(
2066
            cur_header.getColumnsWithTypeAndName(),
2067
            getOutputStream().header.getColumnsWithTypeAndName(),
2068
            ActionsDAG::MatchColumnsMode::Name);
2069

2070
        append_actions(std::move(converting));
2071
    }
2072

2073
    if (result_projection)
2074
    {
2075
        auto projection_actions = std::make_shared<ExpressionActions>(result_projection);
2076
        pipe.addSimpleTransform([&](const Block & header)
2077
        {
2078
            return std::make_shared<ExpressionTransform>(header, projection_actions);
2079
        });
2080
    }
2081

2082
    /// Some extra columns could be added by sample/final/in-order/etc
2083
    /// Remove them from header if not needed.
2084
    if (!blocksHaveEqualStructure(pipe.getHeader(), getOutputStream().header))
2085
    {
2086
        auto convert_actions_dag = ActionsDAG::makeConvertingActions(
2087
            pipe.getHeader().getColumnsWithTypeAndName(),
2088
            getOutputStream().header.getColumnsWithTypeAndName(),
2089
            ActionsDAG::MatchColumnsMode::Name,
2090
            true);
2091

2092
        auto converting_dag_expr = std::make_shared<ExpressionActions>(convert_actions_dag);
2093

2094
        pipe.addSimpleTransform([&](const Block & header)
2095
        {
2096
            return std::make_shared<ExpressionTransform>(header, converting_dag_expr);
2097
        });
2098
    }
2099

2100
    for (const auto & processor : pipe.getProcessors())
2101
        processors.emplace_back(processor);
2102

2103
    pipeline.init(std::move(pipe));
2104
    pipeline.addContext(context);
2105
    // Attach QueryIdHolder if needed
2106
    if (query_id_holder)
2107
        pipeline.setQueryIdHolder(std::move(query_id_holder));
2108
}
2109

2110
static const char * indexTypeToString(ReadFromMergeTree::IndexType type)
2111
{
2112
    switch (type)
2113
    {
2114
        case ReadFromMergeTree::IndexType::None:
2115
            return "None";
2116
        case ReadFromMergeTree::IndexType::MinMax:
2117
            return "MinMax";
2118
        case ReadFromMergeTree::IndexType::Partition:
2119
            return "Partition";
2120
        case ReadFromMergeTree::IndexType::PrimaryKey:
2121
            return "PrimaryKey";
2122
        case ReadFromMergeTree::IndexType::Skip:
2123
            return "Skip";
2124
    }
2125

2126
    UNREACHABLE();
2127
}
2128

2129
static const char * readTypeToString(ReadFromMergeTree::ReadType type)
2130
{
2131
    switch (type)
2132
    {
2133
        case ReadFromMergeTree::ReadType::Default:
2134
            return "Default";
2135
        case ReadFromMergeTree::ReadType::InOrder:
2136
            return "InOrder";
2137
        case ReadFromMergeTree::ReadType::InReverseOrder:
2138
            return "InReverseOrder";
2139
        case ReadFromMergeTree::ReadType::ParallelReplicas:
2140
            return "Parallel";
2141
    }
2142

2143
    UNREACHABLE();
2144
}
2145

2146
void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
2147
{
2148
    auto result = getAnalysisResult();
2149
    std::string prefix(format_settings.offset, format_settings.indent_char);
2150
    format_settings.out << prefix << "ReadType: " << readTypeToString(result.read_type) << '\n';
2151

2152
    if (!result.index_stats.empty())
2153
    {
2154
        format_settings.out << prefix << "Parts: " << result.index_stats.back().num_parts_after << '\n';
2155
        format_settings.out << prefix << "Granules: " << result.index_stats.back().num_granules_after << '\n';
2156
    }
2157

2158
    if (prewhere_info)
2159
    {
2160
        format_settings.out << prefix << "Prewhere info" << '\n';
2161
        format_settings.out << prefix << "Need filter: " << prewhere_info->need_filter << '\n';
2162

2163
        prefix.push_back(format_settings.indent_char);
2164
        prefix.push_back(format_settings.indent_char);
2165

2166
        if (prewhere_info->prewhere_actions)
2167
        {
2168
            format_settings.out << prefix << "Prewhere filter" << '\n';
2169
            format_settings.out << prefix << "Prewhere filter column: " << prewhere_info->prewhere_column_name;
2170
            if (prewhere_info->remove_prewhere_column)
2171
               format_settings.out << " (removed)";
2172
            format_settings.out << '\n';
2173

2174
            auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
2175
            expression->describeActions(format_settings.out, prefix);
2176
        }
2177

2178
        if (prewhere_info->row_level_filter)
2179
        {
2180
            format_settings.out << prefix << "Row level filter" << '\n';
2181
            format_settings.out << prefix << "Row level filter column: " << prewhere_info->row_level_column_name << '\n';
2182

2183
            auto expression = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter);
2184
            expression->describeActions(format_settings.out, prefix);
2185
        }
2186
    }
2187
}
2188

2189
void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
2190
{
2191
    auto result = getAnalysisResult();
2192
    map.add("Read Type", readTypeToString(result.read_type));
2193
    if (!result.index_stats.empty())
2194
    {
2195
        map.add("Parts", result.index_stats.back().num_parts_after);
2196
        map.add("Granules", result.index_stats.back().num_granules_after);
2197
    }
2198

2199
    if (prewhere_info)
2200
    {
2201
        std::unique_ptr<JSONBuilder::JSONMap> prewhere_info_map = std::make_unique<JSONBuilder::JSONMap>();
2202
        prewhere_info_map->add("Need filter", prewhere_info->need_filter);
2203

2204
        if (prewhere_info->prewhere_actions)
2205
        {
2206
            std::unique_ptr<JSONBuilder::JSONMap> prewhere_filter_map = std::make_unique<JSONBuilder::JSONMap>();
2207
            prewhere_filter_map->add("Prewhere filter column", prewhere_info->prewhere_column_name);
2208
            prewhere_filter_map->add("Prewhere filter remove filter column", prewhere_info->remove_prewhere_column);
2209
            auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
2210
            prewhere_filter_map->add("Prewhere filter expression", expression->toTree());
2211

2212
            prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map));
2213
        }
2214

2215
        if (prewhere_info->row_level_filter)
2216
        {
2217
            std::unique_ptr<JSONBuilder::JSONMap> row_level_filter_map = std::make_unique<JSONBuilder::JSONMap>();
2218
            row_level_filter_map->add("Row level filter column", prewhere_info->row_level_column_name);
2219
            auto expression = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter);
2220
            row_level_filter_map->add("Row level filter expression", expression->toTree());
2221

2222
            prewhere_info_map->add("Row level filter", std::move(row_level_filter_map));
2223
        }
2224

2225
        map.add("Prewhere info", std::move(prewhere_info_map));
2226
    }
2227
}
2228

2229
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
2230
{
2231
    auto result = getAnalysisResult();
2232
    const auto & index_stats = result.index_stats;
2233

2234
    std::string prefix(format_settings.offset, format_settings.indent_char);
2235
    if (!index_stats.empty())
2236
    {
2237
        /// Do not print anything if no indexes is applied.
2238
        if (index_stats.size() == 1 && index_stats.front().type == IndexType::None)
2239
            return;
2240

2241
        std::string indent(format_settings.indent, format_settings.indent_char);
2242
        format_settings.out << prefix << "Indexes:\n";
2243

2244
        for (size_t i = 0; i < index_stats.size(); ++i)
2245
        {
2246
            const auto & stat = index_stats[i];
2247
            if (stat.type == IndexType::None)
2248
                continue;
2249

2250
            format_settings.out << prefix << indent << indexTypeToString(stat.type) << '\n';
2251

2252
            if (!stat.name.empty())
2253
                format_settings.out << prefix << indent << indent << "Name: " << stat.name << '\n';
2254

2255
            if (!stat.description.empty())
2256
                format_settings.out << prefix << indent << indent << "Description: " << stat.description << '\n';
2257

2258
            if (!stat.used_keys.empty())
2259
            {
2260
                format_settings.out << prefix << indent << indent << "Keys: " << stat.name << '\n';
2261
                for (const auto & used_key : stat.used_keys)
2262
                    format_settings.out << prefix << indent << indent << indent << used_key << '\n';
2263
            }
2264

2265
            if (!stat.condition.empty())
2266
                format_settings.out << prefix << indent << indent << "Condition: " << stat.condition << '\n';
2267

2268
            format_settings.out << prefix << indent << indent << "Parts: " << stat.num_parts_after;
2269
            if (i)
2270
                format_settings.out << '/' << index_stats[i - 1].num_parts_after;
2271
            format_settings.out << '\n';
2272

2273
            format_settings.out << prefix << indent << indent << "Granules: " << stat.num_granules_after;
2274
            if (i)
2275
                format_settings.out << '/' << index_stats[i - 1].num_granules_after;
2276
            format_settings.out << '\n';
2277
        }
2278
    }
2279
}
2280

2281
void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
2282
{
2283
    auto result = getAnalysisResult();
2284
    auto index_stats = std::move(result.index_stats);
2285

2286
    if (!index_stats.empty())
2287
    {
2288
        /// Do not print anything if no indexes is applied.
2289
        if (index_stats.size() == 1 && index_stats.front().type == IndexType::None)
2290
            return;
2291

2292
        auto indexes_array = std::make_unique<JSONBuilder::JSONArray>();
2293

2294
        for (size_t i = 0; i < index_stats.size(); ++i)
2295
        {
2296
            const auto & stat = index_stats[i];
2297
            if (stat.type == IndexType::None)
2298
                continue;
2299

2300
            auto index_map = std::make_unique<JSONBuilder::JSONMap>();
2301

2302
            index_map->add("Type", indexTypeToString(stat.type));
2303

2304
            if (!stat.name.empty())
2305
                index_map->add("Name", stat.name);
2306

2307
            if (!stat.description.empty())
2308
                index_map->add("Description", stat.description);
2309

2310
            if (!stat.used_keys.empty())
2311
            {
2312
                auto keys_array = std::make_unique<JSONBuilder::JSONArray>();
2313

2314
                for (const auto & used_key : stat.used_keys)
2315
                    keys_array->add(used_key);
2316

2317
                index_map->add("Keys", std::move(keys_array));
2318
            }
2319

2320
            if (!stat.condition.empty())
2321
                index_map->add("Condition", stat.condition);
2322

2323
            if (i)
2324
                index_map->add("Initial Parts", index_stats[i - 1].num_parts_after);
2325
            index_map->add("Selected Parts", stat.num_parts_after);
2326

2327
            if (i)
2328
                index_map->add("Initial Granules", index_stats[i - 1].num_granules_after);
2329
            index_map->add("Selected Granules", stat.num_granules_after);
2330

2331
            indexes_array->add(std::move(index_map));
2332
        }
2333

2334
        map.add("Indexes", std::move(indexes_array));
2335
    }
2336
}
2337

2338

2339
}
2340

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.