ClickHouse

Форк
0
/
useDataParallelAggregation.cpp 
219 строк · 8.5 Кб
1
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
2

3
#include <Functions/IFunction.h>
4
#include <Processors/QueryPlan/AggregatingStep.h>
5
#include <Processors/QueryPlan/ExpressionStep.h>
6
#include <Processors/QueryPlan/FilterStep.h>
7
#include <Processors/QueryPlan/Optimizations/actionsDAGUtils.h>
8
#include <Processors/QueryPlan/ReadFromMergeTree.h>
9

10
#include <stack>
11
#include <unordered_map>
12

13
using namespace DB;
14

15
namespace
16
{
17

18
using NodeSet = std::unordered_set<const ActionsDAG::Node *>;
19
using NodeMap = std::unordered_map<const ActionsDAG::Node *, bool>;
20

21
struct Frame
22
{
23
    const ActionsDAG::Node * node = nullptr;
24
    size_t next_child = 0;
25
};
26

27
bool isInjectiveFunction(const ActionsDAG::Node * node)
28
{
29
    if (node->function_base->isInjective({}))
30
        return true;
31

32
    size_t fixed_args = 0;
33
    for (const auto & child : node->children)
34
        if (child->type == ActionsDAG::ActionType::COLUMN)
35
            ++fixed_args;
36
    static const std::vector<String> injective = {"plus", "minus", "negate", "tuple"};
37
    return (fixed_args + 1 >= node->children.size()) && (std::ranges::find(injective, node->function_base->getName()) != injective.end());
38
}
39

40
void removeInjectiveFunctionsFromResultsRecursively(const ActionsDAG::Node * node, NodeSet & irreducible, NodeSet & visited)
41
{
42
    if (visited.contains(node))
43
        return;
44
    visited.insert(node);
45

46
    switch (node->type)
47
    {
48
        case ActionsDAG::ActionType::ALIAS:
49
            assert(node->children.size() == 1);
50
            removeInjectiveFunctionsFromResultsRecursively(node->children.at(0), irreducible, visited);
51
            break;
52
        case ActionsDAG::ActionType::ARRAY_JOIN:
53
            UNREACHABLE();
54
        case ActionsDAG::ActionType::COLUMN:
55
            irreducible.insert(node);
56
            break;
57
        case ActionsDAG::ActionType::FUNCTION:
58
            if (!isInjectiveFunction(node))
59
            {
60
                irreducible.insert(node);
61
            }
62
            else
63
            {
64
                for (const auto & child : node->children)
65
                    removeInjectiveFunctionsFromResultsRecursively(child, irreducible, visited);
66
            }
67
            break;
68
        case ActionsDAG::ActionType::INPUT:
69
            irreducible.insert(node);
70
            break;
71
    }
72
}
73

74
/// Our objective is to replace injective function nodes in `actions` results with its children
75
/// until only the irreducible subset of nodes remains. Against these set of nodes we will match partition key expression
76
/// to determine if it maps all rows with the same value of group by key to the same partition.
77
NodeSet removeInjectiveFunctionsFromResultsRecursively(const ActionsDAGPtr & actions)
78
{
79
    NodeSet irreducible;
80
    NodeSet visited;
81
    for (const auto & node : actions->getOutputs())
82
        removeInjectiveFunctionsFromResultsRecursively(node, irreducible, visited);
83
    return irreducible;
84
}
85

86
bool allOutputsDependsOnlyOnAllowedNodes(
87
    const NodeSet & irreducible_nodes, const MatchedTrees::Matches & matches, const ActionsDAG::Node * node, NodeMap & visited)
88
{
89
    if (visited.contains(node))
90
        return visited[node];
91

92
    bool res = false;
93
    /// `matches` maps partition key nodes into nodes in group by actions
94
    if (matches.contains(node))
95
    {
96
        const auto & match = matches.at(node);
97
        /// Function could be mapped into its argument. In this case .monotonicity != std::nullopt (see matchTrees)
98
        if (match.node && !match.monotonicity)
99
            res = irreducible_nodes.contains(match.node);
100
    }
101

102
    if (!res)
103
    {
104
        switch (node->type)
105
        {
106
            case ActionsDAG::ActionType::ALIAS:
107
                assert(node->children.size() == 1);
108
                res = allOutputsDependsOnlyOnAllowedNodes(irreducible_nodes, matches, node->children.at(0), visited);
109
                break;
110
            case ActionsDAG::ActionType::ARRAY_JOIN:
111
                UNREACHABLE();
112
            case ActionsDAG::ActionType::COLUMN:
113
                /// Constants doesn't matter, so let's always consider them matched.
114
                res = true;
115
                break;
116
            case ActionsDAG::ActionType::FUNCTION:
117
                res = true;
118
                for (const auto & child : node->children)
119
                    res &= allOutputsDependsOnlyOnAllowedNodes(irreducible_nodes, matches, child, visited);
120
                break;
121
            case ActionsDAG::ActionType::INPUT:
122
                break;
123
        }
124
    }
125
    visited[node] = res;
126
    return res;
127
}
128

129
/// Here we check that partition key expression is a deterministic function of the reduced set of group by key nodes.
130
/// No need to explicitly check that each function is deterministic, because it is a guaranteed property of partition key expression (checked on table creation).
131
/// So it is left only to check that each output node depends only on the allowed set of nodes (`irreducible_nodes`).
132
bool allOutputsDependsOnlyOnAllowedNodes(
133
    const ActionsDAG & partition_actions, const NodeSet & irreducible_nodes, const MatchedTrees::Matches & matches)
134
{
135
    NodeMap visited;
136
    bool res = true;
137
    for (const auto & node : partition_actions.getOutputs())
138
        if (node->type != ActionsDAG::ActionType::INPUT)
139
            res &= allOutputsDependsOnlyOnAllowedNodes(irreducible_nodes, matches, node, visited);
140
    return res;
141
}
142

143
/// 0. Partition key columns should be a subset of group by key columns.
144
/// 1. Optimization is applicable if partition by expression is a deterministic function of col1, ..., coln and group by key is injective functions of these col1, ..., coln.
145
/// 2. To find col1, ..., coln we apply removeInjectiveFunctionsFromResultsRecursively to group by key actions.
146
/// 3. We match partition key actions with group by key actions to find col1', ..., coln' in partition key actions.
147
/// 4. We check that partition key is indeed a deterministic function of col1', ..., coln'.
148
bool isPartitionKeySuitsGroupByKey(
149
    const ReadFromMergeTree & reading, const ActionsDAGPtr & group_by_actions, const AggregatingStep & aggregating)
150
{
151
    if (aggregating.isGroupingSets())
152
        return false;
153

154
    if (group_by_actions->hasArrayJoin() || group_by_actions->hasStatefulFunctions() || group_by_actions->hasNonDeterministic())
155
        return false;
156

157
    /// We are interested only in calculations required to obtain group by keys (and not aggregate function arguments for example).
158
    auto key_nodes = group_by_actions->findInOutpus(aggregating.getParams().keys);
159
    auto group_by_key_actions = ActionsDAG::cloneSubDAG(key_nodes, /*remove_aliases=*/ true);
160

161
    const auto & gb_key_required_columns = group_by_key_actions->getRequiredColumnsNames();
162

163
    const auto & partition_actions = reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG();
164

165
    /// Check that PK columns is a subset of GBK columns.
166
    for (const auto & col : partition_actions.getRequiredColumnsNames())
167
        if (std::ranges::find(gb_key_required_columns, col) == gb_key_required_columns.end())
168
            return false;
169

170
    const auto irreducibe_nodes = removeInjectiveFunctionsFromResultsRecursively(group_by_key_actions);
171

172
    const auto matches = matchTrees(group_by_key_actions->getOutputs(), partition_actions);
173

174
    return allOutputsDependsOnlyOnAllowedNodes(partition_actions, irreducibe_nodes, matches);
175
}
176
}
177

