ClickHouse

Форк
0
/
optimizeUseAggregateProjection.cpp 
754 строки · 27.7 Кб
1
#include <Processors/QueryPlan/Optimizations/projectionsCommon.h>
2
#include <Processors/QueryPlan/Optimizations/actionsDAGUtils.h>
3
#include <Processors/QueryPlan/AggregatingStep.h>
4
#include <Processors/QueryPlan/ReadFromMergeTree.h>
5
#include <Processors/QueryPlan/ExpressionStep.h>
6
#include <Processors/QueryPlan/FilterStep.h>
7
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
8

9
#include <Processors/Sources/SourceFromSingleChunk.h>
10
#include <Processors/Sources/NullSource.h>
11

12
#include <AggregateFunctions/AggregateFunctionCount.h>
13
#include <Analyzer/JoinNode.h>
14
#include <Analyzer/TableNode.h>
15
#include <Analyzer/QueryTreeBuilder.h>
16
#include <Analyzer/QueryTreePassManager.h>
17
#include <Analyzer/QueryNode.h>
18

19
#include <Common/logger_useful.h>
20
#include <Storages/StorageDummy.h>
21
#include <Storages/VirtualColumnUtils.h>
22
#include <Planner/PlannerExpressionAnalysis.h>
23
#include <Interpreters/InterpreterSelectQuery.h>
24
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
25
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
26
#include <Storages/ProjectionsDescription.h>
27
#include <Parsers/queryToString.h>
28

29
namespace DB::QueryPlanOptimizations
30
{
31

32
using DAGIndex = std::unordered_map<std::string_view, const ActionsDAG::Node *>;
33
static DAGIndex buildDAGIndex(const ActionsDAG & dag)
34
{
35
    DAGIndex index;
36
    for (const auto * output : dag.getOutputs())
37
        index.emplace(output->result_name, output);
38

39
    return index;
40
}
41

42
/// Required analysis info from aggregate projection.
43
struct AggregateProjectionInfo
44
{
45
    ActionsDAGPtr before_aggregation;
46
    Names keys;
47
    AggregateDescriptions aggregates;
48

49
    /// A context copy from interpreter which was used for analysis.
50
    /// Just in case it is used by some function.
51
    ContextPtr context;
52
};
53

54
/// Get required info from aggregate projection.
55
/// Ideally, this should be pre-calculated and stored inside ProjectionDescription.
56
static AggregateProjectionInfo getAggregatingProjectionInfo(
57
    const ProjectionDescription & projection,
58
    const ContextPtr & context,
59
    const StorageMetadataPtr & metadata_snapshot,
60
    const Block & key_virtual_columns)
61
{
62
    /// This is a bad approach.
63
    /// We'd better have a separate interpreter for projections.
64
    /// Now it's not obvious we didn't miss anything here.
65
    ///
66
    /// Setting ignoreASTOptimizations is used because some of them are invalid for projections.
67
    /// Example: 'SELECT min(c0), max(c0), count() GROUP BY -c0' for minmax_count projection can be rewritten to
68
    /// 'SELECT min(c0), max(c0), count() GROUP BY c0' which is incorrect cause we store a column '-c0' in projection.
69
    InterpreterSelectQuery interpreter(
70
        projection.query_ast,
71
        context,
72
        Pipe(std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock())),
73
        SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreASTOptimizations().ignoreSettingConstraints());
74

75
    const auto & analysis_result = interpreter.getAnalysisResult();
76
    const auto & query_analyzer = interpreter.getQueryAnalyzer();
77

78
    AggregateProjectionInfo info;
79
    info.context = interpreter.getContext();
80
    info.before_aggregation = analysis_result.before_aggregation;
81
    info.keys = query_analyzer->aggregationKeys().getNames();
82
    info.aggregates = query_analyzer->aggregates();
83

84
    /// Add part/partition virtual columns to projection aggregation keys.
85
    /// We can do it because projection is stored for every part separately.
86
    for (const auto & virt_column : key_virtual_columns)
87
    {
88
        const auto * input = &info.before_aggregation->addInput(virt_column);
89
        info.before_aggregation->getOutputs().push_back(input);
90
        info.keys.push_back(virt_column.name);
91
    }
92

93
    return info;
94
}
95

