ClickHouse

Форк
0
/
Planner.cpp 
1765 строк · 75.6 Кб
1
#include <Planner/Planner.h>
2

3
#include <Columns/ColumnConst.h>
4
#include <Columns/ColumnSet.h>
5
#include <Core/ProtocolDefines.h>
6
#include <Common/ProfileEvents.h>
7
#include <Common/logger_useful.h>
8

9
#include <DataTypes/DataTypeString.h>
10

11
#include <Functions/FunctionFactory.h>
12
#include <Functions/CastOverloadResolver.h>
13
#include <Functions/indexHint.h>
14

15
#include <QueryPipeline/Pipe.h>
16
#include <Processors/Sources/SourceFromSingleChunk.h>
17
#include <Processors/QueryPlan/QueryPlan.h>
18
#include <Processors/QueryPlan/ExpressionStep.h>
19
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
20
#include <Processors/QueryPlan/FilterStep.h>
21
#include <Processors/QueryPlan/UnionStep.h>
22
#include <Processors/QueryPlan/DistinctStep.h>
23
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
24
#include <Processors/QueryPlan/CreatingSetsStep.h>
25
#include <Processors/QueryPlan/AggregatingStep.h>
26
#include <Processors/QueryPlan/MergingAggregatedStep.h>
27
#include <Processors/QueryPlan/SortingStep.h>
28
#include <Processors/QueryPlan/FillingStep.h>
29
#include <Processors/QueryPlan/LimitStep.h>
30
#include <Processors/QueryPlan/OffsetStep.h>
31
#include <Processors/QueryPlan/ExtremesStep.h>
32
#include <Processors/QueryPlan/TotalsHavingStep.h>
33
#include <Processors/QueryPlan/RollupStep.h>
34
#include <Processors/QueryPlan/CubeStep.h>
35
#include <Processors/QueryPlan/LimitByStep.h>
36
#include <Processors/QueryPlan/WindowStep.h>
37
#include <Processors/QueryPlan/ReadNothingStep.h>
38
#include <QueryPipeline/QueryPipelineBuilder.h>
39

40
#include <Interpreters/Context.h>
41
#include <Interpreters/StorageID.h>
42

43
#include <Storages/ColumnsDescription.h>
44
#include <Storages/IStorage.h>
45
#include <Storages/MergeTree/MergeTreeData.h>
46
#include <Storages/SelectQueryInfo.h>
47
#include <Storages/StorageDistributed.h>
48
#include <Storages/StorageDummy.h>
49
#include <Storages/StorageMerge.h>
50

51
#include <Analyzer/Utils.h>
52
#include <Analyzer/ColumnNode.h>
53
#include <Analyzer/ConstantNode.h>
54
#include <Analyzer/FunctionNode.h>
55
#include <Analyzer/SortNode.h>
56
#include <Analyzer/InterpolateNode.h>
57
#include <Analyzer/WindowNode.h>
58
#include <Analyzer/TableNode.h>
59
#include <Analyzer/TableFunctionNode.h>
60
#include <Analyzer/QueryNode.h>
61
#include <Analyzer/UnionNode.h>
62
#include <Analyzer/JoinNode.h>
63
#include <Analyzer/ArrayJoinNode.h>
64
#include <Analyzer/QueryTreeBuilder.h>
65
#include <Analyzer/QueryTreePassManager.h>
66
#include <Analyzer/AggregationUtils.h>
67
#include <Analyzer/WindowFunctionsUtils.h>
68

69
#include <Planner/findQueryForParallelReplicas.h>
70
#include <Planner/Utils.h>
71
#include <Planner/PlannerContext.h>
72
#include <Planner/PlannerActionsVisitor.h>
73
#include <Planner/PlannerJoins.h>
74
#include <Planner/PlannerAggregation.h>
75
#include <Planner/PlannerSorting.h>
76
#include <Planner/PlannerWindowFunctions.h>
77
#include <Planner/CollectSets.h>
78
#include <Planner/CollectTableExpressionData.h>
79
#include <Planner/PlannerJoinTree.h>
80
#include <Planner/PlannerExpressionAnalysis.h>
81
#include <Planner/CollectColumnIdentifiers.h>
82
#include <Planner/PlannerQueryProcessingInfo.h>
83

84
namespace ProfileEvents
85
{
86
    extern const Event SelectQueriesWithSubqueries;
87
    extern const Event QueriesWithSubqueries;
88
}
89

90
namespace DB
91
{
92

93
namespace ErrorCodes
94
{
95
    extern const int UNSUPPORTED_METHOD;
96
    extern const int LOGICAL_ERROR;
97
    extern const int BAD_ARGUMENTS;
98
    extern const int TOO_DEEP_SUBQUERIES;
99
    extern const int NOT_IMPLEMENTED;
100
    extern const int SUPPORT_IS_DISABLED;
101
}
102

103
namespace
104
{
105

106
/** Check that table and table function table expressions from planner context support transactions.
107
  *
108
  * There is precondition that table expression data for table expression nodes is collected in planner context.
109
  */
110
void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
111
{
112
    const auto & query_context = planner_context->getQueryContext();
113
    if (!query_context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
114
        return;
115

116
    if (!query_context->getCurrentTransaction())
117
        return;
118

119
    for (const auto & [table_expression, _] : planner_context->getTableExpressionNodeToData())
120
    {
121
        StoragePtr storage;
122
        if (auto * table_node = table_expression->as<TableNode>())
123
            storage = table_node->getStorage();
124
        else if (auto * table_function_node = table_expression->as<TableFunctionNode>())
125
            storage = table_function_node->getStorage();
126

127
        if (storage && !storage->supportsTransactions())
128
            throw Exception(ErrorCodes::NOT_IMPLEMENTED,
129
                "Storage {} (table {}) does not support transactions",
130
                storage->getName(),
131
                storage->getStorageID().getNameForLogs());
132
    }
133
}
134

135
/** Storages can rely that filters that for storage will be available for analysis before
136
  * getQueryProcessingStage method will be called.
137
  *
138
  * StorageDistributed skip unused shards optimization relies on this.
139
  * Parallel replicas estimation relies on this too.
140
  * StorageMerge common header calculation relies on this too.
141
  *
142
  * To collect filters that will be applied to specific table in case we have JOINs requires
143
  * to run query plan optimization pipeline.
144
  *
145
  * Algorithm:
146
  * 1. Replace all table expressions in query tree with dummy tables.
147
  * 2. Build query plan.
148
  * 3. Optimize query plan.
149
  * 4. Extract filters from ReadFromDummy query plan steps from query plan leaf nodes.
150
  */
151

152
FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const QueryTreeNodes & table_nodes, const ContextPtr & query_context)
153
{
154
    bool collect_filters = false;
155
    const auto & settings = query_context->getSettingsRef();
156

157
    bool parallel_replicas_estimation_enabled
158
        = query_context->canUseParallelReplicasOnInitiator() && settings.parallel_replicas_min_number_of_rows_per_replica > 0;
159

160
    for (const auto & table_expression : table_nodes)
161
    {
162
        auto * table_node = table_expression->as<TableNode>();
163
        auto * table_function_node = table_expression->as<TableFunctionNode>();
164
        if (!table_node && !table_function_node)
165
            continue;
166

167
        const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
168
        if (typeid_cast<const StorageDistributed *>(storage.get()) || typeid_cast<const StorageMerge *>(storage.get())
169
            || (parallel_replicas_estimation_enabled && std::dynamic_pointer_cast<MergeTreeData>(storage)))
170
        {
171
            collect_filters = true;
172
            break;
173
        }
174
    }
175

176
    if (!collect_filters)
177
        return {};
178

179
    ResultReplacementMap replacement_map;
180

181
    auto updated_query_tree = replaceTableExpressionsWithDummyTables(query_tree, table_nodes, query_context, &replacement_map);
182

183
    std::unordered_map<const IStorage *, QueryTreeNodePtr> dummy_storage_to_table;
184

185
    for (auto & [from_table_expression, dummy_table_expression] : replacement_map)
186
    {
187
        auto * dummy_storage = dummy_table_expression->as<TableNode &>().getStorage().get();
188
        dummy_storage_to_table.emplace(dummy_storage, from_table_expression);
189
    }
190

191
    SelectQueryOptions select_query_options;
192
    Planner planner(updated_query_tree, select_query_options);
193
    planner.buildQueryPlanIfNeeded();
194

195
    auto & result_query_plan = planner.getQueryPlan();
196

197
    auto optimization_settings = QueryPlanOptimizationSettings::fromContext(query_context);
198
    result_query_plan.optimize(optimization_settings);
199

200
    FiltersForTableExpressionMap res;
201

202
    std::vector<QueryPlan::Node *> nodes_to_process;
203
    nodes_to_process.push_back(result_query_plan.getRootNode());
204

205
    while (!nodes_to_process.empty())
206
    {
207
        const auto * node_to_process = nodes_to_process.back();
208
        nodes_to_process.pop_back();
209
        nodes_to_process.insert(nodes_to_process.end(), node_to_process->children.begin(), node_to_process->children.end());
210

211
        auto * read_from_dummy = typeid_cast<ReadFromDummy *>(node_to_process->step.get());
212
        if (!read_from_dummy)
213
            continue;
214

215
        auto filter_actions = read_from_dummy->getFilterActionsDAG();
216
        const auto & table_node = dummy_storage_to_table.at(&read_from_dummy->getStorage());
217
        res[table_node] = FiltersForTableExpression{std::move(filter_actions), read_from_dummy->getPrewhereInfo()};
218
    }
219

220
    return res;
221
}
222

223
FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
224
{
225
    if (select_query_options.only_analyze)
226
        return {};
227

228
    auto * query_node = query_tree_node->as<QueryNode>();
229
    auto * union_node = query_tree_node->as<UnionNode>();
230

231
    if (!query_node && !union_node)
232
        throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
233
            "Expected QUERY or UNION node. Actual {}",
234
            query_tree_node->formatASTForErrorMessage());
235

236
    auto context = query_node ? query_node->getContext() : union_node->getContext();
237

238
    auto table_expressions_nodes
239
        = extractTableExpressions(query_tree_node, false /* add_array_join */, true /* recursive */);
240

241
    return collectFiltersForAnalysis(query_tree_node, table_expressions_nodes, context);
242
}
243

244
/// Extend lifetime of query context, storages, and table locks
245
void extendQueryContextAndStoragesLifetime(QueryPlan & query_plan, const PlannerContextPtr & planner_context)
246
{
247
    query_plan.addInterpreterContext(planner_context->getQueryContext());
248

249
    for (const auto & [table_expression, _] : planner_context->getTableExpressionNodeToData())
250
    {
251
        if (auto * table_node = table_expression->as<TableNode>())
252
        {
253
            query_plan.addStorageHolder(table_node->getStorage());
254
            query_plan.addTableLock(table_node->getStorageLock());
255
        }
256
        else if (auto * table_function_node = table_expression->as<TableFunctionNode>())
257
        {
258
            query_plan.addStorageHolder(table_function_node->getStorage());
259
        }
260
    }
261
}
262

263
class QueryAnalysisResult
264
{
265
public:
266
    QueryAnalysisResult(const QueryTreeNodePtr & query_tree,
267
        const PlannerQueryProcessingInfo & query_processing_info,
268
        const PlannerContextPtr & planner_context)
269
    {
270
        const auto & query_node = query_tree->as<QueryNode &>();
271
        const auto & query_context = planner_context->getQueryContext();
272
        const auto & settings = query_context->getSettingsRef();
273

274
        aggregate_overflow_row = query_node.isGroupByWithTotals() && settings.max_rows_to_group_by
275
            && settings.group_by_overflow_mode == OverflowMode::ANY && settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
276
        aggregate_final = query_processing_info.getToStage() > QueryProcessingStage::WithMergeableState
277
            && !query_node.isGroupByWithTotals() && !query_node.isGroupByWithRollup() && !query_node.isGroupByWithCube();
278
        aggregation_with_rollup_or_cube_or_grouping_sets = query_node.isGroupByWithRollup() || query_node.isGroupByWithCube() ||
279
            query_node.isGroupByWithGroupingSets();
280
        aggregation_should_produce_results_in_order_of_bucket_number
281
            = query_processing_info.getToStage() == QueryProcessingStage::WithMergeableState
282
            && (settings.distributed_aggregation_memory_efficient || settings.enable_memory_bound_merging_of_aggregation_results);
283

284
        query_has_array_join_in_join_tree = queryHasArrayJoinInJoinTree(query_tree);
285
        query_has_with_totals_in_any_subquery_in_join_tree = queryHasWithTotalsInAnySubqueryInJoinTree(query_tree);
286

287
        sort_description = extractSortDescription(query_node.getOrderByNode(), *planner_context);
288

289
        if (query_node.hasLimit())
290
        {
291
            /// Constness of limit is validated during query analysis stage
292
            limit_length = query_node.getLimit()->as<ConstantNode &>().getValue().safeGet<UInt64>();
293

294
            if (query_node.hasOffset() && limit_length)
295
            {
296
                /// Constness of offset is validated during query analysis stage
297
                limit_offset = query_node.getOffset()->as<ConstantNode &>().getValue().safeGet<UInt64>();
298
            }
299
        }
300
        else if (query_node.hasOffset())
301
        {
302
            /// Constness of offset is validated during query analysis stage
303
            limit_offset = query_node.getOffset()->as<ConstantNode &>().getValue().safeGet<UInt64>();
304
        }
305

306
        /// Partial sort can be done if there is LIMIT, but no DISTINCT, LIMIT WITH TIES, LIMIT BY, ARRAY JOIN
307
        if (limit_length != 0 &&
308
            !query_node.isDistinct() &&
309
            !query_node.isLimitWithTies() &&
310
            !query_node.hasLimitBy() &&
311
            !query_has_array_join_in_join_tree &&
312
            limit_length <= std::numeric_limits<UInt64>::max() - limit_offset)
313
        {
314
            partial_sorting_limit = limit_length + limit_offset;
315
        }
316
    }
317

318
    bool aggregate_overflow_row = false;
319
    bool aggregate_final = false;
320
    bool aggregation_with_rollup_or_cube_or_grouping_sets = false;
321
    bool aggregation_should_produce_results_in_order_of_bucket_number = false;
322
    bool query_has_array_join_in_join_tree = false;
323
    bool query_has_with_totals_in_any_subquery_in_join_tree = false;
324
    SortDescription sort_description;
325
    UInt64 limit_length = 0;
326
    UInt64 limit_offset = 0;
327
    UInt64 partial_sorting_limit = 0;
328
};
329

330
void addExpressionStep(QueryPlan & query_plan,
331
    const ActionsDAGPtr & expression_actions,
332
    const std::string & step_description,
333
    std::vector<ActionsDAGPtr> & result_actions_to_execute)
334
{
335
    result_actions_to_execute.push_back(expression_actions);
336
    auto expression_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression_actions);
337
    expression_step->setStepDescription(step_description);
338
    query_plan.addStep(std::move(expression_step));
339
}
340

