ClickHouse

Форк
0
/
optimizeReadInOrder.cpp 
1096 строк · 39.4 Кб
1
#include <Columns/IColumn.h>
2
#include <DataTypes/DataTypeAggregateFunction.h>
3
#include <Functions/IFunction.h>
4
#include <Interpreters/ActionsDAG.h>
5
#include <Interpreters/ArrayJoinAction.h>
6
#include <Interpreters/InterpreterSelectQuery.h>
7
#include <Interpreters/TableJoin.h>
8
#include <Parsers/ASTWindowDefinition.h>
9
#include <Processors/QueryPlan/AggregatingStep.h>
10
#include <Processors/QueryPlan/ArrayJoinStep.h>
11
#include <Processors/QueryPlan/CreatingSetsStep.h>
12
#include <Processors/QueryPlan/CubeStep.h>
13
#include <Processors/QueryPlan/DistinctStep.h>
14
#include <Processors/QueryPlan/ExpressionStep.h>
15
#include <Processors/QueryPlan/FilterStep.h>
16
#include <Processors/QueryPlan/ITransformingStep.h>
17
#include <Processors/QueryPlan/JoinStep.h>
18
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
19
#include <Processors/QueryPlan/Optimizations/actionsDAGUtils.h>
20
#include <Processors/QueryPlan/ReadFromMergeTree.h>
21
#include <Processors/QueryPlan/SortingStep.h>
22
#include <Processors/QueryPlan/TotalsHavingStep.h>
23
#include <Processors/QueryPlan/UnionStep.h>
24
#include <Processors/QueryPlan/WindowStep.h>
25
#include <Storages/StorageMerge.h>
26
#include <Common/typeid_cast.h>
27

28
#include <stack>
29

30

