ClickHouse

Форк
0
272 строки · 8.4 Кб
1
#include <Processors/QueryPlan/Optimizations/projectionsCommon.h>
2

3
#include <Processors/QueryPlan/ExpressionStep.h>
4
#include <Processors/QueryPlan/FilterStep.h>
5
#include <Processors/QueryPlan/ReadFromMergeTree.h>
6

7
#include <Common/logger_useful.h>
8
#include <DataTypes/DataTypeNullable.h>
9
#include <Functions/IFunctionAdaptors.h>
10
#include <Functions/FunctionsLogical.h>
11
#include <Interpreters/InterpreterSelectQuery.h>
12
#include <Storages/StorageReplicatedMergeTree.h>
13

14

15
namespace DB
16
{
17

18
namespace ErrorCodes
19
{
20
    extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
21
}
22

23
namespace QueryPlanOptimizations
24
{
25

26
bool canUseProjectionForReadingStep(ReadFromMergeTree * reading)
27
{
28
    /// Probably some projection already was applied.
29
    if (reading->hasAnalyzedResult())
30
        return false;
31

32
    if (reading->isQueryWithFinal())
33
        return false;
34

35
    if (reading->isQueryWithSampling())
36
        return false;
37

38
    if (reading->isParallelReadingEnabled())
39
        return false;
40

41
    if (reading->readsInOrder())
42
        return false;
43

44
    // Currently projection don't support deduplication when moving parts between shards.
45
    if (reading->getContext()->getSettingsRef().allow_experimental_query_deduplication)
46
        return false;
47

48
    // Currently projection don't support settings which implicitly modify aggregate functions.
49
    if (reading->getContext()->getSettingsRef().aggregate_functions_null_for_empty)
50
        return false;
51

52
    return true;
53
}
54

55
std::shared_ptr<PartitionIdToMaxBlock> getMaxAddedBlocks(ReadFromMergeTree * reading)
56
{
57
    ContextPtr context = reading->getContext();
58

59
    if (context->getSettingsRef().select_sequential_consistency)
60
    {
61
        if (const auto * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(&reading->getMergeTreeData()))
62
            return std::make_shared<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
63
    }
64

65
    return {};
66
}
67

68
void QueryDAG::appendExpression(const ActionsDAGPtr & expression)
69
{
70
    if (dag)
71
        dag->mergeInplace(std::move(*expression->clone()));
72
    else
73
        dag = expression->clone();
74
}
75

76
const ActionsDAG::Node * findInOutputs(ActionsDAG & dag, const std::string & name, bool remove)
77
{
78
    auto & outputs = dag.getOutputs();
79
    for (auto it = outputs.begin(); it != outputs.end(); ++it)
80
    {
81
        if ((*it)->result_name == name)
82
        {
83
            const auto * node = *it;
84

85
            /// We allow to use Null as a filter.
86
            /// In this case, result is empty. Ignore optimizations.
87
            if (node->result_type->onlyNull())
88
                return nullptr;
89

90
            if (!isUInt8(removeNullable(removeLowCardinality(node->result_type))))
91
                throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
92
                    "Illegal type {} of column {} for filter. Must be UInt8 or Nullable(UInt8).",
93
                    node->result_type->getName(), name);
94

95
            if (remove)
96
            {
97
                outputs.erase(it);
98
            }
99
            else
100
            {
101
                ColumnWithTypeAndName col;
102
                col.name = node->result_name;
103
                col.type = node->result_type;
104
                col.column = col.type->createColumnConst(1, 1);
105
                *it = &dag.addColumn(std::move(col));
106
            }
107

108
            return node;
109
        }
110
    }
111

112
    return nullptr;
113
}
114

115
bool QueryDAG::buildImpl(QueryPlan::Node & node, ActionsDAG::NodeRawConstPtrs & filter_nodes)
116
{
117
    IQueryPlanStep * step = node.step.get();
118
    if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
119
    {
120
        if (const auto & prewhere_info = reading->getPrewhereInfo())
121
        {
122
            if (prewhere_info->row_level_filter)
123
            {
124
                appendExpression(prewhere_info->row_level_filter);
125
                if (const auto * filter_expression = findInOutputs(*dag, prewhere_info->row_level_column_name, false))
126
                    filter_nodes.push_back(filter_expression);
127
                else
128
                    return false;
129
            }
130

131
            if (prewhere_info->prewhere_actions)
132
            {
133
                appendExpression(prewhere_info->prewhere_actions);
134
                if (const auto * filter_expression
135
                    = findInOutputs(*dag, prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column))
136
                    filter_nodes.push_back(filter_expression);
137
                else
138
                    return false;
139
            }
140
        }
141
        return true;
142
    }
143

144
    if (node.children.size() != 1)
145
        return false;
146

147
    if (!buildImpl(*node.children.front(), filter_nodes))
148
        return false;
149

150
    if (auto * expression = typeid_cast<ExpressionStep *>(step))
151
    {
152
        const auto & actions = expression->getExpression();
153
        if (actions->hasArrayJoin())
154
            return false;
155

156
        appendExpression(actions);
157
        return true;
158
    }
159

160
    if (auto * filter = typeid_cast<FilterStep *>(step))
161
    {
162
        const auto & actions = filter->getExpression();
163
        if (actions->hasArrayJoin())
164
            return false;
165

166
        appendExpression(actions);
167
        const auto * filter_expression = findInOutputs(*dag, filter->getFilterColumnName(), filter->removesFilterColumn());
168
        if (!filter_expression)
169
            return false;
170

171
        filter_nodes.push_back(filter_expression);
172
        return true;
173
    }
174

175
    return false;
176
}
177

178
bool QueryDAG::build(QueryPlan::Node & node)
179
{
180
    ActionsDAG::NodeRawConstPtrs filter_nodes;
181
    if (!buildImpl(node, filter_nodes))
182
        return false;
183

184
    if (!filter_nodes.empty())
185
    {
186
        filter_node = filter_nodes.back();
187

188
        if (filter_nodes.size() > 1)
189
        {
190
            /// Add a conjunction of all the filters.
191

192
            FunctionOverloadResolverPtr func_builder_and =
193
                std::make_unique<FunctionToOverloadResolverAdaptor>(
194
                    std::make_shared<FunctionAnd>());
195

196
            filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
197
        }
198
        else
199
            filter_node = &dag->addAlias(*filter_node, "_projection_filter");
200

201
        auto & outputs = dag->getOutputs();
202
        outputs.insert(outputs.begin(), filter_node);
203
    }
204

205
    return true;
206
}
207

208
bool analyzeProjectionCandidate(
209
    ProjectionCandidate & candidate,
210
    const ReadFromMergeTree & reading,
211
    const MergeTreeDataSelectExecutor & reader,
212
    const Names & required_column_names,
213
    const RangesInDataParts & parts_with_ranges,
214
    const SelectQueryInfo & query_info,
215
    const ContextPtr & context,
216
    const std::shared_ptr<PartitionIdToMaxBlock> & max_added_blocks,
217
    const ActionsDAGPtr & dag)
218
{
219
    MergeTreeData::DataPartsVector projection_parts;
220
    MergeTreeData::DataPartsVector normal_parts;
221
    std::vector<AlterConversionsPtr> alter_conversions;
222
    for (const auto & part_with_ranges : parts_with_ranges)
223
    {
224
        const auto & created_projections = part_with_ranges.data_part->getProjectionParts();
225
        auto it = created_projections.find(candidate.projection->name);
226
        if (it != created_projections.end() && !it->second->is_broken)
227
        {
228
            projection_parts.push_back(it->second);
229
        }
230
        else
231
        {
232
            normal_parts.push_back(part_with_ranges.data_part);
233
            alter_conversions.push_back(part_with_ranges.alter_conversions);
234
        }
235
    }
236

237
    if (projection_parts.empty())
238
        return false;
239

240
    auto projection_query_info = query_info;
241
    projection_query_info.prewhere_info = nullptr;
242
    projection_query_info.filter_actions_dag = dag;
243

244
    auto projection_result_ptr = reader.estimateNumMarksToRead(
245
        std::move(projection_parts),
246
        required_column_names,
247
        candidate.projection->metadata,
248
        projection_query_info,
249
        context,
250
        context->getSettingsRef().max_threads,
251
        max_added_blocks);
252

253
    candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr);
254
    candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->selected_marks;
255

256
    if (!normal_parts.empty())
257
    {
258
        /// TODO: We can reuse existing analysis_result by filtering out projection parts
259
        auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), std::move(alter_conversions));
260

261
        if (normal_result_ptr->selected_marks != 0)
262
        {
263
            candidate.sum_marks += normal_result_ptr->selected_marks;
264
            candidate.merge_tree_ordinary_select_result_ptr = std::move(normal_result_ptr);
265
        }
266
    }
267

268
    return true;
269
}
270

271
}
272
}
273

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.