341
void addFilterStep(QueryPlan & query_plan,
342
    const FilterAnalysisResult & filter_analysis_result,
343
    const std::string & step_description,
344
    std::vector<ActionsDAGPtr> & result_actions_to_execute)
345
{
346
    result_actions_to_execute.push_back(filter_analysis_result.filter_actions);
347
    auto where_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
348
        filter_analysis_result.filter_actions,
349
        filter_analysis_result.filter_column_name,
350
        filter_analysis_result.remove_filter_column);
351
    where_step->setStepDescription(step_description);
352
    query_plan.addStep(std::move(where_step));
353
}
354

355
Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
356
    const AggregationAnalysisResult & aggregation_analysis_result,
357
    const QueryAnalysisResult & query_analysis_result,
358
    const SelectQueryInfo & select_query_info,
359
    bool aggregate_descriptions_remove_arguments = false)
360
{
361
    const auto & query_context = planner_context->getQueryContext();
362
    const Settings & settings = query_context->getSettingsRef();
363

364
    const auto stats_collecting_params = Aggregator::Params::StatsCollectingParams(
365
        select_query_info.query,
366
        settings.collect_hash_table_stats_during_aggregation,
367
        settings.max_entries_for_hash_table_stats,
368
        settings.max_size_to_preallocate_for_aggregation);
369

370
    auto aggregate_descriptions = aggregation_analysis_result.aggregate_descriptions;
371
    if (aggregate_descriptions_remove_arguments)
372
    {
373
        for (auto & aggregate_description : aggregate_descriptions)
374
            aggregate_description.argument_names.clear();
375
    }
376

377
    Aggregator::Params aggregator_params = Aggregator::Params(
378
        aggregation_analysis_result.aggregation_keys,
379
        aggregate_descriptions,
380
        query_analysis_result.aggregate_overflow_row,
381
        settings.max_rows_to_group_by,
382
        settings.group_by_overflow_mode,
383
        settings.group_by_two_level_threshold,
384
        settings.group_by_two_level_threshold_bytes,
385
        settings.max_bytes_before_external_group_by,
386
        settings.empty_result_for_aggregation_by_empty_set
387
            || (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && aggregation_analysis_result.aggregation_keys.empty()
388
                && aggregation_analysis_result.group_by_with_constant_keys),
389
        query_context->getTempDataOnDisk(),
390
        settings.max_threads,
391
        settings.min_free_disk_space_for_temporary_data,
392
        settings.compile_aggregate_expressions,
393
        settings.min_count_to_compile_aggregate_expression,
394
        settings.max_block_size,
395
        settings.enable_software_prefetch_in_aggregation,
396
        /* only_merge */ false,
397
        settings.optimize_group_by_constant_keys,
398
        settings.min_hit_rate_to_use_consecutive_keys_optimization,
399
        stats_collecting_params);
400

401
    return aggregator_params;
402
}
403

404
SortDescription getSortDescriptionFromNames(const Names & names)
405
{
406
    SortDescription order_descr;
407
    order_descr.reserve(names.size());
408

409
    for (const auto & name : names)
410
        order_descr.emplace_back(name, 1, 1);
411

412
    return order_descr;
413
}
414

415
void addAggregationStep(QueryPlan & query_plan,
416
    const AggregationAnalysisResult & aggregation_analysis_result,
417
    const QueryAnalysisResult & query_analysis_result,
418
    const PlannerContextPtr & planner_context,
419
    const SelectQueryInfo & select_query_info)
420
{
421
    const Settings & settings = planner_context->getQueryContext()->getSettingsRef();
422
    auto aggregator_params = getAggregatorParams(planner_context, aggregation_analysis_result, query_analysis_result, select_query_info);
423

424
    SortDescription sort_description_for_merging;
425
    SortDescription group_by_sort_description;
426

427
    if (settings.force_aggregation_in_order)
428
    {
429
        group_by_sort_description = getSortDescriptionFromNames(aggregation_analysis_result.aggregation_keys);
430
        sort_description_for_merging = group_by_sort_description;
431
    }
432

433
    auto merge_threads = settings.max_threads;
434
    auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
435
        ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
436
        : static_cast<size_t>(settings.max_threads);
437

438
    bool storage_has_evenly_distributed_read = false;
439
    const auto & table_expression_node_to_data = planner_context->getTableExpressionNodeToData();
440

441
    if (table_expression_node_to_data.size() == 1)
442
    {
443
        auto it = table_expression_node_to_data.begin();
444
        const auto & table_expression_node = it->first;
445
        if (const auto * table_node = table_expression_node->as<TableNode>())
446
            storage_has_evenly_distributed_read = table_node->getStorage()->hasEvenlyDistributedRead();
447
        else if (const auto * table_function_node = table_expression_node->as<TableFunctionNode>())
448
            storage_has_evenly_distributed_read = table_function_node->getStorageOrThrow()->hasEvenlyDistributedRead();
449
    }
450

451
    auto aggregating_step = std::make_unique<AggregatingStep>(
452
        query_plan.getCurrentDataStream(),
453
        aggregator_params,
454
        aggregation_analysis_result.grouping_sets_parameters_list,
455
        query_analysis_result.aggregate_final,
456
        settings.max_block_size,
457
        settings.aggregation_in_order_max_block_bytes,
458
        merge_threads,
459
        temporary_data_merge_threads,
460
        storage_has_evenly_distributed_read,
461
        settings.group_by_use_nulls,
462
        std::move(sort_description_for_merging),
463
        std::move(group_by_sort_description),
464
        query_analysis_result.aggregation_should_produce_results_in_order_of_bucket_number,
465
        settings.enable_memory_bound_merging_of_aggregation_results,
466
        settings.force_aggregation_in_order);
467
    query_plan.addStep(std::move(aggregating_step));
468
}
469

