ClickHouse

Форк
0
/
RemoteSource.cpp 
254 строки · 6.7 Кб
1
#include <Processors/Sources/RemoteSource.h>
2
#include <QueryPipeline/RemoteQueryExecutor.h>
3
#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
4
#include <QueryPipeline/StreamLocalLimits.h>
5
#include <Processors/Transforms/AggregatingTransform.h>
6
#include <DataTypes/DataTypeAggregateFunction.h>
7

8
namespace DB
9
{
10

11
namespace ErrorCodes
12
{
13
    extern const int LOGICAL_ERROR;
14
}
15

16
RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_, bool async_query_sending_)
17
    : ISource(executor->getHeader(), false)
18
    , add_aggregation_info(add_aggregation_info_), query_executor(std::move(executor))
19
    , async_read(async_read_)
20
    , async_query_sending(async_query_sending_)
21
{
22
    /// Add AggregatedChunkInfo if we expect DataTypeAggregateFunction as a result.
23
    const auto & sample = getPort().getHeader();
24
    for (auto & type : sample.getDataTypes())
25
        if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
26
            add_aggregation_info = true;
27

28
    /// Progress method will be called on Progress packet.
29
    query_executor->setProgressCallback([this](const Progress & value)
30
    {
31
        if (value.total_rows_to_read)
32
            addTotalRowsApprox(value.total_rows_to_read);
33
        if (value.total_bytes_to_read)
34
            addTotalBytes(value.total_bytes_to_read);
35
        progress(value.read_rows, value.read_bytes);
36
    });
37

38
    query_executor->setProfileInfoCallback([this](const ProfileInfo & info)
39
    {
40
        if (rows_before_limit)
41
        {
42
            if (info.hasAppliedLimit())
43
                rows_before_limit->add(info.getRowsBeforeLimit());
44
            else
45
                manually_add_rows_before_limit_counter = true; /// Remote subquery doesn't contain a limit
46
        }
47
    });
48
}
49

50
RemoteSource::~RemoteSource() = default;
51

52
void RemoteSource::setStorageLimits(const std::shared_ptr<const StorageLimitsList> & storage_limits_)
53
{
54
    /// Remove leaf limits for remote source.
55
    StorageLimitsList list;
56
    for (const auto & value : *storage_limits_)
57
        list.emplace_back(StorageLimits{value.local_limits, {}});
58

59
    storage_limits = std::make_shared<const StorageLimitsList>(std::move(list));
60
}
61

62
ISource::Status RemoteSource::prepare()
63
{
64
    /// Check if query was cancelled before returning Async status. Otherwise it may lead to infinite loop.
65
    if (isCancelled())
66
    {
67
        getPort().finish();
68
        return Status::Finished;
69
    }
70

71
    if (is_async_state)
72
        return Status::Async;
73

74
    if (executor_finished)
75
        return Status::Finished;
76

77
    Status status = ISource::prepare();
78
    /// To avoid resetting the connection (because of "unfinished" query) in the
79
    /// RemoteQueryExecutor it should be finished explicitly.
80
    if (status == Status::Finished)
81
    {
82
        is_async_state = false;
83
        need_drain = true;
84
        return Status::Ready;
85
    }
86

87
    return status;
88
}
89

90
void RemoteSource::work()
91
{
92
    /// Connection drain is a heavy operation that may take a long time.
93
    /// Therefore we move connection drain from prepare() to work(), and drain multiple connections in parallel.
94
    /// See issue: https://github.com/ClickHouse/ClickHouse/issues/60844
95
    if (need_drain)
96
    {
97
        query_executor->finish();
98
        executor_finished = true;
99
        return;
100
    }
101
    ISource::work();
102
}
103