96
struct AggregateFunctionMatch
97
{
98
    const AggregateDescription * description = nullptr;
99
    DataTypes argument_types;
100
};
101

102
using AggregateFunctionMatches = std::vector<AggregateFunctionMatch>;
103

104
/// Here we try to match aggregate functions from the query to
105
/// aggregate functions from projection.
106
std::optional<AggregateFunctionMatches> matchAggregateFunctions(
107
    const AggregateProjectionInfo & info,
108
    const AggregateDescriptions & aggregates,
109
    const MatchedTrees::Matches & matches,
110
    const DAGIndex & query_index,
111
    const DAGIndex & proj_index)
112
{
113
    AggregateFunctionMatches res;
114

115
    /// Index (projection agg function name) -> pos
116
    std::unordered_map<std::string, std::vector<size_t>> projection_aggregate_functions;
117
    for (size_t i = 0; i < info.aggregates.size(); ++i)
118
        projection_aggregate_functions[info.aggregates[i].function->getName()].push_back(i);
119

120
    for (const auto & aggregate : aggregates)
121
    {
122
        /// Get a list of candidates by name first.
123
        auto it = projection_aggregate_functions.find(aggregate.function->getName());
124
        if (it == projection_aggregate_functions.end())
125
        {
126
            // LOG_TRACE(
127
            //     getLogger("optimizeUseProjections"),
128
            //     "Cannot match agg func {} by name {}",
129
            //     aggregate.column_name, aggregate.function->getName());
130

131
            return {};
132
        }
133

134
        size_t num_args = aggregate.argument_names.size();
135

136
        DataTypes argument_types;
137
        argument_types.reserve(num_args);
138

139
        auto & candidates = it->second;
140
        bool found_match = false;
141

142
        for (size_t idx : candidates)
143
        {
144
            argument_types.clear();
145
            const auto & candidate = info.aggregates[idx];
146

147
            /// In some cases it's possible only to check that states are equal,
148
            /// e.g. for quantile(0.3)(...) and quantile(0.5)(...).
149
            ///
150
            /// Note we already checked that aggregate function names are equal,
151
            /// so that functions sum(...) and sumIf(...) with equal states will
152
            /// not match.
153
            if (!candidate.function->getStateType()->equals(*aggregate.function->getStateType()))
154
            {
155
                // LOG_TRACE(getLogger("optimizeUseProjections"), "Cannot match agg func {} vs {} by state {} vs {}",
156
                //     aggregate.column_name, candidate.column_name,
157
                //     candidate.function->getStateType()->getName(), aggregate.function->getStateType()->getName());
158
                continue;
159
            }
160

161
            /// This is a special case for the function count().
162
            /// We can assume that 'count(expr) == count()' if expr is not nullable,
163
            /// which can be verified by simply casting to `AggregateFunctionCount *`.
164
            if (typeid_cast<const AggregateFunctionCount *>(aggregate.function.get()))
165
            {
166
                /// we can ignore arguments for count()
167
                found_match = true;
168
                res.push_back({&candidate, DataTypes()});
169
                break;
170
            }
171

172
            /// Now, function names and types matched.
173
            /// Next, match arguments from DAGs.
174

175
            if (num_args != candidate.argument_names.size())
176
                continue;
177

178
            size_t next_arg = 0;
179
            while (next_arg < num_args)
180
            {
181
                const auto & query_name = aggregate.argument_names[next_arg];
182
                const auto & proj_name = candidate.argument_names[next_arg];
183

184
                auto jt = query_index.find(query_name);
185
                auto kt = proj_index.find(proj_name);
186

187
                /// This should not happen ideally.
188
                if (jt == query_index.end() || kt == proj_index.end())
189
                    break;
190

191
                const auto * query_node = jt->second;
192
                const auto * proj_node = kt->second;
193

194
                auto mt = matches.find(query_node);
195
                if (mt == matches.end())
196
                {
197
                    // LOG_TRACE(
198
                    //     getLogger("optimizeUseProjections"),
199
                    //     "Cannot match agg func {} vs {} : can't match arg {} vs {} : no node in map",
200
                    //     aggregate.column_name, candidate.column_name, query_name, proj_name);
201

202
                    break;
203
                }
204

205
                const auto & node_match = mt->second;
206
                if (node_match.node != proj_node || node_match.monotonicity)
207
                {
208
                    // LOG_TRACE(
209
                    //     getLogger("optimizeUseProjections"),
210
                    //     "Cannot match agg func {} vs {} : can't match arg {} vs {} : no match or monotonicity",
211
                    //     aggregate.column_name, candidate.column_name, query_name, proj_name);
212

213
                    break;
214
                }
215

216
                argument_types.push_back(query_node->result_type);
217
                ++next_arg;
218
            }
219

220
            if (next_arg < aggregate.argument_names.size())
221
                continue;
222

223
            found_match = true;
224
            res.push_back({&candidate, std::move(argument_types)});
225
            break;
226
        }
227

228
        if (!found_match)
229
            return {};
230
    }
231

232
    return res;
233
}
234