470
void addMergingAggregatedStep(QueryPlan & query_plan,
471
    const AggregationAnalysisResult & aggregation_analysis_result,
472
    const QueryAnalysisResult & query_analysis_result,
473
    const PlannerContextPtr & planner_context)
474
{
475
    const auto & query_context = planner_context->getQueryContext();
476
    const auto & settings = query_context->getSettingsRef();
477

478
    /** There are two modes of distributed aggregation.
479
      *
480
      * 1. In different threads read from the remote servers blocks.
481
      * Save all the blocks in the RAM. Merge blocks.
482
      * If the aggregation is two-level - parallelize to the number of buckets.
483
      *
484
      * 2. In one thread, read blocks from different servers in order.
485
      * RAM stores only one block from each server.
486
      * If the aggregation is a two-level aggregation, we consistently merge the blocks of each next level.
487
      *
488
      * The second option consumes less memory (up to 256 times less)
489
      * in the case of two-level aggregation, which is used for large results after GROUP BY,
490
      * but it can work more slowly.
491
      */
492

493
    auto keys = aggregation_analysis_result.aggregation_keys;
494
    if (!aggregation_analysis_result.grouping_sets_parameters_list.empty())
495
        keys.insert(keys.begin(), "__grouping_set");
496

497
    Aggregator::Params params(keys,
498
        aggregation_analysis_result.aggregate_descriptions,
499
        query_analysis_result.aggregate_overflow_row,
500
        settings.max_threads,
501
        settings.max_block_size,
502
        settings.min_hit_rate_to_use_consecutive_keys_optimization);
503

504
    bool is_remote_storage = false;
505
    bool parallel_replicas_from_merge_tree = false;
506

507
    const auto & table_expression_node_to_data = planner_context->getTableExpressionNodeToData();
508
    if (table_expression_node_to_data.size() == 1)
509
    {
510
        auto it = table_expression_node_to_data.begin();
511
        is_remote_storage = it->second.isRemote();
512
        parallel_replicas_from_merge_tree = it->second.isMergeTree() && query_context->canUseParallelReplicasOnInitiator();
513
    }
514

515
    SortDescription group_by_sort_description;
516

517
    auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
518
        query_plan.getCurrentDataStream(),
519
        params,
520
        query_analysis_result.aggregate_final,
521
        /// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
522
        settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets,
523
        settings.max_threads,
524
        settings.aggregation_memory_efficient_merge_threads,
525
        query_analysis_result.aggregation_should_produce_results_in_order_of_bucket_number,
526
        settings.max_block_size,
527
        settings.aggregation_in_order_max_block_bytes,
528
        std::move(group_by_sort_description),
529
        settings.enable_memory_bound_merging_of_aggregation_results);
530
    query_plan.addStep(std::move(merging_aggregated));
531
}
532

533
void addTotalsHavingStep(QueryPlan & query_plan,
534
    const PlannerExpressionsAnalysisResult & expression_analysis_result,
535
    const QueryAnalysisResult & query_analysis_result,
536
    const PlannerContextPtr & planner_context,
537
    const QueryNode & query_node,
538
    std::vector<ActionsDAGPtr> & result_actions_to_execute)
539
{
540
    const auto & query_context = planner_context->getQueryContext();
541
    const auto & settings = query_context->getSettingsRef();
542

543
    const auto & aggregation_analysis_result = expression_analysis_result.getAggregation();
544
    const auto & having_analysis_result = expression_analysis_result.getHaving();
545
    bool need_finalize = !query_node.isGroupByWithRollup() && !query_node.isGroupByWithCube();
546

547
    if (having_analysis_result.filter_actions)
548
        result_actions_to_execute.push_back(having_analysis_result.filter_actions);
549

550
    auto totals_having_step = std::make_unique<TotalsHavingStep>(
551
        query_plan.getCurrentDataStream(),
552
        aggregation_analysis_result.aggregate_descriptions,
553
        query_analysis_result.aggregate_overflow_row,
554
        having_analysis_result.filter_actions,
555
        having_analysis_result.filter_column_name,
556
        having_analysis_result.remove_filter_column,
557
        settings.totals_mode,
558
        settings.totals_auto_threshold,
559
        need_finalize);
560
    query_plan.addStep(std::move(totals_having_step));
561
}
562

563
void addCubeOrRollupStepIfNeeded(QueryPlan & query_plan,
564
    const AggregationAnalysisResult & aggregation_analysis_result,
565
    const QueryAnalysisResult & query_analysis_result,
566
    const PlannerContextPtr & planner_context,
567
    const SelectQueryInfo & select_query_info,
568
    const QueryNode & query_node)
569
{
570
    if (!query_node.isGroupByWithCube() && !query_node.isGroupByWithRollup())
571
        return;
572

573
    const auto & query_context = planner_context->getQueryContext();
574
    const auto & settings = query_context->getSettingsRef();
575

576
    auto aggregator_params = getAggregatorParams(planner_context,
577
        aggregation_analysis_result,
578
        query_analysis_result,
579
        select_query_info,
580
        true /*aggregate_descriptions_remove_arguments*/);
581

582
    if (query_node.isGroupByWithRollup())
583
    {
584
        auto rollup_step = std::make_unique<RollupStep>(
585
            query_plan.getCurrentDataStream(), std::move(aggregator_params), true /*final*/, settings.group_by_use_nulls);
586
        query_plan.addStep(std::move(rollup_step));
587
    }
588
    else if (query_node.isGroupByWithCube())
589
    {
590
        auto cube_step = std::make_unique<CubeStep>(
591
            query_plan.getCurrentDataStream(), std::move(aggregator_params), true /*final*/, settings.group_by_use_nulls);
592
        query_plan.addStep(std::move(cube_step));
593
    }
594
}
595

596
void addDistinctStep(QueryPlan & query_plan,
597
    const QueryAnalysisResult & query_analysis_result,
598
    const PlannerContextPtr & planner_context,
599
    const Names & column_names,
600
    const QueryNode & query_node,
601
    bool before_order,
602
    bool pre_distinct)
603
{
604
    const Settings & settings = planner_context->getQueryContext()->getSettingsRef();
605

606
    UInt64 limit_offset = query_analysis_result.limit_offset;
607
    UInt64 limit_length = query_analysis_result.limit_length;
608

609
    UInt64 limit_hint_for_distinct = 0;
610

611
    /** If after this stage of DISTINCT
612
      * 1. ORDER BY is not executed.
613
      * 2. There is no LIMIT BY.
614
      * Then you can get no more than limit_length + limit_offset of different rows.
615
      */
616
    if ((!query_node.hasOrderBy() || !before_order) && !query_node.hasLimitBy())
617
    {
618
        if (limit_length <= std::numeric_limits<UInt64>::max() - limit_offset)
619
            limit_hint_for_distinct = limit_length + limit_offset;
620
    }
621

622
    SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
623

624
    auto distinct_step = std::make_unique<DistinctStep>(
625
        query_plan.getCurrentDataStream(),
626
        limits,
627
        limit_hint_for_distinct,
628
        column_names,
629
        pre_distinct,
630
        settings.optimize_distinct_in_order);
631

632
    distinct_step->setStepDescription(pre_distinct ? "Preliminary DISTINCT" : "DISTINCT");
633
    query_plan.addStep(std::move(distinct_step));
634
}
635

636
void addSortingStep(QueryPlan & query_plan,
637
    const QueryAnalysisResult & query_analysis_result,
638
    const PlannerContextPtr & planner_context)
639
{
640
    const auto & sort_description = query_analysis_result.sort_description;
641
    const auto & query_context = planner_context->getQueryContext();
642
    const Settings & settings = query_context->getSettingsRef();
643
    SortingStep::Settings sort_settings(*query_context);
644

645
    auto sorting_step = std::make_unique<SortingStep>(
646
        query_plan.getCurrentDataStream(),
647
        sort_description,
648
        query_analysis_result.partial_sorting_limit,
649
        sort_settings,
650
        settings.optimize_sorting_by_input_stream_properties);
651
    sorting_step->setStepDescription("Sorting for ORDER BY");
652
    query_plan.addStep(std::move(sorting_step));
653
}
654

655
void addMergeSortingStep(QueryPlan & query_plan,
656
    const QueryAnalysisResult & query_analysis_result,
657
    const PlannerContextPtr & planner_context,
658
    const std::string & description)
659
{
660
    const auto & query_context = planner_context->getQueryContext();
661
    const auto & settings = query_context->getSettingsRef();
662

663
    const auto & sort_description = query_analysis_result.sort_description;
664
    const auto max_block_size = settings.max_block_size;
665

666
    auto merging_sorted = std::make_unique<SortingStep>(query_plan.getCurrentDataStream(),
667
        sort_description,
668
        max_block_size,
669
        query_analysis_result.partial_sorting_limit,
670
        settings.exact_rows_before_limit);
671
    merging_sorted->setStepDescription("Merge sorted streams " + description);
672
    query_plan.addStep(std::move(merging_sorted));
673
}
674

675
void addWithFillStepIfNeeded(QueryPlan & query_plan,
676
    const QueryAnalysisResult & query_analysis_result,
677
    const PlannerContextPtr & planner_context,
678
    const QueryNode & query_node)
