ClickHouse
92 строки · 3.2 Кб
1#include <Processors/QueryPlan/IntersectOrExceptStep.h>2
3#include <Interpreters/Context.h>4#include <Interpreters/ExpressionActions.h>5#include <QueryPipeline/QueryPipelineBuilder.h>6#include <Processors/Sources/NullSource.h>7#include <Processors/Transforms/ExpressionTransform.h>8#include <Processors/Transforms/IntersectOrExceptTransform.h>9#include <Processors/ResizeProcessor.h>10
11
12namespace DB13{
14
15namespace ErrorCodes16{
17extern const int LOGICAL_ERROR;18}
19
20static Block checkHeaders(const DataStreams & input_streams_)21{
22if (input_streams_.empty())23throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot perform intersect/except on empty set of query plan steps");24
25Block res = input_streams_.front().header;26for (const auto & stream : input_streams_)27assertBlocksHaveEqualStructure(stream.header, res, "IntersectOrExceptStep");28
29return res;30}
31
32IntersectOrExceptStep::IntersectOrExceptStep(33DataStreams input_streams_, Operator operator_, size_t max_threads_)34: header(checkHeaders(input_streams_))35, current_operator(operator_)36, max_threads(max_threads_)37{
38input_streams = std::move(input_streams_);39output_stream = DataStream{.header = header};40}
41
42QueryPipelineBuilderPtr IntersectOrExceptStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)43{
44auto pipeline = std::make_unique<QueryPipelineBuilder>();45
46if (pipelines.empty())47{48QueryPipelineProcessorsCollector collector(*pipeline, this);49pipeline->init(Pipe(std::make_shared<NullSource>(output_stream->header)));50processors = collector.detachProcessors();51return pipeline;52}53
54for (auto & cur_pipeline : pipelines)55{56/// Just in case.57if (!isCompatibleHeader(cur_pipeline->getHeader(), getOutputStream().header))58{59QueryPipelineProcessorsCollector collector(*cur_pipeline, this);60auto converting_dag = ActionsDAG::makeConvertingActions(61cur_pipeline->getHeader().getColumnsWithTypeAndName(),62getOutputStream().header.getColumnsWithTypeAndName(),63ActionsDAG::MatchColumnsMode::Name);64
65auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));66cur_pipeline->addSimpleTransform([&](const Block & cur_header)67{68return std::make_shared<ExpressionTransform>(cur_header, converting_actions);69});70
71auto added_processors = collector.detachProcessors();72processors.insert(processors.end(), added_processors.begin(), added_processors.end());73}74
75/// For the case of union.76cur_pipeline->addTransform(std::make_shared<ResizeProcessor>(header, cur_pipeline->getNumStreams(), 1));77}78
79*pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), max_threads, &processors);80auto transform = std::make_shared<IntersectOrExceptTransform>(header, current_operator);81processors.push_back(transform);82pipeline->addTransform(std::move(transform));83
84return pipeline;85}
86
87void IntersectOrExceptStep::describePipeline(FormatSettings & settings) const88{
89IQueryPlanStep::describePipeline(processors, settings);90}
91
92}
93