ClickHouse

Форк
0
/
IntersectOrExceptStep.cpp 
92 строки · 3.2 Кб
1
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
2

3
#include <Interpreters/Context.h>
4
#include <Interpreters/ExpressionActions.h>
5
#include <QueryPipeline/QueryPipelineBuilder.h>
6
#include <Processors/Sources/NullSource.h>
7
#include <Processors/Transforms/ExpressionTransform.h>
8
#include <Processors/Transforms/IntersectOrExceptTransform.h>
9
#include <Processors/ResizeProcessor.h>
10

11

12
namespace DB
13
{
14

15
namespace ErrorCodes
16
{
17
    extern const int LOGICAL_ERROR;
18
}
19

20
static Block checkHeaders(const DataStreams & input_streams_)
21
{
22
    if (input_streams_.empty())
23
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot perform intersect/except on empty set of query plan steps");
24

25
    Block res = input_streams_.front().header;
26
    for (const auto & stream : input_streams_)
27
        assertBlocksHaveEqualStructure(stream.header, res, "IntersectOrExceptStep");
28

29
    return res;
30
}
31

32
IntersectOrExceptStep::IntersectOrExceptStep(
33
    DataStreams input_streams_, Operator operator_, size_t max_threads_)
34
    : header(checkHeaders(input_streams_))
35
    , current_operator(operator_)
36
    , max_threads(max_threads_)
37
{
38
    input_streams = std::move(input_streams_);
39
    output_stream = DataStream{.header = header};
40
}
41

42
QueryPipelineBuilderPtr IntersectOrExceptStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
43
{
44
    auto pipeline = std::make_unique<QueryPipelineBuilder>();
45

46
    if (pipelines.empty())
47
    {
48
        QueryPipelineProcessorsCollector collector(*pipeline, this);
49
        pipeline->init(Pipe(std::make_shared<NullSource>(output_stream->header)));
50
        processors = collector.detachProcessors();
51
        return pipeline;
52
    }
53

54
    for (auto & cur_pipeline : pipelines)
55
    {
56
        /// Just in case.
57
        if (!isCompatibleHeader(cur_pipeline->getHeader(), getOutputStream().header))
58
        {
59
            QueryPipelineProcessorsCollector collector(*cur_pipeline, this);
60
            auto converting_dag = ActionsDAG::makeConvertingActions(
61
                cur_pipeline->getHeader().getColumnsWithTypeAndName(),
62
                getOutputStream().header.getColumnsWithTypeAndName(),
63
                ActionsDAG::MatchColumnsMode::Name);
64

65
            auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
66
            cur_pipeline->addSimpleTransform([&](const Block & cur_header)
67
            {
68
                return std::make_shared<ExpressionTransform>(cur_header, converting_actions);
69
            });
70

71
            auto added_processors = collector.detachProcessors();
72
            processors.insert(processors.end(), added_processors.begin(), added_processors.end());
73
        }
74

75
        /// For the case of union.
76
        cur_pipeline->addTransform(std::make_shared<ResizeProcessor>(header, cur_pipeline->getNumStreams(), 1));
77
    }
78

79
    *pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), max_threads, &processors);
80
    auto transform = std::make_shared<IntersectOrExceptTransform>(header, current_operator);
81
    processors.push_back(transform);
82
    pipeline->addTransform(std::move(transform));
83

84
    return pipeline;
85
}
86

87
void IntersectOrExceptStep::describePipeline(FormatSettings & settings) const
88
{
89
    IQueryPlanStep::describePipeline(processors, settings);
90
}
91

92
}
93

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.