235
static void appendAggregateFunctions(
236
    ActionsDAG & proj_dag,
237
    const AggregateDescriptions & aggregates,
238
    const AggregateFunctionMatches & matched_aggregates)
239
{
240
    std::unordered_map<const AggregateDescription *, const ActionsDAG::Node *> inputs;
241

242
    /// Just add all the aggregates to dag inputs.
243
    auto & proj_dag_outputs =  proj_dag.getOutputs();
244
    size_t num_aggregates = aggregates.size();
245
    for (size_t i = 0; i < num_aggregates; ++i)
246
    {
247
        const auto & aggregate = aggregates[i];
248
        const auto & match = matched_aggregates[i];
249
        auto type = std::make_shared<DataTypeAggregateFunction>(aggregate.function, match.argument_types, aggregate.parameters);
250

251
        auto & input = inputs[match.description];
252
        if (!input)
253
            input = &proj_dag.addInput(match.description->column_name, type);
254

255
        const auto * node = input;
256

257
        if (node->result_name != aggregate.column_name)
258
        {
259
            if (DataTypeAggregateFunction::strictEquals(type, node->result_type))
260
            {
261
                node = &proj_dag.addAlias(*node, aggregate.column_name);
262
            }
263
            else
264
            {
265
                /// Cast to aggregate types specified in query if it's not
266
                /// strictly the same as the one specified in projection. This
267
                /// is required to generate correct results during finalization.
268
                node = &proj_dag.addCast(*node, type, aggregate.column_name);
269
            }
270
        }
271

272
        proj_dag_outputs.push_back(node);
273
    }
274
}
275

276
ActionsDAGPtr analyzeAggregateProjection(
277
    const AggregateProjectionInfo & info,
278
    const QueryDAG & query,
279
    const DAGIndex & query_index,
280
    const Names & keys,
281
    const AggregateDescriptions & aggregates)
