ClickHouse
70 строк · 2.3 Кб
1#include <Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h>
2#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
3
4namespace DB
5{
6
7IMergingAlgorithmWithSharedChunks::IMergingAlgorithmWithSharedChunks(
8Block header_, size_t num_inputs, SortDescription description_, WriteBuffer * out_row_sources_buf_, size_t max_row_refs)
9: header(std::move(header_))
10, description(std::move(description_))
11, chunk_allocator(num_inputs + max_row_refs)
12, cursors(num_inputs)
13, sources(num_inputs)
14, sources_origin_merge_tree_part_level(num_inputs)
15, out_row_sources_buf(out_row_sources_buf_)
16{
17}
18
19static void prepareChunk(Chunk & chunk)
20{
21auto num_rows = chunk.getNumRows();
22auto columns = chunk.detachColumns();
23for (auto & column : columns)
24column = column->convertToFullColumnIfConst();
25
26chunk.setColumns(std::move(columns), num_rows);
27}
28
29void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
30{
31for (size_t source_num = 0; source_num < inputs.size(); ++source_num)
32{
33if (!inputs[source_num].chunk)
34continue;
35
36prepareChunk(inputs[source_num].chunk);
37
38auto & source = sources[source_num];
39
40source.skip_last_row = inputs[source_num].skip_last_row;
41source.chunk = chunk_allocator.alloc(inputs[source_num].chunk);
42cursors[source_num] = SortCursorImpl(header, source.chunk->getColumns(), description, source_num, inputs[source_num].permutation);
43
44source.chunk->all_columns = cursors[source_num].all_columns;
45source.chunk->sort_columns = cursors[source_num].sort_columns;
46
47sources_origin_merge_tree_part_level[source_num] = getPartLevelFromChunk(*source.chunk);
48}
49
50queue = SortingQueue<SortCursor>(cursors);
51}
52
53void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num)
54{
55prepareChunk(input.chunk);
56
57auto & source = sources[source_num];
58source.skip_last_row = input.skip_last_row;
59source.chunk = chunk_allocator.alloc(input.chunk);
60cursors[source_num].reset(source.chunk->getColumns(), header, input.permutation);
61
62source.chunk->all_columns = cursors[source_num].all_columns;
63source.chunk->sort_columns = cursors[source_num].sort_columns;
64
65sources_origin_merge_tree_part_level[source_num] = getPartLevelFromChunk(*source.chunk);
66
67queue.push(cursors[source_num]);
68}
69
70}
71