ClickHouse
160 строк · 4.9 Кб
1#include <AggregateFunctions/IAggregateFunction.h>2#include <IO/Operators.h>3#include <Interpreters/ExpressionActions.h>4#include <Processors/QueryPlan/WindowStep.h>5#include <Processors/Transforms/ExpressionTransform.h>6#include <Processors/Transforms/WindowTransform.h>7#include <QueryPipeline/QueryPipelineBuilder.h>8#include <Common/JSONBuilder.h>9
10namespace DB11{
12
13static ITransformingStep::Traits getTraits(bool preserves_sorting)14{
15return ITransformingStep::Traits16{17{18.returns_single_stream = false,19.preserves_number_of_streams = true,20.preserves_sorting = preserves_sorting,21},22{23.preserves_number_of_rows = true24}25};26}
27
28static Block addWindowFunctionResultColumns(const Block & block,29std::vector<WindowFunctionDescription> window_functions)30{
31auto result = block;32
33for (const auto & f : window_functions)34{35ColumnWithTypeAndName column_with_type;36column_with_type.name = f.column_name;37column_with_type.type = f.aggregate_function->getResultType();38column_with_type.column = column_with_type.type->createColumn();39
40result.insert(column_with_type);41}42
43return result;44}
45
46WindowStep::WindowStep(47const DataStream & input_stream_,48const WindowDescription & window_description_,49const std::vector<WindowFunctionDescription> & window_functions_,50bool streams_fan_out_)51: ITransformingStep(input_stream_, addWindowFunctionResultColumns(input_stream_.header, window_functions_), getTraits(!streams_fan_out_))52, window_description(window_description_)53, window_functions(window_functions_)54, streams_fan_out(streams_fan_out_)55{
56// We don't remove any columns, only add, so probably we don't have to update57// the output DataStream::distinct_columns.58
59window_description.checkValid();60
61}
62
63void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)64{
65auto num_threads = pipeline.getNumThreads();66
67// This resize is needed for cases such as `over ()` when we don't have a68// sort node, and the input might have multiple streams. The sort node would69// have resized it.70if (window_description.full_sort_description.empty())71pipeline.resize(1);72
73pipeline.addSimpleTransform(74[&](const Block & /*header*/)75{76return std::make_shared<WindowTransform>(77input_streams.front().header, output_stream->header, window_description, window_functions);78});79
80if (streams_fan_out)81{82pipeline.resize(num_threads);83}84
85assertBlocksHaveEqualStructure(pipeline.getHeader(), output_stream->header,86"WindowStep transform for '" + window_description.window_name + "'");87}
88
89void WindowStep::describeActions(FormatSettings & settings) const90{
91String prefix(settings.offset, ' ');92settings.out << prefix << "Window: (";93if (!window_description.partition_by.empty())94{95settings.out << "PARTITION BY ";96for (size_t i = 0; i < window_description.partition_by.size(); ++i)97{98if (i > 0)99{100settings.out << ", ";101}102
103settings.out << window_description.partition_by[i].column_name;104}105}106if (!window_description.partition_by.empty()107&& !window_description.order_by.empty())108{109settings.out << " ";110}111if (!window_description.order_by.empty())112{113settings.out << "ORDER BY "114<< dumpSortDescription(window_description.order_by);115}116settings.out << ")\n";117
118for (size_t i = 0; i < window_functions.size(); ++i)119{120settings.out << prefix << (i == 0 ? "Functions: "121: " ");122settings.out << window_functions[i].column_name << "\n";123}124}
125
126void WindowStep::describeActions(JSONBuilder::JSONMap & map) const127{
128if (!window_description.partition_by.empty())129{130auto partion_columns_array = std::make_unique<JSONBuilder::JSONArray>();131for (const auto & descr : window_description.partition_by)132partion_columns_array->add(descr.column_name);133
134map.add("Partition By", std::move(partion_columns_array));135}136
137if (!window_description.order_by.empty())138map.add("Sort Description", explainSortDescription(window_description.order_by));139
140auto functions_array = std::make_unique<JSONBuilder::JSONArray>();141for (const auto & func : window_functions)142functions_array->add(func.column_name);143
144map.add("Functions", std::move(functions_array));145}
146
147void WindowStep::updateOutputStream()148{
149output_stream = createOutputStream(150input_streams.front(), addWindowFunctionResultColumns(input_streams.front().header, window_functions), getDataStreamTraits());151
152window_description.checkValid();153}
154
155const WindowDescription & WindowStep::getWindowDescription() const156{
157return window_description;158}
159
160}
161