679
{
680
    const auto & sort_description = query_analysis_result.sort_description;
681

682
    NameSet column_names_with_fill;
683
    SortDescription fill_description;
684

685
    for (const auto & description : sort_description)
686
    {
687
        if (description.with_fill)
688
        {
689
            fill_description.push_back(description);
690
            column_names_with_fill.insert(description.column_name);
691
        }
692
    }
693

694
    if (fill_description.empty())
695
        return;
696

697
    InterpolateDescriptionPtr interpolate_description;
698

699
    if (query_node.hasInterpolate())
700
    {
701
        auto interpolate_actions_dag = std::make_shared<ActionsDAG>();
702
        auto query_plan_columns = query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName();
703
        for (auto & query_plan_column : query_plan_columns)
704
        {
705
            /// INTERPOLATE actions dag input columns must be non constant
706
            query_plan_column.column = nullptr;
707
            interpolate_actions_dag->addInput(query_plan_column);
708
        }
709

710
        auto & interpolate_list_node = query_node.getInterpolate()->as<ListNode &>();
711
        auto & interpolate_list_nodes = interpolate_list_node.getNodes();
712

713
        if (interpolate_list_nodes.empty())
714
        {
715
            for (const auto * input_node : interpolate_actions_dag->getInputs())
716
            {
717
                if (column_names_with_fill.contains(input_node->result_name))
718
                    continue;
719

720
                interpolate_actions_dag->getOutputs().push_back(input_node);
721
            }
722
        }
723
        else
724
        {
725
            for (auto & interpolate_node : interpolate_list_nodes)
726
            {
727
                auto & interpolate_node_typed = interpolate_node->as<InterpolateNode &>();
728

729
                PlannerActionsVisitor planner_actions_visitor(planner_context);
730
                auto expression_to_interpolate_expression_nodes = planner_actions_visitor.visit(interpolate_actions_dag,
731
                    interpolate_node_typed.getExpression());
732
                if (expression_to_interpolate_expression_nodes.size() != 1)
733
                    throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression to interpolate expected to have single action node");
734

735
                auto interpolate_expression_nodes = planner_actions_visitor.visit(interpolate_actions_dag,
736
                    interpolate_node_typed.getInterpolateExpression());
737
                if (interpolate_expression_nodes.size() != 1)
738
                    throw Exception(ErrorCodes::BAD_ARGUMENTS, "Interpolate expression expected to have single action node");
739

740
                const auto * expression_to_interpolate = expression_to_interpolate_expression_nodes[0];
741
                const auto & expression_to_interpolate_name = expression_to_interpolate->result_name;
742

743
                const auto * interpolate_expression = interpolate_expression_nodes[0];
744
                if (!interpolate_expression->result_type->equals(*expression_to_interpolate->result_type))
745
                {
746
                    interpolate_expression = &interpolate_actions_dag->addCast(*interpolate_expression,
747
                        expression_to_interpolate->result_type,
748
                        interpolate_expression->result_name);
749
                }
750

751
                const auto * alias_node = &interpolate_actions_dag->addAlias(*interpolate_expression, expression_to_interpolate_name);
752
                interpolate_actions_dag->getOutputs().push_back(alias_node);
753
            }
754

755
            interpolate_actions_dag->removeUnusedActions();
756
        }
757

758
        Aliases empty_aliases;
759
        interpolate_description = std::make_shared<InterpolateDescription>(std::move(interpolate_actions_dag), empty_aliases);
760
    }
761

762
    const auto & query_context = planner_context->getQueryContext();
763
    const Settings & settings = query_context->getSettingsRef();
764
    auto filling_step = std::make_unique<FillingStep>(
765
        query_plan.getCurrentDataStream(),
766
        sort_description,
767
        std::move(fill_description),
768
        interpolate_description,
769
        settings.use_with_fill_by_sorting_prefix);
770
    query_plan.addStep(std::move(filling_step));
771
}
772

773
void addLimitByStep(QueryPlan & query_plan,
774
    const LimitByAnalysisResult & limit_by_analysis_result,
775
    const QueryNode & query_node)
776
{
777
    /// Constness of LIMIT BY limit is validated during query analysis stage
778
    UInt64 limit_by_limit = query_node.getLimitByLimit()->as<ConstantNode &>().getValue().safeGet<UInt64>();
779
    UInt64 limit_by_offset = 0;
780

781
    if (query_node.hasLimitByOffset())
782
    {
783
        /// Constness of LIMIT BY offset is validated during query analysis stage
784
        limit_by_offset = query_node.getLimitByOffset()->as<ConstantNode &>().getValue().safeGet<UInt64>();
785
    }
786

787
    auto limit_by_step = std::make_unique<LimitByStep>(query_plan.getCurrentDataStream(),
788
        limit_by_limit,
789
        limit_by_offset,
790
        limit_by_analysis_result.limit_by_column_names);
791
    query_plan.addStep(std::move(limit_by_step));
792
}
793

794
void addPreliminaryLimitStep(QueryPlan & query_plan,
795
    const QueryAnalysisResult & query_analysis_result,
796
    const PlannerContextPtr & planner_context,
797
    bool do_not_skip_offset)
798
{
799
    UInt64 limit_offset = query_analysis_result.limit_offset;
800
    UInt64 limit_length = query_analysis_result.limit_length;
801

802
    if (do_not_skip_offset)
803
    {
804
        if (limit_length > std::numeric_limits<UInt64>::max() - limit_offset)
805
            return;
806

807
        limit_length += limit_offset;
808
        limit_offset = 0;
809
    }
810

811
    const auto & query_context = planner_context->getQueryContext();
812
    const Settings & settings = query_context->getSettingsRef();
813

814
    auto limit = std::make_unique<LimitStep>(query_plan.getCurrentDataStream(), limit_length, limit_offset, settings.exact_rows_before_limit);
815
    limit->setStepDescription(do_not_skip_offset ? "preliminary LIMIT (with OFFSET)" : "preliminary LIMIT (without OFFSET)");
816
    query_plan.addStep(std::move(limit));
817
}
818

819
bool addPreliminaryLimitOptimizationStepIfNeeded(QueryPlan & query_plan,
820
    const QueryAnalysisResult & query_analysis_result,
821
    const PlannerContextPtr planner_context,
822
    const PlannerQueryProcessingInfo & query_processing_info,
823
    const QueryTreeNodePtr & query_tree)
824
{
825
    const auto & query_node = query_tree->as<QueryNode &>();
826
    const auto & query_context = planner_context->getQueryContext();
827
    const auto & settings = query_context->getSettingsRef();
828
    const auto & sort_description = query_analysis_result.sort_description;
829

830
    bool has_withfill = false;
831

832
    for (const auto & desc : sort_description)
833
    {
834
        if (desc.with_fill)
835
        {
836
            has_withfill = true;
837
            break;
838
        }
839
    }
840

841
    bool apply_limit = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregation;
842
    bool apply_prelimit = apply_limit &&
843
        query_node.hasLimit() &&
844
        !query_node.isLimitWithTies() &&
845
        !query_node.isGroupByWithTotals() &&
846
        !query_analysis_result.query_has_with_totals_in_any_subquery_in_join_tree &&
847
        !query_analysis_result.query_has_array_join_in_join_tree &&
848
        !query_node.isDistinct() &&
849
        !query_node.hasLimitBy() &&
850
        !settings.extremes &&
851
        !has_withfill;
852
    bool apply_offset = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
853
    if (apply_prelimit)
854
    {
855
        addPreliminaryLimitStep(query_plan, query_analysis_result, planner_context, /* do_not_skip_offset= */!apply_offset);
856
        return true;
857
    }
858

859
    return false;
860
}
861

862
/** For distributed query processing, add preliminary sort or distinct or limit
863
  * for first stage of query processing on shard, if there is no GROUP BY, HAVING,
864
  * WINDOW functions.
865
  */
866
void addPreliminarySortOrDistinctOrLimitStepsIfNeeded(QueryPlan & query_plan,
867
    const PlannerExpressionsAnalysisResult & expressions_analysis_result,
868
    const QueryAnalysisResult & query_analysis_result,
869
    const PlannerContextPtr & planner_context,
870
    const PlannerQueryProcessingInfo & query_processing_info,
871
    const QueryTreeNodePtr & query_tree,
872
    std::vector<ActionsDAGPtr> & result_actions_to_execute)
873
{
874
    const auto & query_node = query_tree->as<QueryNode &>();
875

876
    if (query_processing_info.isSecondStage() ||
877
        expressions_analysis_result.hasAggregation() ||
878
        expressions_analysis_result.hasHaving() ||
879
        expressions_analysis_result.hasWindow())
880
        return;
881

882
    if (expressions_analysis_result.hasSort())
883
        addSortingStep(query_plan, query_analysis_result, planner_context);
884

885
    /** For DISTINCT step, pre_distinct = false, because if we have limit and distinct,
886
      * we need to merge streams to one and calculate overall distinct.
887
      * Otherwise we can take several equal values from different streams
888
      * according to limit and skip some distinct values.
889
      */
890
    if (query_node.hasLimit() && query_node.isDistinct())
891
    {
892
        addDistinctStep(query_plan,
893
            query_analysis_result,
894
            planner_context,
895
            expressions_analysis_result.getProjection().projection_column_names,
896
            query_node,
897
            false /*before_order*/,
898
            false /*pre_distinct*/);
899
    }
900

901
    if (expressions_analysis_result.hasLimitBy())
902
    {
903
        const auto & limit_by_analysis_result = expressions_analysis_result.getLimitBy();
904
        addExpressionStep(query_plan, limit_by_analysis_result.before_limit_by_actions, "Before LIMIT BY", result_actions_to_execute);
905
        addLimitByStep(query_plan, limit_by_analysis_result, query_node);
906
    }
907

908
    if (query_node.hasLimit())
909
        addPreliminaryLimitStep(query_plan, query_analysis_result, planner_context, true /*do_not_skip_offset*/);
910
}
911

