ClickHouse
178 строк · 6.0 Кб
1#include <Processors/QueryPlan/DistinctStep.h>
2#include <Processors/Transforms/DistinctSortedChunkTransform.h>
3#include <Processors/Transforms/DistinctSortedTransform.h>
4#include <Processors/Transforms/DistinctTransform.h>
5#include <QueryPipeline/QueryPipelineBuilder.h>
6#include <IO/Operators.h>
7#include <Common/JSONBuilder.h>
8#include <Core/SortDescription.h>
9
10namespace DB
11{
12
13static ITransformingStep::Traits getTraits(bool pre_distinct)
14{
15const bool preserves_number_of_streams = pre_distinct;
16return ITransformingStep::Traits
17{
18{
19.returns_single_stream = !pre_distinct,
20.preserves_number_of_streams = preserves_number_of_streams,
21.preserves_sorting = preserves_number_of_streams,
22},
23{
24.preserves_number_of_rows = false,
25}
26};
27}
28
29static SortDescription getSortDescription(const SortDescription & input_sort_desc, const Names& columns)
30{
31SortDescription distinct_sort_desc;
32for (const auto & sort_column_desc : input_sort_desc)
33{
34if (std::find(begin(columns), end(columns), sort_column_desc.column_name) == columns.end())
35break;
36distinct_sort_desc.emplace_back(sort_column_desc);
37}
38return distinct_sort_desc;
39}
40
41DistinctStep::DistinctStep(
42const DataStream & input_stream_,
43const SizeLimits & set_size_limits_,
44UInt64 limit_hint_,
45const Names & columns_,
46bool pre_distinct_,
47bool optimize_distinct_in_order_)
48: ITransformingStep(
49input_stream_,
50input_stream_.header,
51getTraits(pre_distinct_))
52, set_size_limits(set_size_limits_)
53, limit_hint(limit_hint_)
54, columns(columns_)
55, pre_distinct(pre_distinct_)
56, optimize_distinct_in_order(optimize_distinct_in_order_)
57{
58}
59
60void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
61{
62if (!pre_distinct)
63pipeline.resize(1);
64
65if (optimize_distinct_in_order)
66{
67const auto & input_stream = input_streams.back();
68const SortDescription distinct_sort_desc = getSortDescription(input_stream.sort_description, columns);
69if (!distinct_sort_desc.empty())
70{
71/// pre-distinct for sorted chunks
72if (pre_distinct)
73{
74pipeline.addSimpleTransform(
75[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
76{
77if (stream_type != QueryPipelineBuilder::StreamType::Main)
78return nullptr;
79
80return std::make_shared<DistinctSortedChunkTransform>(
81header,
82set_size_limits,
83limit_hint,
84distinct_sort_desc,
85columns,
86input_stream.sort_scope == DataStream::SortScope::Stream);
87});
88return;
89}
90/// final distinct for sorted stream (sorting inside and among chunks)
91if (input_stream.sort_scope == DataStream::SortScope::Global)
92{
93assert(input_stream.has_single_port);
94
95if (distinct_sort_desc.size() < columns.size())
96{
97if (DistinctSortedTransform::isApplicable(pipeline.getHeader(), distinct_sort_desc, columns))
98{
99pipeline.addSimpleTransform(
100[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
101{
102if (stream_type != QueryPipelineBuilder::StreamType::Main)
103return nullptr;
104
105return std::make_shared<DistinctSortedTransform>(
106header, distinct_sort_desc, set_size_limits, limit_hint, columns);
107});
108return;
109}
110}
111else
112{
113pipeline.addSimpleTransform(
114[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
115{
116if (stream_type != QueryPipelineBuilder::StreamType::Main)
117return nullptr;
118
119return std::make_shared<DistinctSortedChunkTransform>(
120header, set_size_limits, limit_hint, distinct_sort_desc, columns, true);
121});
122return;
123}
124}
125}
126}
127
128pipeline.addSimpleTransform(
129[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
130{
131if (stream_type != QueryPipelineBuilder::StreamType::Main)
132return nullptr;
133
134return std::make_shared<DistinctTransform>(header, set_size_limits, limit_hint, columns);
135});
136}
137
138void DistinctStep::describeActions(FormatSettings & settings) const
139{
140String prefix(settings.offset, ' ');
141settings.out << prefix << "Columns: ";
142
143if (columns.empty())
144settings.out << "none";
145else
146{
147bool first = true;
148for (const auto & column : columns)
149{
150if (!first)
151settings.out << ", ";
152first = false;
153
154settings.out << column;
155}
156}
157
158settings.out << '\n';
159}
160
161void DistinctStep::describeActions(JSONBuilder::JSONMap & map) const
162{
163auto columns_array = std::make_unique<JSONBuilder::JSONArray>();
164for (const auto & column : columns)
165columns_array->add(column);
166
167map.add("Columns", std::move(columns_array));
168}
169
170void DistinctStep::updateOutputStream()
171{
172output_stream = createOutputStream(
173input_streams.front(),
174input_streams.front().header,
175getTraits(pre_distinct).data_stream_traits);
176}
177
178}
179