ClickHouse

Форк
0
/
ReadFromMemoryStorageStep.cpp 
176 строк · 5.5 Кб
1
#include "ReadFromMemoryStorageStep.h"
2

3
#include <atomic>
4
#include <functional>
5
#include <memory>
6

7
#include <Interpreters/getColumnFromBlock.h>
8
#include <Interpreters/inplaceBlockConversions.h>
9
#include <Interpreters/InterpreterSelectQuery.h>
10
#include <Storages/StorageSnapshot.h>
11
#include <Storages/StorageMemory.h>
12

13
#include <QueryPipeline/Pipe.h>
14
#include <QueryPipeline/QueryPipelineBuilder.h>
15
#include <Processors/ISource.h>
16
#include <Processors/Sources/NullSource.h>
17

18
namespace DB
19
{
20

21
class MemorySource : public ISource
22
{
23
    using InitializerFunc = std::function<void(std::shared_ptr<const Blocks> &)>;
24
public:
25

26
    MemorySource(
27
        Names column_names_,
28
        const StorageSnapshotPtr & storage_snapshot,
29
        std::shared_ptr<const Blocks> data_,
30
        std::shared_ptr<std::atomic<size_t>> parallel_execution_index_,
31
        InitializerFunc initializer_func_ = {})
32
        : ISource(storage_snapshot->getSampleBlockForColumns(column_names_))
33
        , column_names_and_types(storage_snapshot->getColumnsByNames(
34
              GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withExtendedObjects(), column_names_))
35
        , data(data_)
36
        , parallel_execution_index(parallel_execution_index_)
37
        , initializer_func(std::move(initializer_func_))
38
    {
39
    }
40

41
    String getName() const override { return "Memory"; }
42

43
protected:
44
    Chunk generate() override
45
    {
46
        if (initializer_func)
47
        {
48
            initializer_func(data);
49
            initializer_func = {};
50
        }
51

52
        size_t current_index = getAndIncrementExecutionIndex();
53

54
        if (!data || current_index >= data->size())
55
        {
56
            return {};
57
        }
58

59
        const Block & src = (*data)[current_index];
60

61
        Columns columns;
62
        size_t num_columns = column_names_and_types.size();
63
        columns.reserve(num_columns);
64

65
        auto name_and_type = column_names_and_types.begin();
66
        for (size_t i = 0; i < num_columns; ++i)
67
        {
68
            columns.emplace_back(tryGetColumnFromBlock(src, *name_and_type));
69
            ++name_and_type;
70
        }
71

72
        fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, {}, nullptr);
73
        assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; }));
74

75
        return Chunk(std::move(columns), src.rows());
76
    }
77

78
private:
79
    size_t getAndIncrementExecutionIndex()
80
    {
81
        if (parallel_execution_index)
82
        {
83
            return (*parallel_execution_index)++;
84
        }
85
        else
86
        {
87
            return execution_index++;
88
        }
89
    }
90

91
    const NamesAndTypesList column_names_and_types;
92
    size_t execution_index = 0;
93
    std::shared_ptr<const Blocks> data;
94
    std::shared_ptr<std::atomic<size_t>> parallel_execution_index;
95
    InitializerFunc initializer_func;
96
};
97

98
ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(
99
    const Names & columns_to_read_,
100
    const SelectQueryInfo & query_info_,
101
    const StorageSnapshotPtr & storage_snapshot_,
102
    const ContextPtr & context_,
103
    StoragePtr storage_,
104
    const size_t num_streams_,
105
    const bool delay_read_for_global_sub_queries_)
106
    : SourceStepWithFilter(
107
        DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)},
108
        columns_to_read_,
109
        query_info_,
110
        storage_snapshot_,
111
        context_)
112
    , columns_to_read(columns_to_read_)
113
    , storage(std::move(storage_))
114
    , num_streams(num_streams_)
115
    , delay_read_for_global_sub_queries(delay_read_for_global_sub_queries_)
116
{
117
}
118

119
void ReadFromMemoryStorageStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
120
{
121
    auto pipe = makePipe();
122

123
    if (pipe.empty())
124
    {
125
        assert(output_stream != std::nullopt);
126
        pipe = Pipe(std::make_shared<NullSource>(output_stream->header));
127
    }
128

129
    pipeline.init(std::move(pipe));
130
}
131

132
Pipe ReadFromMemoryStorageStep::makePipe()
133
{
134
    storage_snapshot->check(columns_to_read);
135

136
    const auto & snapshot_data = assert_cast<const StorageMemory::SnapshotData &>(*storage_snapshot->data);
137
    auto current_data = snapshot_data.blocks;
138

139
    if (delay_read_for_global_sub_queries)
140
    {
141
        /// Note: for global subquery we use single source.
142
        /// Mainly, the reason is that at this point table is empty,
143
        /// and we don't know the number of blocks are going to be inserted into it.
144
        ///
145
        /// It may seem to be not optimal, but actually data from such table is used to fill
146
        /// set for IN or hash table for JOIN, which can't be done concurrently.
147
        /// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
148

149
        return Pipe(std::make_shared<MemorySource>(
150
            columns_to_read,
151
            storage_snapshot,
152
            nullptr /* data */,
153
            nullptr /* parallel execution index */,
154
            [my_storage = storage](std::shared_ptr<const Blocks> & data_to_initialize)
155
            {
156
                data_to_initialize = assert_cast<const StorageMemory &>(*my_storage).data.get();
157
            }));
158
    }
159

160
    size_t size = current_data->size();
161

162
    if (num_streams > size)
163
        num_streams = size;
164

165
    Pipes pipes;
166

167
    auto parallel_execution_index = std::make_shared<std::atomic<size_t>>(0);
168

169
    for (size_t stream = 0; stream < num_streams; ++stream)
170
    {
171
        pipes.emplace_back(std::make_shared<MemorySource>(columns_to_read, storage_snapshot, current_data, parallel_execution_index));
172
    }
173
    return Pipe::unitePipes(std::move(pipes));
174
}
175

176
}
177

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.