282
{
283
    auto proj_index = buildDAGIndex(*info.before_aggregation);
284

285
    MatchedTrees::Matches matches = matchTrees(info.before_aggregation->getOutputs(), *query.dag, false /* check_monotonicity */);
286

287
    // for (const auto & [node, match] : matches)
288
    // {
289
    //     LOG_TRACE(getLogger("optimizeUseProjections"), "Match {} {} -> {} {} (with monotonicity : {})",
290
    //         static_cast<const void *>(node), node->result_name,
291
    //         static_cast<const void *>(match.node), (match.node ? match.node->result_name : ""), match.monotonicity != std::nullopt);
292
    // }
293

294
    auto matched_aggregates = matchAggregateFunctions(info, aggregates, matches, query_index, proj_index);
295
    if (!matched_aggregates)
296
        return {};
297

298
    ActionsDAG::NodeRawConstPtrs query_key_nodes;
299
    std::unordered_set<const ActionsDAG::Node *> proj_key_nodes;
300

301
    {
302
        /// Just, filling the set above.
303

304
        for (const auto & key : info.keys)
305
        {
306
            auto it = proj_index.find(key);
307
            /// This should not happen ideally.
308
            if (it == proj_index.end())
309
                return {};
310

311
            proj_key_nodes.insert(it->second);
312
        }
313

314
        query_key_nodes.reserve(keys.size() + 1);
315

316
        /// We need to add filter column to keys set.
317
        /// It should be computable from projection keys.
318
        /// It will be removed in FilterStep.
319
        if (query.filter_node)
320
            query_key_nodes.push_back(query.filter_node);
321

322
        for (const auto & key : keys)
323
        {
324
            auto it = query_index.find(key);
325
            /// This should not happen ideally.
326
            if (it == query_index.end())
327
                return {};
328

329
            query_key_nodes.push_back(it->second);
330
        }
331
    }
332

333
    /// Here we want to match query keys with projection keys.
334
    /// Query key can be any expression depending on projection keys.
335

336
    struct Frame
337
    {
338
        const ActionsDAG::Node * node;
339
        size_t next_child_to_visit = 0;
340
    };
341

342
    std::stack<Frame> stack;
343
    std::unordered_set<const ActionsDAG::Node *> visited;
344
    std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> new_inputs;
345

346
    for (const auto * key_node : query_key_nodes)
347
    {
348
        if (visited.contains(key_node))
349
            continue;
350

351
        stack.push({.node = key_node});
352

353
        while (!stack.empty())
354
        {
355
            auto & frame = stack.top();
356

357
            if (frame.next_child_to_visit == 0)
358
            {
359
                auto jt = matches.find(frame.node);
360
                if (jt != matches.end())
361
                {
362
                    auto & match = jt->second;
363
                    if (match.node && !match.monotonicity && proj_key_nodes.contains(match.node))
364
                    {
365
                        visited.insert(frame.node);
366
                        new_inputs[frame.node] = match.node;
367
                        stack.pop();
368
                        continue;
369
                    }
370
                }
371
            }
372

373
            if (frame.next_child_to_visit < frame.node->children.size())
374
            {
375
                stack.push({.node = frame.node->children[frame.next_child_to_visit]});
376
                ++frame.next_child_to_visit;
377
                continue;
378
            }
379

380
            /// Not a match and there is no matched child.
381
            if (frame.node->type == ActionsDAG::ActionType::INPUT)
382
            {
383
                // LOG_TRACE(getLogger("optimizeUseProjections"), "Cannot find match for {}", frame.node->result_name);
384
                return {};
385
            }
386

387
            /// Not a match, but all children matched.
388
            visited.insert(frame.node);
389
            stack.pop();
390
        }
391
    }
392

393
    // LOG_TRACE(getLogger("optimizeUseProjections"), "Folding actions by projection");
394

395
    auto proj_dag = query.dag->foldActionsByProjection(new_inputs, query_key_nodes);
396
    appendAggregateFunctions(*proj_dag, aggregates, *matched_aggregates);
397
    return proj_dag;
398
}
399

400

401
/// Aggregate projection analysis result in case it can be applied.
402
struct AggregateProjectionCandidate : public ProjectionCandidate
403
{
404
    AggregateProjectionInfo info;
405

406
    /// Actions which need to be applied to columns from projection
407
    /// in order to get all the columns required for aggregation.
408
    ActionsDAGPtr dag;
409
};
410

411
struct MinMaxProjectionCandidate
412
{
413
    AggregateProjectionCandidate candidate;
414
    Block block;
415
};
416