31
namespace DB::QueryPlanOptimizations
32
{
33

34
static ISourceStep * checkSupportedReadingStep(IQueryPlanStep * step)
35
{
36
    if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
37
    {
38
        /// Already read-in-order, skip.
39
        if (reading->getQueryInfo().input_order_info)
40
            return nullptr;
41

42
        const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
43
        if (sorting_key.column_names.empty())
44
            return nullptr;
45

46
        return reading;
47
    }
48

49
    if (auto * merge = typeid_cast<ReadFromMerge *>(step))
50
    {
51
        const auto & tables = merge->getSelectedTables();
52
        if (tables.empty())
53
            return nullptr;
54

55
        for (const auto & table : tables)
56
        {
57
            auto storage = std::get<StoragePtr>(table);
58
            const auto & sorting_key = storage->getInMemoryMetadataPtr()->getSortingKey();
59
            if (sorting_key.column_names.empty())
60
                return nullptr;
61
        }
62

63
        return merge;
64
    }
65

66
    return nullptr;
67
}
68

69
using StepStack = std::vector<IQueryPlanStep*>;
70

71
static QueryPlan::Node * findReadingStep(QueryPlan::Node & node, StepStack & backward_path)
72
{
73
    IQueryPlanStep * step = node.step.get();
74
    if (auto * reading = checkSupportedReadingStep(step))
75
    {
76
        backward_path.push_back(node.step.get());
77
        return &node;
78
    }
79

80
    if (node.children.size() != 1)
81
        return nullptr;
82

83
    backward_path.push_back(node.step.get());
84

85
    if (typeid_cast<ExpressionStep *>(step) || typeid_cast<FilterStep *>(step) || typeid_cast<ArrayJoinStep *>(step))
86
        return findReadingStep(*node.children.front(), backward_path);
87

88
    if (auto * distinct = typeid_cast<DistinctStep *>(step); distinct && distinct->isPreliminary())
89
        return findReadingStep(*node.children.front(), backward_path);
90

91
    return nullptr;
92
}
93

94
void updateStepsDataStreams(StepStack & steps_to_update)
95
{
96
    /// update data stream's sorting properties for found transforms
97
    if (!steps_to_update.empty())
98
    {
99
        const DataStream * input_stream = &steps_to_update.back()->getOutputStream();
100
        chassert(dynamic_cast<ISourceStep *>(steps_to_update.back()));
101
        steps_to_update.pop_back();
102

103
        while (!steps_to_update.empty())
104
        {
105
            auto * transforming_step = dynamic_cast<ITransformingStep *>(steps_to_update.back());
106
            if (!transforming_step)
107
                break;
108

109
            transforming_step->updateInputStream(*input_stream);
110
            input_stream = &steps_to_update.back()->getOutputStream();
111
            steps_to_update.pop_back();
112
        }
113
    }
114
}
115

116
/// FixedColumns are columns which values become constants after filtering.
117
/// In a query "SELECT x, y, z FROM table WHERE x = 1 AND y = 'a' ORDER BY x, y, z"
118
/// Fixed columns are 'x' and 'y'.
119
using FixedColumns = std::unordered_set<const ActionsDAG::Node *>;
120

121
/// Right now we find only simple cases like 'and(..., and(..., and(column = value, ...), ...'
122
/// Injective functions are supported here. For a condition 'injectiveFunction(x) = 5' column 'x' is fixed.
123
static void appendFixedColumnsFromFilterExpression(const ActionsDAG::Node & filter_expression, FixedColumns & fixed_columns)
124
{
125
    std::stack<const ActionsDAG::Node *> stack;
126
    stack.push(&filter_expression);
127

128
    while (!stack.empty())
129
    {
130
        const auto * node = stack.top();
131
        stack.pop();
132
        if (node->type == ActionsDAG::ActionType::FUNCTION)
133
        {
134
            const auto & name = node->function_base->getName();
135
            if (name == "and")
136
            {
137
                for (const auto * arg : node->children)
138
                    stack.push(arg);
139
            }
140
            else if (name == "equals")
141
            {
142
                const ActionsDAG::Node * maybe_fixed_column = nullptr;
143
                size_t num_constant_columns = 0;
144
                for (const auto & child : node->children)
145
                {
146
                    if (child->column)
147
                        ++num_constant_columns;
148
                    else
149
                        maybe_fixed_column = child;
150
                }
151

152
                if (maybe_fixed_column && num_constant_columns + 1 == node->children.size())
153
                {
154
                    //std::cerr << "====== Added fixed column " << maybe_fixed_column->result_name << ' ' << static_cast<const void *>(maybe_fixed_column) << std::endl;
155
                    fixed_columns.insert(maybe_fixed_column);
156

157
                    /// Support injective functions chain.
158
                    const ActionsDAG::Node * maybe_injective = maybe_fixed_column;
159
                    while (maybe_injective->type == ActionsDAG::ActionType::FUNCTION
160
                        && maybe_injective->children.size() == 1
161
                        && maybe_injective->function_base->isInjective({}))
162
                    {
163
                        maybe_injective = maybe_injective->children.front();
164
                        fixed_columns.insert(maybe_injective);
165
                    }
166
                }
167
            }
168
        }
169
    }
170
}
171

172
static void appendExpression(ActionsDAGPtr & dag, const ActionsDAGPtr & expression)
173
{
174
    if (dag)
175
        dag->mergeInplace(std::move(*expression->clone()));
176
    else
177
        dag = expression->clone();
178

179
    dag->projectInput(false);
180
}
181

182
/// This function builds a common DAG which is a merge of DAGs from Filter and Expression steps chain.
183
/// Additionally, build a set of fixed columns.
184
void buildSortingDAG(QueryPlan::Node & node, ActionsDAGPtr & dag, FixedColumns & fixed_columns, size_t & limit)
185
{
186
    IQueryPlanStep * step = node.step.get();
187
    if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
188
    {
189
        if (const auto prewhere_info = reading->getPrewhereInfo())
190
        {
191
            /// Should ignore limit if there is filtering.
192
            limit = 0;
193

194
            if (prewhere_info->prewhere_actions)
195
            {
196
                //std::cerr << "====== Adding prewhere " << std::endl;
197
                appendExpression(dag, prewhere_info->prewhere_actions);
198
                if (const auto * filter_expression = dag->tryFindInOutputs(prewhere_info->prewhere_column_name))
199
                    appendFixedColumnsFromFilterExpression(*filter_expression, fixed_columns);
200
            }
201
        }
202
        return;
203
    }
204

205
    if (node.children.size() != 1)
206
        return;
207

208
    buildSortingDAG(*node.children.front(), dag, fixed_columns, limit);
209

210
    if (auto * expression = typeid_cast<ExpressionStep *>(step))
211
    {
212
        const auto & actions = expression->getExpression();
213

214
        /// Should ignore limit because arrayJoin() can reduce the number of rows in case of empty array.
215
        if (actions->hasArrayJoin())
216
            limit = 0;
217

218
        appendExpression(dag, actions);
219
    }
220

221
    if (auto * filter = typeid_cast<FilterStep *>(step))
222
    {
223
        /// Should ignore limit if there is filtering.
224
        limit = 0;
225

226
        appendExpression(dag, filter->getExpression());
227
        if (const auto * filter_expression = dag->tryFindInOutputs(filter->getFilterColumnName()))
228
            appendFixedColumnsFromFilterExpression(*filter_expression, fixed_columns);
229
    }
230

231
    if (auto * array_join = typeid_cast<ArrayJoinStep *>(step))
232
    {
233
        /// Should ignore limit because ARRAY JOIN can reduce the number of rows in case of empty array.
234
        /// But in case of LEFT ARRAY JOIN the result number of rows is always bigger.
235
        if (!array_join->arrayJoin()->is_left)
236
            limit = 0;
237

238
        const auto & array_joined_columns = array_join->arrayJoin()->columns;
239

240
        if (dag)
241
        {
242
            /// Remove array joined columns from outputs.
243
            /// Types are changed after ARRAY JOIN, and we can't use this columns anyway.
244
            ActionsDAG::NodeRawConstPtrs outputs;
245
            outputs.reserve(dag->getOutputs().size());
246

247
            for (const auto & output : dag->getOutputs())
248
            {
249
                if (!array_joined_columns.contains(output->result_name))
250
                    outputs.push_back(output);
251
            }
252

253
            dag->getOutputs() = std::move(outputs);
254
        }
255
    }
256
}
257

258
/// Add more functions to fixed columns.
259
/// Functions result is fixed if all arguments are fixed or constants.
260
void enreachFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
261
{
262
    struct Frame
263
    {
264
        const ActionsDAG::Node * node;
265
        size_t next_child = 0;
266
    };
267

268
    std::stack<Frame> stack;
269
    std::unordered_set<const ActionsDAG::Node *> visited;
270
    for (const auto & node : dag.getNodes())
271
    {
272
        if (visited.contains(&node))
273
            continue;
274

275
        stack.push({&node});
276
        visited.insert(&node);
277
        while (!stack.empty())
278
        {
279
            auto & frame = stack.top();
280
            for (; frame.next_child < frame.node->children.size(); ++frame.next_child)
281
                if (!visited.contains(frame.node->children[frame.next_child]))
282
                    break;
283

284
            if (frame.next_child < frame.node->children.size())
285
            {
286
                const auto * child = frame.node->children[frame.next_child];
287
                visited.insert(child);
288
                stack.push({child});
289
                ++frame.next_child;
290
            }
291
            else
292
            {
293
                /// Ignore constants here, will check them separately
294
                if (!frame.node->column)
295
                {
296
                    if (frame.node->type == ActionsDAG::ActionType::ALIAS)
297
                    {
298
                        if (fixed_columns.contains(frame.node->children.at(0)))
299
                            fixed_columns.insert(frame.node);
300
                    }
301
                    else if (frame.node->type == ActionsDAG::ActionType::FUNCTION)
302
                    {
303
                        if (frame.node->function_base->isDeterministicInScopeOfQuery())
304
                        {
305
                            //std::cerr << "*** enreachFixedColumns check " << frame.node->result_name << std::endl;
306
                            bool all_args_fixed_or_const = true;
307
                            for (const auto * child : frame.node->children)
308
                            {
309
                                if (!child->column && !fixed_columns.contains(child))
310
                                {
311
                                    //std::cerr << "*** enreachFixedColumns fail " << child->result_name <<  ' ' << static_cast<const void *>(child) << std::endl;
312
                                    all_args_fixed_or_const = false;
313
                                }
314
                            }
315

316
                            if (all_args_fixed_or_const)
317
                            {
318
                                //std::cerr << "*** enreachFixedColumns add " << frame.node->result_name << ' ' << static_cast<const void *>(frame.node) << std::endl;
319
                                fixed_columns.insert(frame.node);
320
                            }
321
                        }
322
                    }
323
                }
324

325
                stack.pop();
326
            }
327
        }
328
    }
329
}
330

331
InputOrderInfoPtr buildInputOrderInfo(
332
    const FixedColumns & fixed_columns,
333
    const ActionsDAGPtr & dag,
334
    const SortDescription & description,
335
    const ActionsDAG & sorting_key_dag,
336
    const Names & sorting_key_columns,
337
    size_t limit)
338
{
339
    //std::cerr << "------- buildInputOrderInfo " << std::endl;
340
    SortDescription order_key_prefix_descr;
341
    order_key_prefix_descr.reserve(description.size());
342

343
    MatchedTrees::Matches matches;
344
    FixedColumns fixed_key_columns;
345

346
    if (dag)
347
    {
348
        matches = matchTrees(sorting_key_dag.getOutputs(), *dag);
349

350
        for (const auto & [node, match] : matches)
351
        {
352
            //std::cerr << "------- matching " << static_cast<const void *>(node) << " " << node->result_name
353
            //    << " to " << static_cast<const void *>(match.node) << " " << (match.node ? match.node->result_name : "") << std::endl;
354
            if (!match.monotonicity || match.monotonicity->strict)
355
            {
356
                if (match.node && fixed_columns.contains(node))
357
                    fixed_key_columns.insert(match.node);
358
            }
359
        }
360

361
        enreachFixedColumns(sorting_key_dag, fixed_key_columns);
362
    }
363

364
    /// This is a result direction we will read from MergeTree
365
    ///  1 - in order,
366
    /// -1 - in reverse order,
367
    ///  0 - usual read, don't apply optimization
368
    ///
369
    /// So far, 0 means any direction is possible. It is ok for constant prefix.
370
    int read_direction = 0;
371
    size_t next_description_column = 0;
372
    size_t next_sort_key = 0;
373

374
    while (next_description_column < description.size() && next_sort_key < sorting_key_columns.size())
375
    {
376
        const auto & sorting_key_column = sorting_key_columns[next_sort_key];
377
        const auto & sort_column_description = description[next_description_column];
378

379
        /// If required order depend on collation, it cannot be matched with primary key order.
380
        /// Because primary keys cannot have collations.
381
        if (sort_column_description.collator)
382
            break;
383

384
        /// Direction for current sort key.
385
        int current_direction = 0;
386
        bool strict_monotonic = true;
387

388
        const ActionsDAG::Node * sort_column_node = sorting_key_dag.tryFindInOutputs(sorting_key_column);
389
        /// This should not happen.
390
        if (!sort_column_node)
391
            break;
392

393
        if (!dag)
394
        {
395
            /// This is possible if there were no Expression or Filter steps in Plan.
396
            /// Example: SELECT * FROM tab ORDER BY a, b
397

398
            if (sort_column_node->type != ActionsDAG::ActionType::INPUT)
399
                break;
400

401
            if (sort_column_description.column_name != sorting_key_column)
402
                break;
403

404
            current_direction = sort_column_description.direction;
405

406

407
            //std::cerr << "====== (no dag) Found direct match" << std::endl;
408

409
            ++next_description_column;
410
            ++next_sort_key;
411
        }
412
        else
413
        {
414
            const ActionsDAG::Node * sort_node = dag->tryFindInOutputs(sort_column_description.column_name);
415
             /// It is possible when e.g. sort by array joined column.
416
            if (!sort_node)
417
                break;
418

419
            const auto & match = matches[sort_node];
420

421
            //std::cerr << "====== Finding match for " << sort_column_node->result_name << ' ' << static_cast<const void *>(sort_column_node) << std::endl;
422

423
            if (match.node && match.node == sort_column_node)
424
            {
425
                //std::cerr << "====== Found direct match" << std::endl;
426

427
                /// We try to find the match first even if column is fixed. In this case, potentially more keys will match.
428
                /// Example: 'table (x Int32, y Int32) ORDER BY x + 1, y + 1'
429
                ///          'SELECT x, y FROM table WHERE x = 42 ORDER BY x + 1, y + 1'
430
                /// Here, 'x + 1' would be a fixed point. But it is reasonable to read-in-order.
431

432
                current_direction = sort_column_description.direction;
433
                if (match.monotonicity)
434
                {
435
                    current_direction *= match.monotonicity->direction;
436
                    strict_monotonic = match.monotonicity->strict;
437
                }
438

439
                ++next_description_column;
440
                ++next_sort_key;
441
            }
442
            else if (fixed_key_columns.contains(sort_column_node))
443
            {
444
                //std::cerr << "+++++++++ Found fixed key by match" << std::endl;
445
                ++next_sort_key;
446
            }
447
            else
448
            {
449

450
                //std::cerr << "====== Check for fixed const : " << bool(sort_node->column) << " fixed : " << fixed_columns.contains(sort_node) << std::endl;
451
                bool is_fixed_column = sort_node->column || fixed_columns.contains(sort_node);
452
                if (!is_fixed_column)
453
                    break;
454

455
                order_key_prefix_descr.push_back(sort_column_description);
456
                ++next_description_column;
457
            }
458
        }
459

460
        /// read_direction == 0 means we can choose any global direction.
461
        /// current_direction == 0 means current key if fixed and any direction is possible for it.
462
        if (current_direction && read_direction && current_direction != read_direction)
463
            break;
464

465
        if (read_direction == 0)
466
            read_direction = current_direction;
467

468
        if (current_direction)
469
            order_key_prefix_descr.push_back(sort_column_description);
470

471
        if (current_direction && !strict_monotonic)
472
            break;
473
    }
474

475
    if (read_direction == 0 || order_key_prefix_descr.empty())
476
        return nullptr;
477

478
    return std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit);
479
}
480

