ClickHouse

Форк
0
175 строк · 5.4 Кб
1
#include <Processors/QueryPlan/JoinStep.h>
2
#include <QueryPipeline/QueryPipelineBuilder.h>
3
#include <Processors/Transforms/JoiningTransform.h>
4
#include <Interpreters/IJoin.h>
5
#include <Interpreters/TableJoin.h>
6
#include <IO/Operators.h>
7
#include <Common/JSONBuilder.h>
8
#include <Common/typeid_cast.h>
9

10
namespace DB
11
{
12

13
namespace ErrorCodes
14
{
15
    extern const int LOGICAL_ERROR;
16
}
17

18
namespace
19
{
20

21
std::vector<std::pair<String, String>> describeJoinActions(const JoinPtr & join)
22
{
23
    std::vector<std::pair<String, String>> description;
24
    const auto & table_join = join->getTableJoin();
25

26
    description.emplace_back("Type", toString(table_join.kind()));
27
    description.emplace_back("Strictness", toString(table_join.strictness()));
28
    description.emplace_back("Algorithm", join->getName());
29

30
    if (table_join.strictness() == JoinStrictness::Asof)
31
        description.emplace_back("ASOF inequality", toString(table_join.getAsofInequality()));
32

33
    if (!table_join.getClauses().empty())
34
        description.emplace_back("Clauses", TableJoin::formatClauses(table_join.getClauses(), true /*short_format*/));
35

36
    return description;
37
}
38

39
}
40

41
JoinStep::JoinStep(
42
    const DataStream & left_stream_,
43
    const DataStream & right_stream_,
44
    JoinPtr join_,
45
    size_t max_block_size_,
46
    size_t max_streams_,
47
    bool keep_left_read_in_order_)
48
    : join(std::move(join_)), max_block_size(max_block_size_), max_streams(max_streams_), keep_left_read_in_order(keep_left_read_in_order_)
49
{
50
    updateInputStreams(DataStreams{left_stream_, right_stream_});
51
}
52

53
QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
54
{
55
    if (pipelines.size() != 2)
56
        throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");
57

58
    if (join->pipelineType() == JoinPipelineType::YShaped)
59
    {
60
        auto joined_pipeline = QueryPipelineBuilder::joinPipelinesYShaped(
61
            std::move(pipelines[0]), std::move(pipelines[1]), join, output_stream->header, max_block_size, &processors);
62
        joined_pipeline->resize(max_streams);
63
        return joined_pipeline;
64
    }
65

66
    return QueryPipelineBuilder::joinPipelinesRightLeft(
67
        std::move(pipelines[0]),
68
        std::move(pipelines[1]),
69
        join,
70
        output_stream->header,
71
        max_block_size,
72
        max_streams,
73
        keep_left_read_in_order,
74
        &processors);
75
}
76

77
bool JoinStep::allowPushDownToRight() const
78
{
79
    return join->pipelineType() == JoinPipelineType::YShaped || join->pipelineType() == JoinPipelineType::FillRightFirst;
80
}
81

82
void JoinStep::describePipeline(FormatSettings & settings) const
83
{
84
    IQueryPlanStep::describePipeline(processors, settings);
85
}
86

87
void JoinStep::describeActions(FormatSettings & settings) const
88
{
89
    String prefix(settings.offset, ' ');
90

91
    for (const auto & [name, value] : describeJoinActions(join))
92
        settings.out << prefix << name << ": " << value << '\n';
93
}
94

95
void JoinStep::describeActions(JSONBuilder::JSONMap & map) const
96
{
97
    for (const auto & [name, value] : describeJoinActions(join))
98
        map.add(name, value);
99
}
100

101
void JoinStep::updateOutputStream()
102
{
103
    output_stream = DataStream
104
    {
105
        .header = JoiningTransform::transformHeader(input_streams[0].header, join),
106
    };
107
}
108

109
static ITransformingStep::Traits getStorageJoinTraits()
110
{
111
    return ITransformingStep::Traits
112
    {
113
        {
114
            .returns_single_stream = false,
115
            .preserves_number_of_streams = true,
116
            .preserves_sorting = false,
117
        },
118
        {
119
            .preserves_number_of_rows = false,
120
        }
121
    };
122
}
123

124
FilledJoinStep::FilledJoinStep(const DataStream & input_stream_, JoinPtr join_, size_t max_block_size_)
125
    : ITransformingStep(
126
        input_stream_,
127
        JoiningTransform::transformHeader(input_stream_.header, join_),
128
        getStorageJoinTraits())
129
    , join(std::move(join_))
130
    , max_block_size(max_block_size_)
131
{
132
    if (!join->isFilled())
133
        throw Exception(ErrorCodes::LOGICAL_ERROR, "FilledJoinStep expects Join to be filled");
134
}
135

136
void FilledJoinStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
137
{
138
    bool default_totals = false;
139
    if (!pipeline.hasTotals() && join->getTotals())
140
    {
141
        pipeline.addDefaultTotals();
142
        default_totals = true;
143
    }
144

145
    auto finish_counter = std::make_shared<JoiningTransform::FinishCounter>(pipeline.getNumStreams());
146

147
    pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
148
    {
149
        bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
150
        auto counter = on_totals ? nullptr : finish_counter;
151
        return std::make_shared<JoiningTransform>(header, output_stream->header, join, max_block_size, on_totals, default_totals, counter);
152
    });
153
}
154

155
void FilledJoinStep::updateOutputStream()
156
{
157
    output_stream = createOutputStream(
158
        input_streams.front(), JoiningTransform::transformHeader(input_streams.front().header, join), getDataStreamTraits());
159
}
160

161
void FilledJoinStep::describeActions(FormatSettings & settings) const
162
{
163
    String prefix(settings.offset, ' ');
164

165
    for (const auto & [name, value] : describeJoinActions(join))
166
        settings.out << prefix << name << ": " << value << '\n';
167
}
168

169
void FilledJoinStep::describeActions(JSONBuilder::JSONMap & map) const
170
{
171
    for (const auto & [name, value] : describeJoinActions(join))
172
        map.add(name, value);
173
}
174

175
}
176

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.