ClickHouse
225 строк · 8.8 Кб
1#include <Processors/TTL/TTLAggregationAlgorithm.h>
2#include <Interpreters/Context.h>
3
4namespace DB
5{
6
7TTLAggregationAlgorithm::TTLAggregationAlgorithm(
8const TTLExpressions & ttl_expressions_,
9const TTLDescription & description_,
10const TTLInfo & old_ttl_info_,
11time_t current_time_,
12bool force_,
13const Block & header_,
14const MergeTreeData & storage_)
15: ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
16, header(header_)
17{
18current_key_value.resize(description.group_by_keys.size());
19
20const auto & keys = description.group_by_keys;
21
22key_columns.resize(description.group_by_keys.size());
23AggregateDescriptions aggregates = description.aggregate_descriptions;
24
25columns_for_aggregator.resize(description.aggregate_descriptions.size());
26const Settings & settings = storage_.getContext()->getSettingsRef();
27
28Aggregator::Params params(
29keys,
30aggregates,
31false,
32settings.max_rows_to_group_by,
33settings.group_by_overflow_mode,
34/*group_by_two_level_threshold*/0,
35/*group_by_two_level_threshold_bytes*/0,
36settings.max_bytes_before_external_group_by,
37settings.empty_result_for_aggregation_by_empty_set,
38storage_.getContext()->getTempDataOnDisk(),
39settings.max_threads,
40settings.min_free_disk_space_for_temporary_data,
41settings.compile_aggregate_expressions,
42settings.min_count_to_compile_aggregate_expression,
43settings.max_block_size,
44settings.enable_software_prefetch_in_aggregation,
45/*only_merge=*/ false,
46settings.optimize_group_by_constant_keys,
47settings.min_chunk_bytes_for_parallel_parsing,
48/*stats_collecting_params=*/ {});
49
50aggregator = std::make_unique<Aggregator>(header, params);
51
52if (isMaxTTLExpired())
53new_ttl_info.ttl_finished = true;
54}
55
56void TTLAggregationAlgorithm::execute(Block & block)
57{
58
59bool some_rows_were_aggregated = false;
60MutableColumns result_columns = header.cloneEmptyColumns();
61
62if (!block) /// Empty block -- no more data, but we may still have some accumulated rows
63{
64if (!aggregation_result.empty()) /// Still have some aggregated data, let's update TTL
65{
66finalizeAggregates(result_columns);
67some_rows_were_aggregated = true;
68}
69else /// No block, all aggregated, just finish
70{
71return;
72}
73}
74else
75{
76const auto & column_names = header.getNames();
77MutableColumns aggregate_columns = header.cloneEmptyColumns();
78
79auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
80auto where_column = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);
81
82size_t rows_aggregated = 0;
83size_t current_key_start = 0;
84size_t rows_with_current_key = 0;
85
86for (size_t i = 0; i < block.rows(); ++i)
87{
88UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
89bool where_filter_passed = !where_column || where_column->getBool(i);
90bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
91
92bool same_as_current = true;
93for (size_t j = 0; j < description.group_by_keys.size(); ++j)
94{
95const String & key_column = description.group_by_keys[j];
96const IColumn * values_column = block.getByName(key_column).column.get();
97if (!same_as_current || (*values_column)[i] != current_key_value[j])
98{
99values_column->get(i, current_key_value[j]);
100same_as_current = false;
101}
102}
103
104/// We are observing the row with new the aggregation key.
105/// In this case we definitely need to finish the current aggregation for the previuos key and
106/// write results to `result_columns`.
107const bool observing_new_key = !same_as_current;
108/// We are observing the row with the same aggregation key, but TTL is not expired anymore.
109/// In this case we need to finish aggregation here. The current row has to be written as is.
110const bool no_new_rows_to_aggregate_within_the_same_key = same_as_current && !ttl_expired;
111/// The aggregation for this aggregation key is done.
112const bool need_to_flush_aggregation_state = observing_new_key || no_new_rows_to_aggregate_within_the_same_key;
113
114if (need_to_flush_aggregation_state)
115{
116if (rows_with_current_key)
117{
118some_rows_were_aggregated = true;
119calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
120}
121finalizeAggregates(result_columns);
122
123current_key_start = rows_aggregated;
124rows_with_current_key = 0;
125}
126
127if (ttl_expired)
128{
129++rows_with_current_key;
130++rows_aggregated;
131for (const auto & name : column_names)
132{
133const IColumn * values_column = block.getByName(name).column.get();
134auto & column = aggregate_columns[header.getPositionByName(name)];
135column->insertFrom(*values_column, i);
136}
137}
138else
139{
140for (const auto & name : column_names)
141{
142const IColumn * values_column = block.getByName(name).column.get();
143auto & column = result_columns[header.getPositionByName(name)];
144column->insertFrom(*values_column, i);
145}
146}
147}
148
149if (rows_with_current_key)
150{
151some_rows_were_aggregated = true;
152calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
153}
154}
155
156block = header.cloneWithColumns(std::move(result_columns));
157
158/// If some rows were aggregated we have to recalculate ttl info's
159if (some_rows_were_aggregated)
160{
161auto ttl_column_after_aggregation = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
162auto where_column_after_aggregation = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);
163for (size_t i = 0; i < block.rows(); ++i)
164{
165bool where_filter_passed = !where_column_after_aggregation || where_column_after_aggregation->getBool(i);
166if (where_filter_passed)
167new_ttl_info.update(getTimestampByIndex(ttl_column_after_aggregation.get(), i));
168}
169}
170}
171
172void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
173{
174Columns aggregate_chunk;
175aggregate_chunk.reserve(aggregate_columns.size());
176for (const auto & name : header.getNames())
177{
178const auto & column = aggregate_columns[header.getPositionByName(name)];
179ColumnPtr chunk_column = column->cut(start_pos, length);
180aggregate_chunk.emplace_back(std::move(chunk_column));
181}
182
183aggregator->executeOnBlock(
184aggregate_chunk, /* row_begin= */ 0, length,
185aggregation_result, key_columns, columns_for_aggregator, no_more_keys);
186
187}
188
189void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns)
190{
191if (!aggregation_result.empty())
192{
193auto aggregated_res = aggregator->convertToBlocks(aggregation_result, true, 1);
194
195for (auto & agg_block : aggregated_res)
196{
197for (const auto & it : description.set_parts)
198it.expression->execute(agg_block);
199
200for (const auto & name : description.group_by_keys)
201{
202const IColumn * values_column = agg_block.getByName(name).column.get();
203auto & result_column = result_columns[header.getPositionByName(name)];
204result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
205}
206
207for (const auto & it : description.set_parts)
208{
209const IColumn * values_column = agg_block.getByName(it.expression_result_column_name).column.get();
210auto & result_column = result_columns[header.getPositionByName(it.column_name)];
211result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
212}
213}
214}
215
216aggregation_result.invalidate();
217}
218
219void TTLAggregationAlgorithm::finalize(const MutableDataPartPtr & data_part) const
220{
221data_part->ttl_infos.group_by_ttl[description.result_column] = new_ttl_info;
222data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
223}
224
225}
226