ClickHouse
55 строк · 1.8 Кб
1#include <Processors/QueryPlan/ITransformingStep.h>2#include <QueryPipeline/QueryPipelineBuilder.h>3
4namespace DB5{
6
7ITransformingStep::ITransformingStep(DataStream input_stream, Block output_header, Traits traits, bool collect_processors_)8: transform_traits(std::move(traits.transform_traits))9, collect_processors(collect_processors_)10, data_stream_traits(std::move(traits.data_stream_traits))11{
12input_streams.emplace_back(std::move(input_stream));13output_stream = createOutputStream(input_streams.front(), std::move(output_header), data_stream_traits);14}
15
16DataStream ITransformingStep::createOutputStream(17const DataStream & input_stream,18Block output_header,19const DataStreamTraits & stream_traits)20{
21DataStream output_stream{.header = std::move(output_header)};22
23output_stream.has_single_port = stream_traits.returns_single_stream24|| (input_stream.has_single_port && stream_traits.preserves_number_of_streams);25
26if (stream_traits.preserves_sorting)27{28output_stream.sort_description = input_stream.sort_description;29output_stream.sort_scope = input_stream.sort_scope;30}31
32return output_stream;33}
34
35
36QueryPipelineBuilderPtr ITransformingStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings)37{
38if (collect_processors)39{40QueryPipelineProcessorsCollector collector(*pipelines.front(), this);41transformPipeline(*pipelines.front(), settings);42processors = collector.detachProcessors();43}44else45transformPipeline(*pipelines.front(), settings);46
47return std::move(pipelines.front());48}
49
50void ITransformingStep::describePipeline(FormatSettings & settings) const51{
52IQueryPlanStep::describePipeline(processors, settings);53}
54
55}
56