ClickHouse
176 строк · 5.5 Кб
1#include "ReadFromMemoryStorageStep.h"2
3#include <atomic>4#include <functional>5#include <memory>6
7#include <Interpreters/getColumnFromBlock.h>8#include <Interpreters/inplaceBlockConversions.h>9#include <Interpreters/InterpreterSelectQuery.h>10#include <Storages/StorageSnapshot.h>11#include <Storages/StorageMemory.h>12
13#include <QueryPipeline/Pipe.h>14#include <QueryPipeline/QueryPipelineBuilder.h>15#include <Processors/ISource.h>16#include <Processors/Sources/NullSource.h>17
18namespace DB19{
20
21class MemorySource : public ISource22{
23using InitializerFunc = std::function<void(std::shared_ptr<const Blocks> &)>;24public:25
26MemorySource(27Names column_names_,28const StorageSnapshotPtr & storage_snapshot,29std::shared_ptr<const Blocks> data_,30std::shared_ptr<std::atomic<size_t>> parallel_execution_index_,31InitializerFunc initializer_func_ = {})32: ISource(storage_snapshot->getSampleBlockForColumns(column_names_))33, column_names_and_types(storage_snapshot->getColumnsByNames(34GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withExtendedObjects(), column_names_))35, data(data_)36, parallel_execution_index(parallel_execution_index_)37, initializer_func(std::move(initializer_func_))38{39}40
41String getName() const override { return "Memory"; }42
43protected:44Chunk generate() override45{46if (initializer_func)47{48initializer_func(data);49initializer_func = {};50}51
52size_t current_index = getAndIncrementExecutionIndex();53
54if (!data || current_index >= data->size())55{56return {};57}58
59const Block & src = (*data)[current_index];60
61Columns columns;62size_t num_columns = column_names_and_types.size();63columns.reserve(num_columns);64
65auto name_and_type = column_names_and_types.begin();66for (size_t i = 0; i < num_columns; ++i)67{68columns.emplace_back(tryGetColumnFromBlock(src, *name_and_type));69++name_and_type;70}71
72fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, {}, nullptr);73assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; }));74
75return Chunk(std::move(columns), src.rows());76}77
78private:79size_t getAndIncrementExecutionIndex()80{81if (parallel_execution_index)82{83return (*parallel_execution_index)++;84}85else86{87return execution_index++;88}89}90
91const NamesAndTypesList column_names_and_types;92size_t execution_index = 0;93std::shared_ptr<const Blocks> data;94std::shared_ptr<std::atomic<size_t>> parallel_execution_index;95InitializerFunc initializer_func;96};97
98ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(99const Names & columns_to_read_,100const SelectQueryInfo & query_info_,101const StorageSnapshotPtr & storage_snapshot_,102const ContextPtr & context_,103StoragePtr storage_,104const size_t num_streams_,105const bool delay_read_for_global_sub_queries_)106: SourceStepWithFilter(107DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)},108columns_to_read_,109query_info_,110storage_snapshot_,111context_)112, columns_to_read(columns_to_read_)113, storage(std::move(storage_))114, num_streams(num_streams_)115, delay_read_for_global_sub_queries(delay_read_for_global_sub_queries_)116{
117}
118
119void ReadFromMemoryStorageStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)120{
121auto pipe = makePipe();122
123if (pipe.empty())124{125assert(output_stream != std::nullopt);126pipe = Pipe(std::make_shared<NullSource>(output_stream->header));127}128
129pipeline.init(std::move(pipe));130}
131
132Pipe ReadFromMemoryStorageStep::makePipe()133{
134storage_snapshot->check(columns_to_read);135
136const auto & snapshot_data = assert_cast<const StorageMemory::SnapshotData &>(*storage_snapshot->data);137auto current_data = snapshot_data.blocks;138
139if (delay_read_for_global_sub_queries)140{141/// Note: for global subquery we use single source.142/// Mainly, the reason is that at this point table is empty,143/// and we don't know the number of blocks are going to be inserted into it.144///145/// It may seem to be not optimal, but actually data from such table is used to fill146/// set for IN or hash table for JOIN, which can't be done concurrently.147/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.148
149return Pipe(std::make_shared<MemorySource>(150columns_to_read,151storage_snapshot,152nullptr /* data */,153nullptr /* parallel execution index */,154[my_storage = storage](std::shared_ptr<const Blocks> & data_to_initialize)155{156data_to_initialize = assert_cast<const StorageMemory &>(*my_storage).data.get();157}));158}159
160size_t size = current_data->size();161
162if (num_streams > size)163num_streams = size;164
165Pipes pipes;166
167auto parallel_execution_index = std::make_shared<std::atomic<size_t>>(0);168
169for (size_t stream = 0; stream < num_streams; ++stream)170{171pipes.emplace_back(std::make_shared<MemorySource>(columns_to_read, storage_snapshot, current_data, parallel_execution_index));172}173return Pipe::unitePipes(std::move(pipes));174}
175
176}
177