ClickHouse

Форк
0
/
removeRedundantDistinct.cpp 
305 строк · 11.6 Кб
1
#include <memory>
2
#include <Interpreters/ActionsDAG.h>
3
#include <Processors/QueryPlan/AggregatingStep.h>
4
#include <Processors/QueryPlan/ArrayJoinStep.h>
5
#include <Processors/QueryPlan/CubeStep.h>
6
#include <Processors/QueryPlan/DistinctStep.h>
7
#include <Processors/QueryPlan/ExpressionStep.h>
8
#include <Processors/QueryPlan/FillingStep.h>
9
#include <Processors/QueryPlan/FilterStep.h>
10
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
11
#include <Processors/QueryPlan/JoinStep.h>
12
#include <Processors/QueryPlan/LimitByStep.h>
13
#include <Processors/QueryPlan/LimitStep.h>
14
#include <Processors/QueryPlan/MergingAggregatedStep.h>
15
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
16
#include <Processors/QueryPlan/RollupStep.h>
17
#include <Processors/QueryPlan/SortingStep.h>
18
#include <Processors/QueryPlan/TotalsHavingStep.h>
19
#include <Processors/QueryPlan/UnionStep.h>
20
#include <Processors/QueryPlan/WindowStep.h>
21
#include <Common/logger_useful.h>
22
#include <Common/typeid_cast.h>
23