912
void addWindowSteps(QueryPlan & query_plan,
913
    const PlannerContextPtr & planner_context,
914
    const WindowAnalysisResult & window_analysis_result)
915
{
916
    const auto & query_context = planner_context->getQueryContext();
917
    const auto & settings = query_context->getSettingsRef();
918

919
    auto window_descriptions = window_analysis_result.window_descriptions;
920
    sortWindowDescriptions(window_descriptions);
921

922
    size_t window_descriptions_size = window_descriptions.size();
923

924
    for (size_t i = 0; i < window_descriptions_size; ++i)
925
    {
926
        const auto & window_description = window_descriptions[i];
927

928
        /** We don't need to sort again if the input from previous window already
929
          * has suitable sorting. Also don't create sort steps when there are no
930
          * columns to sort by, because the sort nodes are confused by this. It
931
          * happens in case of `over ()`.
932
          * Even if full_sort_description of both windows match, in case of different
933
          * partitioning we need to add a SortingStep to reshuffle data in the streams.
934
          */
935

936
        bool need_sort = !window_description.full_sort_description.empty();
937
        if (need_sort && i != 0)
938
        {
939
            need_sort = !sortDescriptionIsPrefix(window_description.full_sort_description, window_descriptions[i - 1].full_sort_description)
940
                        || (settings.max_threads != 1 && window_description.partition_by.size() != window_descriptions[i - 1].partition_by.size());
941
        }
942
        if (need_sort)
943
        {
944
            SortingStep::Settings sort_settings(*query_context);
945

946
            auto sorting_step = std::make_unique<SortingStep>(
947
                query_plan.getCurrentDataStream(),
948
                window_description.full_sort_description,
949
                window_description.partition_by,
950
                0 /*limit*/,
951
                sort_settings,
952
                settings.optimize_sorting_by_input_stream_properties);
953
            sorting_step->setStepDescription("Sorting for window '" + window_description.window_name + "'");
954
            query_plan.addStep(std::move(sorting_step));
955
        }
956

957
        // Fan out streams only for the last window to preserve the ordering between windows,
958
        // and WindowTransform works on single stream anyway.
959
        const bool streams_fan_out = settings.query_plan_enable_multithreading_after_window_functions && ((i + 1) == window_descriptions_size);
960

961
        auto window_step
962
            = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window_description, window_description.window_functions, streams_fan_out);
963
        window_step->setStepDescription("Window step for window '" + window_description.window_name + "'");
964
        query_plan.addStep(std::move(window_step));
965
    }
966
}
967

968
void addLimitStep(QueryPlan & query_plan,
969
    const QueryAnalysisResult & query_analysis_result,
970
    const PlannerContextPtr & planner_context,
971
    const QueryNode & query_node)
972
{
973
    const auto & query_context = planner_context->getQueryContext();
974
    const auto & settings = query_context->getSettingsRef();
975
    bool always_read_till_end = settings.exact_rows_before_limit;
976
    bool limit_with_ties = query_node.isLimitWithTies();
977

978
    /** Special cases:
979
      *
980
      * 1. If there is WITH TOTALS and there is no ORDER BY, then read the data to the end,
981
      *  otherwise TOTALS is counted according to incomplete data.
982
      *
983
      * 2. If there is no WITH TOTALS and there is a subquery in FROM, and there is WITH TOTALS on one of the levels,
984
      *  then when using LIMIT, you should read the data to the end, rather than cancel the query earlier,
985
      *  because if you cancel the query, we will not get `totals` data from the remote server.
986
      */
987
    if (query_node.isGroupByWithTotals() && !query_node.hasOrderBy())
988
        always_read_till_end = true;
989

990
    if (!query_node.isGroupByWithTotals() && query_analysis_result.query_has_with_totals_in_any_subquery_in_join_tree)
991
        always_read_till_end = true;
992

993
    SortDescription limit_with_ties_sort_description;
994

995
    if (query_node.isLimitWithTies())
996
    {
997
        /// Validated during parser stage
998
        if (!query_node.hasOrderBy())
999
            throw Exception(ErrorCodes::LOGICAL_ERROR, "LIMIT WITH TIES without ORDER BY");
1000

1001
        limit_with_ties_sort_description = query_analysis_result.sort_description;
1002
    }
1003

1004
    UInt64 limit_length = query_analysis_result.limit_length;
1005
    UInt64 limit_offset = query_analysis_result.limit_offset;
1006

1007
    auto limit = std::make_unique<LimitStep>(
1008
        query_plan.getCurrentDataStream(),
1009
        limit_length,
1010
        limit_offset,
1011
        always_read_till_end,
1012
        limit_with_ties,
1013
        limit_with_ties_sort_description);
1014

1015
    if (limit_with_ties)
1016
        limit->setStepDescription("LIMIT WITH TIES");
1017

1018
    query_plan.addStep(std::move(limit));
1019
}
1020

1021
void addExtremesStepIfNeeded(QueryPlan & query_plan, const PlannerContextPtr & planner_context)
1022
{
1023
    const auto & query_context = planner_context->getQueryContext();
1024
    if (!query_context->getSettingsRef().extremes)
1025
        return;
1026

1027
    auto extremes_step = std::make_unique<ExtremesStep>(query_plan.getCurrentDataStream());
1028
    query_plan.addStep(std::move(extremes_step));
1029
}
1030

1031
void addOffsetStep(QueryPlan & query_plan, const QueryAnalysisResult & query_analysis_result)
1032
{
1033
    /// If there is not a LIMIT but an offset
1034
    if (!query_analysis_result.limit_length && query_analysis_result.limit_offset)
1035
    {
1036
        auto offsets_step = std::make_unique<OffsetStep>(query_plan.getCurrentDataStream(), query_analysis_result.limit_offset);
1037
        query_plan.addStep(std::move(offsets_step));
1038
    }
1039
}
1040

1041
void collectSetsFromActionsDAG(const ActionsDAGPtr & dag, std::unordered_set<const FutureSet *> & useful_sets)
1042
{
1043
    for (const auto & node : dag->getNodes())
1044
    {
1045
        if (node.column)
1046
        {
1047
            const IColumn * column = node.column.get();
1048
            if (const auto * column_const = typeid_cast<const ColumnConst *>(column))
1049
                column = &column_const->getDataColumn();
1050

1051
            if (const auto * column_set = typeid_cast<const ColumnSet *>(column))
1052
                useful_sets.insert(column_set->getData().get());
1053
        }
1054

1055
        if (node.type == ActionsDAG::ActionType::FUNCTION && node.function_base->getName() == "indexHint")
1056
        {
1057
            ActionsDAG::NodeRawConstPtrs children;
1058
            if (const auto * adaptor = typeid_cast<const FunctionToFunctionBaseAdaptor *>(node.function_base.get()))
1059
            {
1060
                if (const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction().get()))
1061
                {
1062
                    collectSetsFromActionsDAG(index_hint->getActions(), useful_sets);
1063
                }
1064
            }
1065
        }
1066
    }
1067
}
1068

1069
void addBuildSubqueriesForSetsStepIfNeeded(
1070
    QueryPlan & query_plan,
1071
    const SelectQueryOptions & select_query_options,
1072
    const PlannerContextPtr & planner_context,
1073
    const std::vector<ActionsDAGPtr> & result_actions_to_execute)
1074
{
1075
    auto subqueries = planner_context->getPreparedSets().getSubqueries();
1076
    std::unordered_set<const FutureSet *> useful_sets;
1077

1078
    for (const auto & actions_to_execute : result_actions_to_execute)
1079
        collectSetsFromActionsDAG(actions_to_execute, useful_sets);
1080

1081
    auto predicate = [&useful_sets](const auto & set) { return !useful_sets.contains(set.get()); };
1082
    auto it = std::remove_if(subqueries.begin(), subqueries.end(), std::move(predicate));
1083
    subqueries.erase(it, subqueries.end());
1084

1085
    for (auto & subquery : subqueries)
1086
    {
1087
        auto query_tree = subquery->detachQueryTree();
1088
        auto subquery_options = select_query_options.subquery();
1089
        /// I don't know if this is a good decision,
1090
        /// But for now it is done in the same way as in old analyzer.
1091
        /// This would not ignore limits for subqueries (affects mutations only).
1092
        /// See test_build_sets_from_multiple_threads-analyzer.
1093
        subquery_options.ignore_limits = false;
1094
        Planner subquery_planner(
1095
            query_tree,
1096
            subquery_options,
1097
            std::make_shared<GlobalPlannerContext>(nullptr, nullptr, FiltersForTableExpressionMap{}));
1098
        subquery_planner.buildQueryPlanIfNeeded();
1099

1100
        subquery->setQueryPlan(std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan()));
1101
    }
1102

1103
    if (!subqueries.empty())
1104
    {
1105
        auto step = std::make_unique<DelayedCreatingSetsStep>(
1106
            query_plan.getCurrentDataStream(),
1107
            std::move(subqueries),
1108
            planner_context->getQueryContext());
1109

1110
        query_plan.addStep(std::move(step));
1111
    }
1112
}
1113

1114
/// Support for `additional_result_filter` setting
1115
void addAdditionalFilterStepIfNeeded(QueryPlan & query_plan,
1116
    const QueryNode & query_node,
1117
    const SelectQueryOptions & select_query_options,
1118
    PlannerContextPtr & planner_context
1119
)
1120
{
1121
    if (select_query_options.subquery_depth != 0)
1122
        return;
1123

1124
    const auto & query_context = planner_context->getQueryContext();
1125
    const auto & settings = query_context->getSettingsRef();
1126

1127
    auto additional_result_filter_ast = parseAdditionalResultFilter(settings);
1128
    if (!additional_result_filter_ast)
1129
        return;
1130

1131
    ColumnsDescription fake_column_descriptions;
1132
    NameSet fake_name_set;
1133
    for (const auto & column : query_node.getProjectionColumns())
1134
    {
1135
        fake_column_descriptions.add(ColumnDescription(column.name, column.type));
1136
        fake_name_set.emplace(column.name);
1137
    }
1138

1139
    auto storage = std::make_shared<StorageDummy>(StorageID{"dummy", "dummy"}, fake_column_descriptions);
1140
    auto fake_table_expression = std::make_shared<TableNode>(std::move(storage), query_context);
1141

1142
    auto filter_info = buildFilterInfo(additional_result_filter_ast, fake_table_expression, planner_context, std::move(fake_name_set));
1143
    if (!filter_info.actions || !query_plan.isInitialized())
1144
        return;
1145

1146
    auto filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
1147
        filter_info.actions,
1148
        filter_info.column_name,
1149
        filter_info.do_remove_column);
