ClickHouse

Форк
0
107 строк · 3.7 Кб
1
#include <type_traits>
2
#include <Interpreters/ExpressionActions.h>
3
#include <Processors/QueryPlan/UnionStep.h>
4
#include <Processors/Sources/NullSource.h>
5
#include <Processors/Transforms/ExpressionTransform.h>
6
#include <QueryPipeline/QueryPipelineBuilder.h>
7
#include <base/defines.h>
8

9
namespace DB
10
{
11

12
namespace ErrorCodes
13
{
14
    extern const int LOGICAL_ERROR;
15
}
16

17
static Block checkHeaders(const DataStreams & input_streams)
18
{
19
    if (input_streams.empty())
20
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of query plan steps");
21

22
    Block res = input_streams.front().header;
23
    for (const auto & stream : input_streams)
24
        assertBlocksHaveEqualStructure(stream.header, res, "UnionStep");
25

26
    return res;
27
}
28

29
UnionStep::UnionStep(DataStreams input_streams_, size_t max_threads_)
30
    : header(checkHeaders(input_streams_))
31
    , max_threads(max_threads_)
32
{
33
    updateInputStreams(std::move(input_streams_));
34
}
35

36
void UnionStep::updateOutputStream()
37
{
38
    if (input_streams.size() == 1)
39
        output_stream = input_streams.front();
40
    else
41
        output_stream = DataStream{.header = header};
42

43
    SortDescription common_sort_description = input_streams.front().sort_description;
44
    DataStream::SortScope sort_scope = input_streams.front().sort_scope;
45
    for (const auto & input_stream : input_streams)
46
    {
47
        common_sort_description = commonPrefix(common_sort_description, input_stream.sort_description);
48
        sort_scope = std::min(sort_scope, input_stream.sort_scope);
49
    }
50
    if (!common_sort_description.empty() && sort_scope >= DataStream::SortScope::Chunk)
51
    {
52
        output_stream->sort_description = common_sort_description;
53
        if (sort_scope == DataStream::SortScope::Global && input_streams.size() > 1)
54
            output_stream->sort_scope = DataStream::SortScope::Stream;
55
        else
56
            output_stream->sort_scope = sort_scope;
57
    }
58
}
59

60
QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
61
{
62
    auto pipeline = std::make_unique<QueryPipelineBuilder>();
63

64
    if (pipelines.empty())
65
    {
66
        QueryPipelineProcessorsCollector collector(*pipeline, this);
67
        pipeline->init(Pipe(std::make_shared<NullSource>(output_stream->header)));
68
        processors = collector.detachProcessors();
69
        return pipeline;
70
    }
71

72
    for (auto & cur_pipeline : pipelines)
73
    {
74
#if !defined(NDEBUG)
75
        assertCompatibleHeader(cur_pipeline->getHeader(), getOutputStream().header, "UnionStep");
76
#endif
77
        /// Headers for union must be equal.
78
        /// But, just in case, convert it to the same header if not.
79
        if (!isCompatibleHeader(cur_pipeline->getHeader(), getOutputStream().header))
80
        {
81
            QueryPipelineProcessorsCollector collector(*cur_pipeline, this);
82
            auto converting_dag = ActionsDAG::makeConvertingActions(
83
                cur_pipeline->getHeader().getColumnsWithTypeAndName(),
84
                getOutputStream().header.getColumnsWithTypeAndName(),
85
                ActionsDAG::MatchColumnsMode::Name);
86

87
            auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
88
            cur_pipeline->addSimpleTransform([&](const Block & cur_header)
89
            {
90
                return std::make_shared<ExpressionTransform>(cur_header, converting_actions);
91
            });
92

93
            auto added_processors = collector.detachProcessors();
94
            processors.insert(processors.end(), added_processors.begin(), added_processors.end());
95
        }
96
    }
97

98
    *pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), max_threads, &processors);
99
    return pipeline;
100
}
101

102
void UnionStep::describePipeline(FormatSettings & settings) const
103
{
104
    IQueryPlanStep::describePipeline(processors, settings);
105
}
106

107
}
108

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.