ClickHouse

Форк
0
/
DistinctStep.cpp 
178 строк · 6.0 Кб
1
#include <Processors/QueryPlan/DistinctStep.h>
2
#include <Processors/Transforms/DistinctSortedChunkTransform.h>
3
#include <Processors/Transforms/DistinctSortedTransform.h>
4
#include <Processors/Transforms/DistinctTransform.h>
5
#include <QueryPipeline/QueryPipelineBuilder.h>
6
#include <IO/Operators.h>
7
#include <Common/JSONBuilder.h>
8
#include <Core/SortDescription.h>
9

10
namespace DB
11
{
12

13
static ITransformingStep::Traits getTraits(bool pre_distinct)
14
{
15
    const bool preserves_number_of_streams = pre_distinct;
16
    return ITransformingStep::Traits
17
    {
18
        {
19
            .returns_single_stream = !pre_distinct,
20
            .preserves_number_of_streams = preserves_number_of_streams,
21
            .preserves_sorting = preserves_number_of_streams,
22
        },
23
        {
24
            .preserves_number_of_rows = false,
25
        }
26
    };
27
}
28

29
static SortDescription getSortDescription(const SortDescription & input_sort_desc, const Names& columns)
30
{
31
    SortDescription distinct_sort_desc;
32
    for (const auto & sort_column_desc : input_sort_desc)
33
    {
34
        if (std::find(begin(columns), end(columns), sort_column_desc.column_name) == columns.end())
35
            break;
36
        distinct_sort_desc.emplace_back(sort_column_desc);
37
    }
38
    return distinct_sort_desc;
39
}
40

41
DistinctStep::DistinctStep(
42
    const DataStream & input_stream_,
43
    const SizeLimits & set_size_limits_,
44
    UInt64 limit_hint_,
45
    const Names & columns_,
46
    bool pre_distinct_,
47
    bool optimize_distinct_in_order_)
48
    : ITransformingStep(
49
            input_stream_,
50
            input_stream_.header,
51
            getTraits(pre_distinct_))
52
    , set_size_limits(set_size_limits_)
53
    , limit_hint(limit_hint_)
54
    , columns(columns_)
55
    , pre_distinct(pre_distinct_)
56
    , optimize_distinct_in_order(optimize_distinct_in_order_)
57
{
58
}
59

60
void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
61
{
62
    if (!pre_distinct)
63
        pipeline.resize(1);
64

65
    if (optimize_distinct_in_order)
66
    {
67
        const auto & input_stream = input_streams.back();
68
        const SortDescription distinct_sort_desc = getSortDescription(input_stream.sort_description, columns);
69
        if (!distinct_sort_desc.empty())
70
        {
71
            /// pre-distinct for sorted chunks
72
            if (pre_distinct)
73
            {
74
                pipeline.addSimpleTransform(
75
                    [&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
76
                    {
77
                        if (stream_type != QueryPipelineBuilder::StreamType::Main)
78
                            return nullptr;
79

80
                        return std::make_shared<DistinctSortedChunkTransform>(
81
                            header,
82
                            set_size_limits,
83
                            limit_hint,
84
                            distinct_sort_desc,
85
                            columns,
86
                            input_stream.sort_scope == DataStream::SortScope::Stream);
87
                    });
88
                return;
89
            }
90
            /// final distinct for sorted stream (sorting inside and among chunks)
91
            if (input_stream.sort_scope == DataStream::SortScope::Global)
92
            {
93
                assert(input_stream.has_single_port);
94

95
                if (distinct_sort_desc.size() < columns.size())
96
                {
97
                    if (DistinctSortedTransform::isApplicable(pipeline.getHeader(), distinct_sort_desc, columns))
98
                    {
99
                        pipeline.addSimpleTransform(
100
                            [&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
101
                            {
102
                                if (stream_type != QueryPipelineBuilder::StreamType::Main)
103
                                    return nullptr;
104

105
                                return std::make_shared<DistinctSortedTransform>(
106
                                    header, distinct_sort_desc, set_size_limits, limit_hint, columns);
107
                            });
108
                        return;
109
                    }
110
                }
111
                else
112
                {
113
                    pipeline.addSimpleTransform(
114
                        [&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
115
                        {
116
                            if (stream_type != QueryPipelineBuilder::StreamType::Main)
117
                                return nullptr;
118

119
                            return std::make_shared<DistinctSortedChunkTransform>(
120
                                header, set_size_limits, limit_hint, distinct_sort_desc, columns, true);
121
                        });
122
                    return;
123
                }
124
            }
125
        }
126
    }
127

128
    pipeline.addSimpleTransform(
129
        [&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
130
        {
131
            if (stream_type != QueryPipelineBuilder::StreamType::Main)
132
                return nullptr;
133

134
            return std::make_shared<DistinctTransform>(header, set_size_limits, limit_hint, columns);
135
        });
136
}
137

138
void DistinctStep::describeActions(FormatSettings & settings) const
139
{
140
    String prefix(settings.offset, ' ');
141
    settings.out << prefix << "Columns: ";
142

143
    if (columns.empty())
144
        settings.out << "none";
145
    else
146
    {
147
        bool first = true;
148
        for (const auto & column : columns)
149
        {
150
            if (!first)
151
                settings.out << ", ";
152
            first = false;
153

154
            settings.out << column;
155
        }
156
    }
157

158
    settings.out << '\n';
159
}
160

161
void DistinctStep::describeActions(JSONBuilder::JSONMap & map) const
162
{
163
    auto columns_array = std::make_unique<JSONBuilder::JSONArray>();
164
    for (const auto & column : columns)
165
        columns_array->add(column);
166

167
    map.add("Columns", std::move(columns_array));
168
}
169

170
void DistinctStep::updateOutputStream()
171
{
172
    output_stream = createOutputStream(
173
        input_streams.front(),
174
        input_streams.front().header,
175
        getTraits(pre_distinct).data_stream_traits);
176
}
177

178
}
179

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.