481
/// We really need three different sort descriptions here.
482
/// For example:
483
///
484
///   create table tab (a Int32, b Int32, c Int32, d Int32) engine = MergeTree order by (a, b, c);
485
///   select a, any(b), c, d from tab where b = 1 group by a, c, d order by c, d;
486
///
487
/// We would like to have:
488
/// (a, b, c) - a sort description for reading from table (it's into input_order)
489
/// (a, c) - a sort description for merging (an input of AggregatingInOrderTransfrom is sorted by this GROUP BY keys)
490
/// (a, c, d) - a group by soer description (an input of FinishAggregatingInOrderTransform is sorted by all GROUP BY keys)
491
///
492
/// Sort description from input_order is not actually used. ReadFromMergeTree reads only PK prefix size.
493
/// We should remove it later.
494
struct AggregationInputOrder
495
{
496
    InputOrderInfoPtr input_order;
497
    SortDescription sort_description_for_merging;
498
    SortDescription group_by_sort_description;
499
};
500

501
AggregationInputOrder buildInputOrderInfo(
502
    const FixedColumns & fixed_columns,
503
    const ActionsDAGPtr & dag,
504
    const Names & group_by_keys,
505
    const ActionsDAG & sorting_key_dag,
506
    const Names & sorting_key_columns)
