ClickHouse
370 строк · 10.8 Кб
1#include <Processors/LimitTransform.h>2
3
4namespace DB5{
6
7namespace ErrorCodes8{
9extern const int LOGICAL_ERROR;10}
11
12LimitTransform::LimitTransform(13const Block & header_, UInt64 limit_, UInt64 offset_, size_t num_streams,14bool always_read_till_end_, bool with_ties_,15SortDescription description_)16: IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_))17, limit(limit_), offset(offset_)18, always_read_till_end(always_read_till_end_)19, with_ties(with_ties_), description(std::move(description_))20{
21if (num_streams != 1 && with_ties)22throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot use LimitTransform with multiple ports and ties");23
24ports_data.resize(num_streams);25
26size_t cur_stream = 0;27for (auto & input : inputs)28{29ports_data[cur_stream].input_port = &input;30++cur_stream;31}32
33cur_stream = 0;34for (auto & output : outputs)35{36ports_data[cur_stream].output_port = &output;37++cur_stream;38}39
40for (const auto & desc : description)41sort_column_positions.push_back(header_.getPositionByName(desc.column_name));42}
43
44Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, UInt64 row) const45{
46assert(row < chunk.getNumRows());47ColumnRawPtrs current_columns = extractSortColumns(chunk.getColumns());48MutableColumns last_row_sort_columns;49for (size_t i = 0; i < current_columns.size(); ++i)50{51last_row_sort_columns.emplace_back(current_columns[i]->cloneEmpty());52last_row_sort_columns[i]->insertFrom(*current_columns[i], row);53}54return Chunk(std::move(last_row_sort_columns), 1);55}
56
57
58IProcessor::Status LimitTransform::prepare(59const PortNumbers & updated_input_ports,60const PortNumbers & updated_output_ports)61{
62bool has_full_port = false;63
64auto process_pair = [&](size_t pos)65{66auto status = preparePair(ports_data[pos]);67
68switch (status)69{70case IProcessor::Status::Finished:71{72if (!ports_data[pos].is_finished)73{74ports_data[pos].is_finished = true;75++num_finished_port_pairs;76}77
78return;79}80case IProcessor::Status::PortFull:81{82has_full_port = true;83return;84}85case IProcessor::Status::NeedData:86return;87default:88throw Exception(89ErrorCodes::LOGICAL_ERROR, "Unexpected status for LimitTransform::preparePair : {}", IProcessor::statusToName(status));90}91};92
93for (auto pos : updated_input_ports)94process_pair(pos);95
96for (auto pos : updated_output_ports)97process_pair(pos);98
99/// All ports are finished. It may happen even before we reached the limit (has less data then limit).100if (num_finished_port_pairs == ports_data.size())101return Status::Finished;102
103bool limit_is_unreachable = (limit > std::numeric_limits<UInt64>::max() - offset);104
105/// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data.106/// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1107if ((!limit_is_unreachable && rows_read >= offset + limit)108&& !previous_row_chunk && !always_read_till_end)109{110for (auto & input : inputs)111input.close();112
113for (auto & output : outputs)114output.finish();115
116return Status::Finished;117}118
119if (has_full_port)120return Status::PortFull;121
122return Status::NeedData;123}
124
125LimitTransform::Status LimitTransform::prepare()126{
127if (ports_data.size() != 1)128throw Exception(ErrorCodes::LOGICAL_ERROR, "prepare without arguments is not supported for multi-port LimitTransform");129
130return prepare({0}, {0});131}
132
133LimitTransform::Status LimitTransform::preparePair(PortsData & data)134{
135auto & output = *data.output_port;136auto & input = *data.input_port;137
138/// Check can output.139bool output_finished = false;140if (output.isFinished())141{142output_finished = true;143if (!always_read_till_end)144{145input.close();146return Status::Finished;147}148}149
150if (!output_finished && !output.canPush())151{152input.setNotNeeded();153return Status::PortFull;154}155
156bool limit_is_unreachable = (limit > std::numeric_limits<UInt64>::max() - offset);157
158/// Check if we are done with pushing.159bool is_limit_reached = !limit_is_unreachable && rows_read >= offset + limit && !previous_row_chunk;160if (is_limit_reached)161{162if (!always_read_till_end)163{164output.finish();165input.close();166return Status::Finished;167}168}169
170/// Check can input.171
172if (input.isFinished())173{174output.finish();175return Status::Finished;176}177
178input.setNeeded();179if (!input.hasData())180return Status::NeedData;181
182data.current_chunk = input.pull(true);183
184auto rows = data.current_chunk.getNumRows();185
186if (rows_before_limit_at_least && !data.input_port_has_counter)187rows_before_limit_at_least->add(rows);188
189/// Skip block (for 'always_read_till_end' case).190if (is_limit_reached || output_finished)191{192data.current_chunk.clear();193if (input.isFinished())194{195output.finish();196return Status::Finished;197}198
199/// Now, we pulled from input, and it must be empty.200input.setNeeded();201return Status::NeedData;202}203
204/// Process block.205
206rows_read += rows;207
208if (rows_read <= offset)209{210data.current_chunk.clear();211
212if (input.isFinished())213{214output.finish();215return Status::Finished;216}217
218/// Now, we pulled from input, and it must be empty.219input.setNeeded();220return Status::NeedData;221}222
223if (rows <= std::numeric_limits<UInt64>::max() - offset && rows_read >= offset + rows224&& !limit_is_unreachable && rows_read <= offset + limit)225{226/// Return the whole chunk.227
228/// Save the last row of current chunk to check if next block begins with the same row (for WITH TIES).229if (with_ties && rows_read == offset + limit)230previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, data.current_chunk.getNumRows() - 1);231}232else233/// This function may be heavy to execute in prepare. But it happens no more than twice, and make code simpler.234splitChunk(data);235
236bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit;237/// No more data is needed.238if (!always_read_till_end && !limit_is_unreachable && rows_read >= offset + limit && !may_need_more_data_for_ties)239input.close();240
241output.push(std::move(data.current_chunk));242
243return Status::PortFull;244}
245
246
247void LimitTransform::splitChunk(PortsData & data)248{
249auto current_chunk_sort_columns = extractSortColumns(data.current_chunk.getColumns());250UInt64 num_rows = data.current_chunk.getNumRows();251UInt64 num_columns = data.current_chunk.getNumColumns();252
253bool limit_is_unreachable = (limit > std::numeric_limits<UInt64>::max() - offset);254
255if (previous_row_chunk && !limit_is_unreachable && rows_read >= offset + limit)256{257/// Scan until the first row, which is not equal to previous_row_chunk (for WITH TIES)258UInt64 current_row_num = 0;259for (; current_row_num < num_rows; ++current_row_num)260{261if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))262break;263}264
265auto columns = data.current_chunk.detachColumns();266
267if (current_row_num < num_rows)268{269previous_row_chunk = {};270for (UInt64 i = 0; i < num_columns; ++i)271columns[i] = columns[i]->cut(0, current_row_num);272}273
274data.current_chunk.setColumns(std::move(columns), current_row_num);275return;276}277
278/// return a piece of the block279UInt64 start = 0;280
281/// ------------[....(...).]282/// <----------------------> rows_read283/// <----------> num_rows284/// <---------------> offset285/// <---> start286
287assert(offset < rows_read);288
289if (offset + num_rows > rows_read)290start = offset + num_rows - rows_read;291
292/// ------------[....(...).]293/// <----------------------> rows_read294/// <----------> num_rows295/// <---------------> offset296/// <---> limit297/// <---> length298/// <---> start299
300/// Or:301
302/// -----------------(------[....)....]303/// <---------------------------------> rows_read304/// <---------> num_rows305/// <---------------> offset306/// <-----------> limit307/// <----> length308/// 0 = start309
310UInt64 length = num_rows - start;311
312if (!limit_is_unreachable && offset + limit < rows_read)313{314if (offset + limit < rows_read - num_rows)315length = 0;316else317length = offset + limit - (rows_read - num_rows) - start;318}319
320/// check if other rows in current block equals to last one in limit321if (with_ties && length)322{323UInt64 current_row_num = start + length;324previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1);325
326for (; current_row_num < num_rows; ++current_row_num)327{328if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))329{330previous_row_chunk = {};331break;332}333}334
335length = current_row_num - start;336}337
338if (length == num_rows)339return;340
341auto columns = data.current_chunk.detachColumns();342
343for (UInt64 i = 0; i < num_columns; ++i)344columns[i] = columns[i]->cut(start, length);345
346data.current_chunk.setColumns(std::move(columns), length);347}
348
349ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const350{
351ColumnRawPtrs res;352res.reserve(description.size());353for (size_t pos : sort_column_positions)354res.push_back(columns[pos].get());355
356return res;357}
358
359bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, UInt64 current_chunk_row_num) const360{
361assert(current_chunk_sort_columns.size() == previous_row_chunk.getNumColumns());362size_t size = current_chunk_sort_columns.size();363const auto & previous_row_sort_columns = previous_row_chunk.getColumns();364for (size_t i = 0; i < size; ++i)365if (0 != current_chunk_sort_columns[i]->compareAt(current_chunk_row_num, 0, *previous_row_sort_columns[i], 1))366return false;367return true;368}
369
370}
371
372