178
namespace DB::QueryPlanOptimizations
179
{
180

181
size_t tryAggregatePartitionsIndependently(QueryPlan::Node * node, QueryPlan::Nodes &)
182
{
183
    if (!node || node->children.size() != 1)
184
        return 0;
185

186
    auto * aggregating_step = typeid_cast<AggregatingStep *>(node->step.get());
187
    if (!aggregating_step)
188
        return 0;
189

190
    const auto * expression_node = node->children.front();
191
    const auto * expression_step = typeid_cast<const ExpressionStep *>(expression_node->step.get());
192
    if (!expression_step)
193
        return 0;
194

195
    auto * maybe_reading_step = expression_node->children.front()->step.get();
196

197
    if (const auto * filter = typeid_cast<const FilterStep *>(maybe_reading_step))
198
    {
199
        const auto * filter_node = expression_node->children.front();
200
        if (filter_node->children.size() != 1 || !filter_node->children.front()->step)
201
            return 0;
202
        maybe_reading_step = filter_node->children.front()->step.get();
203
    }
204

205
    auto * reading = typeid_cast<ReadFromMergeTree *>(maybe_reading_step);
206
    if (!reading)
207
        return 0;
208

209
    if (!reading->willOutputEachPartitionThroughSeparatePort()
210
        && isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression(), *aggregating_step))
211
    {
212
        if (reading->requestOutputEachPartitionThroughSeparatePort())
213
            aggregating_step->skipMerging();
214
    }
215

216
    return 0;
217
}
218

219
}
220

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.