ClickHouse

Форк
0
/
MergingAggregatedStep.cpp 
167 строк · 6.4 Кб
1
#include <Interpreters/Context.h>
2
#include <Processors/Merges/FinishAggregatingInOrderTransform.h>
3
#include <Processors/QueryPlan/MergingAggregatedStep.h>
4
#include <Processors/Transforms/AggregatingTransform.h>
5
#include <Processors/Transforms/MemoryBoundMerging.h>
6
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
7
#include <Processors/Transforms/MergingAggregatedTransform.h>
8
#include <QueryPipeline/QueryPipelineBuilder.h>
9

10
namespace DB
11
{
12

13
static bool memoryBoundMergingWillBeUsed(
14
    const DataStream & input_stream,
15
    bool memory_bound_merging_of_aggregation_results_enabled,
16
    const SortDescription & group_by_sort_description)
17
{
18
    return memory_bound_merging_of_aggregation_results_enabled && !group_by_sort_description.empty()
19
        && input_stream.sort_scope >= DataStream::SortScope::Stream && input_stream.sort_description.hasPrefix(group_by_sort_description);
20
}
21

22
static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number)
23
{
24
    return ITransformingStep::Traits
25
    {
26
        {
27
            .returns_single_stream = should_produce_results_in_order_of_bucket_number,
28
            .preserves_number_of_streams = false,
29
            .preserves_sorting = false,
30
        },
31
        {
32
            .preserves_number_of_rows = false,
33
        }
34
    };
35
}
36

37
MergingAggregatedStep::MergingAggregatedStep(
38
    const DataStream & input_stream_,
39
    Aggregator::Params params_,
40
    bool final_,
41
    bool memory_efficient_aggregation_,
42
    size_t max_threads_,
43
    size_t memory_efficient_merge_threads_,
44
    bool should_produce_results_in_order_of_bucket_number_,
45
    size_t max_block_size_,
46
    size_t memory_bound_merging_max_block_bytes_,
47
    SortDescription group_by_sort_description_,
48
    bool memory_bound_merging_of_aggregation_results_enabled_)
49
    : ITransformingStep(
50
        input_stream_,
51
        params_.getHeader(input_stream_.header, final_),
52
        getTraits(should_produce_results_in_order_of_bucket_number_))
53
    , params(std::move(params_))
54
    , final(final_)
55
    , memory_efficient_aggregation(memory_efficient_aggregation_)
56
    , max_threads(max_threads_)
57
    , memory_efficient_merge_threads(memory_efficient_merge_threads_)
58
    , max_block_size(max_block_size_)
59
    , memory_bound_merging_max_block_bytes(memory_bound_merging_max_block_bytes_)
60
    , group_by_sort_description(std::move(group_by_sort_description_))
61
    , should_produce_results_in_order_of_bucket_number(should_produce_results_in_order_of_bucket_number_)
62
    , memory_bound_merging_of_aggregation_results_enabled(memory_bound_merging_of_aggregation_results_enabled_)
63
{
64
    if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)
65
    {
66
        output_stream->sort_description = group_by_sort_description;
67
        output_stream->sort_scope = DataStream::SortScope::Global;
68
    }
69
}
70

71
void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStream::SortScope sort_scope)
72
{
73
    is_order_overwritten = true;
74
    overwritten_sort_scope = sort_scope;
75

76
    auto & input_stream = input_streams.front();
77
    input_stream.sort_scope = sort_scope;
78
    input_stream.sort_description = sort_description;
79

80
    /// Columns might be reordered during optimization, so we better to update sort description.
81
    group_by_sort_description = std::move(sort_description);
82

83
    if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)
84
    {
85
        output_stream->sort_description = group_by_sort_description;
86
        output_stream->sort_scope = DataStream::SortScope::Global;
87
    }
88
}
89

90
void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
91
{
92
    auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
93

94
    if (memoryBoundMergingWillBeUsed())
95
    {
96
        auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
97
            pipeline.getHeader(),
98
            pipeline.getNumStreams(),
99
            transform_params,
100
            group_by_sort_description,
101
            max_block_size,
102
            memory_bound_merging_max_block_bytes);
103

104
        pipeline.addTransform(std::move(transform));
105

106
        /// Do merge of aggregated data in parallel.
107
        pipeline.resize(max_threads);
108

109
        const auto & required_sort_description
110
            = should_produce_results_in_order_of_bucket_number ? group_by_sort_description : SortDescription{};
111

112
        pipeline.addSimpleTransform(
113
            [&](const Block &) { return std::make_shared<MergingAggregatedBucketTransform>(transform_params, required_sort_description); });
114

115
        if (should_produce_results_in_order_of_bucket_number)
116
        {
117
            pipeline.addTransform(
118
                std::make_shared<SortingAggregatedForMemoryBoundMergingTransform>(pipeline.getHeader(), pipeline.getNumStreams()));
119
        }
120

121
        return;
122
    }
123

124
    if (!memory_efficient_aggregation)
125
    {
126
        /// We union several sources into one, paralleling the work.
127
        pipeline.resize(1);
128

129
        /// Now merge the aggregated blocks
130
        pipeline.addSimpleTransform([&](const Block & header)
131
                                    { return std::make_shared<MergingAggregatedTransform>(header, transform_params, max_threads); });
132
    }
133
    else
134
    {
135
        auto num_merge_threads = memory_efficient_merge_threads
136
                                 ? static_cast<size_t>(memory_efficient_merge_threads)
137
                                 : static_cast<size_t>(max_threads);
138

139
        pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads);
140
    }
141

142
    pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : max_threads);
143
}
144

145
void MergingAggregatedStep::describeActions(FormatSettings & settings) const
146
{
147
    return params.explain(settings.out, settings.offset);
148
}
149

150
void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const
151
{
152
    params.explain(map);
153
}
154

155
void MergingAggregatedStep::updateOutputStream()
156
{
157
    output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits());
158
    if (is_order_overwritten)  /// overwrite order again
159
        applyOrder(group_by_sort_description, overwritten_sort_scope);
160
}
161

162
bool MergingAggregatedStep::memoryBoundMergingWillBeUsed() const
163
{
164
    return DB::memoryBoundMergingWillBeUsed(
165
        input_streams.front(), memory_bound_merging_of_aggregation_results_enabled, group_by_sort_description);
166
}
167
}
168

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.