ClickHouse

Форк
0
/
FinishAggregatingInOrderAlgorithm.cpp 
176 строк · 5.8 Кб
1
#include <Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h>
2
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
3
#include <Processors/Transforms/AggregatingTransform.h>
4
#include <Processors/Transforms/AggregatingInOrderTransform.h>
5
#include <Core/SortCursor.h>
6

7
#include <base/range.h>
8

9
namespace DB
10
{
11

12
namespace ErrorCodes
13
{
14
    extern const int LOGICAL_ERROR;
15
}
16

17
FinishAggregatingInOrderAlgorithm::State::State(const Chunk & chunk, const SortDescriptionWithPositions & desc, Int64 total_bytes_)
18
    : all_columns(chunk.getColumns()), num_rows(chunk.getNumRows()), total_bytes(total_bytes_)
19
{
20
    if (!chunk)
21
        return;
22

23
    sorting_columns.reserve(desc.size());
24
    for (const auto & column_desc : desc)
25
        sorting_columns.emplace_back(all_columns[column_desc.column_number].get());
26
}
27

28
FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm(
29
    const Block & header_,
30
    size_t num_inputs_,
31
    AggregatingTransformParamsPtr params_,
32
    const SortDescription & description_,
33
    size_t max_block_size_rows_,
34
    size_t max_block_size_bytes_)
35
    : header(header_), num_inputs(num_inputs_), params(params_), max_block_size_rows(max_block_size_rows_), max_block_size_bytes(max_block_size_bytes_)
36
{
37
    for (const auto & column_description : description_)
38
        description.emplace_back(column_description, header_.getPositionByName(column_description.column_name));
39
}
40

41
void FinishAggregatingInOrderAlgorithm::initialize(Inputs inputs)
42
{
43
    current_inputs = std::move(inputs);
44
    states.resize(num_inputs);
45
    for (size_t i = 0; i < num_inputs; ++i)
46
        consume(current_inputs[i], i);
47
}
48

49
void FinishAggregatingInOrderAlgorithm::consume(Input & input, size_t source_num)
50
{
51
    if (!input.chunk.hasRows())
52
        return;
53

54
    const auto & info = input.chunk.getChunkInfo();
55
    if (!info)
56
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk info was not set for chunk in FinishAggregatingInOrderAlgorithm");
57

58
    Int64 allocated_bytes = 0;
59
    /// Will be set by AggregatingInOrderTransform during local aggregation; will be nullptr during merging on initiator.
60
    if (const auto * arenas_info = typeid_cast<const ChunkInfoWithAllocatedBytes *>(info.get()))
61
        allocated_bytes = arenas_info->allocated_bytes;
62

63
    states[source_num] = State{input.chunk, description, allocated_bytes};
64
}
65

66
IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
67
{
68
    if (!inputs_to_update.empty())
69
    {
70
        Status status(inputs_to_update.back());
71
        inputs_to_update.pop_back();
72
        return status;
73
    }
74

75
    /// Find the input with smallest last row.
76
    std::optional<size_t> best_input;
77
    for (size_t i = 0; i < num_inputs; ++i)
78
    {
79
        if (!states[i].isValid())
80
            continue;
81

82
        if (!best_input
83
            || less(states[i].sorting_columns, states[*best_input].sorting_columns,
84
                    states[i].num_rows - 1, states[*best_input].num_rows - 1, description))
85
        {
86
            best_input = i;
87
        }
88
    }
89

90
    if (!best_input)
91
        return Status(prepareToMerge(), true);
92

93
    /// Chunk at best_input will be aggregated entirely.
94
    auto & best_state = states[*best_input];
95
    best_state.to_row = states[*best_input].num_rows;
96

97
    /// Find the positions up to which need to aggregate in other chunks.
98
    for (size_t i = 0; i < num_inputs; ++i)
99
    {
100
        if (!states[i].isValid() || i == *best_input)
101
            continue;
102

103
        auto indices = collections::range(states[i].current_row, states[i].num_rows);
104
        auto it = std::upper_bound(indices.begin(), indices.end(), best_state.num_rows - 1,
105
            [&](size_t lhs_pos, size_t rhs_pos)
106
            {
107
                return less(best_state.sorting_columns, states[i].sorting_columns, lhs_pos, rhs_pos, description);
108
            });
109

110
        states[i].to_row = (it == indices.end() ? states[i].num_rows : *it);
111
    }
112

113
    addToAggregation();
114

115
    /// At least one chunk should be fully aggregated.
116
    assert(!inputs_to_update.empty());
117
    Status status(inputs_to_update.back());
118
    inputs_to_update.pop_back();
119

120
    /// Do not merge blocks, if there are too few rows or bytes.
121
    if (accumulated_rows >= max_block_size_rows || accumulated_bytes >= max_block_size_bytes)
122
        status.chunk = prepareToMerge();
123

124
    return status;
125
}
126

127
Chunk FinishAggregatingInOrderAlgorithm::prepareToMerge()
128
{
129
    accumulated_rows = 0;
130
    accumulated_bytes = 0;
131

132
    auto info = std::make_shared<ChunksToMerge>();
133
    info->chunks = std::make_unique<Chunks>(std::move(chunks));
134
    info->chunk_num = chunk_num++;
135

136
    Chunk chunk;
137
    chunk.setChunkInfo(std::move(info));
138
    return chunk;
139
}
140

141
void FinishAggregatingInOrderAlgorithm::addToAggregation()
142
{
143
    for (size_t i = 0; i < num_inputs; ++i)
144
    {
145
        const auto & state = states[i];
146
        if (!state.isValid() || state.current_row == state.to_row)
147
            continue;
148

149
        size_t current_rows = state.to_row - state.current_row;
150
        if (current_rows == state.num_rows)
151
        {
152
            chunks.emplace_back(state.all_columns, current_rows);
153
        }
154
        else
155
        {
156
            Columns new_columns;
157
            new_columns.reserve(state.all_columns.size());
158
            for (const auto & column : state.all_columns)
159
                new_columns.emplace_back(column->cut(state.current_row, current_rows));
160

161
            chunks.emplace_back(std::move(new_columns), current_rows);
162
        }
163

164
        chunks.back().setChunkInfo(std::make_shared<AggregatedChunkInfo>());
165
        states[i].current_row = states[i].to_row;
166

167
        /// We assume that sizes in bytes of rows are almost the same.
168
        accumulated_bytes += static_cast<size_t>(static_cast<double>(states[i].total_bytes) * current_rows / states[i].num_rows);
169
        accumulated_rows += current_rows;
170

171
        if (!states[i].isValid())
172
            inputs_to_update.push_back(i);
173
    }
174
}
175

176
}
177

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.