ClickHouse
254 строки · 6.7 Кб
1#include <Processors/Sources/RemoteSource.h>2#include <QueryPipeline/RemoteQueryExecutor.h>3#include <QueryPipeline/RemoteQueryExecutorReadContext.h>4#include <QueryPipeline/StreamLocalLimits.h>5#include <Processors/Transforms/AggregatingTransform.h>6#include <DataTypes/DataTypeAggregateFunction.h>7
8namespace DB9{
10
11namespace ErrorCodes12{
13extern const int LOGICAL_ERROR;14}
15
16RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_, bool async_query_sending_)17: ISource(executor->getHeader(), false)18, add_aggregation_info(add_aggregation_info_), query_executor(std::move(executor))19, async_read(async_read_)20, async_query_sending(async_query_sending_)21{
22/// Add AggregatedChunkInfo if we expect DataTypeAggregateFunction as a result.23const auto & sample = getPort().getHeader();24for (auto & type : sample.getDataTypes())25if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))26add_aggregation_info = true;27
28/// Progress method will be called on Progress packet.29query_executor->setProgressCallback([this](const Progress & value)30{31if (value.total_rows_to_read)32addTotalRowsApprox(value.total_rows_to_read);33if (value.total_bytes_to_read)34addTotalBytes(value.total_bytes_to_read);35progress(value.read_rows, value.read_bytes);36});37
38query_executor->setProfileInfoCallback([this](const ProfileInfo & info)39{40if (rows_before_limit)41{42if (info.hasAppliedLimit())43rows_before_limit->add(info.getRowsBeforeLimit());44else45manually_add_rows_before_limit_counter = true; /// Remote subquery doesn't contain a limit46}47});48}
49
50RemoteSource::~RemoteSource() = default;51
52void RemoteSource::setStorageLimits(const std::shared_ptr<const StorageLimitsList> & storage_limits_)53{
54/// Remove leaf limits for remote source.55StorageLimitsList list;56for (const auto & value : *storage_limits_)57list.emplace_back(StorageLimits{value.local_limits, {}});58
59storage_limits = std::make_shared<const StorageLimitsList>(std::move(list));60}
61
62ISource::Status RemoteSource::prepare()63{
64/// Check if query was cancelled before returning Async status. Otherwise it may lead to infinite loop.65if (isCancelled())66{67getPort().finish();68return Status::Finished;69}70
71if (is_async_state)72return Status::Async;73
74if (executor_finished)75return Status::Finished;76
77Status status = ISource::prepare();78/// To avoid resetting the connection (because of "unfinished" query) in the79/// RemoteQueryExecutor it should be finished explicitly.80if (status == Status::Finished)81{82is_async_state = false;83need_drain = true;84return Status::Ready;85}86
87return status;88}
89
90void RemoteSource::work()91{
92/// Connection drain is a heavy operation that may take a long time.93/// Therefore we move connection drain from prepare() to work(), and drain multiple connections in parallel.94/// See issue: https://github.com/ClickHouse/ClickHouse/issues/6084495if (need_drain)96{97query_executor->finish();98executor_finished = true;99return;100}101ISource::work();102}
103
104std::optional<Chunk> RemoteSource::tryGenerate()105{
106/// onCancel() will do the cancel if the query was sent.107if (isCancelled())108return {};109
110if (!was_query_sent)111{112if (async_query_sending)113{114int fd_ = query_executor->sendQueryAsync();115if (fd_ >= 0)116{117fd = fd_;118is_async_state = true;119return Chunk();120}121
122is_async_state = false;123}124else125{126query_executor->sendQuery();127}128
129was_query_sent = true;130}131
132Block block;133
134if (async_read)135{136auto res = query_executor->readAsync();137
138if (res.getType() == RemoteQueryExecutor::ReadResult::Type::Nothing)139throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an empty packet from the RemoteQueryExecutor. This is a bug");140
141if (res.getType() == RemoteQueryExecutor::ReadResult::Type::FileDescriptor)142{143fd = res.getFileDescriptor();144is_async_state = true;145return Chunk();146}147
148if (res.getType() == RemoteQueryExecutor::ReadResult::Type::ParallelReplicasToken)149{150is_async_state = false;151return Chunk();152}153
154is_async_state = false;155
156block = res.getBlock();157}158else159block = query_executor->readBlock();160
161if (!block)162{163if (manually_add_rows_before_limit_counter)164rows_before_limit->add(rows);165
166query_executor->finish();167return {};168}169
170UInt64 num_rows = block.rows();171rows += num_rows;172Chunk chunk(block.getColumns(), num_rows);173
174if (add_aggregation_info)175{176auto info = std::make_shared<AggregatedChunkInfo>();177info->bucket_num = block.info.bucket_num;178info->is_overflows = block.info.is_overflows;179chunk.setChunkInfo(std::move(info));180}181
182return chunk;183}
184
185void RemoteSource::onCancel()186{
187query_executor->cancel();188}
189
190void RemoteSource::onUpdatePorts()191{
192if (getPort().isFinished())193{194query_executor->finish();195}196}
197
198
199RemoteTotalsSource::RemoteTotalsSource(RemoteQueryExecutorPtr executor)200: ISource(executor->getHeader())201, query_executor(std::move(executor))202{
203}
204
205RemoteTotalsSource::~RemoteTotalsSource() = default;206
207Chunk RemoteTotalsSource::generate()208{
209if (auto block = query_executor->getTotals())210{211UInt64 num_rows = block.rows();212return Chunk(block.getColumns(), num_rows);213}214
215return {};216}
217
218
219RemoteExtremesSource::RemoteExtremesSource(RemoteQueryExecutorPtr executor)220: ISource(executor->getHeader())221, query_executor(std::move(executor))222{
223}
224
225RemoteExtremesSource::~RemoteExtremesSource() = default;226
227Chunk RemoteExtremesSource::generate()228{
229if (auto block = query_executor->getExtremes())230{231UInt64 num_rows = block.rows();232return Chunk(block.getColumns(), num_rows);233}234
235return {};236}
237
238
239Pipe createRemoteSourcePipe(240RemoteQueryExecutorPtr query_executor,241bool add_aggregation_info, bool add_totals, bool add_extremes, bool async_read, bool async_query_sending)242{
243Pipe pipe(std::make_shared<RemoteSource>(query_executor, add_aggregation_info, async_read, async_query_sending));244
245if (add_totals)246pipe.addTotalsSource(std::make_shared<RemoteTotalsSource>(query_executor));247
248if (add_extremes)249pipe.addExtremesSource(std::make_shared<RemoteExtremesSource>(query_executor));250
251return pipe;252}
253
254}
255