507
{
508
    MatchedTrees::Matches matches;
509
    FixedColumns fixed_key_columns;
510

511
    /// For every column in PK find any match from GROUP BY key.
512
    using ReverseMatches = std::unordered_map<const ActionsDAG::Node *, MatchedTrees::Matches::const_iterator>;
513
    ReverseMatches reverse_matches;
514

515
    if (dag)
516
    {
517
        matches = matchTrees(sorting_key_dag.getOutputs(), *dag);
518

519
        for (const auto & [node, match] : matches)
520
        {
521
            if (!match.monotonicity || match.monotonicity->strict)
522
            {
523
                if (match.node && fixed_columns.contains(node))
524
                    fixed_key_columns.insert(match.node);
525
            }
526
        }
527

528
        enreachFixedColumns(sorting_key_dag, fixed_key_columns);
529

530
        for (const auto * output : dag->getOutputs())
531
        {
532
            auto it = matches.find(output);
533
            const MatchedTrees::Match * match = &it->second;
534
            if (match->node)
535
            {
536
                auto [jt, inserted] = reverse_matches.emplace(match->node, it);
537
                if (!inserted)
538
                {
539
                    /// Find the best match for PK node.
540
                    /// Direct match > strict monotonic > monotonic.
541
                    const MatchedTrees::Match * prev_match = &jt->second->second;
542
                    bool is_better = prev_match->monotonicity && !match->monotonicity;
543
                    if (!is_better)
544
                    {
545
                        bool both_monotionic = prev_match->monotonicity && match->monotonicity;
546
                        is_better = both_monotionic && match->monotonicity->strict && !prev_match->monotonicity->strict;
547
                    }
548

549
                    if (is_better)
550
                        jt->second = it;
551
                }
552
            }
553
        }
554
    }
555

556
    /// This is a result direction we will read from MergeTree
557
    ///  1 - in order,
558
    /// -1 - in reverse order,
559
    ///  0 - usual read, don't apply optimization
560
    ///
561
    /// So far, 0 means any direction is possible. It is ok for constant prefix.
562
    int read_direction = 0;
563
    size_t next_sort_key = 0;
564
    std::unordered_set<std::string_view> not_matched_group_by_keys(group_by_keys.begin(), group_by_keys.end());
565

566
    SortDescription group_by_sort_description;
567
    group_by_sort_description.reserve(group_by_keys.size());
568

569
    SortDescription order_key_prefix_descr;
570
    order_key_prefix_descr.reserve(sorting_key_columns.size());
571

572
    while (!not_matched_group_by_keys.empty() && next_sort_key < sorting_key_columns.size())
573
    {
574
        const auto & sorting_key_column = sorting_key_columns[next_sort_key];
575

576
        /// Direction for current sort key.
577
        int current_direction = 0;
578
        bool strict_monotonic = true;
579
        std::unordered_set<std::string_view>::iterator group_by_key_it;
580

581
        const ActionsDAG::Node * sort_column_node = sorting_key_dag.tryFindInOutputs(sorting_key_column);
582
        /// This should not happen.
583
        if (!sort_column_node)
584
            break;
585

586
        if (!dag)
587
        {
588
            /// This is possible if there were no Expression or Filter steps in Plan.
589
            /// Example: SELECT * FROM tab ORDER BY a, b
590

591
            if (sort_column_node->type != ActionsDAG::ActionType::INPUT)
592
                break;
593

594
            group_by_key_it = not_matched_group_by_keys.find(sorting_key_column);
595
            if (group_by_key_it == not_matched_group_by_keys.end())
596
                break;
597

598
            current_direction = 1;
599

600
            //std::cerr << "====== (no dag) Found direct match" << std::endl;
601
            ++next_sort_key;
602
        }
603
        else
604
        {
605
            const MatchedTrees::Match * match = nullptr;
606
            const ActionsDAG::Node * group_by_key_node = nullptr;
607
            if (const auto match_it = reverse_matches.find(sort_column_node); match_it != reverse_matches.end())
608
            {
609
                group_by_key_node = match_it->second->first;
610
                match = &match_it->second->second;
611
            }
612

613
            //std::cerr << "====== Finding match for " << sort_column_node->result_name << ' ' << static_cast<const void *>(sort_column_node) << std::endl;
614

615
            if (match && match->node)
616
                group_by_key_it = not_matched_group_by_keys.find(group_by_key_node->result_name);
617

618
            if (match && match->node && group_by_key_it != not_matched_group_by_keys.end())
619
            {
620
                //std::cerr << "====== Found direct match" << std::endl;
621

622
                current_direction = 1;
623
                if (match->monotonicity)
624
                {
625
                    current_direction *= match->monotonicity->direction;
626
                    strict_monotonic = match->monotonicity->strict;
627
                }
628

629
                ++next_sort_key;
630
            }
631
            else if (fixed_key_columns.contains(sort_column_node))
632
            {
633
                //std::cerr << "+++++++++ Found fixed key by match" << std::endl;
634
                ++next_sort_key;
635
            }
636
            else
637
                break;
638
        }
639

640
        /// read_direction == 0 means we can choose any global direction.
641
        /// current_direction == 0 means current key if fixed and any direction is possible for it.
642
        if (current_direction && read_direction && current_direction != read_direction)
643
            break;
644

645
        if (read_direction == 0 && current_direction != 0)
646
            read_direction = current_direction;
647

648
        if (current_direction)
649
        {
650
            /// Aggregation in order will always read in table order.
651
            /// Here, current_direction is a direction which will be applied to every key.
652
            /// Example:
653
            ///   CREATE TABLE t (x, y, z) ENGINE = MergeTree ORDER BY (x, y)
654
            ///   SELECT ... FROM t GROUP BY negate(y), negate(x), z
655
            /// Here, current_direction will be -1 cause negate() is negative montonic,
656
            /// Prefix sort description for reading will be (negate(y) DESC, negate(x) DESC),
657
            /// Sort description for GROUP BY will be (negate(y) DESC, negate(x) DESC, z).
658
            //std::cerr << "---- adding " << std::string(*group_by_key_it) << std::endl;
659
            group_by_sort_description.emplace_back(SortColumnDescription(std::string(*group_by_key_it), current_direction));
660
            order_key_prefix_descr.emplace_back(SortColumnDescription(std::string(*group_by_key_it), current_direction));
661
            not_matched_group_by_keys.erase(group_by_key_it);
662
        }
663
        else
664
        {
665
            /// If column is fixed, will read it in table order as well.
666
            //std::cerr << "---- adding " << sorting_key_column << std::endl;
667
            order_key_prefix_descr.emplace_back(SortColumnDescription(sorting_key_column, 1));
668
        }
669

670
        if (current_direction && !strict_monotonic)
671
            break;
672
    }
673

674
    if (read_direction == 0 || group_by_sort_description.empty())
675
        return {};
676

677
    SortDescription sort_description_for_merging = group_by_sort_description;
678

679
    for (const auto & key : not_matched_group_by_keys)
680
        group_by_sort_description.emplace_back(SortColumnDescription(std::string(key)));
681

682
    auto input_order = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, /*read_direction*/ 1, /* limit */ 0);
