ClickHouse

Форк
0
/
distinctReadInOrder.cpp 
148 строк · 6.0 Кб
1
#include <memory>
2
#include <Processors/QueryPlan/DistinctStep.h>
3
#include <Processors/QueryPlan/ExpressionStep.h>
4
#include <Processors/QueryPlan/FilterStep.h>
5
#include <Processors/QueryPlan/ITransformingStep.h>
6
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
7
#include <Processors/QueryPlan/ReadFromMergeTree.h>
8
#include <Common/typeid_cast.h>
9

10
namespace DB::QueryPlanOptimizations
11
{
12
/// build actions DAG from stack of steps
13
static ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
14
{
15
    if (dag_stack.empty())
16
        return nullptr;
17

18
    ActionsDAGPtr path_actions = dag_stack.back()->clone();
19
    dag_stack.pop_back();
20
    while (!dag_stack.empty())
21
    {
22
        ActionsDAGPtr clone = dag_stack.back()->clone();
23
        dag_stack.pop_back();
24
        path_actions->mergeInplace(std::move(*clone));
25
    }
26
    return path_actions;
27
}
28

29
static std::set<std::string>
30
getOriginalDistinctColumns(const ColumnsWithTypeAndName & distinct_columns, std::vector<ActionsDAGPtr> & dag_stack)
31
{
32
    auto actions = buildActionsForPlanPath(dag_stack);
33
    FindOriginalNodeForOutputName original_node_finder(actions);
34
    std::set<std::string> original_distinct_columns;
35
    for (const auto & column : distinct_columns)
36
    {
37
        /// const columns doesn't affect DISTINCT, so skip them
38
        if (isColumnConst(*column.column))
39
            continue;
40

41
        const auto * input_node = original_node_finder.find(column.name);
42
        if (!input_node)
43
            break;
44

45
        original_distinct_columns.insert(input_node->result_name);
46
    }
47
    return original_distinct_columns;
48
}
49

50
size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
51
{
52
    /// check if it is preliminary distinct node
53
    DistinctStep * pre_distinct = nullptr;
54
    if (auto * distinct = typeid_cast<DistinctStep *>(parent_node->step.get()); distinct)
55
    {
56
        if (distinct->isPreliminary())
57
            pre_distinct = distinct;
58
    }
59
    if (!pre_distinct)
60
        return 0;
61

62
    /// walk through the plan
63
    /// (1) check if nodes below preliminary distinct preserve sorting
64
    /// (2) gather transforming steps to update their sorting properties later
65
    /// (3) gather actions DAG to find original names for columns in distinct step later
66
    std::vector<ITransformingStep *> steps_to_update;
67
    QueryPlan::Node * node = parent_node;
68
    std::vector<ActionsDAGPtr> dag_stack;
69
    while (!node->children.empty())
70
    {
71
        auto * step = dynamic_cast<ITransformingStep *>(node->step.get());
72
        if (!step)
73
            return 0;
74

75
        const ITransformingStep::DataStreamTraits & traits = step->getDataStreamTraits();
76
        if (!traits.preserves_sorting)
77
            return 0;
78

79
        steps_to_update.push_back(step);
80

81
        if (const auto * const expr = typeid_cast<const ExpressionStep *>(step); expr)
82
            dag_stack.push_back(expr->getExpression());
83
        else if (const auto * const filter = typeid_cast<const FilterStep *>(step); filter)
84
            dag_stack.push_back(filter->getExpression());
85

86
        node = node->children.front();
87
    }
88

89
    /// check if we read from MergeTree
90
    auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(node->step.get());
91
    if (!read_from_merge_tree)
92
        return 0;
93

94
    /// if reading from merge tree doesn't provide any output order, we can do nothing
95
    /// it means that no ordering can provided or supported for a particular sorting key
96
    /// for example, tuple() or sipHash(string)
97
    if (read_from_merge_tree->getOutputStream().sort_description.empty())
98
        return 0;
99

100
    /// get original names for DISTINCT columns
101
    const ColumnsWithTypeAndName & distinct_columns = pre_distinct->getOutputStream().header.getColumnsWithTypeAndName();
102
    auto original_distinct_columns = getOriginalDistinctColumns(distinct_columns, dag_stack);
103

104
    /// check if DISTINCT has the same columns as sorting key
105
    const Names & sorting_key_columns = read_from_merge_tree->getStorageMetadata()->getSortingKeyColumns();
106
    size_t number_of_sorted_distinct_columns = 0;
107
    for (const auto & column_name : sorting_key_columns)
108
    {
109
        if (!original_distinct_columns.contains(column_name))
110
            break;
111

112
        ++number_of_sorted_distinct_columns;
113
    }
114

115
    /// apply optimization only when distinct columns match or form prefix of sorting key
116
    /// todo: check if reading in order optimization would be beneficial when sorting key is prefix of columns in DISTINCT
117
    if (number_of_sorted_distinct_columns != original_distinct_columns.size())
118
        return 0;
119

120
    /// check if another read in order optimization is already applied
121
    /// apply optimization only if another read in order one uses less sorting columns
122
    /// example: SELECT DISTINCT a, b FROM t ORDER BY a; -- sorting key: a, b
123
    /// if read in order for ORDER BY is already applied, then output sort description will contain only column `a`
124
    /// but we need columns `a, b`, applying read in order for distinct will still benefit `order by`
125
    const DataStream & output_data_stream = read_from_merge_tree->getOutputStream();
126
    const SortDescription & output_sort_desc = output_data_stream.sort_description;
127
    if (output_data_stream.sort_scope != DataStream::SortScope::Chunk && number_of_sorted_distinct_columns <= output_sort_desc.size())
128
        return 0;
129

130
    /// update input order info in read_from_merge_tree step
131
    const int direction = 0; /// for DISTINCT direction doesn't matter, ReadFromMergeTree will choose proper one
132
    bool can_read = read_from_merge_tree->requestReadingInOrder(number_of_sorted_distinct_columns, direction, pre_distinct->getLimitHint());
133
    if (!can_read)
134
        return 0;
135

136
    /// update data stream's sorting properties for found transforms
137
    const DataStream * input_stream = &read_from_merge_tree->getOutputStream();
138
    while (!steps_to_update.empty())
139
    {
140
        steps_to_update.back()->updateInputStream(*input_stream);
141
        input_stream = &steps_to_update.back()->getOutputStream();
142
        steps_to_update.pop_back();
143
    }
144

145
    return 0;
146
}
147

148
}
149

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.