ClickHouse

Форк
0
/
IMergingAlgorithmWithSharedChunks.cpp 
70 строк · 2.3 Кб
1
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h>
2
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
3

4
namespace DB
5
{
6

7
IMergingAlgorithmWithSharedChunks::IMergingAlgorithmWithSharedChunks(
8
    Block header_, size_t num_inputs, SortDescription description_, WriteBuffer * out_row_sources_buf_, size_t max_row_refs)
9
    : header(std::move(header_))
10
    , description(std::move(description_))
11
    , chunk_allocator(num_inputs + max_row_refs)
12
    , cursors(num_inputs)
13
    , sources(num_inputs)
14
    , sources_origin_merge_tree_part_level(num_inputs)
15
    , out_row_sources_buf(out_row_sources_buf_)
16
{
17
}
18

19
static void prepareChunk(Chunk & chunk)
20
{
21
    auto num_rows = chunk.getNumRows();
22
    auto columns = chunk.detachColumns();
23
    for (auto & column : columns)
24
        column = column->convertToFullColumnIfConst();
25

26
    chunk.setColumns(std::move(columns), num_rows);
27
}
28

29
void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
30
{
31
    for (size_t source_num = 0; source_num < inputs.size(); ++source_num)
32
    {
33
        if (!inputs[source_num].chunk)
34
            continue;
35

36
        prepareChunk(inputs[source_num].chunk);
37

38
        auto & source = sources[source_num];
39

40
        source.skip_last_row = inputs[source_num].skip_last_row;
41
        source.chunk = chunk_allocator.alloc(inputs[source_num].chunk);
42
        cursors[source_num] = SortCursorImpl(header, source.chunk->getColumns(), description, source_num, inputs[source_num].permutation);
43

44
        source.chunk->all_columns = cursors[source_num].all_columns;
45
        source.chunk->sort_columns = cursors[source_num].sort_columns;
46

47
        sources_origin_merge_tree_part_level[source_num] = getPartLevelFromChunk(*source.chunk);
48
    }
49

50
    queue = SortingQueue<SortCursor>(cursors);
51
}
52

53
void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num)
54
{
55
    prepareChunk(input.chunk);
56

57
    auto & source = sources[source_num];
58
    source.skip_last_row = input.skip_last_row;
59
    source.chunk = chunk_allocator.alloc(input.chunk);
60
    cursors[source_num].reset(source.chunk->getColumns(), header, input.permutation);
61

62
    source.chunk->all_columns = cursors[source_num].all_columns;
63
    source.chunk->sort_columns = cursors[source_num].sort_columns;
64

65
    sources_origin_merge_tree_part_level[source_num] = getPartLevelFromChunk(*source.chunk);
66

67
    queue.push(cursors[source_num]);
68
}
69

70
}
71

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.