683
    return { std::move(input_order), std::move(sort_description_for_merging), std::move(group_by_sort_description) };
684
}
685

686
InputOrderInfoPtr buildInputOrderInfo(
687
    const ReadFromMergeTree * reading,
688
    const FixedColumns & fixed_columns,
689
    const ActionsDAGPtr & dag,
690
    const SortDescription & description,
691
    size_t limit)
692
{
693
    const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
694
    const auto & sorting_key_columns = sorting_key.column_names;
695

696
    return buildInputOrderInfo(
697
        fixed_columns,
698
        dag, description,
699
        sorting_key.expression->getActionsDAG(), sorting_key_columns,
700
        limit);
701
}
702

703
InputOrderInfoPtr buildInputOrderInfo(
704
    ReadFromMerge * merge,
705
    const FixedColumns & fixed_columns,
706
    const ActionsDAGPtr & dag,
707
    const SortDescription & description,
708
    size_t limit)
709
{
710
    const auto & tables = merge->getSelectedTables();
711

712
    InputOrderInfoPtr order_info;
713
    for (const auto & table : tables)
714
    {
715
        auto storage = std::get<StoragePtr>(table);
716
        const auto & sorting_key = storage->getInMemoryMetadataPtr()->getSortingKey();
717
        const auto & sorting_key_columns = sorting_key.column_names;
718

719
        if (sorting_key_columns.empty())
720
            return nullptr;
721

722
        auto table_order_info = buildInputOrderInfo(
723
            fixed_columns,
724
            dag, description,
725
            sorting_key.expression->getActionsDAG(), sorting_key_columns,
726
            limit);
727

728
        if (!table_order_info)
729
            return nullptr;
730

731
        if (!order_info)
732
            order_info = table_order_info;
733
        else if (*order_info != *table_order_info)
734
            return nullptr;
735
    }
736

737
    return order_info;
738
}
739

