ClickHouse
1765 строк · 75.6 Кб
1#include <Planner/Planner.h>
2
3#include <Columns/ColumnConst.h>
4#include <Columns/ColumnSet.h>
5#include <Core/ProtocolDefines.h>
6#include <Common/ProfileEvents.h>
7#include <Common/logger_useful.h>
8
9#include <DataTypes/DataTypeString.h>
10
11#include <Functions/FunctionFactory.h>
12#include <Functions/CastOverloadResolver.h>
13#include <Functions/indexHint.h>
14
15#include <QueryPipeline/Pipe.h>
16#include <Processors/Sources/SourceFromSingleChunk.h>
17#include <Processors/QueryPlan/QueryPlan.h>
18#include <Processors/QueryPlan/ExpressionStep.h>
19#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
20#include <Processors/QueryPlan/FilterStep.h>
21#include <Processors/QueryPlan/UnionStep.h>
22#include <Processors/QueryPlan/DistinctStep.h>
23#include <Processors/QueryPlan/IntersectOrExceptStep.h>
24#include <Processors/QueryPlan/CreatingSetsStep.h>
25#include <Processors/QueryPlan/AggregatingStep.h>
26#include <Processors/QueryPlan/MergingAggregatedStep.h>
27#include <Processors/QueryPlan/SortingStep.h>
28#include <Processors/QueryPlan/FillingStep.h>
29#include <Processors/QueryPlan/LimitStep.h>
30#include <Processors/QueryPlan/OffsetStep.h>
31#include <Processors/QueryPlan/ExtremesStep.h>
32#include <Processors/QueryPlan/TotalsHavingStep.h>
33#include <Processors/QueryPlan/RollupStep.h>
34#include <Processors/QueryPlan/CubeStep.h>
35#include <Processors/QueryPlan/LimitByStep.h>
36#include <Processors/QueryPlan/WindowStep.h>
37#include <Processors/QueryPlan/ReadNothingStep.h>
38#include <QueryPipeline/QueryPipelineBuilder.h>
39
40#include <Interpreters/Context.h>
41#include <Interpreters/StorageID.h>
42
43#include <Storages/ColumnsDescription.h>
44#include <Storages/IStorage.h>
45#include <Storages/MergeTree/MergeTreeData.h>
46#include <Storages/SelectQueryInfo.h>
47#include <Storages/StorageDistributed.h>
48#include <Storages/StorageDummy.h>
49#include <Storages/StorageMerge.h>
50
51#include <Analyzer/Utils.h>
52#include <Analyzer/ColumnNode.h>
53#include <Analyzer/ConstantNode.h>
54#include <Analyzer/FunctionNode.h>
55#include <Analyzer/SortNode.h>
56#include <Analyzer/InterpolateNode.h>
57#include <Analyzer/WindowNode.h>
58#include <Analyzer/TableNode.h>
59#include <Analyzer/TableFunctionNode.h>
60#include <Analyzer/QueryNode.h>
61#include <Analyzer/UnionNode.h>
62#include <Analyzer/JoinNode.h>
63#include <Analyzer/ArrayJoinNode.h>
64#include <Analyzer/QueryTreeBuilder.h>
65#include <Analyzer/QueryTreePassManager.h>
66#include <Analyzer/AggregationUtils.h>
67#include <Analyzer/WindowFunctionsUtils.h>
68
69#include <Planner/findQueryForParallelReplicas.h>
70#include <Planner/Utils.h>
71#include <Planner/PlannerContext.h>
72#include <Planner/PlannerActionsVisitor.h>
73#include <Planner/PlannerJoins.h>
74#include <Planner/PlannerAggregation.h>
75#include <Planner/PlannerSorting.h>
76#include <Planner/PlannerWindowFunctions.h>
77#include <Planner/CollectSets.h>
78#include <Planner/CollectTableExpressionData.h>
79#include <Planner/PlannerJoinTree.h>
80#include <Planner/PlannerExpressionAnalysis.h>
81#include <Planner/CollectColumnIdentifiers.h>
82#include <Planner/PlannerQueryProcessingInfo.h>
83
84namespace ProfileEvents
85{
86extern const Event SelectQueriesWithSubqueries;
87extern const Event QueriesWithSubqueries;
88}
89
90namespace DB
91{
92
93namespace ErrorCodes
94{
95extern const int UNSUPPORTED_METHOD;
96extern const int LOGICAL_ERROR;
97extern const int BAD_ARGUMENTS;
98extern const int TOO_DEEP_SUBQUERIES;
99extern const int NOT_IMPLEMENTED;
100extern const int SUPPORT_IS_DISABLED;
101}
102
103namespace
104{
105
106/** Check that table and table function table expressions from planner context support transactions.
107*
108* There is precondition that table expression data for table expression nodes is collected in planner context.
109*/
110void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
111{
112const auto & query_context = planner_context->getQueryContext();
113if (!query_context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
114return;
115
116if (!query_context->getCurrentTransaction())
117return;
118
119for (const auto & [table_expression, _] : planner_context->getTableExpressionNodeToData())
120{
121StoragePtr storage;
122if (auto * table_node = table_expression->as<TableNode>())
123storage = table_node->getStorage();
124else if (auto * table_function_node = table_expression->as<TableFunctionNode>())
125storage = table_function_node->getStorage();
126
127if (storage && !storage->supportsTransactions())
128throw Exception(ErrorCodes::NOT_IMPLEMENTED,
129"Storage {} (table {}) does not support transactions",
130storage->getName(),
131storage->getStorageID().getNameForLogs());
132}
133}
134
135/** Storages can rely that filters that for storage will be available for analysis before
136* getQueryProcessingStage method will be called.
137*
138* StorageDistributed skip unused shards optimization relies on this.
139* Parallel replicas estimation relies on this too.
140* StorageMerge common header calculation relies on this too.
141*
142* To collect filters that will be applied to specific table in case we have JOINs requires
143* to run query plan optimization pipeline.
144*
145* Algorithm:
146* 1. Replace all table expressions in query tree with dummy tables.
147* 2. Build query plan.
148* 3. Optimize query plan.
149* 4. Extract filters from ReadFromDummy query plan steps from query plan leaf nodes.
150*/
151
152FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const QueryTreeNodes & table_nodes, const ContextPtr & query_context)
153{
154bool collect_filters = false;
155const auto & settings = query_context->getSettingsRef();
156
157bool parallel_replicas_estimation_enabled
158= query_context->canUseParallelReplicasOnInitiator() && settings.parallel_replicas_min_number_of_rows_per_replica > 0;
159
160for (const auto & table_expression : table_nodes)
161{
162auto * table_node = table_expression->as<TableNode>();
163auto * table_function_node = table_expression->as<TableFunctionNode>();
164if (!table_node && !table_function_node)
165continue;
166
167const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
168if (typeid_cast<const StorageDistributed *>(storage.get()) || typeid_cast<const StorageMerge *>(storage.get())
169|| (parallel_replicas_estimation_enabled && std::dynamic_pointer_cast<MergeTreeData>(storage)))
170{
171collect_filters = true;
172break;
173}
174}
175
176if (!collect_filters)
177return {};
178
179ResultReplacementMap replacement_map;
180
181auto updated_query_tree = replaceTableExpressionsWithDummyTables(query_tree, table_nodes, query_context, &replacement_map);
182
183std::unordered_map<const IStorage *, QueryTreeNodePtr> dummy_storage_to_table;
184
185for (auto & [from_table_expression, dummy_table_expression] : replacement_map)
186{
187auto * dummy_storage = dummy_table_expression->as<TableNode &>().getStorage().get();
188dummy_storage_to_table.emplace(dummy_storage, from_table_expression);
189}
190
191SelectQueryOptions select_query_options;
192Planner planner(updated_query_tree, select_query_options);
193planner.buildQueryPlanIfNeeded();
194
195auto & result_query_plan = planner.getQueryPlan();
196
197auto optimization_settings = QueryPlanOptimizationSettings::fromContext(query_context);
198result_query_plan.optimize(optimization_settings);
199
200FiltersForTableExpressionMap res;
201
202std::vector<QueryPlan::Node *> nodes_to_process;
203nodes_to_process.push_back(result_query_plan.getRootNode());
204
205while (!nodes_to_process.empty())
206{
207const auto * node_to_process = nodes_to_process.back();
208nodes_to_process.pop_back();
209nodes_to_process.insert(nodes_to_process.end(), node_to_process->children.begin(), node_to_process->children.end());
210
211auto * read_from_dummy = typeid_cast<ReadFromDummy *>(node_to_process->step.get());
212if (!read_from_dummy)
213continue;
214
215auto filter_actions = read_from_dummy->getFilterActionsDAG();
216const auto & table_node = dummy_storage_to_table.at(&read_from_dummy->getStorage());
217res[table_node] = FiltersForTableExpression{std::move(filter_actions), read_from_dummy->getPrewhereInfo()};
218}
219
220return res;
221}
222
223FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
224{
225if (select_query_options.only_analyze)
226return {};
227
228auto * query_node = query_tree_node->as<QueryNode>();
229auto * union_node = query_tree_node->as<UnionNode>();
230
231if (!query_node && !union_node)
232throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
233"Expected QUERY or UNION node. Actual {}",
234query_tree_node->formatASTForErrorMessage());
235
236auto context = query_node ? query_node->getContext() : union_node->getContext();
237
238auto table_expressions_nodes
239= extractTableExpressions(query_tree_node, false /* add_array_join */, true /* recursive */);
240
241return collectFiltersForAnalysis(query_tree_node, table_expressions_nodes, context);
242}
243
244/// Extend lifetime of query context, storages, and table locks
245void extendQueryContextAndStoragesLifetime(QueryPlan & query_plan, const PlannerContextPtr & planner_context)
246{
247query_plan.addInterpreterContext(planner_context->getQueryContext());
248
249for (const auto & [table_expression, _] : planner_context->getTableExpressionNodeToData())
250{
251if (auto * table_node = table_expression->as<TableNode>())
252{
253query_plan.addStorageHolder(table_node->getStorage());
254query_plan.addTableLock(table_node->getStorageLock());
255}
256else if (auto * table_function_node = table_expression->as<TableFunctionNode>())
257{
258query_plan.addStorageHolder(table_function_node->getStorage());
259}
260}
261}
262
263class QueryAnalysisResult
264{
265public:
266QueryAnalysisResult(const QueryTreeNodePtr & query_tree,
267const PlannerQueryProcessingInfo & query_processing_info,
268const PlannerContextPtr & planner_context)
269{
270const auto & query_node = query_tree->as<QueryNode &>();
271const auto & query_context = planner_context->getQueryContext();
272const auto & settings = query_context->getSettingsRef();
273
274aggregate_overflow_row = query_node.isGroupByWithTotals() && settings.max_rows_to_group_by
275&& settings.group_by_overflow_mode == OverflowMode::ANY && settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
276aggregate_final = query_processing_info.getToStage() > QueryProcessingStage::WithMergeableState
277&& !query_node.isGroupByWithTotals() && !query_node.isGroupByWithRollup() && !query_node.isGroupByWithCube();
278aggregation_with_rollup_or_cube_or_grouping_sets = query_node.isGroupByWithRollup() || query_node.isGroupByWithCube() ||
279query_node.isGroupByWithGroupingSets();
280aggregation_should_produce_results_in_order_of_bucket_number
281= query_processing_info.getToStage() == QueryProcessingStage::WithMergeableState
282&& (settings.distributed_aggregation_memory_efficient || settings.enable_memory_bound_merging_of_aggregation_results);
283
284query_has_array_join_in_join_tree = queryHasArrayJoinInJoinTree(query_tree);
285query_has_with_totals_in_any_subquery_in_join_tree = queryHasWithTotalsInAnySubqueryInJoinTree(query_tree);
286
287sort_description = extractSortDescription(query_node.getOrderByNode(), *planner_context);
288
289if (query_node.hasLimit())
290{
291/// Constness of limit is validated during query analysis stage
292limit_length = query_node.getLimit()->as<ConstantNode &>().getValue().safeGet<UInt64>();
293
294if (query_node.hasOffset() && limit_length)
295{
296/// Constness of offset is validated during query analysis stage
297limit_offset = query_node.getOffset()->as<ConstantNode &>().getValue().safeGet<UInt64>();
298}
299}
300else if (query_node.hasOffset())
301{
302/// Constness of offset is validated during query analysis stage
303limit_offset = query_node.getOffset()->as<ConstantNode &>().getValue().safeGet<UInt64>();
304}
305
306/// Partial sort can be done if there is LIMIT, but no DISTINCT, LIMIT WITH TIES, LIMIT BY, ARRAY JOIN
307if (limit_length != 0 &&
308!query_node.isDistinct() &&
309!query_node.isLimitWithTies() &&
310!query_node.hasLimitBy() &&
311!query_has_array_join_in_join_tree &&
312limit_length <= std::numeric_limits<UInt64>::max() - limit_offset)
313{
314partial_sorting_limit = limit_length + limit_offset;
315}
316}
317
318bool aggregate_overflow_row = false;
319bool aggregate_final = false;
320bool aggregation_with_rollup_or_cube_or_grouping_sets = false;
321bool aggregation_should_produce_results_in_order_of_bucket_number = false;
322bool query_has_array_join_in_join_tree = false;
323bool query_has_with_totals_in_any_subquery_in_join_tree = false;
324SortDescription sort_description;
325UInt64 limit_length = 0;
326UInt64 limit_offset = 0;
327UInt64 partial_sorting_limit = 0;
328};
329
330void addExpressionStep(QueryPlan & query_plan,
331const ActionsDAGPtr & expression_actions,
332const std::string & step_description,
333std::vector<ActionsDAGPtr> & result_actions_to_execute)
334{
335result_actions_to_execute.push_back(expression_actions);
336auto expression_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression_actions);
337expression_step->setStepDescription(step_description);
338query_plan.addStep(std::move(expression_step));
339}
340
341void addFilterStep(QueryPlan & query_plan,
342const FilterAnalysisResult & filter_analysis_result,
343const std::string & step_description,
344std::vector<ActionsDAGPtr> & result_actions_to_execute)
345{
346result_actions_to_execute.push_back(filter_analysis_result.filter_actions);
347auto where_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
348filter_analysis_result.filter_actions,
349filter_analysis_result.filter_column_name,
350filter_analysis_result.remove_filter_column);
351where_step->setStepDescription(step_description);
352query_plan.addStep(std::move(where_step));
353}
354
355Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
356const AggregationAnalysisResult & aggregation_analysis_result,
357const QueryAnalysisResult & query_analysis_result,
358const SelectQueryInfo & select_query_info,
359bool aggregate_descriptions_remove_arguments = false)
360{
361const auto & query_context = planner_context->getQueryContext();
362const Settings & settings = query_context->getSettingsRef();
363
364const auto stats_collecting_params = Aggregator::Params::StatsCollectingParams(
365select_query_info.query,
366settings.collect_hash_table_stats_during_aggregation,
367settings.max_entries_for_hash_table_stats,
368settings.max_size_to_preallocate_for_aggregation);
369
370auto aggregate_descriptions = aggregation_analysis_result.aggregate_descriptions;
371if (aggregate_descriptions_remove_arguments)
372{
373for (auto & aggregate_description : aggregate_descriptions)
374aggregate_description.argument_names.clear();
375}
376
377Aggregator::Params aggregator_params = Aggregator::Params(
378aggregation_analysis_result.aggregation_keys,
379aggregate_descriptions,
380query_analysis_result.aggregate_overflow_row,
381settings.max_rows_to_group_by,
382settings.group_by_overflow_mode,
383settings.group_by_two_level_threshold,
384settings.group_by_two_level_threshold_bytes,
385settings.max_bytes_before_external_group_by,
386settings.empty_result_for_aggregation_by_empty_set
387|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && aggregation_analysis_result.aggregation_keys.empty()
388&& aggregation_analysis_result.group_by_with_constant_keys),
389query_context->getTempDataOnDisk(),
390settings.max_threads,
391settings.min_free_disk_space_for_temporary_data,
392settings.compile_aggregate_expressions,
393settings.min_count_to_compile_aggregate_expression,
394settings.max_block_size,
395settings.enable_software_prefetch_in_aggregation,
396/* only_merge */ false,
397settings.optimize_group_by_constant_keys,
398settings.min_hit_rate_to_use_consecutive_keys_optimization,
399stats_collecting_params);
400
401return aggregator_params;
402}
403
404SortDescription getSortDescriptionFromNames(const Names & names)
405{
406SortDescription order_descr;
407order_descr.reserve(names.size());
408
409for (const auto & name : names)
410order_descr.emplace_back(name, 1, 1);
411
412return order_descr;
413}
414
415void addAggregationStep(QueryPlan & query_plan,
416const AggregationAnalysisResult & aggregation_analysis_result,
417const QueryAnalysisResult & query_analysis_result,
418const PlannerContextPtr & planner_context,
419const SelectQueryInfo & select_query_info)
420{
421const Settings & settings = planner_context->getQueryContext()->getSettingsRef();
422auto aggregator_params = getAggregatorParams(planner_context, aggregation_analysis_result, query_analysis_result, select_query_info);
423
424SortDescription sort_description_for_merging;
425SortDescription group_by_sort_description;
426
427if (settings.force_aggregation_in_order)
428{
429group_by_sort_description = getSortDescriptionFromNames(aggregation_analysis_result.aggregation_keys);
430sort_description_for_merging = group_by_sort_description;
431}
432
433auto merge_threads = settings.max_threads;
434auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
435? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
436: static_cast<size_t>(settings.max_threads);
437
438bool storage_has_evenly_distributed_read = false;
439const auto & table_expression_node_to_data = planner_context->getTableExpressionNodeToData();
440
441if (table_expression_node_to_data.size() == 1)
442{
443auto it = table_expression_node_to_data.begin();
444const auto & table_expression_node = it->first;
445if (const auto * table_node = table_expression_node->as<TableNode>())
446storage_has_evenly_distributed_read = table_node->getStorage()->hasEvenlyDistributedRead();
447else if (const auto * table_function_node = table_expression_node->as<TableFunctionNode>())
448storage_has_evenly_distributed_read = table_function_node->getStorageOrThrow()->hasEvenlyDistributedRead();
449}
450
451auto aggregating_step = std::make_unique<AggregatingStep>(
452query_plan.getCurrentDataStream(),
453aggregator_params,
454aggregation_analysis_result.grouping_sets_parameters_list,
455query_analysis_result.aggregate_final,
456settings.max_block_size,
457settings.aggregation_in_order_max_block_bytes,
458merge_threads,
459temporary_data_merge_threads,
460storage_has_evenly_distributed_read,
461settings.group_by_use_nulls,
462std::move(sort_description_for_merging),
463std::move(group_by_sort_description),
464query_analysis_result.aggregation_should_produce_results_in_order_of_bucket_number,
465settings.enable_memory_bound_merging_of_aggregation_results,
466settings.force_aggregation_in_order);
467query_plan.addStep(std::move(aggregating_step));
468}
469
470void addMergingAggregatedStep(QueryPlan & query_plan,
471const AggregationAnalysisResult & aggregation_analysis_result,
472const QueryAnalysisResult & query_analysis_result,
473const PlannerContextPtr & planner_context)
474{
475const auto & query_context = planner_context->getQueryContext();
476const auto & settings = query_context->getSettingsRef();
477
478/** There are two modes of distributed aggregation.
479*
480* 1. In different threads read from the remote servers blocks.
481* Save all the blocks in the RAM. Merge blocks.
482* If the aggregation is two-level - parallelize to the number of buckets.
483*
484* 2. In one thread, read blocks from different servers in order.
485* RAM stores only one block from each server.
486* If the aggregation is a two-level aggregation, we consistently merge the blocks of each next level.
487*
488* The second option consumes less memory (up to 256 times less)
489* in the case of two-level aggregation, which is used for large results after GROUP BY,
490* but it can work more slowly.
491*/
492
493auto keys = aggregation_analysis_result.aggregation_keys;
494if (!aggregation_analysis_result.grouping_sets_parameters_list.empty())
495keys.insert(keys.begin(), "__grouping_set");
496
497Aggregator::Params params(keys,
498aggregation_analysis_result.aggregate_descriptions,
499query_analysis_result.aggregate_overflow_row,
500settings.max_threads,
501settings.max_block_size,
502settings.min_hit_rate_to_use_consecutive_keys_optimization);
503
504bool is_remote_storage = false;
505bool parallel_replicas_from_merge_tree = false;
506
507const auto & table_expression_node_to_data = planner_context->getTableExpressionNodeToData();
508if (table_expression_node_to_data.size() == 1)
509{
510auto it = table_expression_node_to_data.begin();
511is_remote_storage = it->second.isRemote();
512parallel_replicas_from_merge_tree = it->second.isMergeTree() && query_context->canUseParallelReplicasOnInitiator();
513}
514
515SortDescription group_by_sort_description;
516
517auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
518query_plan.getCurrentDataStream(),
519params,
520query_analysis_result.aggregate_final,
521/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
522settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets,
523settings.max_threads,
524settings.aggregation_memory_efficient_merge_threads,
525query_analysis_result.aggregation_should_produce_results_in_order_of_bucket_number,
526settings.max_block_size,
527settings.aggregation_in_order_max_block_bytes,
528std::move(group_by_sort_description),
529settings.enable_memory_bound_merging_of_aggregation_results);
530query_plan.addStep(std::move(merging_aggregated));
531}
532
533void addTotalsHavingStep(QueryPlan & query_plan,
534const PlannerExpressionsAnalysisResult & expression_analysis_result,
535const QueryAnalysisResult & query_analysis_result,
536const PlannerContextPtr & planner_context,
537const QueryNode & query_node,
538std::vector<ActionsDAGPtr> & result_actions_to_execute)
539{
540const auto & query_context = planner_context->getQueryContext();
541const auto & settings = query_context->getSettingsRef();
542
543const auto & aggregation_analysis_result = expression_analysis_result.getAggregation();
544const auto & having_analysis_result = expression_analysis_result.getHaving();
545bool need_finalize = !query_node.isGroupByWithRollup() && !query_node.isGroupByWithCube();
546
547if (having_analysis_result.filter_actions)
548result_actions_to_execute.push_back(having_analysis_result.filter_actions);
549
550auto totals_having_step = std::make_unique<TotalsHavingStep>(
551query_plan.getCurrentDataStream(),
552aggregation_analysis_result.aggregate_descriptions,
553query_analysis_result.aggregate_overflow_row,
554having_analysis_result.filter_actions,
555having_analysis_result.filter_column_name,
556having_analysis_result.remove_filter_column,
557settings.totals_mode,
558settings.totals_auto_threshold,
559need_finalize);
560query_plan.addStep(std::move(totals_having_step));
561}
562
563void addCubeOrRollupStepIfNeeded(QueryPlan & query_plan,
564const AggregationAnalysisResult & aggregation_analysis_result,
565const QueryAnalysisResult & query_analysis_result,
566const PlannerContextPtr & planner_context,
567const SelectQueryInfo & select_query_info,
568const QueryNode & query_node)
569{
570if (!query_node.isGroupByWithCube() && !query_node.isGroupByWithRollup())
571return;
572
573const auto & query_context = planner_context->getQueryContext();
574const auto & settings = query_context->getSettingsRef();
575
576auto aggregator_params = getAggregatorParams(planner_context,
577aggregation_analysis_result,
578query_analysis_result,
579select_query_info,
580true /*aggregate_descriptions_remove_arguments*/);
581
582if (query_node.isGroupByWithRollup())
583{
584auto rollup_step = std::make_unique<RollupStep>(
585query_plan.getCurrentDataStream(), std::move(aggregator_params), true /*final*/, settings.group_by_use_nulls);
586query_plan.addStep(std::move(rollup_step));
587}
588else if (query_node.isGroupByWithCube())
589{
590auto cube_step = std::make_unique<CubeStep>(
591query_plan.getCurrentDataStream(), std::move(aggregator_params), true /*final*/, settings.group_by_use_nulls);
592query_plan.addStep(std::move(cube_step));
593}
594}
595
596void addDistinctStep(QueryPlan & query_plan,
597const QueryAnalysisResult & query_analysis_result,
598const PlannerContextPtr & planner_context,
599const Names & column_names,
600const QueryNode & query_node,
601bool before_order,
602bool pre_distinct)
603{
604const Settings & settings = planner_context->getQueryContext()->getSettingsRef();
605
606UInt64 limit_offset = query_analysis_result.limit_offset;
607UInt64 limit_length = query_analysis_result.limit_length;
608
609UInt64 limit_hint_for_distinct = 0;
610
611/** If after this stage of DISTINCT
612* 1. ORDER BY is not executed.
613* 2. There is no LIMIT BY.
614* Then you can get no more than limit_length + limit_offset of different rows.
615*/
616if ((!query_node.hasOrderBy() || !before_order) && !query_node.hasLimitBy())
617{
618if (limit_length <= std::numeric_limits<UInt64>::max() - limit_offset)
619limit_hint_for_distinct = limit_length + limit_offset;
620}
621
622SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
623
624auto distinct_step = std::make_unique<DistinctStep>(
625query_plan.getCurrentDataStream(),
626limits,
627limit_hint_for_distinct,
628column_names,
629pre_distinct,
630settings.optimize_distinct_in_order);
631
632distinct_step->setStepDescription(pre_distinct ? "Preliminary DISTINCT" : "DISTINCT");
633query_plan.addStep(std::move(distinct_step));
634}
635
636void addSortingStep(QueryPlan & query_plan,
637const QueryAnalysisResult & query_analysis_result,
638const PlannerContextPtr & planner_context)
639{
640const auto & sort_description = query_analysis_result.sort_description;
641const auto & query_context = planner_context->getQueryContext();
642const Settings & settings = query_context->getSettingsRef();
643SortingStep::Settings sort_settings(*query_context);
644
645auto sorting_step = std::make_unique<SortingStep>(
646query_plan.getCurrentDataStream(),
647sort_description,
648query_analysis_result.partial_sorting_limit,
649sort_settings,
650settings.optimize_sorting_by_input_stream_properties);
651sorting_step->setStepDescription("Sorting for ORDER BY");
652query_plan.addStep(std::move(sorting_step));
653}
654
655void addMergeSortingStep(QueryPlan & query_plan,
656const QueryAnalysisResult & query_analysis_result,
657const PlannerContextPtr & planner_context,
658const std::string & description)
659{
660const auto & query_context = planner_context->getQueryContext();
661const auto & settings = query_context->getSettingsRef();
662
663const auto & sort_description = query_analysis_result.sort_description;
664const auto max_block_size = settings.max_block_size;
665
666auto merging_sorted = std::make_unique<SortingStep>(query_plan.getCurrentDataStream(),
667sort_description,
668max_block_size,
669query_analysis_result.partial_sorting_limit,
670settings.exact_rows_before_limit);
671merging_sorted->setStepDescription("Merge sorted streams " + description);
672query_plan.addStep(std::move(merging_sorted));
673}
674
675void addWithFillStepIfNeeded(QueryPlan & query_plan,
676const QueryAnalysisResult & query_analysis_result,
677const PlannerContextPtr & planner_context,
678const QueryNode & query_node)
679{
680const auto & sort_description = query_analysis_result.sort_description;
681
682NameSet column_names_with_fill;
683SortDescription fill_description;
684
685for (const auto & description : sort_description)
686{
687if (description.with_fill)
688{
689fill_description.push_back(description);
690column_names_with_fill.insert(description.column_name);
691}
692}
693
694if (fill_description.empty())
695return;
696
697InterpolateDescriptionPtr interpolate_description;
698
699if (query_node.hasInterpolate())
700{
701auto interpolate_actions_dag = std::make_shared<ActionsDAG>();
702auto query_plan_columns = query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName();
703for (auto & query_plan_column : query_plan_columns)
704{
705/// INTERPOLATE actions dag input columns must be non constant
706query_plan_column.column = nullptr;
707interpolate_actions_dag->addInput(query_plan_column);
708}
709
710auto & interpolate_list_node = query_node.getInterpolate()->as<ListNode &>();
711auto & interpolate_list_nodes = interpolate_list_node.getNodes();
712
713if (interpolate_list_nodes.empty())
714{
715for (const auto * input_node : interpolate_actions_dag->getInputs())
716{
717if (column_names_with_fill.contains(input_node->result_name))
718continue;
719
720interpolate_actions_dag->getOutputs().push_back(input_node);
721}
722}
723else
724{
725for (auto & interpolate_node : interpolate_list_nodes)
726{
727auto & interpolate_node_typed = interpolate_node->as<InterpolateNode &>();
728
729PlannerActionsVisitor planner_actions_visitor(planner_context);
730auto expression_to_interpolate_expression_nodes = planner_actions_visitor.visit(interpolate_actions_dag,
731interpolate_node_typed.getExpression());
732if (expression_to_interpolate_expression_nodes.size() != 1)
733throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression to interpolate expected to have single action node");
734
735auto interpolate_expression_nodes = planner_actions_visitor.visit(interpolate_actions_dag,
736interpolate_node_typed.getInterpolateExpression());
737if (interpolate_expression_nodes.size() != 1)
738throw Exception(ErrorCodes::BAD_ARGUMENTS, "Interpolate expression expected to have single action node");
739
740const auto * expression_to_interpolate = expression_to_interpolate_expression_nodes[0];
741const auto & expression_to_interpolate_name = expression_to_interpolate->result_name;
742
743const auto * interpolate_expression = interpolate_expression_nodes[0];
744if (!interpolate_expression->result_type->equals(*expression_to_interpolate->result_type))
745{
746interpolate_expression = &interpolate_actions_dag->addCast(*interpolate_expression,
747expression_to_interpolate->result_type,
748interpolate_expression->result_name);
749}
750
751const auto * alias_node = &interpolate_actions_dag->addAlias(*interpolate_expression, expression_to_interpolate_name);
752interpolate_actions_dag->getOutputs().push_back(alias_node);
753}
754
755interpolate_actions_dag->removeUnusedActions();
756}
757
758Aliases empty_aliases;
759interpolate_description = std::make_shared<InterpolateDescription>(std::move(interpolate_actions_dag), empty_aliases);
760}
761
762const auto & query_context = planner_context->getQueryContext();
763const Settings & settings = query_context->getSettingsRef();
764auto filling_step = std::make_unique<FillingStep>(
765query_plan.getCurrentDataStream(),
766sort_description,
767std::move(fill_description),
768interpolate_description,
769settings.use_with_fill_by_sorting_prefix);
770query_plan.addStep(std::move(filling_step));
771}
772
773void addLimitByStep(QueryPlan & query_plan,
774const LimitByAnalysisResult & limit_by_analysis_result,
775const QueryNode & query_node)
776{
777/// Constness of LIMIT BY limit is validated during query analysis stage
778UInt64 limit_by_limit = query_node.getLimitByLimit()->as<ConstantNode &>().getValue().safeGet<UInt64>();
779UInt64 limit_by_offset = 0;
780
781if (query_node.hasLimitByOffset())
782{
783/// Constness of LIMIT BY offset is validated during query analysis stage
784limit_by_offset = query_node.getLimitByOffset()->as<ConstantNode &>().getValue().safeGet<UInt64>();
785}
786
787auto limit_by_step = std::make_unique<LimitByStep>(query_plan.getCurrentDataStream(),
788limit_by_limit,
789limit_by_offset,
790limit_by_analysis_result.limit_by_column_names);
791query_plan.addStep(std::move(limit_by_step));
792}
793
794void addPreliminaryLimitStep(QueryPlan & query_plan,
795const QueryAnalysisResult & query_analysis_result,
796const PlannerContextPtr & planner_context,
797bool do_not_skip_offset)
798{
799UInt64 limit_offset = query_analysis_result.limit_offset;
800UInt64 limit_length = query_analysis_result.limit_length;
801
802if (do_not_skip_offset)
803{
804if (limit_length > std::numeric_limits<UInt64>::max() - limit_offset)
805return;
806
807limit_length += limit_offset;
808limit_offset = 0;
809}
810
811const auto & query_context = planner_context->getQueryContext();
812const Settings & settings = query_context->getSettingsRef();
813
814auto limit = std::make_unique<LimitStep>(query_plan.getCurrentDataStream(), limit_length, limit_offset, settings.exact_rows_before_limit);
815limit->setStepDescription(do_not_skip_offset ? "preliminary LIMIT (with OFFSET)" : "preliminary LIMIT (without OFFSET)");
816query_plan.addStep(std::move(limit));
817}
818
819bool addPreliminaryLimitOptimizationStepIfNeeded(QueryPlan & query_plan,
820const QueryAnalysisResult & query_analysis_result,
821const PlannerContextPtr planner_context,
822const PlannerQueryProcessingInfo & query_processing_info,
823const QueryTreeNodePtr & query_tree)
824{
825const auto & query_node = query_tree->as<QueryNode &>();
826const auto & query_context = planner_context->getQueryContext();
827const auto & settings = query_context->getSettingsRef();
828const auto & sort_description = query_analysis_result.sort_description;
829
830bool has_withfill = false;
831
832for (const auto & desc : sort_description)
833{
834if (desc.with_fill)
835{
836has_withfill = true;
837break;
838}
839}
840
841bool apply_limit = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregation;
842bool apply_prelimit = apply_limit &&
843query_node.hasLimit() &&
844!query_node.isLimitWithTies() &&
845!query_node.isGroupByWithTotals() &&
846!query_analysis_result.query_has_with_totals_in_any_subquery_in_join_tree &&
847!query_analysis_result.query_has_array_join_in_join_tree &&
848!query_node.isDistinct() &&
849!query_node.hasLimitBy() &&
850!settings.extremes &&
851!has_withfill;
852bool apply_offset = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
853if (apply_prelimit)
854{
855addPreliminaryLimitStep(query_plan, query_analysis_result, planner_context, /* do_not_skip_offset= */!apply_offset);
856return true;
857}
858
859return false;
860}
861
862/** For distributed query processing, add preliminary sort or distinct or limit
863* for first stage of query processing on shard, if there is no GROUP BY, HAVING,
864* WINDOW functions.
865*/
866void addPreliminarySortOrDistinctOrLimitStepsIfNeeded(QueryPlan & query_plan,
867const PlannerExpressionsAnalysisResult & expressions_analysis_result,
868const QueryAnalysisResult & query_analysis_result,
869const PlannerContextPtr & planner_context,
870const PlannerQueryProcessingInfo & query_processing_info,
871const QueryTreeNodePtr & query_tree,
872std::vector<ActionsDAGPtr> & result_actions_to_execute)
873{
874const auto & query_node = query_tree->as<QueryNode &>();
875
876if (query_processing_info.isSecondStage() ||
877expressions_analysis_result.hasAggregation() ||
878expressions_analysis_result.hasHaving() ||
879expressions_analysis_result.hasWindow())
880return;
881
882if (expressions_analysis_result.hasSort())
883addSortingStep(query_plan, query_analysis_result, planner_context);
884
885/** For DISTINCT step, pre_distinct = false, because if we have limit and distinct,
886* we need to merge streams to one and calculate overall distinct.
887* Otherwise we can take several equal values from different streams
888* according to limit and skip some distinct values.
889*/
890if (query_node.hasLimit() && query_node.isDistinct())
891{
892addDistinctStep(query_plan,
893query_analysis_result,
894planner_context,
895expressions_analysis_result.getProjection().projection_column_names,
896query_node,
897false /*before_order*/,
898false /*pre_distinct*/);
899}
900
901if (expressions_analysis_result.hasLimitBy())
902{
903const auto & limit_by_analysis_result = expressions_analysis_result.getLimitBy();
904addExpressionStep(query_plan, limit_by_analysis_result.before_limit_by_actions, "Before LIMIT BY", result_actions_to_execute);
905addLimitByStep(query_plan, limit_by_analysis_result, query_node);
906}
907
908if (query_node.hasLimit())
909addPreliminaryLimitStep(query_plan, query_analysis_result, planner_context, true /*do_not_skip_offset*/);
910}
911
912void addWindowSteps(QueryPlan & query_plan,
913const PlannerContextPtr & planner_context,
914const WindowAnalysisResult & window_analysis_result)
915{
916const auto & query_context = planner_context->getQueryContext();
917const auto & settings = query_context->getSettingsRef();
918
919auto window_descriptions = window_analysis_result.window_descriptions;
920sortWindowDescriptions(window_descriptions);
921
922size_t window_descriptions_size = window_descriptions.size();
923
924for (size_t i = 0; i < window_descriptions_size; ++i)
925{
926const auto & window_description = window_descriptions[i];
927
928/** We don't need to sort again if the input from previous window already
929* has suitable sorting. Also don't create sort steps when there are no
930* columns to sort by, because the sort nodes are confused by this. It
931* happens in case of `over ()`.
932* Even if full_sort_description of both windows match, in case of different
933* partitioning we need to add a SortingStep to reshuffle data in the streams.
934*/
935
936bool need_sort = !window_description.full_sort_description.empty();
937if (need_sort && i != 0)
938{
939need_sort = !sortDescriptionIsPrefix(window_description.full_sort_description, window_descriptions[i - 1].full_sort_description)
940|| (settings.max_threads != 1 && window_description.partition_by.size() != window_descriptions[i - 1].partition_by.size());
941}
942if (need_sort)
943{
944SortingStep::Settings sort_settings(*query_context);
945
946auto sorting_step = std::make_unique<SortingStep>(
947query_plan.getCurrentDataStream(),
948window_description.full_sort_description,
949window_description.partition_by,
9500 /*limit*/,
951sort_settings,
952settings.optimize_sorting_by_input_stream_properties);
953sorting_step->setStepDescription("Sorting for window '" + window_description.window_name + "'");
954query_plan.addStep(std::move(sorting_step));
955}
956
957// Fan out streams only for the last window to preserve the ordering between windows,
958// and WindowTransform works on single stream anyway.
959const bool streams_fan_out = settings.query_plan_enable_multithreading_after_window_functions && ((i + 1) == window_descriptions_size);
960
961auto window_step
962= std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window_description, window_description.window_functions, streams_fan_out);
963window_step->setStepDescription("Window step for window '" + window_description.window_name + "'");
964query_plan.addStep(std::move(window_step));
965}
966}
967
968void addLimitStep(QueryPlan & query_plan,
969const QueryAnalysisResult & query_analysis_result,
970const PlannerContextPtr & planner_context,
971const QueryNode & query_node)
972{
973const auto & query_context = planner_context->getQueryContext();
974const auto & settings = query_context->getSettingsRef();
975bool always_read_till_end = settings.exact_rows_before_limit;
976bool limit_with_ties = query_node.isLimitWithTies();
977
978/** Special cases:
979*
980* 1. If there is WITH TOTALS and there is no ORDER BY, then read the data to the end,
981* otherwise TOTALS is counted according to incomplete data.
982*
983* 2. If there is no WITH TOTALS and there is a subquery in FROM, and there is WITH TOTALS on one of the levels,
984* then when using LIMIT, you should read the data to the end, rather than cancel the query earlier,
985* because if you cancel the query, we will not get `totals` data from the remote server.
986*/
987if (query_node.isGroupByWithTotals() && !query_node.hasOrderBy())
988always_read_till_end = true;
989
990if (!query_node.isGroupByWithTotals() && query_analysis_result.query_has_with_totals_in_any_subquery_in_join_tree)
991always_read_till_end = true;
992
993SortDescription limit_with_ties_sort_description;
994
995if (query_node.isLimitWithTies())
996{
997/// Validated during parser stage
998if (!query_node.hasOrderBy())
999throw Exception(ErrorCodes::LOGICAL_ERROR, "LIMIT WITH TIES without ORDER BY");
1000
1001limit_with_ties_sort_description = query_analysis_result.sort_description;
1002}
1003
1004UInt64 limit_length = query_analysis_result.limit_length;
1005UInt64 limit_offset = query_analysis_result.limit_offset;
1006
1007auto limit = std::make_unique<LimitStep>(
1008query_plan.getCurrentDataStream(),
1009limit_length,
1010limit_offset,
1011always_read_till_end,
1012limit_with_ties,
1013limit_with_ties_sort_description);
1014
1015if (limit_with_ties)
1016limit->setStepDescription("LIMIT WITH TIES");
1017
1018query_plan.addStep(std::move(limit));
1019}
1020
1021void addExtremesStepIfNeeded(QueryPlan & query_plan, const PlannerContextPtr & planner_context)
1022{
1023const auto & query_context = planner_context->getQueryContext();
1024if (!query_context->getSettingsRef().extremes)
1025return;
1026
1027auto extremes_step = std::make_unique<ExtremesStep>(query_plan.getCurrentDataStream());
1028query_plan.addStep(std::move(extremes_step));
1029}
1030
1031void addOffsetStep(QueryPlan & query_plan, const QueryAnalysisResult & query_analysis_result)
1032{
1033/// If there is not a LIMIT but an offset
1034if (!query_analysis_result.limit_length && query_analysis_result.limit_offset)
1035{
1036auto offsets_step = std::make_unique<OffsetStep>(query_plan.getCurrentDataStream(), query_analysis_result.limit_offset);
1037query_plan.addStep(std::move(offsets_step));
1038}
1039}
1040
1041void collectSetsFromActionsDAG(const ActionsDAGPtr & dag, std::unordered_set<const FutureSet *> & useful_sets)
1042{
1043for (const auto & node : dag->getNodes())
1044{
1045if (node.column)
1046{
1047const IColumn * column = node.column.get();
1048if (const auto * column_const = typeid_cast<const ColumnConst *>(column))
1049column = &column_const->getDataColumn();
1050
1051if (const auto * column_set = typeid_cast<const ColumnSet *>(column))
1052useful_sets.insert(column_set->getData().get());
1053}
1054
1055if (node.type == ActionsDAG::ActionType::FUNCTION && node.function_base->getName() == "indexHint")
1056{
1057ActionsDAG::NodeRawConstPtrs children;
1058if (const auto * adaptor = typeid_cast<const FunctionToFunctionBaseAdaptor *>(node.function_base.get()))
1059{
1060if (const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction().get()))
1061{
1062collectSetsFromActionsDAG(index_hint->getActions(), useful_sets);
1063}
1064}
1065}
1066}
1067}
1068
1069void addBuildSubqueriesForSetsStepIfNeeded(
1070QueryPlan & query_plan,
1071const SelectQueryOptions & select_query_options,
1072const PlannerContextPtr & planner_context,
1073const std::vector<ActionsDAGPtr> & result_actions_to_execute)
1074{
1075auto subqueries = planner_context->getPreparedSets().getSubqueries();
1076std::unordered_set<const FutureSet *> useful_sets;
1077
1078for (const auto & actions_to_execute : result_actions_to_execute)
1079collectSetsFromActionsDAG(actions_to_execute, useful_sets);
1080
1081auto predicate = [&useful_sets](const auto & set) { return !useful_sets.contains(set.get()); };
1082auto it = std::remove_if(subqueries.begin(), subqueries.end(), std::move(predicate));
1083subqueries.erase(it, subqueries.end());
1084
1085for (auto & subquery : subqueries)
1086{
1087auto query_tree = subquery->detachQueryTree();
1088auto subquery_options = select_query_options.subquery();
1089/// I don't know if this is a good decision,
1090/// But for now it is done in the same way as in old analyzer.
1091/// This would not ignore limits for subqueries (affects mutations only).
1092/// See test_build_sets_from_multiple_threads-analyzer.
1093subquery_options.ignore_limits = false;
1094Planner subquery_planner(
1095query_tree,
1096subquery_options,
1097std::make_shared<GlobalPlannerContext>(nullptr, nullptr, FiltersForTableExpressionMap{}));
1098subquery_planner.buildQueryPlanIfNeeded();
1099
1100subquery->setQueryPlan(std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan()));
1101}
1102
1103if (!subqueries.empty())
1104{
1105auto step = std::make_unique<DelayedCreatingSetsStep>(
1106query_plan.getCurrentDataStream(),
1107std::move(subqueries),
1108planner_context->getQueryContext());
1109
1110query_plan.addStep(std::move(step));
1111}
1112}
1113
1114/// Support for `additional_result_filter` setting
1115void addAdditionalFilterStepIfNeeded(QueryPlan & query_plan,
1116const QueryNode & query_node,
1117const SelectQueryOptions & select_query_options,
1118PlannerContextPtr & planner_context
1119)
1120{
1121if (select_query_options.subquery_depth != 0)
1122return;
1123
1124const auto & query_context = planner_context->getQueryContext();
1125const auto & settings = query_context->getSettingsRef();
1126
1127auto additional_result_filter_ast = parseAdditionalResultFilter(settings);
1128if (!additional_result_filter_ast)
1129return;
1130
1131ColumnsDescription fake_column_descriptions;
1132NameSet fake_name_set;
1133for (const auto & column : query_node.getProjectionColumns())
1134{
1135fake_column_descriptions.add(ColumnDescription(column.name, column.type));
1136fake_name_set.emplace(column.name);
1137}
1138
1139auto storage = std::make_shared<StorageDummy>(StorageID{"dummy", "dummy"}, fake_column_descriptions);
1140auto fake_table_expression = std::make_shared<TableNode>(std::move(storage), query_context);
1141
1142auto filter_info = buildFilterInfo(additional_result_filter_ast, fake_table_expression, planner_context, std::move(fake_name_set));
1143if (!filter_info.actions || !query_plan.isInitialized())
1144return;
1145
1146auto filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
1147filter_info.actions,
1148filter_info.column_name,
1149filter_info.do_remove_column);
1150filter_step->setStepDescription("additional result filter");
1151query_plan.addStep(std::move(filter_step));
1152}
1153
1154}
1155
1156PlannerContextPtr buildPlannerContext(const QueryTreeNodePtr & query_tree_node,
1157const SelectQueryOptions & select_query_options,
1158GlobalPlannerContextPtr global_planner_context)
1159{
1160auto * query_node = query_tree_node->as<QueryNode>();
1161auto * union_node = query_tree_node->as<UnionNode>();
1162
1163if (!query_node && !union_node)
1164throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
1165"Expected QUERY or UNION node. Actual {}",
1166query_tree_node->formatASTForErrorMessage());
1167
1168auto & mutable_context = query_node ? query_node->getMutableContext() : union_node->getMutableContext();
1169size_t max_subquery_depth = mutable_context->getSettingsRef().max_subquery_depth;
1170if (max_subquery_depth && select_query_options.subquery_depth > max_subquery_depth)
1171throw Exception(ErrorCodes::TOO_DEEP_SUBQUERIES,
1172"Too deep subqueries. Maximum: {}",
1173max_subquery_depth);
1174
1175const auto & client_info = mutable_context->getClientInfo();
1176auto min_major = static_cast<UInt64>(DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD);
1177auto min_minor = static_cast<UInt64>(DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD);
1178
1179bool need_to_disable_two_level_aggregation = client_info.query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
1180client_info.connection_client_version_major < min_major &&
1181client_info.connection_client_version_minor < min_minor;
1182
1183if (need_to_disable_two_level_aggregation)
1184{
1185/// Disable two-level aggregation due to version incompatibility
1186mutable_context->setSetting("group_by_two_level_threshold", Field(0));
1187mutable_context->setSetting("group_by_two_level_threshold_bytes", Field(0));
1188}
1189
1190if (select_query_options.is_subquery)
1191updateContextForSubqueryExecution(mutable_context);
1192
1193return std::make_shared<PlannerContext>(mutable_context, std::move(global_planner_context), select_query_options);
1194}
1195
1196Planner::Planner(const QueryTreeNodePtr & query_tree_,
1197SelectQueryOptions & select_query_options_)
1198: query_tree(query_tree_)
1199, select_query_options(select_query_options_)
1200, planner_context(buildPlannerContext(query_tree, select_query_options,
1201std::make_shared<GlobalPlannerContext>(
1202findQueryForParallelReplicas(query_tree, select_query_options),
1203findTableForParallelReplicas(query_tree, select_query_options),
1204collectFiltersForAnalysis(query_tree, select_query_options))))
1205{
1206}
1207
1208Planner::Planner(const QueryTreeNodePtr & query_tree_,
1209SelectQueryOptions & select_query_options_,
1210GlobalPlannerContextPtr global_planner_context_)
1211: query_tree(query_tree_)
1212, select_query_options(select_query_options_)
1213, planner_context(buildPlannerContext(query_tree_, select_query_options, std::move(global_planner_context_)))
1214{
1215}
1216
1217Planner::Planner(const QueryTreeNodePtr & query_tree_,
1218SelectQueryOptions & select_query_options_,
1219PlannerContextPtr planner_context_)
1220: query_tree(query_tree_)
1221, select_query_options(select_query_options_)
1222, planner_context(std::move(planner_context_))
1223{
1224}
1225
1226void Planner::buildQueryPlanIfNeeded()
1227{
1228if (query_plan.isInitialized())
1229return;
1230
1231LOG_TRACE(getLogger("Planner"), "Query {} to stage {}{}",
1232query_tree->formatConvertedASTForErrorMessage(),
1233QueryProcessingStage::toString(select_query_options.to_stage),
1234select_query_options.only_analyze ? " only analyze" : "");
1235
1236if (query_tree->getNodeType() == QueryTreeNodeType::UNION)
1237buildPlanForUnionNode();
1238else
1239buildPlanForQueryNode();
1240
1241extendQueryContextAndStoragesLifetime(query_plan, planner_context);
1242}
1243
1244void Planner::buildPlanForUnionNode()
1245{
1246const auto & union_node = query_tree->as<UnionNode &>();
1247auto union_mode = union_node.getUnionMode();
1248if (union_mode == SelectUnionMode::UNION_DEFAULT || union_mode == SelectUnionMode::EXCEPT_DEFAULT
1249|| union_mode == SelectUnionMode::INTERSECT_DEFAULT)
1250throw Exception(ErrorCodes::BAD_ARGUMENTS, "UNION mode must be initialized");
1251
1252const auto & union_queries_nodes = union_node.getQueries().getNodes();
1253size_t queries_size = union_queries_nodes.size();
1254
1255std::vector<std::unique_ptr<QueryPlan>> query_plans;
1256query_plans.reserve(queries_size);
1257
1258Blocks query_plans_headers;
1259query_plans_headers.reserve(queries_size);
1260
1261for (const auto & query_node : union_queries_nodes)
1262{
1263Planner query_planner(query_node, select_query_options);
1264query_planner.buildQueryPlanIfNeeded();
1265for (const auto & row_policy : query_planner.getUsedRowPolicies())
1266used_row_policies.insert(row_policy);
1267const auto & mapping = query_planner.getQueryNodeToPlanStepMapping();
1268query_node_to_plan_step_mapping.insert(mapping.begin(), mapping.end());
1269auto query_node_plan = std::make_unique<QueryPlan>(std::move(query_planner).extractQueryPlan());
1270query_plans_headers.push_back(query_node_plan->getCurrentDataStream().header);
1271query_plans.push_back(std::move(query_node_plan));
1272}
1273
1274Block union_common_header = buildCommonHeaderForUnion(query_plans_headers, union_mode);
1275DataStreams query_plans_streams;
1276query_plans_streams.reserve(query_plans.size());
1277
1278for (auto & query_node_plan : query_plans)
1279{
1280if (blocksHaveEqualStructure(query_node_plan->getCurrentDataStream().header, union_common_header))
1281{
1282query_plans_streams.push_back(query_node_plan->getCurrentDataStream());
1283continue;
1284}
1285
1286auto actions_dag = ActionsDAG::makeConvertingActions(
1287query_node_plan->getCurrentDataStream().header.getColumnsWithTypeAndName(),
1288union_common_header.getColumnsWithTypeAndName(),
1289ActionsDAG::MatchColumnsMode::Position);
1290auto converting_step = std::make_unique<ExpressionStep>(query_node_plan->getCurrentDataStream(), std::move(actions_dag));
1291converting_step->setStepDescription("Conversion before UNION");
1292query_node_plan->addStep(std::move(converting_step));
1293
1294query_plans_streams.push_back(query_node_plan->getCurrentDataStream());
1295}
1296
1297const auto & query_context = planner_context->getQueryContext();
1298const auto & settings = query_context->getSettingsRef();
1299auto max_threads = settings.max_threads;
1300
1301bool is_distinct = union_mode == SelectUnionMode::UNION_DISTINCT || union_mode == SelectUnionMode::INTERSECT_DISTINCT
1302|| union_mode == SelectUnionMode::EXCEPT_DISTINCT;
1303
1304if (union_mode == SelectUnionMode::UNION_ALL || union_mode == SelectUnionMode::UNION_DISTINCT)
1305{
1306auto union_step = std::make_unique<UnionStep>(std::move(query_plans_streams), max_threads);
1307query_plan.unitePlans(std::move(union_step), std::move(query_plans));
1308}
1309else if (union_mode == SelectUnionMode::INTERSECT_ALL || union_mode == SelectUnionMode::INTERSECT_DISTINCT
1310|| union_mode == SelectUnionMode::EXCEPT_ALL || union_mode == SelectUnionMode::EXCEPT_DISTINCT)
1311{
1312IntersectOrExceptStep::Operator intersect_or_except_operator = IntersectOrExceptStep::Operator::UNKNOWN;
1313
1314if (union_mode == SelectUnionMode::INTERSECT_ALL)
1315intersect_or_except_operator = IntersectOrExceptStep::Operator::INTERSECT_ALL;
1316else if (union_mode == SelectUnionMode::INTERSECT_DISTINCT)
1317intersect_or_except_operator = IntersectOrExceptStep::Operator::INTERSECT_DISTINCT;
1318else if (union_mode == SelectUnionMode::EXCEPT_ALL)
1319intersect_or_except_operator = IntersectOrExceptStep::Operator::EXCEPT_ALL;
1320else if (union_mode == SelectUnionMode::EXCEPT_DISTINCT)
1321intersect_or_except_operator = IntersectOrExceptStep::Operator::EXCEPT_DISTINCT;
1322
1323auto union_step
1324= std::make_unique<IntersectOrExceptStep>(std::move(query_plans_streams), intersect_or_except_operator, max_threads);
1325query_plan.unitePlans(std::move(union_step), std::move(query_plans));
1326}
1327
1328if (is_distinct)
1329{
1330/// Add distinct transform
1331SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
1332
1333auto distinct_step = std::make_unique<DistinctStep>(
1334query_plan.getCurrentDataStream(),
1335limits,
13360 /*limit hint*/,
1337query_plan.getCurrentDataStream().header.getNames(),
1338false /*pre distinct*/,
1339settings.optimize_distinct_in_order);
1340query_plan.addStep(std::move(distinct_step));
1341}
1342}
1343
1344void Planner::buildPlanForQueryNode()
1345{
1346ProfileEvents::increment(ProfileEvents::SelectQueriesWithSubqueries);
1347ProfileEvents::increment(ProfileEvents::QueriesWithSubqueries);
1348
1349auto & query_node = query_tree->as<QueryNode &>();
1350const auto & query_context = planner_context->getQueryContext();
1351
1352if (query_node.hasWhere())
1353{
1354auto condition_constant = tryExtractConstantFromConditionNode(query_node.getWhere());
1355if (condition_constant.has_value() && *condition_constant)
1356query_node.getWhere() = {};
1357}
1358
1359SelectQueryInfo select_query_info = buildSelectQueryInfo();
1360
1361StorageLimitsList current_storage_limits = storage_limits;
1362select_query_info.local_storage_limits = buildStorageLimits(*query_context, select_query_options);
1363current_storage_limits.push_back(select_query_info.local_storage_limits);
1364select_query_info.storage_limits = std::make_shared<StorageLimitsList>(current_storage_limits);
1365select_query_info.has_order_by = query_node.hasOrderBy();
1366select_query_info.has_window = hasWindowFunctionNodes(query_tree);
1367select_query_info.has_aggregates = hasAggregateFunctionNodes(query_tree);
1368select_query_info.need_aggregate = query_node.hasGroupBy() || select_query_info.has_aggregates;
1369
1370if (!select_query_info.need_aggregate && query_node.hasHaving())
1371{
1372if (query_node.hasWhere())
1373query_node.getWhere() = mergeConditionNodes({query_node.getWhere(), query_node.getHaving()}, query_context);
1374else
1375query_node.getWhere() = query_node.getHaving();
1376
1377query_node.getHaving() = {};
1378}
1379
1380collectSets(query_tree, *planner_context);
1381
1382const auto & settings = query_context->getSettingsRef();
1383if (query_context->canUseTaskBasedParallelReplicas())
1384{
1385if (!settings.parallel_replicas_allow_in_with_subquery && planner_context->getPreparedSets().hasSubqueries())
1386{
1387if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
1388throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
1389
1390auto & mutable_context = planner_context->getMutableQueryContext();
1391mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
1392LOG_DEBUG(getLogger("Planner"), "Disabling parallel replicas to execute a query with IN with subquery");
1393}
1394}
1395
1396collectTableExpressionData(query_tree, planner_context);
1397checkStoragesSupportTransactions(planner_context);
1398
1399const auto & table_filters = planner_context->getGlobalPlannerContext()->filters_for_table_expressions;
1400if (!select_query_options.only_analyze && !table_filters.empty()) // && top_level)
1401{
1402for (auto & [table_node, table_expression_data] : planner_context->getTableExpressionNodeToData())
1403{
1404auto it = table_filters.find(table_node);
1405if (it != table_filters.end())
1406{
1407const auto & filters = it->second;
1408table_expression_data.setFilterActions(filters.filter_actions);
1409table_expression_data.setPrewhereInfo(filters.prewhere_info);
1410}
1411}
1412}
1413
1414if (query_context->canUseTaskBasedParallelReplicas())
1415{
1416const auto & table_expression_nodes = planner_context->getTableExpressionNodeToData();
1417for (const auto & it : table_expression_nodes)
1418{
1419auto * table_node = it.first->as<TableNode>();
1420if (!table_node)
1421continue;
1422
1423const auto & modifiers = table_node->getTableExpressionModifiers();
1424if (modifiers.has_value() && modifiers->hasFinal())
1425{
1426if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
1427throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
1428else
1429{
1430LOG_DEBUG(
1431getLogger("Planner"),
1432"FINAL modifier is not supported with parallel replicas. Query will be executed without using them.");
1433auto & mutable_context = planner_context->getMutableQueryContext();
1434mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
1435}
1436}
1437}
1438}
1439
1440if (!settings.parallel_replicas_custom_key.value.empty())
1441{
1442/// Check support for JOIN for parallel replicas with custom key
1443if (planner_context->getTableExpressionNodeToData().size() > 1)
1444{
1445if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
1446throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOINs are not supported with parallel replicas");
1447else
1448{
1449LOG_DEBUG(
1450getLogger("Planner"),
1451"JOINs are not supported with parallel replicas. Query will be executed without using them.");
1452
1453auto & mutable_context = planner_context->getMutableQueryContext();
1454mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
1455mutable_context->setSetting("parallel_replicas_custom_key", String{""});
1456}
1457}
1458}
1459
1460JoinTreeQueryPlan join_tree_query_plan;
1461if (planner_context->getMutableQueryContext()->canUseTaskBasedParallelReplicas()
1462&& planner_context->getGlobalPlannerContext()->parallel_replicas_node == &query_node)
1463{
1464join_tree_query_plan = buildQueryPlanForParallelReplicas(query_node, planner_context, select_query_info.storage_limits);
1465}
1466else
1467{
1468auto top_level_identifiers = collectTopLevelColumnIdentifiers(query_tree, planner_context);
1469join_tree_query_plan = buildJoinTreeQueryPlan(query_tree,
1470select_query_info,
1471select_query_options,
1472top_level_identifiers,
1473planner_context);
1474}
1475
1476auto from_stage = join_tree_query_plan.from_stage;
1477query_plan = std::move(join_tree_query_plan.query_plan);
1478used_row_policies = std::move(join_tree_query_plan.used_row_policies);
1479auto & mapping = join_tree_query_plan.query_node_to_plan_step_mapping;
1480query_node_to_plan_step_mapping.insert(mapping.begin(), mapping.end());
1481
1482LOG_TRACE(getLogger("Planner"), "Query {} from stage {} to stage {}{}",
1483query_tree->formatConvertedASTForErrorMessage(),
1484QueryProcessingStage::toString(from_stage),
1485QueryProcessingStage::toString(select_query_options.to_stage),
1486select_query_options.only_analyze ? " only analyze" : "");
1487
1488if (select_query_options.to_stage == QueryProcessingStage::FetchColumns)
1489return;
1490
1491PlannerQueryProcessingInfo query_processing_info(from_stage, select_query_options.to_stage);
1492QueryAnalysisResult query_analysis_result(query_tree, query_processing_info, planner_context);
1493auto expression_analysis_result = buildExpressionAnalysisResult(query_tree,
1494query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
1495planner_context,
1496query_processing_info);
1497
1498std::vector<ActionsDAGPtr> result_actions_to_execute = std::move(join_tree_query_plan.actions_dags);
1499
1500for (auto & [_, table_expression_data] : planner_context->getTableExpressionNodeToData())
1501{
1502if (table_expression_data.getPrewhereFilterActions())
1503result_actions_to_execute.push_back(table_expression_data.getPrewhereFilterActions());
1504
1505if (table_expression_data.getRowLevelFilterActions())
1506result_actions_to_execute.push_back(table_expression_data.getRowLevelFilterActions());
1507}
1508
1509if (query_processing_info.isIntermediateStage())
1510{
1511addPreliminarySortOrDistinctOrLimitStepsIfNeeded(query_plan,
1512expression_analysis_result,
1513query_analysis_result,
1514planner_context,
1515query_processing_info,
1516query_tree,
1517result_actions_to_execute);
1518
1519if (expression_analysis_result.hasAggregation())
1520{
1521const auto & aggregation_analysis_result = expression_analysis_result.getAggregation();
1522addMergingAggregatedStep(query_plan, aggregation_analysis_result, query_analysis_result, planner_context);
1523}
1524}
1525
1526if (query_processing_info.isFirstStage())
1527{
1528if (expression_analysis_result.hasWhere())
1529addFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE", result_actions_to_execute);
1530
1531if (expression_analysis_result.hasAggregation())
1532{
1533const auto & aggregation_analysis_result = expression_analysis_result.getAggregation();
1534if (aggregation_analysis_result.before_aggregation_actions)
1535addExpressionStep(query_plan, aggregation_analysis_result.before_aggregation_actions, "Before GROUP BY", result_actions_to_execute);
1536
1537addAggregationStep(query_plan, aggregation_analysis_result, query_analysis_result, planner_context, select_query_info);
1538}
1539
1540/** If we have aggregation, we can't execute any later-stage
1541* expressions on shards, neither "Before WINDOW" nor "Before ORDER BY"
1542*/
1543if (!expression_analysis_result.hasAggregation())
1544{
1545if (expression_analysis_result.hasWindow())
1546{
1547/** Window functions must be executed on initiator (second_stage).
1548* ORDER BY and DISTINCT might depend on them, so if we have
1549* window functions, we can't execute ORDER BY and DISTINCT
1550* now, on shard (first_stage).
1551*/
1552const auto & window_analysis_result = expression_analysis_result.getWindow();
1553if (window_analysis_result.before_window_actions)
1554addExpressionStep(query_plan, window_analysis_result.before_window_actions, "Before WINDOW", result_actions_to_execute);
1555}
1556else
1557{
1558/** There are no window functions, so we can execute the
1559* Projection expressions, preliminary DISTINCT and before ORDER BY expressions
1560* now, on shards (first_stage).
1561*/
1562const auto & projection_analysis_result = expression_analysis_result.getProjection();
1563addExpressionStep(query_plan, projection_analysis_result.projection_actions, "Projection", result_actions_to_execute);
1564
1565if (query_node.isDistinct())
1566{
1567addDistinctStep(query_plan,
1568query_analysis_result,
1569planner_context,
1570expression_analysis_result.getProjection().projection_column_names,
1571query_node,
1572true /*before_order*/,
1573true /*pre_distinct*/);
1574}
1575
1576if (expression_analysis_result.hasSort())
1577{
1578const auto & sort_analysis_result = expression_analysis_result.getSort();
1579addExpressionStep(query_plan, sort_analysis_result.before_order_by_actions, "Before ORDER BY", result_actions_to_execute);
1580}
1581}
1582}
1583
1584addPreliminarySortOrDistinctOrLimitStepsIfNeeded(query_plan,
1585expression_analysis_result,
1586query_analysis_result,
1587planner_context,
1588query_processing_info,
1589query_tree,
1590result_actions_to_execute);
1591}
1592
1593if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState())
1594{
1595if (query_processing_info.isFromAggregationState())
1596{
1597/// Aggregation was performed on remote shards
1598}
1599else if (expression_analysis_result.hasAggregation())
1600{
1601const auto & aggregation_analysis_result = expression_analysis_result.getAggregation();
1602
1603if (!query_processing_info.isFirstStage())
1604{
1605addMergingAggregatedStep(query_plan, aggregation_analysis_result, query_analysis_result, planner_context);
1606}
1607
1608bool having_executed = false;
1609
1610if (query_node.isGroupByWithTotals())
1611{
1612addTotalsHavingStep(query_plan, expression_analysis_result, query_analysis_result, planner_context, query_node, result_actions_to_execute);
1613having_executed = true;
1614}
1615
1616addCubeOrRollupStepIfNeeded(query_plan, aggregation_analysis_result, query_analysis_result, planner_context, select_query_info, query_node);
1617
1618if (!having_executed && expression_analysis_result.hasHaving())
1619addFilterStep(query_plan, expression_analysis_result.getHaving(), "HAVING", result_actions_to_execute);
1620}
1621
1622if (query_processing_info.isFromAggregationState())
1623{
1624if (expression_analysis_result.hasWindow())
1625throw Exception(ErrorCodes::NOT_IMPLEMENTED,
1626"Window functions does not support processing from WithMergeableStateAfterAggregation");
1627}
1628else if (expression_analysis_result.hasWindow() || expression_analysis_result.hasAggregation())
1629{
1630if (expression_analysis_result.hasWindow())
1631{
1632const auto & window_analysis_result = expression_analysis_result.getWindow();
1633if (expression_analysis_result.hasAggregation())
1634addExpressionStep(query_plan, window_analysis_result.before_window_actions, "Before window functions", result_actions_to_execute);
1635
1636addWindowSteps(query_plan, planner_context, window_analysis_result);
1637}
1638
1639const auto & projection_analysis_result = expression_analysis_result.getProjection();
1640addExpressionStep(query_plan, projection_analysis_result.projection_actions, "Projection", result_actions_to_execute);
1641
1642if (query_node.isDistinct())
1643{
1644addDistinctStep(query_plan,
1645query_analysis_result,
1646planner_context,
1647expression_analysis_result.getProjection().projection_column_names,
1648query_node,
1649true /*before_order*/,
1650true /*pre_distinct*/);
1651}
1652
1653if (expression_analysis_result.hasSort())
1654{
1655const auto & sort_analysis_result = expression_analysis_result.getSort();
1656addExpressionStep(query_plan, sort_analysis_result.before_order_by_actions, "Before ORDER BY", result_actions_to_execute);
1657}
1658}
1659else
1660{
1661/// There are no aggregation or windows, all expressions before ORDER BY executed on shards
1662}
1663
1664if (expression_analysis_result.hasSort())
1665{
1666/** If there is an ORDER BY for distributed query processing,
1667* but there is no aggregation, then on the remote servers ORDER BY was made
1668* and we merge the sorted streams from remote servers.
1669*
1670* Also in case of remote servers was process the query up to WithMergeableStateAfterAggregationAndLimit
1671* (distributed_group_by_no_merge=2 or optimize_distributed_group_by_sharding_key=1 takes place),
1672* then merge the sorted streams is enough, since remote servers already did full ORDER BY.
1673*/
1674if (query_processing_info.isFromAggregationState())
1675addMergeSortingStep(query_plan, query_analysis_result, planner_context, "after aggregation stage for ORDER BY");
1676else if (!query_processing_info.isFirstStage() &&
1677!expression_analysis_result.hasAggregation() &&
1678!expression_analysis_result.hasWindow() &&
1679!(query_node.isGroupByWithTotals() && !query_analysis_result.aggregate_final))
1680addMergeSortingStep(query_plan, query_analysis_result, planner_context, "for ORDER BY, without aggregation");
1681else
1682addSortingStep(query_plan, query_analysis_result, planner_context);
1683}
1684
1685/** Optimization if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
1686* limiting the number of rows in each up to `offset + limit`.
1687*/
1688bool applied_prelimit = addPreliminaryLimitOptimizationStepIfNeeded(query_plan,
1689query_analysis_result,
1690planner_context,
1691query_processing_info,
1692query_tree);
1693
1694//// If there was more than one stream, then DISTINCT needs to be performed once again after merging all streams.
1695if (!query_processing_info.isFromAggregationState() && query_node.isDistinct())
1696{
1697addDistinctStep(query_plan,
1698query_analysis_result,
1699planner_context,
1700expression_analysis_result.getProjection().projection_column_names,
1701query_node,
1702false /*before_order*/,
1703false /*pre_distinct*/);
1704}
1705
1706if (!query_processing_info.isFromAggregationState() && expression_analysis_result.hasLimitBy())
1707{
1708const auto & limit_by_analysis_result = expression_analysis_result.getLimitBy();
1709addExpressionStep(query_plan, limit_by_analysis_result.before_limit_by_actions, "Before LIMIT BY", result_actions_to_execute);
1710addLimitByStep(query_plan, limit_by_analysis_result, query_node);
1711}
1712
1713if (query_node.hasOrderBy())
1714addWithFillStepIfNeeded(query_plan, query_analysis_result, planner_context, query_node);
1715
1716bool apply_offset = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
1717
1718if (query_node.hasLimit() && query_node.isLimitWithTies() && apply_offset)
1719addLimitStep(query_plan, query_analysis_result, planner_context, query_node);
1720
1721addExtremesStepIfNeeded(query_plan, planner_context);
1722
1723bool limit_applied = applied_prelimit || (query_node.isLimitWithTies() && apply_offset);
1724bool apply_limit = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregation;
1725
1726/** Limit is no longer needed if there is prelimit.
1727*
1728* That LIMIT cannot be applied if OFFSET should not be applied, since LIMIT will apply OFFSET too.
1729* This is the case for various optimizations for distributed queries,
1730* and when LIMIT cannot be applied it will be applied on the initiator anyway.
1731*/
1732if (query_node.hasLimit() && apply_limit && !limit_applied && apply_offset)
1733addLimitStep(query_plan, query_analysis_result, planner_context, query_node);
1734else if (!limit_applied && apply_offset && query_node.hasOffset())
1735addOffsetStep(query_plan, query_analysis_result);
1736
1737/// Project names is not done on shards, because initiator will not find columns in blocks
1738if (!query_processing_info.isToAggregationState())
1739{
1740const auto & projection_analysis_result = expression_analysis_result.getProjection();
1741addExpressionStep(query_plan, projection_analysis_result.project_names_actions, "Project names", result_actions_to_execute);
1742}
1743
1744// For additional_result_filter setting
1745addAdditionalFilterStepIfNeeded(query_plan, query_node, select_query_options, planner_context);
1746}
1747
1748if (!select_query_options.only_analyze)
1749addBuildSubqueriesForSetsStepIfNeeded(query_plan, select_query_options, planner_context, result_actions_to_execute);
1750
1751query_node_to_plan_step_mapping[&query_node] = query_plan.getRootNode();
1752}
1753
1754SelectQueryInfo Planner::buildSelectQueryInfo() const
1755{
1756return ::DB::buildSelectQueryInfo(query_tree, planner_context);
1757}
1758
1759void Planner::addStorageLimits(const StorageLimitsList & limits)
1760{
1761for (const auto & limit : limits)
1762storage_limits.push_back(limit);
1763}
1764
1765}
1766