1150
    filter_step->setStepDescription("additional result filter");
1151
    query_plan.addStep(std::move(filter_step));
1152
}
1153

1154
}
1155

1156
PlannerContextPtr buildPlannerContext(const QueryTreeNodePtr & query_tree_node,
1157
    const SelectQueryOptions & select_query_options,
1158
    GlobalPlannerContextPtr global_planner_context)
1159
{
1160
    auto * query_node = query_tree_node->as<QueryNode>();
1161
    auto * union_node = query_tree_node->as<UnionNode>();
1162

1163
    if (!query_node && !union_node)
1164
        throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
1165
            "Expected QUERY or UNION node. Actual {}",
1166
            query_tree_node->formatASTForErrorMessage());
1167

1168
    auto & mutable_context = query_node ? query_node->getMutableContext() : union_node->getMutableContext();
1169
    size_t max_subquery_depth = mutable_context->getSettingsRef().max_subquery_depth;
1170
    if (max_subquery_depth && select_query_options.subquery_depth > max_subquery_depth)
1171
        throw Exception(ErrorCodes::TOO_DEEP_SUBQUERIES,
1172
            "Too deep subqueries. Maximum: {}",
1173
            max_subquery_depth);
1174

1175
    const auto & client_info = mutable_context->getClientInfo();
1176
    auto min_major = static_cast<UInt64>(DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD);
1177
    auto min_minor = static_cast<UInt64>(DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD);
1178

1179
    bool need_to_disable_two_level_aggregation = client_info.query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
1180
        client_info.connection_client_version_major < min_major &&
1181
        client_info.connection_client_version_minor < min_minor;
1182

1183
    if (need_to_disable_two_level_aggregation)
1184
    {
1185
        /// Disable two-level aggregation due to version incompatibility
1186
        mutable_context->setSetting("group_by_two_level_threshold", Field(0));
1187
        mutable_context->setSetting("group_by_two_level_threshold_bytes", Field(0));
1188
    }
1189

1190
    if (select_query_options.is_subquery)
1191
        updateContextForSubqueryExecution(mutable_context);
1192

1193
    return std::make_shared<PlannerContext>(mutable_context, std::move(global_planner_context), select_query_options);
1194
}
1195

1196
Planner::Planner(const QueryTreeNodePtr & query_tree_,
1197
    SelectQueryOptions & select_query_options_)
1198
    : query_tree(query_tree_)
1199
    , select_query_options(select_query_options_)
1200
    , planner_context(buildPlannerContext(query_tree, select_query_options,
1201
        std::make_shared<GlobalPlannerContext>(
1202
            findQueryForParallelReplicas(query_tree, select_query_options),
1203
            findTableForParallelReplicas(query_tree, select_query_options),
1204
            collectFiltersForAnalysis(query_tree, select_query_options))))
1205
{
1206
}
1207

1208
Planner::Planner(const QueryTreeNodePtr & query_tree_,
1209
    SelectQueryOptions & select_query_options_,
1210
    GlobalPlannerContextPtr global_planner_context_)
1211
    : query_tree(query_tree_)
1212
    , select_query_options(select_query_options_)
1213
    , planner_context(buildPlannerContext(query_tree_, select_query_options, std::move(global_planner_context_)))
1214
{
1215
}
1216

1217
Planner::Planner(const QueryTreeNodePtr & query_tree_,
1218
    SelectQueryOptions & select_query_options_,
1219
    PlannerContextPtr planner_context_)
1220
    : query_tree(query_tree_)
1221
    , select_query_options(select_query_options_)
1222
    , planner_context(std::move(planner_context_))
1223
{
1224
}
1225

1226
void Planner::buildQueryPlanIfNeeded()
1227
{
1228
    if (query_plan.isInitialized())
1229
        return;
1230

1231
    LOG_TRACE(getLogger("Planner"), "Query {} to stage {}{}",
1232
        query_tree->formatConvertedASTForErrorMessage(),
1233
        QueryProcessingStage::toString(select_query_options.to_stage),
1234
        select_query_options.only_analyze ? " only analyze" : "");
1235

1236
    if (query_tree->getNodeType() == QueryTreeNodeType::UNION)
1237
        buildPlanForUnionNode();
1238
    else
1239
        buildPlanForQueryNode();
1240

1241
    extendQueryContextAndStoragesLifetime(query_plan, planner_context);
1242
}
1243

1244
void Planner::buildPlanForUnionNode()
1245
{
1246
    const auto & union_node = query_tree->as<UnionNode &>();
1247
    auto union_mode = union_node.getUnionMode();
1248
    if (union_mode == SelectUnionMode::UNION_DEFAULT || union_mode == SelectUnionMode::EXCEPT_DEFAULT
1249
        || union_mode == SelectUnionMode::INTERSECT_DEFAULT)
1250
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "UNION mode must be initialized");
1251

1252
    const auto & union_queries_nodes = union_node.getQueries().getNodes();
1253
    size_t queries_size = union_queries_nodes.size();
1254

1255
    std::vector<std::unique_ptr<QueryPlan>> query_plans;
1256
    query_plans.reserve(queries_size);
1257

1258
    Blocks query_plans_headers;
1259
    query_plans_headers.reserve(queries_size);
1260

1261
    for (const auto & query_node : union_queries_nodes)
1262
    {
1263
        Planner query_planner(query_node, select_query_options);
1264
        query_planner.buildQueryPlanIfNeeded();
1265
        for (const auto & row_policy : query_planner.getUsedRowPolicies())
1266
            used_row_policies.insert(row_policy);
1267
        const auto & mapping = query_planner.getQueryNodeToPlanStepMapping();
1268
        query_node_to_plan_step_mapping.insert(mapping.begin(), mapping.end());
1269
        auto query_node_plan = std::make_unique<QueryPlan>(std::move(query_planner).extractQueryPlan());
1270
        query_plans_headers.push_back(query_node_plan->getCurrentDataStream().header);
1271
        query_plans.push_back(std::move(query_node_plan));
1272
    }
1273

1274
    Block union_common_header = buildCommonHeaderForUnion(query_plans_headers, union_mode);
1275
    DataStreams query_plans_streams;
1276
    query_plans_streams.reserve(query_plans.size());
1277

1278
    for (auto & query_node_plan : query_plans)
1279
    {
1280
        if (blocksHaveEqualStructure(query_node_plan->getCurrentDataStream().header, union_common_header))
1281
        {
1282
            query_plans_streams.push_back(query_node_plan->getCurrentDataStream());
1283
            continue;
1284
        }
1285

1286
        auto actions_dag = ActionsDAG::makeConvertingActions(
1287
            query_node_plan->getCurrentDataStream().header.getColumnsWithTypeAndName(),
1288
            union_common_header.getColumnsWithTypeAndName(),
1289
            ActionsDAG::MatchColumnsMode::Position);
1290
        auto converting_step = std::make_unique<ExpressionStep>(query_node_plan->getCurrentDataStream(), std::move(actions_dag));
1291
        converting_step->setStepDescription("Conversion before UNION");
1292
        query_node_plan->addStep(std::move(converting_step));
1293

1294
        query_plans_streams.push_back(query_node_plan->getCurrentDataStream());
1295
    }
1296

1297
    const auto & query_context = planner_context->getQueryContext();
1298
    const auto & settings = query_context->getSettingsRef();
1299
    auto max_threads = settings.max_threads;
1300

1301
    bool is_distinct = union_mode == SelectUnionMode::UNION_DISTINCT || union_mode == SelectUnionMode::INTERSECT_DISTINCT
1302
        || union_mode == SelectUnionMode::EXCEPT_DISTINCT;
1303

1304
    if (union_mode == SelectUnionMode::UNION_ALL || union_mode == SelectUnionMode::UNION_DISTINCT)
1305
    {
1306
        auto union_step = std::make_unique<UnionStep>(std::move(query_plans_streams), max_threads);
1307
        query_plan.unitePlans(std::move(union_step), std::move(query_plans));
1308
    }
1309
    else if (union_mode == SelectUnionMode::INTERSECT_ALL || union_mode == SelectUnionMode::INTERSECT_DISTINCT
1310
        || union_mode == SelectUnionMode::EXCEPT_ALL || union_mode == SelectUnionMode::EXCEPT_DISTINCT)
1311
    {
1312
        IntersectOrExceptStep::Operator intersect_or_except_operator = IntersectOrExceptStep::Operator::UNKNOWN;
1313

1314
        if (union_mode == SelectUnionMode::INTERSECT_ALL)
1315
            intersect_or_except_operator = IntersectOrExceptStep::Operator::INTERSECT_ALL;
1316
        else if (union_mode == SelectUnionMode::INTERSECT_DISTINCT)
1317
            intersect_or_except_operator = IntersectOrExceptStep::Operator::INTERSECT_DISTINCT;
1318
        else if (union_mode == SelectUnionMode::EXCEPT_ALL)
1319
            intersect_or_except_operator = IntersectOrExceptStep::Operator::EXCEPT_ALL;
1320
        else if (union_mode == SelectUnionMode::EXCEPT_DISTINCT)
1321
            intersect_or_except_operator = IntersectOrExceptStep::Operator::EXCEPT_DISTINCT;
1322

1323
        auto union_step
1324
            = std::make_unique<IntersectOrExceptStep>(std::move(query_plans_streams), intersect_or_except_operator, max_threads);
1325
        query_plan.unitePlans(std::move(union_step), std::move(query_plans));
1326
    }
