ClickHouse

Форк
0
/
TTLAggregationAlgorithm.cpp 
225 строк · 8.8 Кб
1
#include <Processors/TTL/TTLAggregationAlgorithm.h>
2
#include <Interpreters/Context.h>
3

4
namespace DB
5
{
6

7
TTLAggregationAlgorithm::TTLAggregationAlgorithm(
8
    const TTLExpressions & ttl_expressions_,
9
    const TTLDescription & description_,
10
    const TTLInfo & old_ttl_info_,
11
    time_t current_time_,
12
    bool force_,
13
    const Block & header_,
14
    const MergeTreeData & storage_)
15
    : ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
16
    , header(header_)
17
{
18
    current_key_value.resize(description.group_by_keys.size());
19

20
    const auto & keys = description.group_by_keys;
21

22
    key_columns.resize(description.group_by_keys.size());
23
    AggregateDescriptions aggregates = description.aggregate_descriptions;
24

25
    columns_for_aggregator.resize(description.aggregate_descriptions.size());
26
    const Settings & settings = storage_.getContext()->getSettingsRef();
27

28
    Aggregator::Params params(
29
        keys,
30
        aggregates,
31
        false,
32
        settings.max_rows_to_group_by,
33
        settings.group_by_overflow_mode,
34
        /*group_by_two_level_threshold*/0,
35
        /*group_by_two_level_threshold_bytes*/0,
36
        settings.max_bytes_before_external_group_by,
37
        settings.empty_result_for_aggregation_by_empty_set,
38
        storage_.getContext()->getTempDataOnDisk(),
39
        settings.max_threads,
40
        settings.min_free_disk_space_for_temporary_data,
41
        settings.compile_aggregate_expressions,
42
        settings.min_count_to_compile_aggregate_expression,
43
        settings.max_block_size,
44
        settings.enable_software_prefetch_in_aggregation,
45
        /*only_merge=*/ false,
46
        settings.optimize_group_by_constant_keys,
47
        settings.min_chunk_bytes_for_parallel_parsing,
48
        /*stats_collecting_params=*/ {});
49

50
    aggregator = std::make_unique<Aggregator>(header, params);
51

52
    if (isMaxTTLExpired())
53
        new_ttl_info.ttl_finished = true;
54
}
55

56
void TTLAggregationAlgorithm::execute(Block & block)
57
{
58

59
    bool some_rows_were_aggregated = false;
60
    MutableColumns result_columns = header.cloneEmptyColumns();
61

62
    if (!block) /// Empty block -- no more data, but we may still have some accumulated rows
63
    {
64
        if (!aggregation_result.empty()) /// Still have some aggregated data, let's update TTL
65
        {
66
            finalizeAggregates(result_columns);
67
            some_rows_were_aggregated = true;
68
        }
69
        else /// No block, all aggregated, just finish
70
        {
71
            return;
72
        }
73
    }
74
    else
75
    {
76
        const auto & column_names = header.getNames();
77
        MutableColumns aggregate_columns = header.cloneEmptyColumns();
78

79
        auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
80
        auto where_column = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);
81

82
        size_t rows_aggregated = 0;
83
        size_t current_key_start = 0;
84
        size_t rows_with_current_key = 0;
85

86
        for (size_t i = 0; i < block.rows(); ++i)
87
        {
88
            UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
89
            bool where_filter_passed = !where_column || where_column->getBool(i);
90
            bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
91

92
            bool same_as_current = true;
93
            for (size_t j = 0; j < description.group_by_keys.size(); ++j)
94
            {
95
                const String & key_column = description.group_by_keys[j];
96
                const IColumn * values_column = block.getByName(key_column).column.get();
97
                if (!same_as_current || (*values_column)[i] != current_key_value[j])
98
                {
99
                    values_column->get(i, current_key_value[j]);
100
                    same_as_current = false;
101
                }
102
            }
103

104
            /// We are observing the row with new the aggregation key.
105
            /// In this case we definitely need to finish the current aggregation for the previuos key and
106
            /// write results to `result_columns`.
107
            const bool observing_new_key = !same_as_current;
108
            /// We are observing the row with the same aggregation key, but TTL is not expired anymore.
109
            /// In this case we need to finish aggregation here. The current row has to be written as is.
110
            const bool no_new_rows_to_aggregate_within_the_same_key = same_as_current && !ttl_expired;
111
            /// The aggregation for this aggregation key is done.
112
            const bool need_to_flush_aggregation_state = observing_new_key || no_new_rows_to_aggregate_within_the_same_key;
113

114
            if (need_to_flush_aggregation_state)
115
            {
116
                if (rows_with_current_key)
117
                {
118
                    some_rows_were_aggregated = true;
119
                    calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
120
                }
121
                finalizeAggregates(result_columns);
122

123
                current_key_start = rows_aggregated;
124
                rows_with_current_key = 0;
125
            }
126

127
            if (ttl_expired)
128
            {
129
                ++rows_with_current_key;
130
                ++rows_aggregated;
131
                for (const auto & name : column_names)
132
                {
133
                    const IColumn * values_column = block.getByName(name).column.get();
134
                    auto & column = aggregate_columns[header.getPositionByName(name)];
135
                    column->insertFrom(*values_column, i);
136
                }
137
            }
138
            else
139
            {
140
                for (const auto & name : column_names)
141
                {
142
                    const IColumn * values_column = block.getByName(name).column.get();
143
                    auto & column = result_columns[header.getPositionByName(name)];
144
                    column->insertFrom(*values_column, i);
145
                }
146
            }
147
        }
148

149
        if (rows_with_current_key)
150
        {
151
            some_rows_were_aggregated = true;
152
            calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
153
        }
154
    }
155

156
    block = header.cloneWithColumns(std::move(result_columns));
157

158
    /// If some rows were aggregated we have to recalculate ttl info's
159
    if (some_rows_were_aggregated)
160
    {
161
        auto ttl_column_after_aggregation = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
162
        auto where_column_after_aggregation = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);
163
        for (size_t i = 0; i < block.rows(); ++i)
164
        {
165
            bool where_filter_passed = !where_column_after_aggregation || where_column_after_aggregation->getBool(i);
166
            if (where_filter_passed)
167
                new_ttl_info.update(getTimestampByIndex(ttl_column_after_aggregation.get(), i));
168
        }
169
    }
