ClickHouse
107 строк · 3.7 Кб
1#include <type_traits>
2#include <Interpreters/ExpressionActions.h>
3#include <Processors/QueryPlan/UnionStep.h>
4#include <Processors/Sources/NullSource.h>
5#include <Processors/Transforms/ExpressionTransform.h>
6#include <QueryPipeline/QueryPipelineBuilder.h>
7#include <base/defines.h>
8
9namespace DB
10{
11
12namespace ErrorCodes
13{
14extern const int LOGICAL_ERROR;
15}
16
17static Block checkHeaders(const DataStreams & input_streams)
18{
19if (input_streams.empty())
20throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of query plan steps");
21
22Block res = input_streams.front().header;
23for (const auto & stream : input_streams)
24assertBlocksHaveEqualStructure(stream.header, res, "UnionStep");
25
26return res;
27}
28
29UnionStep::UnionStep(DataStreams input_streams_, size_t max_threads_)
30: header(checkHeaders(input_streams_))
31, max_threads(max_threads_)
32{
33updateInputStreams(std::move(input_streams_));
34}
35
36void UnionStep::updateOutputStream()
37{
38if (input_streams.size() == 1)
39output_stream = input_streams.front();
40else
41output_stream = DataStream{.header = header};
42
43SortDescription common_sort_description = input_streams.front().sort_description;
44DataStream::SortScope sort_scope = input_streams.front().sort_scope;
45for (const auto & input_stream : input_streams)
46{
47common_sort_description = commonPrefix(common_sort_description, input_stream.sort_description);
48sort_scope = std::min(sort_scope, input_stream.sort_scope);
49}
50if (!common_sort_description.empty() && sort_scope >= DataStream::SortScope::Chunk)
51{
52output_stream->sort_description = common_sort_description;
53if (sort_scope == DataStream::SortScope::Global && input_streams.size() > 1)
54output_stream->sort_scope = DataStream::SortScope::Stream;
55else
56output_stream->sort_scope = sort_scope;
57}
58}
59
60QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
61{
62auto pipeline = std::make_unique<QueryPipelineBuilder>();
63
64if (pipelines.empty())
65{
66QueryPipelineProcessorsCollector collector(*pipeline, this);
67pipeline->init(Pipe(std::make_shared<NullSource>(output_stream->header)));
68processors = collector.detachProcessors();
69return pipeline;
70}
71
72for (auto & cur_pipeline : pipelines)
73{
74#if !defined(NDEBUG)
75assertCompatibleHeader(cur_pipeline->getHeader(), getOutputStream().header, "UnionStep");
76#endif
77/// Headers for union must be equal.
78/// But, just in case, convert it to the same header if not.
79if (!isCompatibleHeader(cur_pipeline->getHeader(), getOutputStream().header))
80{
81QueryPipelineProcessorsCollector collector(*cur_pipeline, this);
82auto converting_dag = ActionsDAG::makeConvertingActions(
83cur_pipeline->getHeader().getColumnsWithTypeAndName(),
84getOutputStream().header.getColumnsWithTypeAndName(),
85ActionsDAG::MatchColumnsMode::Name);
86
87auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
88cur_pipeline->addSimpleTransform([&](const Block & cur_header)
89{
90return std::make_shared<ExpressionTransform>(cur_header, converting_actions);
91});
92
93auto added_processors = collector.detachProcessors();
94processors.insert(processors.end(), added_processors.begin(), added_processors.end());
95}
96}
97
98*pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), max_threads, &processors);
99return pipeline;
100}
101
102void UnionStep::describePipeline(FormatSettings & settings) const
103{
104IQueryPlanStep::describePipeline(processors, settings);
105}
106
107}
108