1327

1328
    if (is_distinct)
1329
    {
1330
        /// Add distinct transform
1331
        SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
1332

1333
        auto distinct_step = std::make_unique<DistinctStep>(
1334
            query_plan.getCurrentDataStream(),
1335
            limits,
1336
            0 /*limit hint*/,
1337
            query_plan.getCurrentDataStream().header.getNames(),
1338
            false /*pre distinct*/,
1339
            settings.optimize_distinct_in_order);
1340
        query_plan.addStep(std::move(distinct_step));
1341
    }
1342
}
1343

1344
void Planner::buildPlanForQueryNode()
1345
{
1346
    ProfileEvents::increment(ProfileEvents::SelectQueriesWithSubqueries);
1347
    ProfileEvents::increment(ProfileEvents::QueriesWithSubqueries);
1348

1349
    auto & query_node = query_tree->as<QueryNode &>();
1350
    const auto & query_context = planner_context->getQueryContext();
1351

1352
    if (query_node.hasWhere())
1353
    {
1354
        auto condition_constant = tryExtractConstantFromConditionNode(query_node.getWhere());
1355
        if (condition_constant.has_value() && *condition_constant)
1356
            query_node.getWhere() = {};
1357
    }
1358

1359
    SelectQueryInfo select_query_info = buildSelectQueryInfo();
1360

1361
    StorageLimitsList current_storage_limits = storage_limits;
1362
    select_query_info.local_storage_limits = buildStorageLimits(*query_context, select_query_options);
1363
    current_storage_limits.push_back(select_query_info.local_storage_limits);
1364
    select_query_info.storage_limits = std::make_shared<StorageLimitsList>(current_storage_limits);
1365
    select_query_info.has_order_by = query_node.hasOrderBy();
1366
    select_query_info.has_window = hasWindowFunctionNodes(query_tree);
1367
    select_query_info.has_aggregates = hasAggregateFunctionNodes(query_tree);
1368
    select_query_info.need_aggregate = query_node.hasGroupBy() || select_query_info.has_aggregates;
1369

1370
    if (!select_query_info.need_aggregate && query_node.hasHaving())
1371
    {
1372
        if (query_node.hasWhere())
1373
            query_node.getWhere() = mergeConditionNodes({query_node.getWhere(), query_node.getHaving()}, query_context);
1374
        else
1375
            query_node.getWhere() = query_node.getHaving();
1376

1377
        query_node.getHaving() = {};
1378
    }
1379

1380
    collectSets(query_tree, *planner_context);
1381

1382
    const auto & settings = query_context->getSettingsRef();
1383
    if (query_context->canUseTaskBasedParallelReplicas())
1384
    {
1385
        if (!settings.parallel_replicas_allow_in_with_subquery && planner_context->getPreparedSets().hasSubqueries())
1386
        {
1387
            if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
1388
                throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
1389

1390
            auto & mutable_context = planner_context->getMutableQueryContext();
1391
            mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
1392
            LOG_DEBUG(getLogger("Planner"), "Disabling parallel replicas to execute a query with IN with subquery");
1393
        }
1394
    }
1395

1396
    collectTableExpressionData(query_tree, planner_context);
1397
    checkStoragesSupportTransactions(planner_context);
1398

1399
    const auto & table_filters = planner_context->getGlobalPlannerContext()->filters_for_table_expressions;
1400
    if (!select_query_options.only_analyze && !table_filters.empty()) // && top_level)
1401
    {
1402
        for (auto & [table_node, table_expression_data] : planner_context->getTableExpressionNodeToData())
1403
        {
1404
            auto it = table_filters.find(table_node);
1405
            if (it != table_filters.end())
1406
            {
1407
                const auto & filters = it->second;
1408
                table_expression_data.setFilterActions(filters.filter_actions);
1409
                table_expression_data.setPrewhereInfo(filters.prewhere_info);
1410
            }
1411
        }
1412
    }
1413

1414
    if (query_context->canUseTaskBasedParallelReplicas())
1415
    {
1416
        const auto & table_expression_nodes = planner_context->getTableExpressionNodeToData();
1417
        for (const auto & it : table_expression_nodes)
1418
        {
1419
            auto * table_node = it.first->as<TableNode>();
1420
            if (!table_node)
1421
                continue;
1422

1423
            const auto & modifiers = table_node->getTableExpressionModifiers();
1424
            if (modifiers.has_value() && modifiers->hasFinal())
1425
            {
1426
                if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
1427
                    throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
1428
                else
1429
                {
1430
                    LOG_DEBUG(
1431
                        getLogger("Planner"),
1432
                        "FINAL modifier is not supported with parallel replicas. Query will be executed without using them.");
1433
                    auto & mutable_context = planner_context->getMutableQueryContext();
1434
                    mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
1435
                }
1436
            }
1437
        }
1438
    }
1439

1440
    if (!settings.parallel_replicas_custom_key.value.empty())
1441
    {
1442
        /// Check support for JOIN for parallel replicas with custom key
1443
        if (planner_context->getTableExpressionNodeToData().size() > 1)
1444
        {
1445
            if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
1446
                throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOINs are not supported with parallel replicas");
1447
            else
1448
            {
1449
                LOG_DEBUG(
1450
                    getLogger("Planner"),
1451
                    "JOINs are not supported with parallel replicas. Query will be executed without using them.");
1452

1453
                auto & mutable_context = planner_context->getMutableQueryContext();
1454
                mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
1455
                mutable_context->setSetting("parallel_replicas_custom_key", String{""});
1456
            }
1457
        }
1458
    }
1459

1460
    JoinTreeQueryPlan join_tree_query_plan;
1461
    if (planner_context->getMutableQueryContext()->canUseTaskBasedParallelReplicas()
1462
        && planner_context->getGlobalPlannerContext()->parallel_replicas_node == &query_node)
1463
    {
1464
        join_tree_query_plan = buildQueryPlanForParallelReplicas(query_node, planner_context, select_query_info.storage_limits);
1465
    }
1466
    else
1467
    {
1468
        auto top_level_identifiers = collectTopLevelColumnIdentifiers(query_tree, planner_context);
1469
        join_tree_query_plan = buildJoinTreeQueryPlan(query_tree,
1470
            select_query_info,
1471
            select_query_options,
1472
            top_level_identifiers,
1473
            planner_context);
1474
    }
1475

1476
    auto from_stage = join_tree_query_plan.from_stage;
1477
    query_plan = std::move(join_tree_query_plan.query_plan);
1478
    used_row_policies = std::move(join_tree_query_plan.used_row_policies);
1479
    auto & mapping = join_tree_query_plan.query_node_to_plan_step_mapping;
1480
    query_node_to_plan_step_mapping.insert(mapping.begin(), mapping.end());
1481

1482
    LOG_TRACE(getLogger("Planner"), "Query {} from stage {} to stage {}{}",
1483
        query_tree->formatConvertedASTForErrorMessage(),
1484
        QueryProcessingStage::toString(from_stage),
1485
        QueryProcessingStage::toString(select_query_options.to_stage),
1486
        select_query_options.only_analyze ? " only analyze" : "");
1487

1488
    if (select_query_options.to_stage == QueryProcessingStage::FetchColumns)
1489
        return;
1490

1491
    PlannerQueryProcessingInfo query_processing_info(from_stage, select_query_options.to_stage);
1492
    QueryAnalysisResult query_analysis_result(query_tree, query_processing_info, planner_context);
1493
    auto expression_analysis_result = buildExpressionAnalysisResult(query_tree,
1494
        query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
1495
        planner_context,
1496
        query_processing_info);
1497

1498
    std::vector<ActionsDAGPtr> result_actions_to_execute = std::move(join_tree_query_plan.actions_dags);
1499

1500
    for (auto & [_, table_expression_data] : planner_context->getTableExpressionNodeToData())
1501
    {
1502
        if (table_expression_data.getPrewhereFilterActions())
1503
            result_actions_to_execute.push_back(table_expression_data.getPrewhereFilterActions());
1504

1505
        if (table_expression_data.getRowLevelFilterActions())
1506
            result_actions_to_execute.push_back(table_expression_data.getRowLevelFilterActions());
1507
    }
1508

1509
    if (query_processing_info.isIntermediateStage())
1510
    {
1511
        addPreliminarySortOrDistinctOrLimitStepsIfNeeded(query_plan,
1512
            expression_analysis_result,
1513
            query_analysis_result,
1514
            planner_context,
1515
            query_processing_info,
1516
            query_tree,
1517
            result_actions_to_execute);
1518

1519
        if (expression_analysis_result.hasAggregation())
1520
        {
1521
            const auto & aggregation_analysis_result = expression_analysis_result.getAggregation();
1522
            addMergingAggregatedStep(query_plan, aggregation_analysis_result, query_analysis_result, planner_context);
1523
        }
1524
    }
1525

1526
    if (query_processing_info.isFirstStage())
1527
    {
1528
        if (expression_analysis_result.hasWhere())
1529
            addFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE", result_actions_to_execute);
1530

1531
        if (expression_analysis_result.hasAggregation())
1532
        {
1533
            const auto & aggregation_analysis_result = expression_analysis_result.getAggregation();
1534
            if (aggregation_analysis_result.before_aggregation_actions)
1535
                addExpressionStep(query_plan, aggregation_analysis_result.before_aggregation_actions, "Before GROUP BY", result_actions_to_execute);
1536

1537
            addAggregationStep(query_plan, aggregation_analysis_result, query_analysis_result, planner_context, select_query_info);
1538
        }
1539

1540
        /** If we have aggregation, we can't execute any later-stage
1541
          * expressions on shards, neither "Before WINDOW" nor "Before ORDER BY"
1542
          */
1543
        if (!expression_analysis_result.hasAggregation())
1544
        {
1545
            if (expression_analysis_result.hasWindow())
1546
            {
1547
                /** Window functions must be executed on initiator (second_stage).
1548
                  * ORDER BY and DISTINCT might depend on them, so if we have
1549
                  * window functions, we can't execute ORDER BY and DISTINCT
1550
                  * now, on shard (first_stage).
1551
                  */
1552
                const auto & window_analysis_result = expression_analysis_result.getWindow();
1553
                if (window_analysis_result.before_window_actions)
1554
                    addExpressionStep(query_plan, window_analysis_result.before_window_actions, "Before WINDOW", result_actions_to_execute);
1555
            }
1556
            else
1557
            {
1558
                /** There are no window functions, so we can execute the
1559
                  * Projection expressions, preliminary DISTINCT and before ORDER BY expressions
1560
                  * now, on shards (first_stage).
1561
                  */
1562
                const auto & projection_analysis_result = expression_analysis_result.getProjection();
1563
                addExpressionStep(query_plan, projection_analysis_result.projection_actions, "Projection", result_actions_to_execute);
1564

1565
                if (query_node.isDistinct())
1566
                {
1567
                    addDistinctStep(query_plan,
1568
                        query_analysis_result,
1569
                        planner_context,
1570
                        expression_analysis_result.getProjection().projection_column_names,
1571
                        query_node,
1572
                        true /*before_order*/,
1573
                        true /*pre_distinct*/);
1574
                }
1575

1576
                if (expression_analysis_result.hasSort())
1577
                {
1578
                    const auto & sort_analysis_result = expression_analysis_result.getSort();
1579
                    addExpressionStep(query_plan, sort_analysis_result.before_order_by_actions, "Before ORDER BY", result_actions_to_execute);
1580
                }
1581
            }
1582
        }
1583

1584
        addPreliminarySortOrDistinctOrLimitStepsIfNeeded(query_plan,
1585
            expression_analysis_result,
1586
            query_analysis_result,
1587
            planner_context,
1588
            query_processing_info,
1589
            query_tree,
1590
            result_actions_to_execute);
1591
    }
1592

1593
    if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState())
