ClickHouse

Форк
0
/
optimizeUseNormalProjection.cpp 
292 строки · 9.9 Кб
1
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
2
#include <Processors/QueryPlan/Optimizations/projectionsCommon.h>
3
#include <Processors/QueryPlan/ExpressionStep.h>
4
#include <Processors/QueryPlan/FilterStep.h>
5
#include <Processors/QueryPlan/ReadFromMergeTree.h>
6
#include <Processors/QueryPlan/UnionStep.h>
7
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
8
#include <Processors/Sources/NullSource.h>
9
#include <Common/logger_useful.h>
10
#include <Storages/ProjectionsDescription.h>
11
#include <Storages/SelectQueryInfo.h>
12
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
13
#include <algorithm>
14

15
namespace DB::QueryPlanOptimizations
16
{
17

18
/// Normal projection analysis result in case it can be applied.
19
/// For now, it is empty.
20
/// Normal projection can be used only if it contains all required source columns.
21
/// It would not be hard to support pre-computed expressions and filtration.
22
struct NormalProjectionCandidate : public ProjectionCandidate
23
{
24
};
25

26
static ActionsDAGPtr makeMaterializingDAG(const Block & proj_header, const Block main_header)
27
{
28
    /// Materialize constants in case we don't have it in output header.
29
    /// This may happen e.g. if we have PREWHERE.
30

31
    size_t num_columns = main_header.columns();
32
    /// This is a error; will have block structure mismatch later.
33
    if (proj_header.columns() != num_columns)
34
        return nullptr;
35

36
    std::vector<size_t> const_positions;
37
    for (size_t i = 0; i < num_columns; ++i)
38
    {
39
        auto col_proj = proj_header.getByPosition(i).column;
40
        auto col_main = main_header.getByPosition(i).column;
41
        bool is_proj_const = col_proj && isColumnConst(*col_proj);
42
        bool is_main_proj = col_main && isColumnConst(*col_main);
43
        if (is_proj_const && !is_main_proj)
44
            const_positions.push_back(i);
45
    }
46

47
    if (const_positions.empty())
48
        return nullptr;
49

50
    ActionsDAGPtr dag = std::make_unique<ActionsDAG>();
51
    auto & outputs = dag->getOutputs();
52
    for (const auto & col : proj_header.getColumnsWithTypeAndName())
53
        outputs.push_back(&dag->addInput(col));
54

55
    for (auto pos : const_positions)
56
    {
57
        auto & output = outputs[pos];
58
        output = &dag->materializeNode(*output);
59
    }
60

61
    return dag;
62
}
63

64
static bool hasAllRequiredColumns(const ProjectionDescription * projection, const Names & required_columns)
65
{
66
    for (const auto & col : required_columns)
67
    {
68
        if (!projection->sample_block.has(col))
69
            return false;
70
    }
71

72
    return true;
73
}
74

75

76
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
77
{
78
    const auto & frame = stack.back();
79

80
    auto * reading = typeid_cast<ReadFromMergeTree *>(frame.node->step.get());
81
    if (!reading)
82
        return false;
83

84
    if (!canUseProjectionForReadingStep(reading))
85
        return false;
86

87
    auto iter = stack.rbegin();
88
    while (std::next(iter) != stack.rend())
89
    {
90
        iter = std::next(iter);
91

92
        if (!typeid_cast<FilterStep *>(iter->node->step.get()) &&
93
            !typeid_cast<ExpressionStep *>(iter->node->step.get()))
94
            break;
95
    }
96

97
    /// Dangling query plan node. This might be generated by StorageMerge.
98
    if (iter->node->step.get() == reading)
99
        return false;
100

101
    const auto metadata = reading->getStorageMetadata();
102
    const auto & projections = metadata->projections;
103

104
    std::vector<const ProjectionDescription *> normal_projections;
105
    for (const auto & projection : projections)
106
        if (projection.type == ProjectionDescription::Type::Normal)
107
            normal_projections.push_back(&projection);
108

109
    if (normal_projections.empty())
110
        return false;
111

112
    ContextPtr context = reading->getContext();
113
    auto it = std::find_if(normal_projections.begin(), normal_projections.end(), [&](const auto * projection)
114
    {
115
        return projection->name == context->getSettings().preferred_optimize_projection_name.value;
116
    });
117

118
    if (it != normal_projections.end())
119
    {
120
        const ProjectionDescription * preferred_projection = *it;
121
        normal_projections.clear();
122
        normal_projections.push_back(preferred_projection);
123
    }
124

125
    QueryDAG query;
126
    {
127
        auto & child = iter->node->children[iter->next_child - 1];
128
        if (!query.build(*child))
129
            return false;
130

131
        if (query.dag)
132
            query.dag->removeUnusedActions();
133
    }
134

135
    std::list<NormalProjectionCandidate> candidates;
136
    NormalProjectionCandidate * best_candidate = nullptr;
137

138
    const Names & required_columns = reading->getAllColumnNames();
139
    const auto & parts = reading->getParts();
140
    const auto & alter_conversions = reading->getAlterConvertionsForParts();
141
    const auto & query_info = reading->getQueryInfo();
142
    MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
143

144
    auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
145
    size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
146

147
    /// Nothing to read. Ignore projections.
148
    if (ordinary_reading_marks == 0)
149
    {
150
        reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
151
        return false;
152
    }
153

154
    const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
155

156
    std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
157

158
    for (const auto * projection : normal_projections)
159
    {
160
        if (!hasAllRequiredColumns(projection, required_columns))
161
            continue;
162

163
        auto & candidate = candidates.emplace_back();
164
        candidate.projection = projection;
165

166
        bool analyzed = analyzeProjectionCandidate(
167
            candidate,
168
            *reading,
169
            reader,
170
            required_columns,
171
            parts_with_ranges,
172
            query_info,
173
            context,
174
            max_added_blocks,
175
            query.filter_node ? query.dag : nullptr);
176

177
        if (!analyzed)
178
            continue;
179

180
        if (candidate.sum_marks >= ordinary_reading_marks)
181
            continue;
182

183
        if (best_candidate == nullptr || candidate.sum_marks < best_candidate->sum_marks)
184
            best_candidate = &candidate;
185
    }
186

187
    if (!best_candidate)
188
    {
189
        reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
190
        return false;
191
    }
192

193
    auto storage_snapshot = reading->getStorageSnapshot();
194
    auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, storage_snapshot->metadata);
195
    proj_snapshot->addProjection(best_candidate->projection);
196

197
    auto query_info_copy = query_info;
198
    query_info_copy.prewhere_info = nullptr;
199

200
    auto projection_reading = reader.readFromParts(
201
        /*parts=*/ {},
202
        /*alter_conversions=*/ {},
203
        required_columns,
204
        proj_snapshot,
205
        query_info_copy,
206
        context,
207
        reading->getMaxBlockSize(),
208
        reading->getNumStreams(),
209
        max_added_blocks,
210
        best_candidate->merge_tree_projection_select_result_ptr,
211
        reading->isParallelReadingEnabled());
212

213
    if (!projection_reading)
214
    {
215
        Pipe pipe(std::make_shared<NullSource>(proj_snapshot->getSampleBlockForColumns(required_columns)));
216
        projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
217
    }
218

219
    if (!query_info.is_internal && context->hasQueryContext())
220
    {
221
        context->getQueryContext()->addQueryAccessInfo(Context::QualifiedProjectionName
222
        {
223
            .storage_id = reading->getMergeTreeData().getStorageID(),
224
            .projection_name = best_candidate->projection->name,
225
        });
226
    }
227

228
    bool has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;
229
    if (has_ordinary_parts)
230
        reading->setAnalyzedResult(std::move(best_candidate->merge_tree_ordinary_select_result_ptr));
231

232
    projection_reading->setStepDescription(best_candidate->projection->name);
233

234
    auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)});