740
AggregationInputOrder buildInputOrderInfo(
741
    ReadFromMergeTree * reading,
742
    const FixedColumns & fixed_columns,
743
    const ActionsDAGPtr & dag,
744
    const Names & group_by_keys)
745
{
746
    const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
747
    const auto & sorting_key_columns = sorting_key.column_names;
748

749
    return buildInputOrderInfo(
750
        fixed_columns,
751
        dag, group_by_keys,
752
        sorting_key.expression->getActionsDAG(), sorting_key_columns);
753
}
754

755
AggregationInputOrder buildInputOrderInfo(
756
    ReadFromMerge * merge,
757
    const FixedColumns & fixed_columns,
758
    const ActionsDAGPtr & dag,
759
    const Names & group_by_keys)
760
{
761
    const auto & tables = merge->getSelectedTables();
762

763
    AggregationInputOrder order_info;
764
    for (const auto & table : tables)
765
    {
766
        auto storage = std::get<StoragePtr>(table);
767
        const auto & sorting_key = storage->getInMemoryMetadataPtr()->getSortingKey();
768
        const auto & sorting_key_columns = sorting_key.column_names;
769

770
        if (sorting_key_columns.empty())
771
            return {};
772

773
        auto table_order_info = buildInputOrderInfo(
774
            fixed_columns,
775
            dag, group_by_keys,
776
            sorting_key.expression->getActionsDAG(), sorting_key_columns);
777

778
        if (!table_order_info.input_order)
779
            return {};
780

781
        if (!order_info.input_order)
782
            order_info = table_order_info;
783
        else if (*order_info.input_order != *table_order_info.input_order)
784
            return {};
785
    }
786

787
    return order_info;
788
}
789