417
struct AggregateProjectionCandidates
418
{
419
    std::vector<AggregateProjectionCandidate> real;
420
    std::optional<MinMaxProjectionCandidate> minmax_projection;
421

422
    /// This flag means that DAG for projection candidate should be used in FilterStep.
423
    bool has_filter = false;
424
};
425

426
AggregateProjectionCandidates getAggregateProjectionCandidates(
427
    QueryPlan::Node & node,
428
    AggregatingStep & aggregating,
429
    ReadFromMergeTree & reading,
430
    const std::shared_ptr<PartitionIdToMaxBlock> & max_added_blocks,
431
    bool allow_implicit_projections)
432
{
433
    const auto & keys = aggregating.getParams().keys;
434
    const auto & aggregates = aggregating.getParams().aggregates;
435
    const auto metadata = reading.getStorageMetadata();
436
    Block key_virtual_columns = reading.getMergeTreeData().getHeaderWithVirtualsForFilter(metadata);
437

438
    AggregateProjectionCandidates candidates;
439

440
    const auto & parts = reading.getParts();
441
    ContextPtr context = reading.getContext();
442

443
    const auto & projections = metadata->projections;
444
    std::vector<const ProjectionDescription *> agg_projections;
445

446
    for (const auto & projection : projections)
447
        if (projection.type == ProjectionDescription::Type::Aggregate)
448
            agg_projections.push_back(&projection);
449

450
    bool can_use_minmax_projection = allow_implicit_projections && metadata->minmax_count_projection
451
        && !reading.getMergeTreeData().has_lightweight_delete_parts.load();
452

453
    if (!can_use_minmax_projection && agg_projections.empty())
454
        return candidates;
455

456
    // LOG_TRACE(getLogger("optimizeUseProjections"), "Has agg projection");
457

458
    QueryDAG dag;
459
    if (!dag.build(*node.children.front()))
460
        return candidates;
461

462
    auto query_index = buildDAGIndex(*dag.dag);
463

464
    // LOG_TRACE(getLogger("optimizeUseProjections"), "Query DAG: {}", dag.dag->dumpDAG());
465

466
    candidates.has_filter = dag.filter_node;
467
    /// We can't use minmax projection if filter has non-deterministic functions.
468
    if (dag.filter_node && !VirtualColumnUtils::isDeterministicInScopeOfQuery(dag.filter_node))
469
        can_use_minmax_projection = false;
470

471
    if (can_use_minmax_projection)
472
    {
473
        const auto * projection = &*(metadata->minmax_count_projection);
474
        // LOG_TRACE(getLogger("optimizeUseProjections"), "Try projection {}", projection->name);
475
        auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns);
476
        // LOG_TRACE(getLogger("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG());
477
        if (auto proj_dag = analyzeAggregateProjection(info, dag, query_index, keys, aggregates))
478
        {
479
            // LOG_TRACE(getLogger("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG());
480
            AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)};
481

482
            // LOG_TRACE(getLogger("optimizeUseProjections"), "Projection sample block {}", sample_block.dumpStructure());
483
            auto block = reading.getMergeTreeData().getMinMaxCountProjectionBlock(
484
                metadata,
485
                candidate.dag->getRequiredColumnsNames(),
486
                (dag.filter_node ? dag.dag : nullptr),
487
                parts,
488
                max_added_blocks.get(),
489
                context);
490

491
            // LOG_TRACE(getLogger("optimizeUseProjections"), "Projection sample block 2 {}", block.dumpStructure());
492

493
            // minmax_count_projection cannot be used when there is no data to process, because
494
            // it will produce incorrect result during constant aggregation.
495
            // See https://github.com/ClickHouse/ClickHouse/issues/36728
496
            if (block)
497
            {
498
                MinMaxProjectionCandidate minmax;
499
                minmax.candidate = std::move(candidate);
500
                minmax.block = std::move(block);
501
                minmax.candidate.projection = projection;
502
                candidates.minmax_projection.emplace(std::move(minmax));
503
            }
504
        }
505
    }
506

507
    if (!candidates.minmax_projection)
