ClickHouse

Форк
0
/
enableMemoryBoundMerging.cpp 
94 строки · 3.4 Кб
1
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
2
#include <Processors/QueryPlan/ReadFromRemote.h>
3
#include <Processors/QueryPlan/AggregatingStep.h>
4
#include <Processors/QueryPlan/MergingAggregatedStep.h>
5
#include <Processors/QueryPlan/UnionStep.h>
6

7
namespace DB::QueryPlanOptimizations
8
{
9

10
/// We are trying to find a part of plan like
11
///
12
///          - ReadFromRemote (x N)
13
///  - Union - ReadFromParallelRemoteReplicasStep (x M)
14
///          - Aggregating/MergingAggregated
15
///
16
/// and enable memory bound merging for remote steps if it was enabled for local aggregation.
17
void enableMemoryBoundMerging(QueryPlan::Node & node, QueryPlan::Nodes &)
18
{
19
    auto * root_mergine_aggeregated = typeid_cast<MergingAggregatedStep *>(node.step.get());
20
    if (!root_mergine_aggeregated)
21
        return;
22

23
    const auto & union_node = *node.children.front();
24
    auto * union_step = typeid_cast<UnionStep *>(union_node.step.get());
25
    if (!union_step)
26
        return;
27

28
    std::vector<ReadFromRemote *> reading_steps;
29
    std::vector<ReadFromParallelRemoteReplicasStep *> async_reading_steps;
30
    IQueryPlanStep * local_plan = nullptr;
31

32
    reading_steps.reserve((union_node.children.size()));
33
    async_reading_steps.reserve((union_node.children.size()));
34

35
    for (const auto & child : union_node.children)
36
    {
37
        auto * child_node = child->step.get();
38
        if (auto * reading_step = typeid_cast<ReadFromRemote *>(child_node))
39
            reading_steps.push_back(reading_step);
40
        else if (auto * async_reading_step = typeid_cast<ReadFromParallelRemoteReplicasStep *>(child_node))
41
            async_reading_steps.push_back(async_reading_step);
42
        else if (local_plan)
43
            /// Usually there is a single local plan.
44
            /// TODO: we can support many local plans and calculate common sort description prefix. Do we need it?
45
            return;
46
        else
47
            local_plan = child_node;
48
    }
49

50
    /// We determine output stream sort properties by a local plan (local because otherwise table could be unknown).
51
    /// If no local shard exist for this cluster, no sort properties will be provided, c'est la vie.
52
    if (local_plan == nullptr || (reading_steps.empty() && async_reading_steps.empty()))
53
        return;
54

55
    SortDescription sort_description;
56
    bool enforce_aggregation_in_order = false;
57

58
    if (auto * aggregating_step = typeid_cast<AggregatingStep *>(local_plan))
59
    {
60
        if (aggregating_step->memoryBoundMergingWillBeUsed())
61
        {
62
            sort_description = aggregating_step->getOutputStream().sort_description;
63
            enforce_aggregation_in_order = true;
64
        }
65
    }
66
    else if (auto * mergine_aggeregated = typeid_cast<MergingAggregatedStep *>(local_plan))
67
    {
68
        if (mergine_aggeregated->memoryBoundMergingWillBeUsed())
69
        {
70
            sort_description = mergine_aggeregated->getOutputStream().sort_description;
71
        }
72
    }
73

74
    if (sort_description.empty())
75
        return;
76

77
    for (auto & reading : reading_steps)
78
    {
79
        reading->enforceSorting(sort_description);
80
        if (enforce_aggregation_in_order)
81
            reading->enforceAggregationInOrder();
82
    }
83

84
    for (auto & reading : async_reading_steps)
85
    {
86
        reading->enforceSorting(sort_description);
87
        if (enforce_aggregation_in_order)
88
            reading->enforceAggregationInOrder();
89
    }
90

91
    root_mergine_aggeregated->applyOrder(sort_description, DataStream::SortScope::Stream);
92
}
93

94
}
95

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.