790
InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & node, StepStack & backward_path)
791
{
792
    QueryPlan::Node * reading_node = findReadingStep(node, backward_path);
793
    if (!reading_node)
794
        return nullptr;
795

796
    const auto & description = sorting.getSortDescription();
797
    size_t limit = sorting.getLimit();
798

799
    ActionsDAGPtr dag;
800
    FixedColumns fixed_columns;
801
    buildSortingDAG(node, dag, fixed_columns, limit);
802

803
    if (dag && !fixed_columns.empty())
804
        enreachFixedColumns(*dag, fixed_columns);
805

806
    if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
807
    {
808
        auto order_info = buildInputOrderInfo(
809
            reading,
810
            fixed_columns,
811
            dag, description,
812
            limit);
813

814
        if (order_info)
815
        {
816
            bool can_read = reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
817
            if (!can_read)
818
                return nullptr;
819
        }
820

821
        return order_info;
822
    }
823
    else if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
824
    {
825
        auto order_info = buildInputOrderInfo(
826
            merge,
827
            fixed_columns,
828
            dag, description,
829
            limit);
830

831
        if (order_info)
832
        {
833
            bool can_read = merge->requestReadingInOrder(order_info);
834
            if (!can_read)
835
                return nullptr;
836
        }
837

838
        return order_info;
839
    }
840

841
    return nullptr;
842
}
843

844
AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPlan::Node & node, StepStack & backward_path)
845
{
846
    QueryPlan::Node * reading_node = findReadingStep(node, backward_path);
847
    if (!reading_node)
848
        return {};
849

850
    const auto & keys = aggregating.getParams().keys;
851
    size_t limit = 0;
852

853
    ActionsDAGPtr dag;
854
    FixedColumns fixed_columns;
855
    buildSortingDAG(node, dag, fixed_columns, limit);
856

857
    if (dag && !fixed_columns.empty())
858
        enreachFixedColumns(*dag, fixed_columns);
859

860
    if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
861
    {
862
        auto order_info = buildInputOrderInfo(
863
            reading,
864
            fixed_columns,
865
            dag, keys);
866

867
        if (order_info.input_order)
868
        {
869
            bool can_read = reading->requestReadingInOrder(
870
                order_info.input_order->used_prefix_of_sorting_key_size,
871
                order_info.input_order->direction,
872
                order_info.input_order->limit);
873
            if (!can_read)
874
                return {};
875
        }
876

877
        return order_info;
878
    }
879
    else if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
880
    {
881
        auto order_info = buildInputOrderInfo(
882
            merge,
883
            fixed_columns,
884
            dag, keys);
885

886
        if (order_info.input_order)
887
        {
888
            bool can_read = merge->requestReadingInOrder(order_info.input_order);
889
            if (!can_read)
890
                return {};
891
        }
892

893
        return order_info;
894
    }
895

896
    return {};
897
}
898

899
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
900
{
901
    if (node.children.size() != 1)
902
        return;
903

904
    auto * sorting = typeid_cast<SortingStep *>(node.step.get());
905
    if (!sorting)
906
        return;
907

908
    //std::cerr << "---- optimizeReadInOrder found sorting" << std::endl;
909

910
    if (sorting->getType() != SortingStep::Type::Full)
911
        return;
912

913
    StepStack steps_to_update;
914
    if (typeid_cast<UnionStep *>(node.children.front()->step.get()))
915
    {
916
        auto & union_node = node.children.front();
917

918
        std::vector<InputOrderInfoPtr> infos;
919
        const SortDescription * max_sort_descr = nullptr;
920
        infos.reserve(node.children.size());
921
        for (auto * child : union_node->children)
922
        {
923
            infos.push_back(buildInputOrderInfo(*sorting, *child, steps_to_update));
924

925
            if (infos.back() && (!max_sort_descr || max_sort_descr->size() < infos.back()->sort_description_for_merging.size()))
926
                max_sort_descr = &infos.back()->sort_description_for_merging;
927
        }
928

929
        if (!max_sort_descr || max_sort_descr->empty())
930
            return;
931

932
        for (size_t i = 0; i < infos.size(); ++i)
933
        {
934
            const auto & info = infos[i];
935
            auto & child = union_node->children[i];
936

937
            QueryPlanStepPtr additional_sorting;
938

939
            if (!info)
940
            {
941
                auto limit = sorting->getLimit();
942
                /// If we have limit, it's better to sort up to full description and apply limit.
943
                /// We cannot sort up to partial read-in-order description with limit cause result set can be wrong.
944
                const auto & descr = limit ? sorting->getSortDescription() : *max_sort_descr;
945
                additional_sorting = std::make_unique<SortingStep>(
946
                    child->step->getOutputStream(),
947
                    descr,
948
                    limit, /// TODO: support limit with ties
949
                    sorting->getSettings(),
950
                    false);
951
            }
952
            else if (info->sort_description_for_merging.size() < max_sort_descr->size())
953
            {
954
                additional_sorting = std::make_unique<SortingStep>(
955
                    child->step->getOutputStream(),
956
                    info->sort_description_for_merging,
957
                    *max_sort_descr,
958
                    sorting->getSettings().max_block_size,
959
                    0); /// TODO: support limit with ties
960
            }
961

962
            if (additional_sorting)
963
            {
964
                auto & sort_node = nodes.emplace_back();
965
                sort_node.step = std::move(additional_sorting);
966
                sort_node.children.push_back(child);
967
                child = &sort_node;
968
            }
969
        }
970

971
        sorting->convertToFinishSorting(*max_sort_descr);
972
    }
973
    else if (auto order_info = buildInputOrderInfo(*sorting, *node.children.front(), steps_to_update))
974
    {
975
        sorting->convertToFinishSorting(order_info->sort_description_for_merging);
976
        /// update data stream's sorting properties
977
        updateStepsDataStreams(steps_to_update);
978
    }
979
}
980