170
}
171

172
void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
173
{
174
    Columns aggregate_chunk;
175
    aggregate_chunk.reserve(aggregate_columns.size());
176
    for (const auto & name : header.getNames())
177
    {
178
        const auto & column = aggregate_columns[header.getPositionByName(name)];
179
        ColumnPtr chunk_column = column->cut(start_pos, length);
180
        aggregate_chunk.emplace_back(std::move(chunk_column));
181
    }
182

183
    aggregator->executeOnBlock(
184
        aggregate_chunk, /* row_begin= */ 0, length,
185
        aggregation_result, key_columns, columns_for_aggregator, no_more_keys);
186

187
}
188

189
void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns)
190
{
191
    if (!aggregation_result.empty())
192
    {
193
        auto aggregated_res = aggregator->convertToBlocks(aggregation_result, true, 1);
194

195
        for (auto & agg_block : aggregated_res)
196
        {
197
            for (const auto & it : description.set_parts)
198
                it.expression->execute(agg_block);
199

200
            for (const auto & name : description.group_by_keys)
201
            {
202
                const IColumn * values_column = agg_block.getByName(name).column.get();
203
                auto & result_column = result_columns[header.getPositionByName(name)];
204
                result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
205
            }
206

207
            for (const auto & it : description.set_parts)
208
            {
209
                const IColumn * values_column = agg_block.getByName(it.expression_result_column_name).column.get();
210
                auto & result_column = result_columns[header.getPositionByName(it.column_name)];
211
                result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
212
            }
213
        }
214
    }
215

216
    aggregation_result.invalidate();
217
}
218

219
void TTLAggregationAlgorithm::finalize(const MutableDataPartPtr & data_part) const
220
{
221
    data_part->ttl_infos.group_by_ttl[description.result_column] = new_ttl_info;
222
    data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
223
}
224

225
}
226

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.