ClickHouse
175 строк · 5.4 Кб
1#include <Processors/QueryPlan/JoinStep.h>
2#include <QueryPipeline/QueryPipelineBuilder.h>
3#include <Processors/Transforms/JoiningTransform.h>
4#include <Interpreters/IJoin.h>
5#include <Interpreters/TableJoin.h>
6#include <IO/Operators.h>
7#include <Common/JSONBuilder.h>
8#include <Common/typeid_cast.h>
9
10namespace DB
11{
12
13namespace ErrorCodes
14{
15extern const int LOGICAL_ERROR;
16}
17
18namespace
19{
20
21std::vector<std::pair<String, String>> describeJoinActions(const JoinPtr & join)
22{
23std::vector<std::pair<String, String>> description;
24const auto & table_join = join->getTableJoin();
25
26description.emplace_back("Type", toString(table_join.kind()));
27description.emplace_back("Strictness", toString(table_join.strictness()));
28description.emplace_back("Algorithm", join->getName());
29
30if (table_join.strictness() == JoinStrictness::Asof)
31description.emplace_back("ASOF inequality", toString(table_join.getAsofInequality()));
32
33if (!table_join.getClauses().empty())
34description.emplace_back("Clauses", TableJoin::formatClauses(table_join.getClauses(), true /*short_format*/));
35
36return description;
37}
38
39}
40
41JoinStep::JoinStep(
42const DataStream & left_stream_,
43const DataStream & right_stream_,
44JoinPtr join_,
45size_t max_block_size_,
46size_t max_streams_,
47bool keep_left_read_in_order_)
48: join(std::move(join_)), max_block_size(max_block_size_), max_streams(max_streams_), keep_left_read_in_order(keep_left_read_in_order_)
49{
50updateInputStreams(DataStreams{left_stream_, right_stream_});
51}
52
53QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
54{
55if (pipelines.size() != 2)
56throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");
57
58if (join->pipelineType() == JoinPipelineType::YShaped)
59{
60auto joined_pipeline = QueryPipelineBuilder::joinPipelinesYShaped(
61std::move(pipelines[0]), std::move(pipelines[1]), join, output_stream->header, max_block_size, &processors);
62joined_pipeline->resize(max_streams);
63return joined_pipeline;
64}
65
66return QueryPipelineBuilder::joinPipelinesRightLeft(
67std::move(pipelines[0]),
68std::move(pipelines[1]),
69join,
70output_stream->header,
71max_block_size,
72max_streams,
73keep_left_read_in_order,
74&processors);
75}
76
77bool JoinStep::allowPushDownToRight() const
78{
79return join->pipelineType() == JoinPipelineType::YShaped || join->pipelineType() == JoinPipelineType::FillRightFirst;
80}
81
82void JoinStep::describePipeline(FormatSettings & settings) const
83{
84IQueryPlanStep::describePipeline(processors, settings);
85}
86
87void JoinStep::describeActions(FormatSettings & settings) const
88{
89String prefix(settings.offset, ' ');
90
91for (const auto & [name, value] : describeJoinActions(join))
92settings.out << prefix << name << ": " << value << '\n';
93}
94
95void JoinStep::describeActions(JSONBuilder::JSONMap & map) const
96{
97for (const auto & [name, value] : describeJoinActions(join))
98map.add(name, value);
99}
100
101void JoinStep::updateOutputStream()
102{
103output_stream = DataStream
104{
105.header = JoiningTransform::transformHeader(input_streams[0].header, join),
106};
107}
108
109static ITransformingStep::Traits getStorageJoinTraits()
110{
111return ITransformingStep::Traits
112{
113{
114.returns_single_stream = false,
115.preserves_number_of_streams = true,
116.preserves_sorting = false,
117},
118{
119.preserves_number_of_rows = false,
120}
121};
122}
123
124FilledJoinStep::FilledJoinStep(const DataStream & input_stream_, JoinPtr join_, size_t max_block_size_)
125: ITransformingStep(
126input_stream_,
127JoiningTransform::transformHeader(input_stream_.header, join_),
128getStorageJoinTraits())
129, join(std::move(join_))
130, max_block_size(max_block_size_)
131{
132if (!join->isFilled())
133throw Exception(ErrorCodes::LOGICAL_ERROR, "FilledJoinStep expects Join to be filled");
134}
135
136void FilledJoinStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
137{
138bool default_totals = false;
139if (!pipeline.hasTotals() && join->getTotals())
140{
141pipeline.addDefaultTotals();
142default_totals = true;
143}
144
145auto finish_counter = std::make_shared<JoiningTransform::FinishCounter>(pipeline.getNumStreams());
146
147pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
148{
149bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
150auto counter = on_totals ? nullptr : finish_counter;
151return std::make_shared<JoiningTransform>(header, output_stream->header, join, max_block_size, on_totals, default_totals, counter);
152});
153}
154
155void FilledJoinStep::updateOutputStream()
156{
157output_stream = createOutputStream(
158input_streams.front(), JoiningTransform::transformHeader(input_streams.front().header, join), getDataStreamTraits());
159}
160
161void FilledJoinStep::describeActions(FormatSettings & settings) const
162{
163String prefix(settings.offset, ' ');
164
165for (const auto & [name, value] : describeJoinActions(join))
166settings.out << prefix << name << ": " << value << '\n';
167}
168
169void FilledJoinStep::describeActions(JSONBuilder::JSONMap & map) const
170{
171for (const auto & [name, value] : describeJoinActions(join))
172map.add(name, value);
173}
174
175}
176