981
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &)
982
{
983
    if (node.children.size() != 1)
984
        return;
985

986
    auto * aggregating = typeid_cast<AggregatingStep *>(node.step.get());
987
    if (!aggregating)
988
        return;
989

990
    if ((aggregating->inOrder() && !aggregating->explicitSortingRequired()) || aggregating->isGroupingSets())
991
        return;
992

993
    /// It just does not work, see 02515_projections_with_totals
994
    if (aggregating->getParams().overflow_row)
995
        return;
996

997
    /// TODO: maybe add support for UNION later.
998
    std::vector<IQueryPlanStep*> steps_to_update;
999
    if (auto order_info = buildInputOrderInfo(*aggregating, *node.children.front(), steps_to_update); order_info.input_order)
1000
    {
1001
        aggregating->applyOrder(std::move(order_info.sort_description_for_merging), std::move(order_info.group_by_sort_description));
1002
        /// update data stream's sorting properties
1003
        updateStepsDataStreams(steps_to_update);
1004
    }
1005
}
1006

1007
/// This optimization is obsolete and will be removed.
1008
/// optimizeReadInOrder covers it.
1009
size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & /*nodes*/)
1010
{
1011
    /// Find the following sequence of steps, add InputOrderInfo and apply prefix sort description to
1012
    /// SortingStep:
1013
    /// WindowStep <- SortingStep <- [Expression] <- ReadFromMergeTree
1014

1015
    auto * window_node = parent_node;
1016
    auto * window = typeid_cast<WindowStep *>(window_node->step.get());
1017
    if (!window)
1018
        return 0;
1019
    if (window_node->children.size() != 1)
1020
        return 0;
1021

1022
    auto * sorting_node = window_node->children.front();
1023
    auto * sorting = typeid_cast<SortingStep *>(sorting_node->step.get());
1024
    if (!sorting)
1025
        return 0;
1026
    if (sorting_node->children.size() != 1)
1027
        return 0;
1028

1029
    auto * possible_read_from_merge_tree_node = sorting_node->children.front();
1030

1031
    if (typeid_cast<ExpressionStep *>(possible_read_from_merge_tree_node->step.get()))
1032
    {
1033
        if (possible_read_from_merge_tree_node->children.size() != 1)
1034
            return 0;
1035

1036
        possible_read_from_merge_tree_node = possible_read_from_merge_tree_node->children.front();
1037
    }
1038

1039
    auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(possible_read_from_merge_tree_node->step.get());
1040
    if (!read_from_merge_tree)
1041
    {
1042
        return 0;
1043
    }
1044

1045
    auto context = read_from_merge_tree->getContext();
1046
    const auto & settings = context->getSettings();
1047
    if (!settings.optimize_read_in_window_order || (settings.optimize_read_in_order && settings.query_plan_read_in_order) || context->getSettingsRef().allow_experimental_analyzer)
1048
    {
1049
        return 0;
1050
    }
1051

1052
    const auto & query_info = read_from_merge_tree->getQueryInfo();
1053
    const auto * select_query = query_info.query->as<ASTSelectQuery>();
1054

1055
    /// TODO: Analyzer syntax analyzer result
1056
    if (!query_info.syntax_analyzer_result)
1057
        return 0;
1058

1059
    ManyExpressionActions order_by_elements_actions;
1060
    const auto & window_desc = window->getWindowDescription();
1061

1062
    for (const auto & actions_dag : window_desc.partition_by_actions)
1063
    {
1064
        order_by_elements_actions.emplace_back(
1065
            std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
1066
    }
1067

1068
    for (const auto & actions_dag : window_desc.order_by_actions)
1069
    {
1070
        order_by_elements_actions.emplace_back(
1071
            std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
1072
    }
1073

1074
    auto order_optimizer = std::make_shared<ReadInOrderOptimizer>(
1075
            *select_query,
1076
            order_by_elements_actions,
1077
            window->getWindowDescription().full_sort_description,
1078
            query_info.syntax_analyzer_result);
1079

1080
    /// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
1081
    UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context);
1082

1083
    auto order_info = order_optimizer->getInputOrder(read_from_merge_tree->getStorageMetadata(), context, limit);
1084

1085
    if (order_info)
1086
    {
1087
        bool can_read = read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
1088
        if (!can_read)
1089
            return 0;
1090
        sorting->convertToFinishSorting(order_info->sort_description_for_merging);
1091
    }
1092

1093
    return 0;
1094
}
1095

1096
}
1097

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.