ClickHouse
1096 строк · 39.4 Кб
1#include <Columns/IColumn.h>
2#include <DataTypes/DataTypeAggregateFunction.h>
3#include <Functions/IFunction.h>
4#include <Interpreters/ActionsDAG.h>
5#include <Interpreters/ArrayJoinAction.h>
6#include <Interpreters/InterpreterSelectQuery.h>
7#include <Interpreters/TableJoin.h>
8#include <Parsers/ASTWindowDefinition.h>
9#include <Processors/QueryPlan/AggregatingStep.h>
10#include <Processors/QueryPlan/ArrayJoinStep.h>
11#include <Processors/QueryPlan/CreatingSetsStep.h>
12#include <Processors/QueryPlan/CubeStep.h>
13#include <Processors/QueryPlan/DistinctStep.h>
14#include <Processors/QueryPlan/ExpressionStep.h>
15#include <Processors/QueryPlan/FilterStep.h>
16#include <Processors/QueryPlan/ITransformingStep.h>
17#include <Processors/QueryPlan/JoinStep.h>
18#include <Processors/QueryPlan/Optimizations/Optimizations.h>
19#include <Processors/QueryPlan/Optimizations/actionsDAGUtils.h>
20#include <Processors/QueryPlan/ReadFromMergeTree.h>
21#include <Processors/QueryPlan/SortingStep.h>
22#include <Processors/QueryPlan/TotalsHavingStep.h>
23#include <Processors/QueryPlan/UnionStep.h>
24#include <Processors/QueryPlan/WindowStep.h>
25#include <Storages/StorageMerge.h>
26#include <Common/typeid_cast.h>
27
28#include <stack>
29
30
31namespace DB::QueryPlanOptimizations
32{
33
34static ISourceStep * checkSupportedReadingStep(IQueryPlanStep * step)
35{
36if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
37{
38/// Already read-in-order, skip.
39if (reading->getQueryInfo().input_order_info)
40return nullptr;
41
42const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
43if (sorting_key.column_names.empty())
44return nullptr;
45
46return reading;
47}
48
49if (auto * merge = typeid_cast<ReadFromMerge *>(step))
50{
51const auto & tables = merge->getSelectedTables();
52if (tables.empty())
53return nullptr;
54
55for (const auto & table : tables)
56{
57auto storage = std::get<StoragePtr>(table);
58const auto & sorting_key = storage->getInMemoryMetadataPtr()->getSortingKey();
59if (sorting_key.column_names.empty())
60return nullptr;
61}
62
63return merge;
64}
65
66return nullptr;
67}
68
69using StepStack = std::vector<IQueryPlanStep*>;
70
71static QueryPlan::Node * findReadingStep(QueryPlan::Node & node, StepStack & backward_path)
72{
73IQueryPlanStep * step = node.step.get();
74if (auto * reading = checkSupportedReadingStep(step))
75{
76backward_path.push_back(node.step.get());
77return &node;
78}
79
80if (node.children.size() != 1)
81return nullptr;
82
83backward_path.push_back(node.step.get());
84
85if (typeid_cast<ExpressionStep *>(step) || typeid_cast<FilterStep *>(step) || typeid_cast<ArrayJoinStep *>(step))
86return findReadingStep(*node.children.front(), backward_path);
87
88if (auto * distinct = typeid_cast<DistinctStep *>(step); distinct && distinct->isPreliminary())
89return findReadingStep(*node.children.front(), backward_path);
90
91return nullptr;
92}
93
94void updateStepsDataStreams(StepStack & steps_to_update)
95{
96/// update data stream's sorting properties for found transforms
97if (!steps_to_update.empty())
98{
99const DataStream * input_stream = &steps_to_update.back()->getOutputStream();
100chassert(dynamic_cast<ISourceStep *>(steps_to_update.back()));
101steps_to_update.pop_back();
102
103while (!steps_to_update.empty())
104{
105auto * transforming_step = dynamic_cast<ITransformingStep *>(steps_to_update.back());
106if (!transforming_step)
107break;
108
109transforming_step->updateInputStream(*input_stream);
110input_stream = &steps_to_update.back()->getOutputStream();
111steps_to_update.pop_back();
112}
113}
114}
115
116/// FixedColumns are columns which values become constants after filtering.
117/// In a query "SELECT x, y, z FROM table WHERE x = 1 AND y = 'a' ORDER BY x, y, z"
118/// Fixed columns are 'x' and 'y'.
119using FixedColumns = std::unordered_set<const ActionsDAG::Node *>;
120
121/// Right now we find only simple cases like 'and(..., and(..., and(column = value, ...), ...'
122/// Injective functions are supported here. For a condition 'injectiveFunction(x) = 5' column 'x' is fixed.
123static void appendFixedColumnsFromFilterExpression(const ActionsDAG::Node & filter_expression, FixedColumns & fixed_columns)
124{
125std::stack<const ActionsDAG::Node *> stack;
126stack.push(&filter_expression);
127
128while (!stack.empty())
129{
130const auto * node = stack.top();
131stack.pop();
132if (node->type == ActionsDAG::ActionType::FUNCTION)
133{
134const auto & name = node->function_base->getName();
135if (name == "and")
136{
137for (const auto * arg : node->children)
138stack.push(arg);
139}
140else if (name == "equals")
141{
142const ActionsDAG::Node * maybe_fixed_column = nullptr;
143size_t num_constant_columns = 0;
144for (const auto & child : node->children)
145{
146if (child->column)
147++num_constant_columns;
148else
149maybe_fixed_column = child;
150}
151
152if (maybe_fixed_column && num_constant_columns + 1 == node->children.size())
153{
154//std::cerr << "====== Added fixed column " << maybe_fixed_column->result_name << ' ' << static_cast<const void *>(maybe_fixed_column) << std::endl;
155fixed_columns.insert(maybe_fixed_column);
156
157/// Support injective functions chain.
158const ActionsDAG::Node * maybe_injective = maybe_fixed_column;
159while (maybe_injective->type == ActionsDAG::ActionType::FUNCTION
160&& maybe_injective->children.size() == 1
161&& maybe_injective->function_base->isInjective({}))
162{
163maybe_injective = maybe_injective->children.front();
164fixed_columns.insert(maybe_injective);
165}
166}
167}
168}
169}
170}
171
172static void appendExpression(ActionsDAGPtr & dag, const ActionsDAGPtr & expression)
173{
174if (dag)
175dag->mergeInplace(std::move(*expression->clone()));
176else
177dag = expression->clone();
178
179dag->projectInput(false);
180}
181
182/// This function builds a common DAG which is a merge of DAGs from Filter and Expression steps chain.
183/// Additionally, build a set of fixed columns.
184void buildSortingDAG(QueryPlan::Node & node, ActionsDAGPtr & dag, FixedColumns & fixed_columns, size_t & limit)
185{
186IQueryPlanStep * step = node.step.get();
187if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
188{
189if (const auto prewhere_info = reading->getPrewhereInfo())
190{
191/// Should ignore limit if there is filtering.
192limit = 0;
193
194if (prewhere_info->prewhere_actions)
195{
196//std::cerr << "====== Adding prewhere " << std::endl;
197appendExpression(dag, prewhere_info->prewhere_actions);
198if (const auto * filter_expression = dag->tryFindInOutputs(prewhere_info->prewhere_column_name))
199appendFixedColumnsFromFilterExpression(*filter_expression, fixed_columns);
200}
201}
202return;
203}
204
205if (node.children.size() != 1)
206return;
207
208buildSortingDAG(*node.children.front(), dag, fixed_columns, limit);
209
210if (auto * expression = typeid_cast<ExpressionStep *>(step))
211{
212const auto & actions = expression->getExpression();
213
214/// Should ignore limit because arrayJoin() can reduce the number of rows in case of empty array.
215if (actions->hasArrayJoin())
216limit = 0;
217
218appendExpression(dag, actions);
219}
220
221if (auto * filter = typeid_cast<FilterStep *>(step))
222{
223/// Should ignore limit if there is filtering.
224limit = 0;
225
226appendExpression(dag, filter->getExpression());
227if (const auto * filter_expression = dag->tryFindInOutputs(filter->getFilterColumnName()))
228appendFixedColumnsFromFilterExpression(*filter_expression, fixed_columns);
229}
230
231if (auto * array_join = typeid_cast<ArrayJoinStep *>(step))
232{
233/// Should ignore limit because ARRAY JOIN can reduce the number of rows in case of empty array.
234/// But in case of LEFT ARRAY JOIN the result number of rows is always bigger.
235if (!array_join->arrayJoin()->is_left)
236limit = 0;
237
238const auto & array_joined_columns = array_join->arrayJoin()->columns;
239
240if (dag)
241{
242/// Remove array joined columns from outputs.
243/// Types are changed after ARRAY JOIN, and we can't use this columns anyway.
244ActionsDAG::NodeRawConstPtrs outputs;
245outputs.reserve(dag->getOutputs().size());
246
247for (const auto & output : dag->getOutputs())
248{
249if (!array_joined_columns.contains(output->result_name))
250outputs.push_back(output);
251}
252
253dag->getOutputs() = std::move(outputs);
254}
255}
256}
257
258/// Add more functions to fixed columns.
259/// Functions result is fixed if all arguments are fixed or constants.
260void enreachFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
261{
262struct Frame
263{
264const ActionsDAG::Node * node;
265size_t next_child = 0;
266};
267
268std::stack<Frame> stack;
269std::unordered_set<const ActionsDAG::Node *> visited;
270for (const auto & node : dag.getNodes())
271{
272if (visited.contains(&node))
273continue;
274
275stack.push({&node});
276visited.insert(&node);
277while (!stack.empty())
278{
279auto & frame = stack.top();
280for (; frame.next_child < frame.node->children.size(); ++frame.next_child)
281if (!visited.contains(frame.node->children[frame.next_child]))
282break;
283
284if (frame.next_child < frame.node->children.size())
285{
286const auto * child = frame.node->children[frame.next_child];
287visited.insert(child);
288stack.push({child});
289++frame.next_child;
290}
291else
292{
293/// Ignore constants here, will check them separately
294if (!frame.node->column)
295{
296if (frame.node->type == ActionsDAG::ActionType::ALIAS)
297{
298if (fixed_columns.contains(frame.node->children.at(0)))
299fixed_columns.insert(frame.node);
300}
301else if (frame.node->type == ActionsDAG::ActionType::FUNCTION)
302{
303if (frame.node->function_base->isDeterministicInScopeOfQuery())
304{
305//std::cerr << "*** enreachFixedColumns check " << frame.node->result_name << std::endl;
306bool all_args_fixed_or_const = true;
307for (const auto * child : frame.node->children)
308{
309if (!child->column && !fixed_columns.contains(child))
310{
311//std::cerr << "*** enreachFixedColumns fail " << child->result_name << ' ' << static_cast<const void *>(child) << std::endl;
312all_args_fixed_or_const = false;
313}
314}
315
316if (all_args_fixed_or_const)
317{
318//std::cerr << "*** enreachFixedColumns add " << frame.node->result_name << ' ' << static_cast<const void *>(frame.node) << std::endl;
319fixed_columns.insert(frame.node);
320}
321}
322}
323}
324
325stack.pop();
326}
327}
328}
329}
330
331InputOrderInfoPtr buildInputOrderInfo(
332const FixedColumns & fixed_columns,
333const ActionsDAGPtr & dag,
334const SortDescription & description,
335const ActionsDAG & sorting_key_dag,
336const Names & sorting_key_columns,
337size_t limit)
338{
339//std::cerr << "------- buildInputOrderInfo " << std::endl;
340SortDescription order_key_prefix_descr;
341order_key_prefix_descr.reserve(description.size());
342
343MatchedTrees::Matches matches;
344FixedColumns fixed_key_columns;
345
346if (dag)
347{
348matches = matchTrees(sorting_key_dag.getOutputs(), *dag);
349
350for (const auto & [node, match] : matches)
351{
352//std::cerr << "------- matching " << static_cast<const void *>(node) << " " << node->result_name
353// << " to " << static_cast<const void *>(match.node) << " " << (match.node ? match.node->result_name : "") << std::endl;
354if (!match.monotonicity || match.monotonicity->strict)
355{
356if (match.node && fixed_columns.contains(node))
357fixed_key_columns.insert(match.node);
358}
359}
360
361enreachFixedColumns(sorting_key_dag, fixed_key_columns);
362}
363
364/// This is a result direction we will read from MergeTree
365/// 1 - in order,
366/// -1 - in reverse order,
367/// 0 - usual read, don't apply optimization
368///
369/// So far, 0 means any direction is possible. It is ok for constant prefix.
370int read_direction = 0;
371size_t next_description_column = 0;
372size_t next_sort_key = 0;
373
374while (next_description_column < description.size() && next_sort_key < sorting_key_columns.size())
375{
376const auto & sorting_key_column = sorting_key_columns[next_sort_key];
377const auto & sort_column_description = description[next_description_column];
378
379/// If required order depend on collation, it cannot be matched with primary key order.
380/// Because primary keys cannot have collations.
381if (sort_column_description.collator)
382break;
383
384/// Direction for current sort key.
385int current_direction = 0;
386bool strict_monotonic = true;
387
388const ActionsDAG::Node * sort_column_node = sorting_key_dag.tryFindInOutputs(sorting_key_column);
389/// This should not happen.
390if (!sort_column_node)
391break;
392
393if (!dag)
394{
395/// This is possible if there were no Expression or Filter steps in Plan.
396/// Example: SELECT * FROM tab ORDER BY a, b
397
398if (sort_column_node->type != ActionsDAG::ActionType::INPUT)
399break;
400
401if (sort_column_description.column_name != sorting_key_column)
402break;
403
404current_direction = sort_column_description.direction;
405
406
407//std::cerr << "====== (no dag) Found direct match" << std::endl;
408
409++next_description_column;
410++next_sort_key;
411}
412else
413{
414const ActionsDAG::Node * sort_node = dag->tryFindInOutputs(sort_column_description.column_name);
415/// It is possible when e.g. sort by array joined column.
416if (!sort_node)
417break;
418
419const auto & match = matches[sort_node];
420
421//std::cerr << "====== Finding match for " << sort_column_node->result_name << ' ' << static_cast<const void *>(sort_column_node) << std::endl;
422
423if (match.node && match.node == sort_column_node)
424{
425//std::cerr << "====== Found direct match" << std::endl;
426
427/// We try to find the match first even if column is fixed. In this case, potentially more keys will match.
428/// Example: 'table (x Int32, y Int32) ORDER BY x + 1, y + 1'
429/// 'SELECT x, y FROM table WHERE x = 42 ORDER BY x + 1, y + 1'
430/// Here, 'x + 1' would be a fixed point. But it is reasonable to read-in-order.
431
432current_direction = sort_column_description.direction;
433if (match.monotonicity)
434{
435current_direction *= match.monotonicity->direction;
436strict_monotonic = match.monotonicity->strict;
437}
438
439++next_description_column;
440++next_sort_key;
441}
442else if (fixed_key_columns.contains(sort_column_node))
443{
444//std::cerr << "+++++++++ Found fixed key by match" << std::endl;
445++next_sort_key;
446}
447else
448{
449
450//std::cerr << "====== Check for fixed const : " << bool(sort_node->column) << " fixed : " << fixed_columns.contains(sort_node) << std::endl;
451bool is_fixed_column = sort_node->column || fixed_columns.contains(sort_node);
452if (!is_fixed_column)
453break;
454
455order_key_prefix_descr.push_back(sort_column_description);
456++next_description_column;
457}
458}
459
460/// read_direction == 0 means we can choose any global direction.
461/// current_direction == 0 means current key if fixed and any direction is possible for it.
462if (current_direction && read_direction && current_direction != read_direction)
463break;
464
465if (read_direction == 0)
466read_direction = current_direction;
467
468if (current_direction)
469order_key_prefix_descr.push_back(sort_column_description);
470
471if (current_direction && !strict_monotonic)
472break;
473}
474
475if (read_direction == 0 || order_key_prefix_descr.empty())
476return nullptr;
477
478return std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit);
479}
480
481/// We really need three different sort descriptions here.
482/// For example:
483///
484/// create table tab (a Int32, b Int32, c Int32, d Int32) engine = MergeTree order by (a, b, c);
485/// select a, any(b), c, d from tab where b = 1 group by a, c, d order by c, d;
486///
487/// We would like to have:
488/// (a, b, c) - a sort description for reading from table (it's into input_order)
489/// (a, c) - a sort description for merging (an input of AggregatingInOrderTransfrom is sorted by this GROUP BY keys)
490/// (a, c, d) - a group by soer description (an input of FinishAggregatingInOrderTransform is sorted by all GROUP BY keys)
491///
492/// Sort description from input_order is not actually used. ReadFromMergeTree reads only PK prefix size.
493/// We should remove it later.
494struct AggregationInputOrder
495{
496InputOrderInfoPtr input_order;
497SortDescription sort_description_for_merging;
498SortDescription group_by_sort_description;
499};
500
501AggregationInputOrder buildInputOrderInfo(
502const FixedColumns & fixed_columns,
503const ActionsDAGPtr & dag,
504const Names & group_by_keys,
505const ActionsDAG & sorting_key_dag,
506const Names & sorting_key_columns)
507{
508MatchedTrees::Matches matches;
509FixedColumns fixed_key_columns;
510
511/// For every column in PK find any match from GROUP BY key.
512using ReverseMatches = std::unordered_map<const ActionsDAG::Node *, MatchedTrees::Matches::const_iterator>;
513ReverseMatches reverse_matches;
514
515if (dag)
516{
517matches = matchTrees(sorting_key_dag.getOutputs(), *dag);
518
519for (const auto & [node, match] : matches)
520{
521if (!match.monotonicity || match.monotonicity->strict)
522{
523if (match.node && fixed_columns.contains(node))
524fixed_key_columns.insert(match.node);
525}
526}
527
528enreachFixedColumns(sorting_key_dag, fixed_key_columns);
529
530for (const auto * output : dag->getOutputs())
531{
532auto it = matches.find(output);
533const MatchedTrees::Match * match = &it->second;
534if (match->node)
535{
536auto [jt, inserted] = reverse_matches.emplace(match->node, it);
537if (!inserted)
538{
539/// Find the best match for PK node.
540/// Direct match > strict monotonic > monotonic.
541const MatchedTrees::Match * prev_match = &jt->second->second;
542bool is_better = prev_match->monotonicity && !match->monotonicity;
543if (!is_better)
544{
545bool both_monotionic = prev_match->monotonicity && match->monotonicity;
546is_better = both_monotionic && match->monotonicity->strict && !prev_match->monotonicity->strict;
547}
548
549if (is_better)
550jt->second = it;
551}
552}
553}
554}
555
556/// This is a result direction we will read from MergeTree
557/// 1 - in order,
558/// -1 - in reverse order,
559/// 0 - usual read, don't apply optimization
560///
561/// So far, 0 means any direction is possible. It is ok for constant prefix.
562int read_direction = 0;
563size_t next_sort_key = 0;
564std::unordered_set<std::string_view> not_matched_group_by_keys(group_by_keys.begin(), group_by_keys.end());
565
566SortDescription group_by_sort_description;
567group_by_sort_description.reserve(group_by_keys.size());
568
569SortDescription order_key_prefix_descr;
570order_key_prefix_descr.reserve(sorting_key_columns.size());
571
572while (!not_matched_group_by_keys.empty() && next_sort_key < sorting_key_columns.size())
573{
574const auto & sorting_key_column = sorting_key_columns[next_sort_key];
575
576/// Direction for current sort key.
577int current_direction = 0;
578bool strict_monotonic = true;
579std::unordered_set<std::string_view>::iterator group_by_key_it;
580
581const ActionsDAG::Node * sort_column_node = sorting_key_dag.tryFindInOutputs(sorting_key_column);
582/// This should not happen.
583if (!sort_column_node)
584break;
585
586if (!dag)
587{
588/// This is possible if there were no Expression or Filter steps in Plan.
589/// Example: SELECT * FROM tab ORDER BY a, b
590
591if (sort_column_node->type != ActionsDAG::ActionType::INPUT)
592break;
593
594group_by_key_it = not_matched_group_by_keys.find(sorting_key_column);
595if (group_by_key_it == not_matched_group_by_keys.end())
596break;
597
598current_direction = 1;
599
600//std::cerr << "====== (no dag) Found direct match" << std::endl;
601++next_sort_key;
602}
603else
604{
605const MatchedTrees::Match * match = nullptr;
606const ActionsDAG::Node * group_by_key_node = nullptr;
607if (const auto match_it = reverse_matches.find(sort_column_node); match_it != reverse_matches.end())
608{
609group_by_key_node = match_it->second->first;
610match = &match_it->second->second;
611}
612
613//std::cerr << "====== Finding match for " << sort_column_node->result_name << ' ' << static_cast<const void *>(sort_column_node) << std::endl;
614
615if (match && match->node)
616group_by_key_it = not_matched_group_by_keys.find(group_by_key_node->result_name);
617
618if (match && match->node && group_by_key_it != not_matched_group_by_keys.end())
619{
620//std::cerr << "====== Found direct match" << std::endl;
621
622current_direction = 1;
623if (match->monotonicity)
624{
625current_direction *= match->monotonicity->direction;
626strict_monotonic = match->monotonicity->strict;
627}
628
629++next_sort_key;
630}
631else if (fixed_key_columns.contains(sort_column_node))
632{
633//std::cerr << "+++++++++ Found fixed key by match" << std::endl;
634++next_sort_key;
635}
636else
637break;
638}
639
640/// read_direction == 0 means we can choose any global direction.
641/// current_direction == 0 means current key if fixed and any direction is possible for it.
642if (current_direction && read_direction && current_direction != read_direction)
643break;
644
645if (read_direction == 0 && current_direction != 0)
646read_direction = current_direction;
647
648if (current_direction)
649{
650/// Aggregation in order will always read in table order.
651/// Here, current_direction is a direction which will be applied to every key.
652/// Example:
653/// CREATE TABLE t (x, y, z) ENGINE = MergeTree ORDER BY (x, y)
654/// SELECT ... FROM t GROUP BY negate(y), negate(x), z
655/// Here, current_direction will be -1 cause negate() is negative montonic,
656/// Prefix sort description for reading will be (negate(y) DESC, negate(x) DESC),
657/// Sort description for GROUP BY will be (negate(y) DESC, negate(x) DESC, z).
658//std::cerr << "---- adding " << std::string(*group_by_key_it) << std::endl;
659group_by_sort_description.emplace_back(SortColumnDescription(std::string(*group_by_key_it), current_direction));
660order_key_prefix_descr.emplace_back(SortColumnDescription(std::string(*group_by_key_it), current_direction));
661not_matched_group_by_keys.erase(group_by_key_it);
662}
663else
664{
665/// If column is fixed, will read it in table order as well.
666//std::cerr << "---- adding " << sorting_key_column << std::endl;
667order_key_prefix_descr.emplace_back(SortColumnDescription(sorting_key_column, 1));
668}
669
670if (current_direction && !strict_monotonic)
671break;
672}
673
674if (read_direction == 0 || group_by_sort_description.empty())
675return {};
676
677SortDescription sort_description_for_merging = group_by_sort_description;
678
679for (const auto & key : not_matched_group_by_keys)
680group_by_sort_description.emplace_back(SortColumnDescription(std::string(key)));
681
682auto input_order = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, /*read_direction*/ 1, /* limit */ 0);
683return { std::move(input_order), std::move(sort_description_for_merging), std::move(group_by_sort_description) };
684}
685
686InputOrderInfoPtr buildInputOrderInfo(
687const ReadFromMergeTree * reading,
688const FixedColumns & fixed_columns,
689const ActionsDAGPtr & dag,
690const SortDescription & description,
691size_t limit)
692{
693const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
694const auto & sorting_key_columns = sorting_key.column_names;
695
696return buildInputOrderInfo(
697fixed_columns,
698dag, description,
699sorting_key.expression->getActionsDAG(), sorting_key_columns,
700limit);
701}
702
703InputOrderInfoPtr buildInputOrderInfo(
704ReadFromMerge * merge,
705const FixedColumns & fixed_columns,
706const ActionsDAGPtr & dag,
707const SortDescription & description,
708size_t limit)
709{
710const auto & tables = merge->getSelectedTables();
711
712InputOrderInfoPtr order_info;
713for (const auto & table : tables)
714{
715auto storage = std::get<StoragePtr>(table);
716const auto & sorting_key = storage->getInMemoryMetadataPtr()->getSortingKey();
717const auto & sorting_key_columns = sorting_key.column_names;
718
719if (sorting_key_columns.empty())
720return nullptr;
721
722auto table_order_info = buildInputOrderInfo(
723fixed_columns,
724dag, description,
725sorting_key.expression->getActionsDAG(), sorting_key_columns,
726limit);
727
728if (!table_order_info)
729return nullptr;
730
731if (!order_info)
732order_info = table_order_info;
733else if (*order_info != *table_order_info)
734return nullptr;
735}
736
737return order_info;
738}
739
740AggregationInputOrder buildInputOrderInfo(
741ReadFromMergeTree * reading,
742const FixedColumns & fixed_columns,
743const ActionsDAGPtr & dag,
744const Names & group_by_keys)
745{
746const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
747const auto & sorting_key_columns = sorting_key.column_names;
748
749return buildInputOrderInfo(
750fixed_columns,
751dag, group_by_keys,
752sorting_key.expression->getActionsDAG(), sorting_key_columns);
753}
754
755AggregationInputOrder buildInputOrderInfo(
756ReadFromMerge * merge,
757const FixedColumns & fixed_columns,
758const ActionsDAGPtr & dag,
759const Names & group_by_keys)
760{
761const auto & tables = merge->getSelectedTables();
762
763AggregationInputOrder order_info;
764for (const auto & table : tables)
765{
766auto storage = std::get<StoragePtr>(table);
767const auto & sorting_key = storage->getInMemoryMetadataPtr()->getSortingKey();
768const auto & sorting_key_columns = sorting_key.column_names;
769
770if (sorting_key_columns.empty())
771return {};
772
773auto table_order_info = buildInputOrderInfo(
774fixed_columns,
775dag, group_by_keys,
776sorting_key.expression->getActionsDAG(), sorting_key_columns);
777
778if (!table_order_info.input_order)
779return {};
780
781if (!order_info.input_order)
782order_info = table_order_info;
783else if (*order_info.input_order != *table_order_info.input_order)
784return {};
785}
786
787return order_info;
788}
789
790InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & node, StepStack & backward_path)
791{
792QueryPlan::Node * reading_node = findReadingStep(node, backward_path);
793if (!reading_node)
794return nullptr;
795
796const auto & description = sorting.getSortDescription();
797size_t limit = sorting.getLimit();
798
799ActionsDAGPtr dag;
800FixedColumns fixed_columns;
801buildSortingDAG(node, dag, fixed_columns, limit);
802
803if (dag && !fixed_columns.empty())
804enreachFixedColumns(*dag, fixed_columns);
805
806if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
807{
808auto order_info = buildInputOrderInfo(
809reading,
810fixed_columns,
811dag, description,
812limit);
813
814if (order_info)
815{
816bool can_read = reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
817if (!can_read)
818return nullptr;
819}
820
821return order_info;
822}
823else if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
824{
825auto order_info = buildInputOrderInfo(
826merge,
827fixed_columns,
828dag, description,
829limit);
830
831if (order_info)
832{
833bool can_read = merge->requestReadingInOrder(order_info);
834if (!can_read)
835return nullptr;
836}
837
838return order_info;
839}
840
841return nullptr;
842}
843
844AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPlan::Node & node, StepStack & backward_path)
845{
846QueryPlan::Node * reading_node = findReadingStep(node, backward_path);
847if (!reading_node)
848return {};
849
850const auto & keys = aggregating.getParams().keys;
851size_t limit = 0;
852
853ActionsDAGPtr dag;
854FixedColumns fixed_columns;
855buildSortingDAG(node, dag, fixed_columns, limit);
856
857if (dag && !fixed_columns.empty())
858enreachFixedColumns(*dag, fixed_columns);
859
860if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
861{
862auto order_info = buildInputOrderInfo(
863reading,
864fixed_columns,
865dag, keys);
866
867if (order_info.input_order)
868{
869bool can_read = reading->requestReadingInOrder(
870order_info.input_order->used_prefix_of_sorting_key_size,
871order_info.input_order->direction,
872order_info.input_order->limit);
873if (!can_read)
874return {};
875}
876
877return order_info;
878}
879else if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
880{
881auto order_info = buildInputOrderInfo(
882merge,
883fixed_columns,
884dag, keys);
885
886if (order_info.input_order)
887{
888bool can_read = merge->requestReadingInOrder(order_info.input_order);
889if (!can_read)
890return {};
891}
892
893return order_info;
894}
895
896return {};
897}
898
899void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
900{
901if (node.children.size() != 1)
902return;
903
904auto * sorting = typeid_cast<SortingStep *>(node.step.get());
905if (!sorting)
906return;
907
908//std::cerr << "---- optimizeReadInOrder found sorting" << std::endl;
909
910if (sorting->getType() != SortingStep::Type::Full)
911return;
912
913StepStack steps_to_update;
914if (typeid_cast<UnionStep *>(node.children.front()->step.get()))
915{
916auto & union_node = node.children.front();
917
918std::vector<InputOrderInfoPtr> infos;
919const SortDescription * max_sort_descr = nullptr;
920infos.reserve(node.children.size());
921for (auto * child : union_node->children)
922{
923infos.push_back(buildInputOrderInfo(*sorting, *child, steps_to_update));
924
925if (infos.back() && (!max_sort_descr || max_sort_descr->size() < infos.back()->sort_description_for_merging.size()))
926max_sort_descr = &infos.back()->sort_description_for_merging;
927}
928
929if (!max_sort_descr || max_sort_descr->empty())
930return;
931
932for (size_t i = 0; i < infos.size(); ++i)
933{
934const auto & info = infos[i];
935auto & child = union_node->children[i];
936
937QueryPlanStepPtr additional_sorting;
938
939if (!info)
940{
941auto limit = sorting->getLimit();
942/// If we have limit, it's better to sort up to full description and apply limit.
943/// We cannot sort up to partial read-in-order description with limit cause result set can be wrong.
944const auto & descr = limit ? sorting->getSortDescription() : *max_sort_descr;
945additional_sorting = std::make_unique<SortingStep>(
946child->step->getOutputStream(),
947descr,
948limit, /// TODO: support limit with ties
949sorting->getSettings(),
950false);
951}
952else if (info->sort_description_for_merging.size() < max_sort_descr->size())
953{
954additional_sorting = std::make_unique<SortingStep>(
955child->step->getOutputStream(),
956info->sort_description_for_merging,
957*max_sort_descr,
958sorting->getSettings().max_block_size,
9590); /// TODO: support limit with ties
960}
961
962if (additional_sorting)
963{
964auto & sort_node = nodes.emplace_back();
965sort_node.step = std::move(additional_sorting);
966sort_node.children.push_back(child);
967child = &sort_node;
968}
969}
970
971sorting->convertToFinishSorting(*max_sort_descr);
972}
973else if (auto order_info = buildInputOrderInfo(*sorting, *node.children.front(), steps_to_update))
974{
975sorting->convertToFinishSorting(order_info->sort_description_for_merging);
976/// update data stream's sorting properties
977updateStepsDataStreams(steps_to_update);
978}
979}
980
981void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &)
982{
983if (node.children.size() != 1)
984return;
985
986auto * aggregating = typeid_cast<AggregatingStep *>(node.step.get());
987if (!aggregating)
988return;
989
990if ((aggregating->inOrder() && !aggregating->explicitSortingRequired()) || aggregating->isGroupingSets())
991return;
992
993/// It just does not work, see 02515_projections_with_totals
994if (aggregating->getParams().overflow_row)
995return;
996
997/// TODO: maybe add support for UNION later.
998std::vector<IQueryPlanStep*> steps_to_update;
999if (auto order_info = buildInputOrderInfo(*aggregating, *node.children.front(), steps_to_update); order_info.input_order)
1000{
1001aggregating->applyOrder(std::move(order_info.sort_description_for_merging), std::move(order_info.group_by_sort_description));
1002/// update data stream's sorting properties
1003updateStepsDataStreams(steps_to_update);
1004}
1005}
1006
1007/// This optimization is obsolete and will be removed.
1008/// optimizeReadInOrder covers it.
1009size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & /*nodes*/)
1010{
1011/// Find the following sequence of steps, add InputOrderInfo and apply prefix sort description to
1012/// SortingStep:
1013/// WindowStep <- SortingStep <- [Expression] <- ReadFromMergeTree
1014
1015auto * window_node = parent_node;
1016auto * window = typeid_cast<WindowStep *>(window_node->step.get());
1017if (!window)
1018return 0;
1019if (window_node->children.size() != 1)
1020return 0;
1021
1022auto * sorting_node = window_node->children.front();
1023auto * sorting = typeid_cast<SortingStep *>(sorting_node->step.get());
1024if (!sorting)
1025return 0;
1026if (sorting_node->children.size() != 1)
1027return 0;
1028
1029auto * possible_read_from_merge_tree_node = sorting_node->children.front();
1030
1031if (typeid_cast<ExpressionStep *>(possible_read_from_merge_tree_node->step.get()))
1032{
1033if (possible_read_from_merge_tree_node->children.size() != 1)
1034return 0;
1035
1036possible_read_from_merge_tree_node = possible_read_from_merge_tree_node->children.front();
1037}
1038
1039auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(possible_read_from_merge_tree_node->step.get());
1040if (!read_from_merge_tree)
1041{
1042return 0;
1043}
1044
1045auto context = read_from_merge_tree->getContext();
1046const auto & settings = context->getSettings();
1047if (!settings.optimize_read_in_window_order || (settings.optimize_read_in_order && settings.query_plan_read_in_order) || context->getSettingsRef().allow_experimental_analyzer)
1048{
1049return 0;
1050}
1051
1052const auto & query_info = read_from_merge_tree->getQueryInfo();
1053const auto * select_query = query_info.query->as<ASTSelectQuery>();
1054
1055/// TODO: Analyzer syntax analyzer result
1056if (!query_info.syntax_analyzer_result)
1057return 0;
1058
1059ManyExpressionActions order_by_elements_actions;
1060const auto & window_desc = window->getWindowDescription();
1061
1062for (const auto & actions_dag : window_desc.partition_by_actions)
1063{
1064order_by_elements_actions.emplace_back(
1065std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
1066}
1067
1068for (const auto & actions_dag : window_desc.order_by_actions)
1069{
1070order_by_elements_actions.emplace_back(
1071std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
1072}
1073
1074auto order_optimizer = std::make_shared<ReadInOrderOptimizer>(
1075*select_query,
1076order_by_elements_actions,
1077window->getWindowDescription().full_sort_description,
1078query_info.syntax_analyzer_result);
1079
1080/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
1081UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context);
1082
1083auto order_info = order_optimizer->getInputOrder(read_from_merge_tree->getStorageMetadata(), context, limit);
1084
1085if (order_info)
1086{
1087bool can_read = read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
1088if (!can_read)
1089return 0;
1090sorting->convertToFinishSorting(order_info->sort_description_for_merging);
1091}
1092
1093return 0;
1094}
1095
1096}
1097