235
    auto * next_node = &projection_reading_node;
236

237
    if (query.dag)
238
    {
239
        auto & expr_or_filter_node = nodes.emplace_back();
240

241
        if (query.filter_node)
242
        {
243
            expr_or_filter_node.step = std::make_unique<FilterStep>(
244
                projection_reading_node.step->getOutputStream(),
245
                query.dag,
246
                query.filter_node->result_name,
247
                true);
248
        }
249
        else
250
            expr_or_filter_node.step = std::make_unique<ExpressionStep>(
251
                projection_reading_node.step->getOutputStream(),
252
                query.dag);
253

254
        expr_or_filter_node.children.push_back(&projection_reading_node);
255
        next_node = &expr_or_filter_node;
256
    }
257

258
    if (!has_ordinary_parts)
259
    {
260
        /// All parts are taken from projection
261
        iter->node->children[iter->next_child - 1] = next_node;
262
    }
263
    else
264
    {
265
        const auto & main_stream = iter->node->children[iter->next_child - 1]->step->getOutputStream();
266
        const auto * proj_stream = &next_node->step->getOutputStream();
267

268
        if (auto materializing = makeMaterializingDAG(proj_stream->header, main_stream.header))
269
        {
270
            auto converting = std::make_unique<ExpressionStep>(*proj_stream, materializing);
271
            proj_stream = &converting->getOutputStream();
272
            auto & expr_node = nodes.emplace_back();
273
            expr_node.step = std::move(converting);
274
            expr_node.children.push_back(next_node);
275
            next_node = &expr_node;
276
        }
277

278
        auto & union_node = nodes.emplace_back();
279
        DataStreams input_streams = {main_stream, *proj_stream};
280
        union_node.step = std::make_unique<UnionStep>(std::move(input_streams));
281
        union_node.children = {iter->node->children[iter->next_child - 1], next_node};
282
        iter->node->children[iter->next_child - 1] = &union_node;
283
    }
284

285
    /// Here we remove last steps from stack to be able to optimize again.
286
    /// In theory, read-in-order can be applied to projection.
287
    stack.resize(iter.base() - stack.begin());
288

289
    return true;
290
}
291

292
}
293

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.