ClickHouse

Форк
0
/
removeRedundantSorting.cpp 
271 строка · 11.0 Кб
1
#include <AggregateFunctions/AggregateFunctionFactory.h>
2
#include <Interpreters/FullSortingMergeJoin.h>
3
#include <Processors/QueryPlan/AggregatingStep.h>
4
#include <Processors/QueryPlan/ExpressionStep.h>
5
#include <Processors/QueryPlan/FillingStep.h>
6
#include <Processors/QueryPlan/FilterStep.h>
7
#include <Processors/QueryPlan/ITransformingStep.h>
8
#include <Processors/QueryPlan/JoinStep.h>
9
#include <Processors/QueryPlan/LimitByStep.h>
10
#include <Processors/QueryPlan/LimitStep.h>
11
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
12
#include <Processors/QueryPlan/QueryPlanVisitor.h>
13
#include <Processors/QueryPlan/ReadFromMergeTree.h>
14
#include <Processors/QueryPlan/ReadFromRemote.h>
15
#include <Processors/QueryPlan/SortingStep.h>
16
#include <Processors/QueryPlan/UnionStep.h>
17
#include <Processors/QueryPlan/WindowStep.h>
18
#include <Common/logger_useful.h>
19
#include <Common/typeid_cast.h>
20

21
namespace DB::QueryPlanOptimizations
22
{
23
constexpr bool debug_logging_enabled = false;
24

25
class RemoveRedundantSorting : public QueryPlanVisitor<RemoveRedundantSorting, debug_logging_enabled>
26
{
27
    /// stack with nodes which affect order
28
    /// nodes added when traversing top-down
29
    /// as soon as all children for the node on top of stack are traversed, the node is removed from stack
30
    std::vector<QueryPlan::Node *> nodes_affect_order;
31

32
public:
33
    explicit RemoveRedundantSorting(QueryPlan::Node * root_) : QueryPlanVisitor<RemoveRedundantSorting, debug_logging_enabled>(root_) { }
34

35
    bool visitTopDownImpl(QueryPlan::Node * current_node, QueryPlan::Node * parent_node)
36
    {
37
        IQueryPlanStep * current_step = current_node->step.get();
38

39
        /// if there is parent node which can affect order and current step is sorting
40
        /// then check if we can remove the sorting step (and corresponding expression step)
41
        if (!nodes_affect_order.empty() && typeid_cast<SortingStep *>(current_step))
42
        {
43
            if (tryRemoveSorting(current_node, parent_node))
44
            {
45
                logStep("step affect sorting", nodes_affect_order.back());
46
                logStep("removed from plan", current_node);
47

48
                auto & frame = stack.back();
49
                /// mark removed node as visited
50
                frame.next_child = frame.node->children.size();
51

52
                /// current sorting step has been removed from plan, its parent has new children, need to visit them
53
                auto next_frame = FrameWithParent{.node = parent_node->children[0], .parent_node = parent_node};
54
                stack.push_back(next_frame);
55
                logStep("push", next_frame.node);
56
                return false;
57
            }
58
        }
59

60
        if (typeid_cast<LimitStep *>(current_step)
61
            || typeid_cast<LimitByStep *>(current_step) /// (1) if there are LIMITs on top of ORDER BY, the ORDER BY is non-removable
62
            || typeid_cast<FillingStep *>(current_step) /// (2) if ORDER BY is with FILL WITH, it is non-removable
63
            || typeid_cast<SortingStep *>(current_step) /// (3) ORDER BY will change order of previous sorting
64
            || typeid_cast<AggregatingStep *>(current_step)) /// (4) aggregation change order
65
        {
66
            logStep("nodes_affect_order/push", current_node);
67
            nodes_affect_order.push_back(current_node);
68
        }
69

70
        return true;
71
    }
72

73
    void visitBottomUpImpl(QueryPlan::Node * current_node, QueryPlan::Node *)
74
    {
75
        /// we come here when all children of current_node are visited,
76
        /// so, if it's a node which affect order, remove it from the corresponding stack
77
        if (!nodes_affect_order.empty() && nodes_affect_order.back() == current_node)
78
        {
79
            logStep("nodes_affect_order/pop", current_node);
80
            nodes_affect_order.pop_back();
81
        }
82
    }
83

84
private:
85
    bool tryRemoveSorting(QueryPlan::Node * sorting_node, QueryPlan::Node * parent_node)
86
    {
87
        if (!canRemoveCurrentSorting())
88
            return false;
89

90
        /// remove sorting
91
        for (auto & child : parent_node->children)
92
        {
93
            if (child == sorting_node)
94
            {
95
                child = sorting_node->children.front();
96
                break;
97
            }
98
        }
99

100
        /// sorting removed, so need to update sorting traits for upstream steps
101
        const DataStream * input_stream = &parent_node->children.front()->step->getOutputStream();
102
        chassert(parent_node == (stack.rbegin() + 1)->node); /// skip element on top of stack since it's sorting which was just removed
103
        for (StackWithParent::const_reverse_iterator it = stack.rbegin() + 1; it != stack.rend(); ++it)
104
        {
105
            const QueryPlan::Node * node = it->node;
106
            /// skip removed sorting steps
107
            auto * step = node->step.get();
108
            if (typeid_cast<const SortingStep *>(step) && node != nodes_affect_order.back())
109
                continue;
110

111
            logStep("update sorting traits", node);
112

113
            auto * trans = dynamic_cast<ITransformingStep *>(step);
114
            if (!trans)
115
            {
116
                logStep("stop update sorting traits: node is not transforming step", node);
117
                break;
118
            }
119

120
            trans->updateInputStream(*input_stream);
121
            input_stream = &trans->getOutputStream();
122

123
            /// update sorting properties though stack until reach node which affects order (inclusive)
124
            if (node == nodes_affect_order.back())
125
            {
126
                logStep("stop update sorting traits: reached node which affect order", node);
127
                break;
128
            }
129
        }
130

131
        return true;
132
    }
133

134
    bool canRemoveCurrentSorting()
135
    {
136
        chassert(!stack.empty());
137
        chassert(typeid_cast<const SortingStep *>(stack.back().node->step.get()));
138

139
        return checkNodeAffectingOrder(nodes_affect_order.back()) && checkPathFromCurrentSortingNode(nodes_affect_order.back());
140
    }
141

142
    static bool checkNodeAffectingOrder(QueryPlan::Node * node_affect_order)
143
    {
144
        IQueryPlanStep * step_affect_order = node_affect_order->step.get();
145

146
        /// if there are LIMITs on top of ORDER BY, the ORDER BY is non-removable
147
        /// if ORDER BY is with FILL WITH, it is non-removable
148
        if (typeid_cast<LimitStep *>(step_affect_order) || typeid_cast<LimitByStep *>(step_affect_order)
149
            || typeid_cast<FillingStep *>(step_affect_order))
150
            return false;
151

152
        /// (1) aggregation
153
        if (const AggregatingStep * parent_aggr = typeid_cast<AggregatingStep *>(step_affect_order); parent_aggr)
154
        {
155
            if (parent_aggr->inOrder())
156
                return false;
157

158
            auto const & aggregates = parent_aggr->getParams().aggregates;
159
            for (const auto & aggregate : aggregates)
160
            {
161
                auto action = NullsAction::EMPTY;
162
                auto aggregate_function_properties
163
                    = AggregateFunctionFactory::instance().tryGetProperties(aggregate.function->getName(), action);
164
                if (aggregate_function_properties && aggregate_function_properties->is_order_dependent)
165
                    return false;
166

167
                /// sum*() with Floats depends on order
168
                /// but currently, there is no way to specify property `is_order_dependent` for combination of aggregating function and data type as argument
169
                /// so, we check explicitly for sum*() functions with Floats here
170
                const auto aggregate_function = aggregate.function;
171
                const String & func_name = aggregate_function->getName();
172
                if (func_name.starts_with("sum"))
173
                {
174
                    DataTypePtr data_type = aggregate_function->getArgumentTypes().front();
175
                    if (WhichDataType(removeNullable(data_type)).isFloat())
176
                        return false;
177
                }
178
            }
179
            return true;
180
        }
181
        /// (2) sorting
182
        else if (const auto * next_sorting = typeid_cast<const SortingStep *>(step_affect_order); next_sorting)
183
        {
184
            if (next_sorting->getType() == SortingStep::Type::Full)
185
                return true;
186
        }
187

188
        return false;
189
    }
190

191
    bool checkPathFromCurrentSortingNode(const QueryPlan::Node * node_affect_order)
192
    {
193
        chassert(!stack.empty());
194
        chassert(typeid_cast<const SortingStep *>(stack.back().node->step.get()));
195

196
        /// (1) if there is expression with stateful function between current step
197
        /// and step which affects order, then we need to keep sorting since
198
        /// stateful function output can depend on order
199

200
        /// skip element on top of stack since it's sorting
201
        for (StackWithParent::const_reverse_iterator it = stack.rbegin() + 1; it != stack.rend(); ++it)
202
        {
203
            const QueryPlan::Node * node = it->node;
204
            /// walking though stack until reach node which affects order
205
            if (node == node_affect_order)
206
                break;
207

208
            const auto * step = node->step.get();
209
            /// skip removed sorting steps
210
            if (typeid_cast<const SortingStep*>(step))
211
                continue;
212

213
            logStep("checking for stateful function", node);
214
            if (const auto * expr = typeid_cast<const ExpressionStep *>(step); expr)
215
            {
216
                if (expr->getExpression()->hasStatefulFunctions())
217
                    return false;
218
            }
219
            else if (const auto * filter = typeid_cast<const FilterStep *>(step); filter)
220
            {
221
                if (filter->getExpression()->hasStatefulFunctions())
222
                    return false;
223
            }
224
            else
225
            {
226
                const auto * trans = dynamic_cast<const ITransformingStep *>(step);
227
                if (!trans)
228
                    break;
229

230
                if (!trans->getDataStreamTraits().preserves_sorting)
231
                    break;
232
            }
233
        }
234

235
        /// check steps on stack if there are some which can prevent from removing SortingStep
236
        for (StackWithParent::const_reverse_iterator it = stack.rbegin() + 1; it != stack.rend(); ++it)
237
        {
238
            const QueryPlan::Node * node = it->node;
239
            /// walking though stack until reach node which affects order
240
            if (node == node_affect_order)
241
                break;
242

243
            const auto * step = node->step.get();
244
            /// skip removed sorting steps
245
            if (typeid_cast<const SortingStep *>(step))
246
                continue;
247

248
            logStep("checking path from current sorting", node);
249

250
            /// (2) for window function we do ORDER BY in 2 Sorting steps,
251
            /// so do not delete Sorting if window function step is on top
252
            if (typeid_cast<const WindowStep *>(step))
253
                return false;
254

255
            if (const auto * join_step = typeid_cast<const JoinStep *>(step); join_step)
256
            {
257
                if (typeid_cast<const FullSortingMergeJoin *>(join_step->getJoin().get()))
258
                    return false;
259
            }
260
        }
261

262
        return true;
263
    }
264
};
265

266
void tryRemoveRedundantSorting(QueryPlan::Node * root)
267
{
268
    RemoveRedundantSorting(root).visit();
269
}
270

271
}
272

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.