ClickHouse

Форк
0
/
VersionedCollapsingAlgorithm.cpp 
159 строк · 5.0 Кб
1
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
2
#include <Columns/ColumnsNumber.h>
3
#include <IO/WriteBuffer.h>
4

5
namespace DB
6
{
7

8
static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
9

10
VersionedCollapsingAlgorithm::VersionedCollapsingAlgorithm(
11
    const Block & header_,
12
    size_t num_inputs,
13
    SortDescription description_,
14
    const String & sign_column_,
15
    size_t max_block_size_rows_,
16
    size_t max_block_size_bytes_,
17
    WriteBuffer * out_row_sources_buf_,
18
    bool use_average_block_sizes)
19
    : IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, MAX_ROWS_IN_MULTIVERSION_QUEUE)
20
    , merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_)
21
    /// -1 for +1 in FixedSizeDequeWithGaps's internal buffer. 3 is a reasonable minimum size to collapse anything.
22
    , max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_rows_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1)
23
    , current_keys(max_rows_in_queue)
24
{
25
    sign_column_number = header_.getPositionByName(sign_column_);
26
}
27

28
inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
29
{
30
    if constexpr (sizeof(RowSourcePart) == 1)
31
        buffer.write(*reinterpret_cast<const char *>(&row_source));
32
    else
33
        buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart));
34
}
35

36
void VersionedCollapsingAlgorithm::insertGap(size_t gap_size)
37
{
38
    if (out_row_sources_buf)
39
    {
40
        for (size_t i = 0; i < gap_size; ++i)
41
        {
42
            writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
43
            current_row_sources.pop();
44
        }
45
    }
46
}
47

48
void VersionedCollapsingAlgorithm::insertRow(size_t skip_rows, const RowRef & row)
49
{
50
    merged_data.insertRow(*row.all_columns, row.row_num, row.owned_chunk->getNumRows());
51

52
    insertGap(skip_rows);
53

54
    if (out_row_sources_buf)
55
    {
56
        current_row_sources.front().setSkipFlag(false);
57
        writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
58
        current_row_sources.pop();
59
    }
60
}
61

62
IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge()
63
{
64
    /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
65
    while (queue.isValid())
66
    {
67
        SortCursor current = queue.current();
68

69
        if (current->isLast() && skipLastRowFor(current->order))
70
        {
71
            /// Get the next block from the corresponding source, if there is one.
72
            queue.removeTop();
73
            return Status(current.impl->order);
74
        }
75

76
        RowRef current_row;
77

78
        Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->getRow()];
79

80
        setRowRef(current_row, current);
81

82
        /// At first, let's decide the number of rows needed to insert right now.
83
        size_t num_rows_to_insert = 0;
84
        if (!current_keys.empty())
85
        {
86
            auto key_differs = !current_row.hasEqualSortColumnsWith(current_keys.back());
87

88
            if (key_differs) /// Flush whole queue
89
                num_rows_to_insert = current_keys.size();
90
            else if (current_keys.size() >= max_rows_in_queue) /// Flush single row if queue is big
91
                num_rows_to_insert = 1;
92
        }
93

94
        /// Insert ready rows if any.
95
        while (num_rows_to_insert)
96
        {
97
            const auto & row = current_keys.front();
98
            auto gap = current_keys.frontGap();
99

100
            insertRow(gap, row);
101

102
            current_keys.popFront();
103

104
            --num_rows_to_insert;
105

106
            /// It's ok to return here, because we didn't affect queue.
107
            if (merged_data.hasEnoughRows())
108
                return Status(merged_data.pull());
109
        }
110

111
        if (current_keys.empty())
112
        {
113
            sign_in_queue = sign;
114
            current_keys.pushBack(current_row);
115
        }
116
        else /// If queue is not empty, then current_row has the same key as in current_keys queue
117
        {
118
            if (sign == sign_in_queue)
119
                current_keys.pushBack(current_row);
120
            else
121
            {
122
                current_keys.popBack();
123
                current_keys.pushGap(2);
124
            }
125
        }
126

127
        if (out_row_sources_buf)
128
            current_row_sources.emplace(current->order, true);
129

130
        if (!current->isLast())
131
        {
132
            queue.next();
133
        }
134
        else
135
        {
136
            /// We take next block from the corresponding source, if there is one.
137
            queue.removeTop();
138
            return Status(current.impl->order);
139
        }
140
    }
141

142
    while (!current_keys.empty())
143
    {
144
        const auto & row = current_keys.front();
145
        auto gap = current_keys.frontGap();
146

147
        insertRow(gap, row);
148
        current_keys.popFront();
149

150
        if (merged_data.hasEnoughRows())
151
            return Status(merged_data.pull());
152
    }
153

154
    /// Write information about last collapsed rows.
155
    insertGap(current_keys.frontGap());
156
    return Status(merged_data.pull(), true);
157
}
158

159
}
160

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.