508
    {
509
        auto it = std::find_if(agg_projections.begin(), agg_projections.end(), [&](const auto * projection)
510
        {
511
            return projection->name == context->getSettings().preferred_optimize_projection_name.value;
512
        });
513

514
        if (it != agg_projections.end())
515
        {
516
            const ProjectionDescription * preferred_projection = *it;
517
            agg_projections.clear();
518
            agg_projections.push_back(preferred_projection);
519
        }
520

521
        candidates.real.reserve(agg_projections.size());
522
        for (const auto * projection : agg_projections)
523
        {
524
            // LOG_TRACE(getLogger("optimizeUseProjections"), "Try projection {}", projection->name);
525
            auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns);
526
            // LOG_TRACE(getLogger("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG());
527
            if (auto proj_dag = analyzeAggregateProjection(info, dag, query_index, keys, aggregates))
528
            {
529
                // LOG_TRACE(getLogger("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG());
530
                AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)};
531
                candidate.projection = projection;
532
                candidates.real.emplace_back(std::move(candidate));
533
            }
534
        }
535
    }
536

537
    return candidates;
538
}
539

540
static QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
541
{
542
    IQueryPlanStep * step = node.step.get();
543
    if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
544
        return &node;
545

546
    if (node.children.size() != 1)
547
        return nullptr;
548

549
    if (typeid_cast<ExpressionStep *>(step) || typeid_cast<FilterStep *>(step))
550
        return findReadingStep(*node.children.front());
551

552
    return nullptr;
553
}
554

555
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections)
556
{
557
    if (node.children.size() != 1)
558
        return false;
559

560
    auto * aggregating = typeid_cast<AggregatingStep *>(node.step.get());
561
    if (!aggregating)
562
        return false;
563

564
    if (!aggregating->canUseProjection())
565
        return false;
566

567
    QueryPlan::Node * reading_node = findReadingStep(*node.children.front());
568
    if (!reading_node)
569
        return false;
570

571
    auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get());
572
    if (!reading)
573
        return false;
574

575
    if (!canUseProjectionForReadingStep(reading))
576
        return false;
577

578
    std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
579

580
    auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks, allow_implicit_projections);
581

582
    const auto & parts = reading->getParts();
583
    const auto & alter_conversions = reading->getAlterConvertionsForParts();
584
    const auto & query_info = reading->getQueryInfo();
585
    const auto metadata = reading->getStorageMetadata();
586
    ContextPtr context = reading->getContext();
587
    MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
588
    AggregateProjectionCandidate * best_candidate = nullptr;
589
    if (candidates.minmax_projection)
590
    {
591
        best_candidate = &candidates.minmax_projection->candidate;
592
    }
593
    else if (!candidates.real.empty())
594
    {
595
        auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
596
        size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
597

598
        /// Nothing to read. Ignore projections.
599
        if (ordinary_reading_marks == 0)
600
        {
601
            reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
602
            return false;
603
        }
604

605
        const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
606

607
        /// Selecting best candidate.
608
        for (auto & candidate : candidates.real)
609
        {
610
            auto required_column_names = candidate.dag->getRequiredColumnsNames();
611

612
            bool analyzed = analyzeProjectionCandidate(
613
                candidate,
614
                *reading,
615
                reader,
616
                required_column_names,
617
                parts_with_ranges,
618
                query_info,
619
                context,
620
                max_added_blocks,
621
                candidate.dag);
622

623
            if (!analyzed)
624
                continue;
625

626
            if (candidate.sum_marks > ordinary_reading_marks)
627
                continue;
628

629
            if (best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks)
630
                best_candidate = &candidate;
631
        }
632

633
        if (!best_candidate)
634
        {
635
            reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
636
            return false;
637
        }
638
    }
639
    else
640
    {
641
        return false;
642
    }
643

644
    Context::QualifiedProjectionName projection_name;
645
    chassert(best_candidate != nullptr);
646

647
    QueryPlanStepPtr projection_reading;