1594
    {
1595
        if (query_processing_info.isFromAggregationState())
1596
        {
1597
            /// Aggregation was performed on remote shards
1598
        }
1599
        else if (expression_analysis_result.hasAggregation())
1600
        {
1601
            const auto & aggregation_analysis_result = expression_analysis_result.getAggregation();
1602

1603
            if (!query_processing_info.isFirstStage())
1604
            {
1605
                addMergingAggregatedStep(query_plan, aggregation_analysis_result, query_analysis_result, planner_context);
1606
            }
1607

1608
            bool having_executed = false;
1609

1610
            if (query_node.isGroupByWithTotals())
1611
            {
1612
                addTotalsHavingStep(query_plan, expression_analysis_result, query_analysis_result, planner_context, query_node, result_actions_to_execute);
1613
                having_executed = true;
1614
            }
1615

1616
            addCubeOrRollupStepIfNeeded(query_plan, aggregation_analysis_result, query_analysis_result, planner_context, select_query_info, query_node);
1617

1618
            if (!having_executed && expression_analysis_result.hasHaving())
1619
                addFilterStep(query_plan, expression_analysis_result.getHaving(), "HAVING", result_actions_to_execute);
1620
        }
1621

1622
        if (query_processing_info.isFromAggregationState())
1623
        {
1624
            if (expression_analysis_result.hasWindow())
1625
                throw Exception(ErrorCodes::NOT_IMPLEMENTED,
1626
                    "Window functions does not support processing from WithMergeableStateAfterAggregation");
1627
        }
1628
        else if (expression_analysis_result.hasWindow() || expression_analysis_result.hasAggregation())
1629
        {
1630
            if (expression_analysis_result.hasWindow())
1631
            {
1632
                const auto & window_analysis_result = expression_analysis_result.getWindow();
1633
                if (expression_analysis_result.hasAggregation())
1634
                    addExpressionStep(query_plan, window_analysis_result.before_window_actions, "Before window functions", result_actions_to_execute);
1635

1636
                addWindowSteps(query_plan, planner_context, window_analysis_result);
1637
            }
1638

1639
            const auto & projection_analysis_result = expression_analysis_result.getProjection();
1640
            addExpressionStep(query_plan, projection_analysis_result.projection_actions, "Projection", result_actions_to_execute);
1641

1642
            if (query_node.isDistinct())
1643
            {
1644
                addDistinctStep(query_plan,
1645
                    query_analysis_result,
1646
                    planner_context,
1647
                    expression_analysis_result.getProjection().projection_column_names,
1648
                    query_node,
1649
                    true /*before_order*/,
1650
                    true /*pre_distinct*/);
1651
            }
1652

1653
            if (expression_analysis_result.hasSort())
1654
            {
1655
                const auto & sort_analysis_result = expression_analysis_result.getSort();
1656
                addExpressionStep(query_plan, sort_analysis_result.before_order_by_actions, "Before ORDER BY", result_actions_to_execute);
1657
            }
1658
        }
1659
        else
1660
        {
1661
            /// There are no aggregation or windows, all expressions before ORDER BY executed on shards
1662
        }
1663

1664
        if (expression_analysis_result.hasSort())
1665
        {
1666
            /** If there is an ORDER BY for distributed query processing,
1667
              * but there is no aggregation, then on the remote servers ORDER BY was made
1668
              * and we merge the sorted streams from remote servers.
1669
              *
1670
              * Also in case of remote servers was process the query up to WithMergeableStateAfterAggregationAndLimit
1671
              * (distributed_group_by_no_merge=2 or optimize_distributed_group_by_sharding_key=1 takes place),
1672
              * then merge the sorted streams is enough, since remote servers already did full ORDER BY.
1673
              */
1674
            if (query_processing_info.isFromAggregationState())
1675
                addMergeSortingStep(query_plan, query_analysis_result, planner_context, "after aggregation stage for ORDER BY");
1676
            else if (!query_processing_info.isFirstStage() &&
1677
                !expression_analysis_result.hasAggregation() &&
1678
                !expression_analysis_result.hasWindow() &&
1679
                !(query_node.isGroupByWithTotals() && !query_analysis_result.aggregate_final))
1680
                addMergeSortingStep(query_plan, query_analysis_result, planner_context, "for ORDER BY, without aggregation");
1681
            else
1682
                addSortingStep(query_plan, query_analysis_result, planner_context);
1683
        }
1684

1685
        /** Optimization if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
1686
          * limiting the number of rows in each up to `offset + limit`.
1687
          */
1688
        bool applied_prelimit = addPreliminaryLimitOptimizationStepIfNeeded(query_plan,
1689
            query_analysis_result,
1690
            planner_context,
1691
            query_processing_info,
1692
            query_tree);
1693

1694
        //// If there was more than one stream, then DISTINCT needs to be performed once again after merging all streams.
1695
        if (!query_processing_info.isFromAggregationState() && query_node.isDistinct())
1696
        {
1697
            addDistinctStep(query_plan,
1698
                query_analysis_result,
1699
                planner_context,
1700
                expression_analysis_result.getProjection().projection_column_names,
1701
                query_node,
1702
                false /*before_order*/,
1703
                false /*pre_distinct*/);
1704
        }
1705

1706
        if (!query_processing_info.isFromAggregationState() && expression_analysis_result.hasLimitBy())
1707
        {
1708
            const auto & limit_by_analysis_result = expression_analysis_result.getLimitBy();
1709
            addExpressionStep(query_plan, limit_by_analysis_result.before_limit_by_actions, "Before LIMIT BY", result_actions_to_execute);
1710
            addLimitByStep(query_plan, limit_by_analysis_result, query_node);
1711
        }
1712

1713
        if (query_node.hasOrderBy())
1714
            addWithFillStepIfNeeded(query_plan, query_analysis_result, planner_context, query_node);
1715

1716
        bool apply_offset = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
1717

1718
        if (query_node.hasLimit() && query_node.isLimitWithTies() && apply_offset)
1719
            addLimitStep(query_plan, query_analysis_result, planner_context, query_node);
1720

1721
        addExtremesStepIfNeeded(query_plan, planner_context);
1722

1723
        bool limit_applied = applied_prelimit || (query_node.isLimitWithTies() && apply_offset);
1724
        bool apply_limit = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregation;
1725

1726
        /** Limit is no longer needed if there is prelimit.
1727
          *
1728
          * That LIMIT cannot be applied if OFFSET should not be applied, since LIMIT will apply OFFSET too.
1729
          * This is the case for various optimizations for distributed queries,
1730
          * and when LIMIT cannot be applied it will be applied on the initiator anyway.
1731
          */
1732
        if (query_node.hasLimit() && apply_limit && !limit_applied && apply_offset)
1733
            addLimitStep(query_plan, query_analysis_result, planner_context, query_node);
1734
        else if (!limit_applied && apply_offset && query_node.hasOffset())
1735
            addOffsetStep(query_plan, query_analysis_result);
1736

1737
        /// Project names is not done on shards, because initiator will not find columns in blocks
1738
        if (!query_processing_info.isToAggregationState())
1739
        {
1740
            const auto & projection_analysis_result = expression_analysis_result.getProjection();
1741
            addExpressionStep(query_plan, projection_analysis_result.project_names_actions, "Project names", result_actions_to_execute);
1742
        }
1743

1744
        // For additional_result_filter setting
1745
        addAdditionalFilterStepIfNeeded(query_plan, query_node, select_query_options, planner_context);
1746
    }
1747

1748
    if (!select_query_options.only_analyze)
1749
        addBuildSubqueriesForSetsStepIfNeeded(query_plan, select_query_options, planner_context, result_actions_to_execute);
1750

1751
    query_node_to_plan_step_mapping[&query_node] = query_plan.getRootNode();
1752
}
1753

1754
SelectQueryInfo Planner::buildSelectQueryInfo() const
1755
{
1756
    return ::DB::buildSelectQueryInfo(query_tree, planner_context);
1757
}
1758

1759
void Planner::addStorageLimits(const StorageLimitsList & limits)
1760
{
1761
    for (const auto & limit : limits)
1762
        storage_limits.push_back(limit);
1763
}
1764

1765
}
1766

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.