104
std::optional<Chunk> RemoteSource::tryGenerate()
105
{
106
    /// onCancel() will do the cancel if the query was sent.
107
    if (isCancelled())
108
        return {};
109

110
    if (!was_query_sent)
111
    {
112
        if (async_query_sending)
113
        {
114
            int fd_ = query_executor->sendQueryAsync();
115
            if (fd_ >= 0)
116
            {
117
                fd = fd_;
118
                is_async_state = true;
119
                return Chunk();
120
            }
121

122
            is_async_state = false;
123
        }
124
        else
125
        {
126
            query_executor->sendQuery();
127
        }
128

129
        was_query_sent = true;
130
    }
131

132
    Block block;
133

134
    if (async_read)
135
    {
136
        auto res = query_executor->readAsync();
137

138
        if (res.getType() == RemoteQueryExecutor::ReadResult::Type::Nothing)
139
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an empty packet from the RemoteQueryExecutor. This is a bug");
140

141
        if (res.getType() == RemoteQueryExecutor::ReadResult::Type::FileDescriptor)
142
        {
143
            fd = res.getFileDescriptor();
144
            is_async_state = true;
145
            return Chunk();
146
        }
147

148
        if (res.getType() == RemoteQueryExecutor::ReadResult::Type::ParallelReplicasToken)
149
        {
150
            is_async_state = false;
151
            return Chunk();
152
        }
153

154
        is_async_state = false;
155

156
        block = res.getBlock();
157
    }
158
    else
159
        block = query_executor->readBlock();
160

161
    if (!block)
162
    {
163
        if (manually_add_rows_before_limit_counter)
164
            rows_before_limit->add(rows);
165

166
        query_executor->finish();
167
        return {};
168
    }
169

170
    UInt64 num_rows = block.rows();
171
    rows += num_rows;
172
    Chunk chunk(block.getColumns(), num_rows);
173

174
    if (add_aggregation_info)
175
    {
176
        auto info = std::make_shared<AggregatedChunkInfo>();
177
        info->bucket_num = block.info.bucket_num;
178
        info->is_overflows = block.info.is_overflows;
179
        chunk.setChunkInfo(std::move(info));
180
    }
181

182
    return chunk;
183
}
184

185
void RemoteSource::onCancel()
186
{
187
    query_executor->cancel();
188
}
189

190
void RemoteSource::onUpdatePorts()
191
{
192
    if (getPort().isFinished())
193
    {
194
        query_executor->finish();
195
    }
196
}
197

198

199
RemoteTotalsSource::RemoteTotalsSource(RemoteQueryExecutorPtr executor)
200
    : ISource(executor->getHeader())
201
    , query_executor(std::move(executor))
202
{
203
}
204

205
RemoteTotalsSource::~RemoteTotalsSource() = default;
206

207
Chunk RemoteTotalsSource::generate()
208
{
209
    if (auto block = query_executor->getTotals())
210
    {
211
        UInt64 num_rows = block.rows();
212
        return Chunk(block.getColumns(), num_rows);
213
    }
214

215
    return {};
216
}
217

218

219
RemoteExtremesSource::RemoteExtremesSource(RemoteQueryExecutorPtr executor)
220
    : ISource(executor->getHeader())
221
    , query_executor(std::move(executor))
222
{
223
}
224

225
RemoteExtremesSource::~RemoteExtremesSource() = default;
226

227
Chunk RemoteExtremesSource::generate()
228
{
229
    if (auto block = query_executor->getExtremes())
230
    {
231
        UInt64 num_rows = block.rows();
232
        return Chunk(block.getColumns(), num_rows);
233
    }
234

235
    return {};
236
}
237

238

239
Pipe createRemoteSourcePipe(
240
    RemoteQueryExecutorPtr query_executor,
241
    bool add_aggregation_info, bool add_totals, bool add_extremes, bool async_read, bool async_query_sending)
242
{
243
    Pipe pipe(std::make_shared<RemoteSource>(query_executor, add_aggregation_info, async_read, async_query_sending));
244

245
    if (add_totals)
246
        pipe.addTotalsSource(std::make_shared<RemoteTotalsSource>(query_executor));
247

248
    if (add_extremes)
249
        pipe.addExtremesSource(std::make_shared<RemoteExtremesSource>(query_executor));
250

251
    return pipe;
252
}
253

254
}
255

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.