ClickHouse
167 строк · 6.4 Кб
1#include <Interpreters/Context.h>2#include <Processors/Merges/FinishAggregatingInOrderTransform.h>3#include <Processors/QueryPlan/MergingAggregatedStep.h>4#include <Processors/Transforms/AggregatingTransform.h>5#include <Processors/Transforms/MemoryBoundMerging.h>6#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>7#include <Processors/Transforms/MergingAggregatedTransform.h>8#include <QueryPipeline/QueryPipelineBuilder.h>9
10namespace DB11{
12
13static bool memoryBoundMergingWillBeUsed(14const DataStream & input_stream,15bool memory_bound_merging_of_aggregation_results_enabled,16const SortDescription & group_by_sort_description)17{
18return memory_bound_merging_of_aggregation_results_enabled && !group_by_sort_description.empty()19&& input_stream.sort_scope >= DataStream::SortScope::Stream && input_stream.sort_description.hasPrefix(group_by_sort_description);20}
21
22static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number)23{
24return ITransformingStep::Traits25{26{27.returns_single_stream = should_produce_results_in_order_of_bucket_number,28.preserves_number_of_streams = false,29.preserves_sorting = false,30},31{32.preserves_number_of_rows = false,33}34};35}
36
37MergingAggregatedStep::MergingAggregatedStep(38const DataStream & input_stream_,39Aggregator::Params params_,40bool final_,41bool memory_efficient_aggregation_,42size_t max_threads_,43size_t memory_efficient_merge_threads_,44bool should_produce_results_in_order_of_bucket_number_,45size_t max_block_size_,46size_t memory_bound_merging_max_block_bytes_,47SortDescription group_by_sort_description_,48bool memory_bound_merging_of_aggregation_results_enabled_)49: ITransformingStep(50input_stream_,51params_.getHeader(input_stream_.header, final_),52getTraits(should_produce_results_in_order_of_bucket_number_))53, params(std::move(params_))54, final(final_)55, memory_efficient_aggregation(memory_efficient_aggregation_)56, max_threads(max_threads_)57, memory_efficient_merge_threads(memory_efficient_merge_threads_)58, max_block_size(max_block_size_)59, memory_bound_merging_max_block_bytes(memory_bound_merging_max_block_bytes_)60, group_by_sort_description(std::move(group_by_sort_description_))61, should_produce_results_in_order_of_bucket_number(should_produce_results_in_order_of_bucket_number_)62, memory_bound_merging_of_aggregation_results_enabled(memory_bound_merging_of_aggregation_results_enabled_)63{
64if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)65{66output_stream->sort_description = group_by_sort_description;67output_stream->sort_scope = DataStream::SortScope::Global;68}69}
70
71void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStream::SortScope sort_scope)72{
73is_order_overwritten = true;74overwritten_sort_scope = sort_scope;75
76auto & input_stream = input_streams.front();77input_stream.sort_scope = sort_scope;78input_stream.sort_description = sort_description;79
80/// Columns might be reordered during optimization, so we better to update sort description.81group_by_sort_description = std::move(sort_description);82
83if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)84{85output_stream->sort_description = group_by_sort_description;86output_stream->sort_scope = DataStream::SortScope::Global;87}88}
89
90void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)91{
92auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);93
94if (memoryBoundMergingWillBeUsed())95{96auto transform = std::make_shared<FinishAggregatingInOrderTransform>(97pipeline.getHeader(),98pipeline.getNumStreams(),99transform_params,100group_by_sort_description,101max_block_size,102memory_bound_merging_max_block_bytes);103
104pipeline.addTransform(std::move(transform));105
106/// Do merge of aggregated data in parallel.107pipeline.resize(max_threads);108
109const auto & required_sort_description110= should_produce_results_in_order_of_bucket_number ? group_by_sort_description : SortDescription{};111
112pipeline.addSimpleTransform(113[&](const Block &) { return std::make_shared<MergingAggregatedBucketTransform>(transform_params, required_sort_description); });114
115if (should_produce_results_in_order_of_bucket_number)116{117pipeline.addTransform(118std::make_shared<SortingAggregatedForMemoryBoundMergingTransform>(pipeline.getHeader(), pipeline.getNumStreams()));119}120
121return;122}123
124if (!memory_efficient_aggregation)125{126/// We union several sources into one, paralleling the work.127pipeline.resize(1);128
129/// Now merge the aggregated blocks130pipeline.addSimpleTransform([&](const Block & header)131{ return std::make_shared<MergingAggregatedTransform>(header, transform_params, max_threads); });132}133else134{135auto num_merge_threads = memory_efficient_merge_threads136? static_cast<size_t>(memory_efficient_merge_threads)137: static_cast<size_t>(max_threads);138
139pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads);140}141
142pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : max_threads);143}
144
145void MergingAggregatedStep::describeActions(FormatSettings & settings) const146{
147return params.explain(settings.out, settings.offset);148}
149
150void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const151{
152params.explain(map);153}
154
155void MergingAggregatedStep::updateOutputStream()156{
157output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits());158if (is_order_overwritten) /// overwrite order again159applyOrder(group_by_sort_description, overwritten_sort_scope);160}
161
162bool MergingAggregatedStep::memoryBoundMergingWillBeUsed() const163{
164return DB::memoryBoundMergingWillBeUsed(165input_streams.front(), memory_bound_merging_of_aggregation_results_enabled, group_by_sort_description);166}
167}
168