ClickHouse

Форк
0
/
LimitTransform.cpp 
370 строк · 10.8 Кб
1
#include <Processors/LimitTransform.h>
2

3

4
namespace DB
5
{
6

7
namespace ErrorCodes
8
{
9
    extern const int LOGICAL_ERROR;
10
}
11

12
LimitTransform::LimitTransform(
13
    const Block & header_, UInt64 limit_, UInt64 offset_, size_t num_streams,
14
    bool always_read_till_end_, bool with_ties_,
15
    SortDescription description_)
16
    : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_))
17
    , limit(limit_), offset(offset_)
18
    , always_read_till_end(always_read_till_end_)
19
    , with_ties(with_ties_), description(std::move(description_))
20
{
21
    if (num_streams != 1 && with_ties)
22
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot use LimitTransform with multiple ports and ties");
23

24
    ports_data.resize(num_streams);
25

26
    size_t cur_stream = 0;
27
    for (auto & input : inputs)
28
    {
29
        ports_data[cur_stream].input_port = &input;
30
        ++cur_stream;
31
    }
32

33
    cur_stream = 0;
34
    for (auto & output : outputs)
35
    {
36
        ports_data[cur_stream].output_port = &output;
37
        ++cur_stream;
38
    }
39

40
    for (const auto & desc : description)
41
        sort_column_positions.push_back(header_.getPositionByName(desc.column_name));
42
}
43

44
Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, UInt64 row) const
45
{
46
    assert(row < chunk.getNumRows());
47
    ColumnRawPtrs current_columns = extractSortColumns(chunk.getColumns());
48
    MutableColumns last_row_sort_columns;
49
    for (size_t i = 0; i < current_columns.size(); ++i)
50
    {
51
        last_row_sort_columns.emplace_back(current_columns[i]->cloneEmpty());
52
        last_row_sort_columns[i]->insertFrom(*current_columns[i], row);
53
    }
54
    return Chunk(std::move(last_row_sort_columns), 1);
55
}
56

57

58
IProcessor::Status LimitTransform::prepare(
59
        const PortNumbers & updated_input_ports,
60
        const PortNumbers & updated_output_ports)
61
{
62
    bool has_full_port = false;
63

64
    auto process_pair = [&](size_t pos)
65
    {
66
        auto status = preparePair(ports_data[pos]);
67

68
        switch (status)
69
        {
70
            case IProcessor::Status::Finished:
71
            {
72
                if (!ports_data[pos].is_finished)
73
                {
74
                    ports_data[pos].is_finished = true;
75
                    ++num_finished_port_pairs;
76
                }
77

78
                return;
79
            }
80
            case IProcessor::Status::PortFull:
81
            {
82
                has_full_port = true;
83
                return;
84
            }
85
            case IProcessor::Status::NeedData:
86
                return;
87
            default:
88
                throw Exception(
89
                    ErrorCodes::LOGICAL_ERROR, "Unexpected status for LimitTransform::preparePair : {}", IProcessor::statusToName(status));
90
        }
91
    };
92

93
    for (auto pos : updated_input_ports)
94
        process_pair(pos);
95

96
    for (auto pos : updated_output_ports)
97
        process_pair(pos);
98

99
    /// All ports are finished. It may happen even before we reached the limit (has less data then limit).
100
    if (num_finished_port_pairs == ports_data.size())
101
        return Status::Finished;
102

103
    bool limit_is_unreachable = (limit > std::numeric_limits<UInt64>::max() - offset);
104

105
    /// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data.
106
    /// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1
107
    if ((!limit_is_unreachable && rows_read >= offset + limit)
108
        && !previous_row_chunk && !always_read_till_end)
109
    {
110
        for (auto & input : inputs)
111
            input.close();
112

113
        for (auto & output : outputs)
114
            output.finish();
115

116
        return Status::Finished;
117
    }
118

119
    if (has_full_port)
120
        return Status::PortFull;
121

122
    return Status::NeedData;
123
}
124

125
LimitTransform::Status LimitTransform::prepare()
126
{
127
    if (ports_data.size() != 1)
128
        throw Exception(ErrorCodes::LOGICAL_ERROR, "prepare without arguments is not supported for multi-port LimitTransform");
129

130
    return prepare({0}, {0});
131
}
132

133
LimitTransform::Status LimitTransform::preparePair(PortsData & data)
134
{
135
    auto & output = *data.output_port;
136
    auto & input = *data.input_port;
137

138
    /// Check can output.
139
    bool output_finished = false;
140
    if (output.isFinished())
141
    {
142
        output_finished = true;
143
        if (!always_read_till_end)
144
        {
145
            input.close();
146
            return Status::Finished;
147
        }
148
    }
149

150
    if (!output_finished && !output.canPush())
151
    {
152
        input.setNotNeeded();
153
        return Status::PortFull;
154
    }
155

156
    bool limit_is_unreachable = (limit > std::numeric_limits<UInt64>::max() - offset);
157

158
    /// Check if we are done with pushing.
159
    bool is_limit_reached = !limit_is_unreachable && rows_read >= offset + limit && !previous_row_chunk;
160
    if (is_limit_reached)
161
    {
162
        if (!always_read_till_end)
163
        {
164
            output.finish();
165
            input.close();
166
            return Status::Finished;
167
        }
168
    }
169

170
    /// Check can input.
171

172
    if (input.isFinished())
173
    {
174
        output.finish();
175
        return Status::Finished;
176
    }
177

178
    input.setNeeded();
179
    if (!input.hasData())
180
        return Status::NeedData;
181

182
    data.current_chunk = input.pull(true);
183

184
    auto rows = data.current_chunk.getNumRows();
185

186
    if (rows_before_limit_at_least && !data.input_port_has_counter)
187
        rows_before_limit_at_least->add(rows);
188

189
    /// Skip block (for 'always_read_till_end' case).
190
    if (is_limit_reached || output_finished)
191
    {
192
        data.current_chunk.clear();
193
        if (input.isFinished())
194
        {
195
            output.finish();
196
            return Status::Finished;
197
        }
198

199
        /// Now, we pulled from input, and it must be empty.
200
        input.setNeeded();
201
        return Status::NeedData;
202
    }
203

204
    /// Process block.
205

206
    rows_read += rows;
207

208
    if (rows_read <= offset)
209
    {
210
        data.current_chunk.clear();
211

212
        if (input.isFinished())
213
        {
214
            output.finish();
215
            return Status::Finished;
216
        }
217

218
        /// Now, we pulled from input, and it must be empty.
219
        input.setNeeded();
220
        return Status::NeedData;
221
    }
222

223
    if (rows <= std::numeric_limits<UInt64>::max() - offset && rows_read >= offset + rows
224
        && !limit_is_unreachable && rows_read <= offset + limit)
225
    {
226
        /// Return the whole chunk.
227

228
        /// Save the last row of current chunk to check if next block begins with the same row (for WITH TIES).
229
        if (with_ties && rows_read == offset + limit)
230
            previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, data.current_chunk.getNumRows() - 1);
231
    }
