ClickHouse

Форк
0
160 строк · 4.9 Кб
1
#include <AggregateFunctions/IAggregateFunction.h>
2
#include <IO/Operators.h>
3
#include <Interpreters/ExpressionActions.h>
4
#include <Processors/QueryPlan/WindowStep.h>
5
#include <Processors/Transforms/ExpressionTransform.h>
6
#include <Processors/Transforms/WindowTransform.h>
7
#include <QueryPipeline/QueryPipelineBuilder.h>
8
#include <Common/JSONBuilder.h>
9

10
namespace DB
11
{
12

13
static ITransformingStep::Traits getTraits(bool preserves_sorting)
14
{
15
    return ITransformingStep::Traits
16
    {
17
        {
18
            .returns_single_stream = false,
19
            .preserves_number_of_streams = true,
20
            .preserves_sorting = preserves_sorting,
21
        },
22
        {
23
            .preserves_number_of_rows = true
24
        }
25
    };
26
}
27

28
static Block addWindowFunctionResultColumns(const Block & block,
29
    std::vector<WindowFunctionDescription> window_functions)
30
{
31
    auto result = block;
32

33
    for (const auto & f : window_functions)
34
    {
35
        ColumnWithTypeAndName column_with_type;
36
        column_with_type.name = f.column_name;
37
        column_with_type.type = f.aggregate_function->getResultType();
38
        column_with_type.column = column_with_type.type->createColumn();
39

40
        result.insert(column_with_type);
41
    }
42

43
    return result;
44
}
45

46
WindowStep::WindowStep(
47
    const DataStream & input_stream_,
48
    const WindowDescription & window_description_,
49
    const std::vector<WindowFunctionDescription> & window_functions_,
50
    bool streams_fan_out_)
51
    : ITransformingStep(input_stream_, addWindowFunctionResultColumns(input_stream_.header, window_functions_), getTraits(!streams_fan_out_))
52
    , window_description(window_description_)
53
    , window_functions(window_functions_)
54
    , streams_fan_out(streams_fan_out_)
55
{
56
    // We don't remove any columns, only add, so probably we don't have to update
57
    // the output DataStream::distinct_columns.
58

59
    window_description.checkValid();
60

61
}
62

63
void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
64
{
65
    auto num_threads = pipeline.getNumThreads();
66

67
    // This resize is needed for cases such as `over ()` when we don't have a
68
    // sort node, and the input might have multiple streams. The sort node would
69
    // have resized it.
70
    if (window_description.full_sort_description.empty())
71
        pipeline.resize(1);
72

73
    pipeline.addSimpleTransform(
74
        [&](const Block & /*header*/)
75
        {
76
            return std::make_shared<WindowTransform>(
77
                input_streams.front().header, output_stream->header, window_description, window_functions);
78
        });
79

80
    if (streams_fan_out)
81
    {
82
        pipeline.resize(num_threads);
83
    }
84

85
    assertBlocksHaveEqualStructure(pipeline.getHeader(), output_stream->header,
86
        "WindowStep transform for '" + window_description.window_name + "'");
87
}
88

89
void WindowStep::describeActions(FormatSettings & settings) const
90
{
91
    String prefix(settings.offset, ' ');
92
    settings.out << prefix << "Window: (";
93
    if (!window_description.partition_by.empty())
94
    {
95
        settings.out << "PARTITION BY ";
96
        for (size_t i = 0; i < window_description.partition_by.size(); ++i)
97
        {
98
            if (i > 0)
99
            {
100
                settings.out << ", ";
101
            }
102

103
            settings.out << window_description.partition_by[i].column_name;
104
        }
105
    }
106
    if (!window_description.partition_by.empty()
107
        && !window_description.order_by.empty())
108
    {
109
        settings.out << " ";
110
    }
111
    if (!window_description.order_by.empty())
112
    {
113
        settings.out << "ORDER BY "
114
            << dumpSortDescription(window_description.order_by);
115
    }
116
    settings.out << ")\n";
117

118
    for (size_t i = 0; i < window_functions.size(); ++i)
119
    {
120
        settings.out << prefix << (i == 0 ? "Functions: "
121
                                          : "           ");
122
        settings.out << window_functions[i].column_name << "\n";
123
    }
124
}
125

126
void WindowStep::describeActions(JSONBuilder::JSONMap & map) const
127
{
128
    if (!window_description.partition_by.empty())
129
    {
130
        auto partion_columns_array = std::make_unique<JSONBuilder::JSONArray>();
131
        for (const auto & descr : window_description.partition_by)
132
            partion_columns_array->add(descr.column_name);
133

134
        map.add("Partition By", std::move(partion_columns_array));
135
    }
136

137
    if (!window_description.order_by.empty())
138
        map.add("Sort Description", explainSortDescription(window_description.order_by));
139

140
    auto functions_array = std::make_unique<JSONBuilder::JSONArray>();
141
    for (const auto & func : window_functions)
142
        functions_array->add(func.column_name);
143

144
    map.add("Functions", std::move(functions_array));
145
}
146

147
void WindowStep::updateOutputStream()
148
{
149
    output_stream = createOutputStream(
150
        input_streams.front(), addWindowFunctionResultColumns(input_streams.front().header, window_functions), getDataStreamTraits());
151

152
    window_description.checkValid();
153
}
154

155
const WindowDescription & WindowStep::getWindowDescription() const
156
{
157
    return window_description;
158
}
159

160
}
161

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.