24
namespace DB::QueryPlanOptimizations
25
{
26

27
namespace
28
{
29
    constexpr bool debug_logging_enabled = false;
30

31
    template <typename T>
32
    void logDebug(String key, const T & value, const char * separator = " : ")
33
    {
34
        if constexpr (debug_logging_enabled)
35
        {
36
            WriteBufferFromOwnString ss;
37
            if constexpr (std::is_pointer_v<T>)
38
                ss << *value;
39
            else
40
                ss << value;
41

42
            LOG_DEBUG(getLogger("redundantDistinct"), "{}{}{}", key, separator, ss.str());
43
        }
44
    }
45

46
    void logActionsDAG(const String & prefix, const ActionsDAGPtr & actions)
47
    {
48
        if constexpr (debug_logging_enabled)
49
            LOG_DEBUG(getLogger("redundantDistinct"), "{} :\n{}", prefix, actions->dumpDAG());
50
    }
51

52
    using DistinctColumns = std::set<std::string_view>;
53
    DistinctColumns getDistinctColumns(const DistinctStep * distinct)
54
    {
55
        /// find non-const columns in DISTINCT
56
        const ColumnsWithTypeAndName & distinct_columns = distinct->getOutputStream().header.getColumnsWithTypeAndName();
57
        std::set<std::string_view> non_const_columns;
58
        std::unordered_set<std::string_view> column_names(cbegin(distinct->getColumnNames()), cend(distinct->getColumnNames()));
59
        for (const auto & column : distinct_columns)
60
        {
61
            if (!isColumnConst(*column.column) && column_names.contains(column.name))
62
                non_const_columns.emplace(column.name);
63
        }
64
        return non_const_columns;
65
    }
66

67
    bool compareAggregationKeysWithDistinctColumns(
68
        const Names & aggregation_keys, const DistinctColumns & distinct_columns, const ActionsDAGPtr & path_actions)
69
    {
70
        logDebug("aggregation_keys", aggregation_keys);
71
        logDebug("aggregation_keys size", aggregation_keys.size());
72
        logDebug("distinct_columns size", distinct_columns.size());
73

74
        std::set<std::string_view> original_distinct_columns;
75
        FindOriginalNodeForOutputName original_node_finder(path_actions);
76
        for (const auto & column : distinct_columns)
77
        {
78
            logDebug("distinct column name", column);
79
            const auto * alias_node = original_node_finder.find(String(column));
80
            if (!alias_node)
81
            {
82
                logDebug("original name for alias is not found", column);
83
                original_distinct_columns.insert(column);
84
            }
85
            else
86
            {
87
                logDebug("alias result name", alias_node->result_name);
88
                original_distinct_columns.insert(alias_node->result_name);
89
            }
90
        }
91
        /// if aggregation keys are part of distinct columns then rows already distinct
92
        for (const auto & key : aggregation_keys)
93
        {
94
            if (!original_distinct_columns.contains(key))
95
            {
96
                logDebug("aggregation key NOT found: {}", key);
97
                return false;
98
            }
99
        }
100
        return true;
101
    }
102

103
    bool checkStepToAllowOptimization(const IQueryPlanStep * step)
104
    {
105
        if (typeid_cast<const DistinctStep *>(step))
106
            return true;
107

108
        if (const auto * const expr = typeid_cast<const ExpressionStep *>(step); expr)
109
            return !expr->getExpression()->hasArrayJoin();
110

111
        if (const auto * const filter = typeid_cast<const FilterStep *>(step); filter)
112
            return !filter->getExpression()->hasArrayJoin();
113

114
        if (typeid_cast<const LimitStep *>(step) || typeid_cast<const LimitByStep *>(step) || typeid_cast<const SortingStep *>(step)
115
            || typeid_cast<const WindowStep *>(step))
116
            return true;
117

118
        /// those steps can be only after AggregatingStep, so we skip them here but check AggregatingStep separately
119
        if (typeid_cast<const CubeStep *>(step) || typeid_cast<const RollupStep *>(step) || typeid_cast<const TotalsHavingStep *>(step))
120
            return true;
121

122
        return false;
123
    }
124

125
    /// build actions DAG from stack of steps
126
    ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
127
    {
128
        if (dag_stack.empty())
129
            return nullptr;
130

131
        ActionsDAGPtr path_actions = dag_stack.back()->clone();
132
        dag_stack.pop_back();
133
        while (!dag_stack.empty())
134
        {
135
            ActionsDAGPtr clone = dag_stack.back()->clone();
136
            logActionsDAG("DAG to merge", clone);
137
            dag_stack.pop_back();
138
            path_actions->mergeInplace(std::move(*clone));
139
        }
140
        return path_actions;
141
    }
142

143
    bool passTillAggregation(const QueryPlan::Node * distinct_node)
144
    {
145
        const DistinctStep * distinct_step = typeid_cast<DistinctStep *>(distinct_node->step.get());
146
        chassert(distinct_step);
147

148
        std::vector<ActionsDAGPtr> dag_stack;
149
        const DistinctStep * inner_distinct_step = nullptr;
150
        const IQueryPlanStep * aggregation_before_distinct = nullptr;
151
        const QueryPlan::Node * node = distinct_node;
152
        while (!node->children.empty())
153
        {
154
            const IQueryPlanStep * current_step = node->step.get();
155
            if (typeid_cast<const AggregatingStep *>(current_step) || typeid_cast<const MergingAggregatedStep *>(current_step))
156
            {
157
                aggregation_before_distinct = current_step;
158
                break;
159
            }
160
            if (!checkStepToAllowOptimization(current_step))
161
            {
162
                logDebug("aggregation pass: stopped by allow check on step", current_step->getName());
163
                break;
164
            }
165

166
            if (const auto * const expr = typeid_cast<const ExpressionStep *>(current_step); expr)
167
                dag_stack.push_back(expr->getExpression());
168
            else if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
169
                dag_stack.push_back(filter->getExpression());
170

171
            node = node->children.front();
172
            if (inner_distinct_step = typeid_cast<DistinctStep *>(node->step.get()); inner_distinct_step)
173
                break;
174
        }
175
        if (inner_distinct_step)
176
            return false;
177

178
        if (aggregation_before_distinct)
179
        {
180
            ActionsDAGPtr actions = buildActionsForPlanPath(dag_stack);
181
            logActionsDAG("aggregation pass: merged DAG", actions);
182

183
            const auto distinct_columns = getDistinctColumns(distinct_step);
184

185
            if (const auto * aggregating_step = typeid_cast<const AggregatingStep *>(aggregation_before_distinct); aggregating_step)
186
                return compareAggregationKeysWithDistinctColumns(aggregating_step->getParams().keys, distinct_columns, actions);
187
            else if (const auto * merging_aggregated_step = typeid_cast<const MergingAggregatedStep *>(aggregation_before_distinct);
188
                     merging_aggregated_step)
189
                return compareAggregationKeysWithDistinctColumns(merging_aggregated_step->getParams().keys, distinct_columns, actions);
190
        }
191

192
        return false;
193
    }
194

195
    bool passTillDistinct(const QueryPlan::Node * distinct_node)
196
    {
197
        const DistinctStep * distinct_step = typeid_cast<DistinctStep *>(distinct_node->step.get());
198
        chassert(distinct_step);
199
        const auto distinct_columns = getDistinctColumns(distinct_step);
200

201
        std::vector<ActionsDAGPtr> dag_stack;
202
        const DistinctStep * inner_distinct_step = nullptr;
203
        const QueryPlan::Node * node = distinct_node;
204
        while (!node->children.empty())
205
        {
206
            const IQueryPlanStep * current_step = node->step.get();
207
            if (!checkStepToAllowOptimization(current_step))
208
            {
209
                logDebug("distinct pass: stopped by allow check on step", current_step->getName());
210
                break;
211
            }
212

213
            if (const auto * const expr = typeid_cast<const ExpressionStep *>(current_step); expr)
214
                dag_stack.push_back(expr->getExpression());
215
            else if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
216
                dag_stack.push_back(filter->getExpression());
217

218
            node = node->children.front();
219
            inner_distinct_step = typeid_cast<DistinctStep *>(node->step.get());
220
            if (inner_distinct_step)
221
                break;
222
        }
223
        if (!inner_distinct_step)
224
            return false;
225

226
        /// possible cases (outer distinct -> inner distinct):
227
        /// final -> preliminary => do nothing
228
        /// preliminary -> final => try remove preliminary
229
        /// final -> final => try remove final
230
        /// preliminary -> preliminary => logical error?
231
        if (inner_distinct_step->isPreliminary())
232
            return false;
233

234
        auto inner_distinct_columns = getDistinctColumns(inner_distinct_step);
235
        if (distinct_columns.size() != inner_distinct_columns.size())
236
            return false;
237

238
        ActionsDAGPtr path_actions;
239
        if (!dag_stack.empty())
240
        {
241
            /// build actions DAG to find original column names
242
            path_actions = buildActionsForPlanPath(dag_stack);
243
            logActionsDAG("distinct pass: merged DAG", path_actions);
244

245
            /// compare columns of two DISTINCTs
246
            FindOriginalNodeForOutputName original_node_finder(path_actions);
247
            for (const auto & column : distinct_columns)
248
            {
249
                const auto * alias_node = original_node_finder.find(String(column));
250
                if (!alias_node)
251
                    return false;
252

253
                auto it = inner_distinct_columns.find(alias_node->result_name);
254
                if (it == inner_distinct_columns.end())
255
                    return false;
256

257
                inner_distinct_columns.erase(it);
258
            }
259
        }
260
        else
261
        {
262
            if (distinct_columns != inner_distinct_columns)
263
                return false;
264
        }
265

266
        return true;
267
    }
268

269
    bool canRemoveDistinct(const QueryPlan::Node * distinct_node)
270
    {
271
        if (passTillAggregation(distinct_node))
272
            return true;
273

274
        if (passTillDistinct(distinct_node))
275
            return true;
276

277
        return false;
278
    }
279
}
280

281
///
282
/// DISTINCT is redundant if DISTINCT on the same columns was executed before
283
/// Trivial example: SELECT DISTINCT * FROM (SELECT DISTINCT * FROM numbers(3))
284
///
285
size_t tryRemoveRedundantDistinct(QueryPlan::Node * parent_node, QueryPlan::Nodes & /* nodes*/)
286
{
287
    bool applied = false;
288
    for (const auto * node : parent_node->children)
289
    {
290
        /// check if it is distinct node
291
        if (typeid_cast<const DistinctStep *>(node->step.get()) == nullptr)
292
            continue;
293

294
        if (canRemoveDistinct(node))
295
        {
296
            /// remove current distinct
297
            chassert(!node->children.empty());
298
            parent_node->children[0] = node->children.front();
299
            applied = true;
300
        }
301
    }
302

303
    return applied;
304
}
305
}
306

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.