232
    else
233
        /// This function may be heavy to execute in prepare. But it happens no more than twice, and make code simpler.
234
        splitChunk(data);
235

236
    bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit;
237
    /// No more data is needed.
238
    if (!always_read_till_end && !limit_is_unreachable && rows_read >= offset + limit && !may_need_more_data_for_ties)
239
        input.close();
240

241
    output.push(std::move(data.current_chunk));
242

243
    return Status::PortFull;
244
}
245

246

247
void LimitTransform::splitChunk(PortsData & data)
248
{
249
    auto current_chunk_sort_columns = extractSortColumns(data.current_chunk.getColumns());
250
    UInt64 num_rows = data.current_chunk.getNumRows();
251
    UInt64 num_columns = data.current_chunk.getNumColumns();
252

253
    bool limit_is_unreachable = (limit > std::numeric_limits<UInt64>::max() - offset);
254

255
    if (previous_row_chunk && !limit_is_unreachable && rows_read >= offset + limit)
256
    {
257
        /// Scan until the first row, which is not equal to previous_row_chunk (for WITH TIES)
258
        UInt64 current_row_num = 0;
259
        for (; current_row_num < num_rows; ++current_row_num)
260
        {
261
            if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))
262
                break;
263
        }
264

265
        auto columns = data.current_chunk.detachColumns();
266

267
        if (current_row_num < num_rows)
268
        {
269
            previous_row_chunk = {};
270
            for (UInt64 i = 0; i < num_columns; ++i)
271
                columns[i] = columns[i]->cut(0, current_row_num);
272
        }
273

274
        data.current_chunk.setColumns(std::move(columns), current_row_num);
275
        return;
276
    }
277

278
    /// return a piece of the block
279
    UInt64 start = 0;
280

281
    /// ------------[....(...).]
282
    /// <----------------------> rows_read
283
    ///             <----------> num_rows
284
    /// <---------------> offset
285
    ///             <---> start
286

287
    assert(offset < rows_read);
288

289
    if (offset + num_rows > rows_read)
290
        start = offset + num_rows - rows_read;
291

292
    /// ------------[....(...).]
293
    /// <----------------------> rows_read
294
    ///             <----------> num_rows
295
    /// <---------------> offset
296
    ///                  <---> limit
297
    ///                  <---> length
298
    ///             <---> start
299

300
    /// Or:
301

302
    /// -----------------(------[....)....]
303
    /// <---------------------------------> rows_read
304
    ///                         <---------> num_rows
305
    /// <---------------> offset
306
    ///                  <-----------> limit
307
    ///                         <----> length
308
    ///                         0 = start
309

310
    UInt64 length = num_rows - start;
311

312
    if (!limit_is_unreachable && offset + limit < rows_read)
313
    {
314
        if (offset + limit < rows_read - num_rows)
315
            length = 0;
316
        else
317
            length = offset + limit - (rows_read - num_rows) - start;
318
    }
319

320
    /// check if other rows in current block equals to last one in limit
321
    if (with_ties && length)
322
    {
323
        UInt64 current_row_num = start + length;
324
        previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1);
325

326
        for (; current_row_num < num_rows; ++current_row_num)
327
        {
328
            if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))
329
            {
330
                previous_row_chunk = {};
331
                break;
332
            }
333
        }
334

335
        length = current_row_num - start;
336
    }
337

338
    if (length == num_rows)
339
        return;
340

341
    auto columns = data.current_chunk.detachColumns();
342

343
    for (UInt64 i = 0; i < num_columns; ++i)
344
        columns[i] = columns[i]->cut(start, length);
345

346
    data.current_chunk.setColumns(std::move(columns), length);
347
}
348

349
ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const
350
{
351
    ColumnRawPtrs res;
352
    res.reserve(description.size());
353
    for (size_t pos : sort_column_positions)
354
        res.push_back(columns[pos].get());
355

356
    return res;
357
}
358

359
bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, UInt64 current_chunk_row_num) const
360
{
361
    assert(current_chunk_sort_columns.size() == previous_row_chunk.getNumColumns());
362
    size_t size = current_chunk_sort_columns.size();
363
    const auto & previous_row_sort_columns = previous_row_chunk.getColumns();
364
    for (size_t i = 0; i < size; ++i)
365
        if (0 != current_chunk_sort_columns[i]->compareAt(current_chunk_row_num, 0, *previous_row_sort_columns[i], 1))
366
            return false;
367
    return true;
368
}
369

370
}
371

372

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.