648
    bool has_ordinary_parts;
649

650
    /// Add reading from projection step.
651
    if (candidates.minmax_projection)
652
    {
653
        // LOG_TRACE(getLogger("optimizeUseProjections"), "Minmax proj block {}",
654
        //           candidates.minmax_projection->block.dumpStructure());
655

656
        Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(candidates.minmax_projection->block)));
657
        projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
658
        has_ordinary_parts = false;
659

660
        projection_name = Context::QualifiedProjectionName
661
        {
662
            .storage_id = reading->getMergeTreeData().getStorageID(),
663
            .projection_name = candidates.minmax_projection->candidate.projection->name,
664
        };
665
    }
666
    else
667
    {
668
        auto storage_snapshot = reading->getStorageSnapshot();
669
        auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, storage_snapshot->metadata);
670
        proj_snapshot->addProjection(best_candidate->projection);
671

672
        auto projection_query_info = query_info;
673
        projection_query_info.prewhere_info = nullptr;
674
        projection_query_info.filter_actions_dag = nullptr;
675

676
        projection_reading = reader.readFromParts(
677
            /* parts = */ {},
678
            /* alter_conversions = */ {},
679
            best_candidate->dag->getRequiredColumnsNames(),
680
            proj_snapshot,
681
            projection_query_info,
682
            context,
683
            reading->getMaxBlockSize(),
684
            reading->getNumStreams(),
685
            max_added_blocks,
686
            best_candidate->merge_tree_projection_select_result_ptr,
687
            reading->isParallelReadingEnabled());
688

689
        if (!projection_reading)
690
        {
691
            auto header = proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames());
692
            Pipe pipe(std::make_shared<NullSource>(std::move(header)));
693
            projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
694
        }
695

696
        projection_name = Context::QualifiedProjectionName
697
        {
698
            .storage_id = reading->getMergeTreeData().getStorageID(),
699
            .projection_name = best_candidate->projection->name,
700
        };
701

702
        has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;
703
        if (has_ordinary_parts)
704
            reading->setAnalyzedResult(std::move(best_candidate->merge_tree_ordinary_select_result_ptr));
705
    }
706

707
    if (!query_info.is_internal && context->hasQueryContext())
708
    {
709
        context->getQueryContext()->addQueryAccessInfo(Context::QualifiedProjectionName
710
        {
711
            .storage_id = reading->getMergeTreeData().getStorageID(),
712
            .projection_name = best_candidate->projection->name,
713
        });
714
    }
715

716
    // LOG_TRACE(getLogger("optimizeUseProjections"), "Projection reading header {}",
717
    //           projection_reading->getOutputStream().header.dumpStructure());
718

719
    projection_reading->setStepDescription(best_candidate->projection->name);
720

721
    auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)});
722
    auto & expr_or_filter_node = nodes.emplace_back();
723

724
    if (candidates.has_filter)
725
    {
726
        expr_or_filter_node.step = std::make_unique<FilterStep>(
727
            projection_reading_node.step->getOutputStream(),
728
            best_candidate->dag,
729
            best_candidate->dag->getOutputs().front()->result_name,
730
            true);
731
    }
732
    else
733
        expr_or_filter_node.step = std::make_unique<ExpressionStep>(
734
            projection_reading_node.step->getOutputStream(),
735
            best_candidate->dag);
736

737
    expr_or_filter_node.children.push_back(&projection_reading_node);
738

739
    if (!has_ordinary_parts)
740
    {
741
        /// All parts are taken from projection
742
        aggregating->requestOnlyMergeForAggregateProjection(expr_or_filter_node.step->getOutputStream());
743
        node.children.front() = &expr_or_filter_node;
744
    }
745
    else
746
    {
747
        node.step = aggregating->convertToAggregatingProjection(expr_or_filter_node.step->getOutputStream());
748
        node.children.push_back(&expr_or_filter_node);
749
    }
750

751
    return true;
752
}
753

754
}
755

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.