ClickHouse

Форк
0
/
findParallelReplicasQuery.cpp 
446 строк · 16.6 Кб
1
#include <Planner/findQueryForParallelReplicas.h>
2
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
3
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
4
#include <Processors/QueryPlan/JoinStep.h>
5
#include <Processors/QueryPlan/CreatingSetsStep.h>
6
#include <Storages/buildQueryTreeForShard.h>
7
#include <Interpreters/ClusterProxy/executeQuery.h>
8
#include <Planner/PlannerJoinTree.h>
9
#include <Planner/Utils.h>
10
#include <Analyzer/ArrayJoinNode.h>
11
#include <Analyzer/InDepthQueryTreeVisitor.h>
12
#include <Analyzer/JoinNode.h>
13
#include <Analyzer/QueryNode.h>
14
#include <Analyzer/TableNode.h>
15
#include <Analyzer/UnionNode.h>
16
#include <Parsers/ASTSubquery.h>
17
#include <Parsers/queryToString.h>
18
#include <Processors/QueryPlan/ExpressionStep.h>
19
#include <Processors/QueryPlan/FilterStep.h>
20
#include <Storages/MergeTree/MergeTreeData.h>
21
#include <Storages/StorageDummy.h>
22

23
namespace DB
24
{
25

26
namespace ErrorCodes
27
{
28
    extern const int LOGICAL_ERROR;
29
    extern const int UNSUPPORTED_METHOD;
30
}
31

32
/// Returns a list of (sub)queries (candidates) which may support parallel replicas.
33
/// The rule is :
34
/// subquery has only LEFT or ALL INNER JOIN (or none), and left part is MergeTree table or subquery candidate as well.
35
///
36
/// Additional checks are required, so we return many candidates. The innermost subquery is on top.
37
std::stack<const QueryNode *> getSupportingParallelReplicasQuery(const IQueryTreeNode * query_tree_node)
38
{
39
    std::stack<const QueryNode *> res;
40

41
    while (query_tree_node)
42
    {
43
        auto join_tree_node_type = query_tree_node->getNodeType();
44

45
        switch (join_tree_node_type)
46
        {
47
            case QueryTreeNodeType::TABLE:
48
            {
49
                const auto & table_node = query_tree_node->as<TableNode &>();
50
                const auto & storage = table_node.getStorage();
51
                /// Here we check StorageDummy as well, to support a query tree with replaced storages.
52
                if (std::dynamic_pointer_cast<MergeTreeData>(storage) || typeid_cast<const StorageDummy *>(storage.get()))
53
                    return res;
54

55
                return {};
56
            }
57
            case QueryTreeNodeType::TABLE_FUNCTION:
58
            {
59
                return {};
60
            }
61
            case QueryTreeNodeType::QUERY:
62
            {
63
                const auto & query_node_to_process = query_tree_node->as<QueryNode &>();
64
                query_tree_node = query_node_to_process.getJoinTree().get();
65
                res.push(&query_node_to_process);
66
                break;
67
            }
68
            case QueryTreeNodeType::UNION:
69
            {
70
                const auto & union_node = query_tree_node->as<UnionNode &>();
71
                const auto & union_queries = union_node.getQueries().getNodes();
72

73
                if (union_queries.empty())
74
                    return {};
75

76
                query_tree_node = union_queries.front().get();
77
                break;
78
            }
79
            case QueryTreeNodeType::ARRAY_JOIN:
80
            {
81
                const auto & array_join_node = query_tree_node->as<ArrayJoinNode &>();
82
                query_tree_node = array_join_node.getTableExpression().get();
83
                break;
84
            }
85
            case QueryTreeNodeType::JOIN:
86
            {
87
                const auto & join_node = query_tree_node->as<JoinNode &>();
88
                auto join_kind = join_node.getKind();
89
                auto join_strictness = join_node.getStrictness();
90

91
                bool can_parallelize_join =
92
                    join_kind == JoinKind::Left
93
                    || (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All);
94

95
                if (!can_parallelize_join)
96
                    return {};
97

98
                query_tree_node = join_node.getLeftTableExpression().get();
99
                break;
100
            }
101
            default:
102
            {
103
                throw Exception(ErrorCodes::LOGICAL_ERROR,
104
                                "Unexpected node type for table expression. "
105
                                "Expected table, table function, query, union, join or array join. Actual {}",
106
                                query_tree_node->getNodeTypeName());
107
            }
108
        }
109
    }
110

111
    return res;
112
}
113

114
class ReplaceTableNodeToDummyVisitor : public InDepthQueryTreeVisitor<ReplaceTableNodeToDummyVisitor, true>
115
{
116
public:
117
    using Base = InDepthQueryTreeVisitor<ReplaceTableNodeToDummyVisitor, true>;
118
    using Base::Base;
119

120
    void visitImpl(const QueryTreeNodePtr & node)
121
    {
122
        auto * table_node = node->as<TableNode>();
123
        auto * table_function_node = node->as<TableFunctionNode>();
124

125
        if (table_node || table_function_node)
126
        {
127
            const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
128
            auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
129

130
            auto storage_dummy = std::make_shared<StorageDummy>(
131
                storage_snapshot->storage.getStorageID(),
132
                ColumnsDescription(storage_snapshot->getColumns(get_column_options)),
133
                storage_snapshot);
134

135
            auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);
136

137
            dummy_table_node->setAlias(node->getAlias());
138
            replacement_map.emplace(node.get(), std::move(dummy_table_node));
139
        }
140
    }
141

142
    ContextPtr context;
143
    std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
144
};
145

146
QueryTreeNodePtr replaceTablesWithDummyTables(const QueryTreeNodePtr & query, const ContextPtr & context)
147
{
148
    ReplaceTableNodeToDummyVisitor visitor;
149
    visitor.context = context;
150
    visitor.visit(query);
151

152
    return query->cloneAndReplace(visitor.replacement_map);
153
}
154

155
/// Find the best candidate for parallel replicas execution by verifying query plan.
156
/// If query plan has only Expression, Filter of Join steps, we can execute it fully remotely and check the next query.
157
/// Otherwise we can execute current query up to WithMergableStage only.
158
const QueryNode * findQueryForParallelReplicas(
159
    std::stack<const QueryNode *> stack,
160
    const std::unordered_map<const QueryNode *, const QueryPlan::Node *> & mapping,
161
    const Settings & settings)
162
{
163
    const QueryPlan::Node * prev_checked_node = nullptr;
164
    const QueryNode * res = nullptr;
165

166
    while (!stack.empty())
167
    {
168
        const QueryNode * subquery_node = stack.top();
169
        stack.pop();
170

171
        auto it = mapping.find(subquery_node);
172
        /// This should not happen ideally.
173
        if (it == mapping.end())
174
            break;
175

176
        const QueryPlan::Node * curr_node = it->second;
177
        const QueryPlan::Node * next_node_to_check = curr_node;
178
        bool can_distribute_full_node = true;
179

180
        while (next_node_to_check && next_node_to_check != prev_checked_node)
181
        {
182
            const auto & children = next_node_to_check->children;
183
            auto * step = next_node_to_check->step.get();
184

185
            if (children.empty())
186
            {
187
                /// Found a source step. This should be possible only in the first iteration.
188
                if (prev_checked_node)
189
                    return nullptr;
190

191
                next_node_to_check = nullptr;
192
            }
193
            else if (children.size() == 1)
194
            {
195
                const auto * expression = typeid_cast<ExpressionStep *>(step);
196
                const auto * filter = typeid_cast<FilterStep *>(step);
197

198
                const auto * creating_sets = typeid_cast<DelayedCreatingSetsStep *>(step);
199
                bool allowed_creating_sets = settings.parallel_replicas_allow_in_with_subquery && creating_sets;
200

201
                if (!expression && !filter && !allowed_creating_sets)
202
                    can_distribute_full_node = false;
203

204
                next_node_to_check = children.front();
205
            }
206
            else
207
            {
208
                const auto * join = typeid_cast<JoinStep *>(step);
209
                /// We've checked that JOIN is INNER/LEFT in query tree.
210
                /// Don't distribute UNION node.
211
                if (!join)
212
                    return res;
213

214
                next_node_to_check = children.front();
215
            }
216
        }
217

218
        /// Current node contains steps like GROUP BY / DISTINCT
219
        /// Will try to execute query up to WithMergableStage
220
        if (!can_distribute_full_node)
221
        {
222
            /// Current query node does not contain subqueries.
223
            /// We can execute parallel replicas over storage::read.
224
            if (!res)
225
                return nullptr;
226

227
            return subquery_node;
228
        }
229

230
        /// Query is simple enough to be fully distributed.
231
        res = subquery_node;
232
        prev_checked_node = curr_node;
233
    }
234

235
    return res;
236
}
237

238
const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
239
{
240
    if (select_query_options.only_analyze)
241
        return nullptr;
242

243
    auto * query_node = query_tree_node->as<QueryNode>();
244
    auto * union_node = query_tree_node->as<UnionNode>();
245

246
    if (!query_node && !union_node)
247
        throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
248
            "Expected QUERY or UNION node. Actual {}",
249
            query_tree_node->formatASTForErrorMessage());
250

251
    auto context = query_node ? query_node->getContext() : union_node->getContext();
252

253
    if (!context->canUseParallelReplicasOnInitiator())
254
        return nullptr;
255

256
    auto stack = getSupportingParallelReplicasQuery(query_tree_node.get());
257
    /// Empty stack means that storage does not support parallel replicas.
258
    if (stack.empty())
259
        return nullptr;
260

261
    /// We don't have any subquery and storage can process parallel replicas by itself.
262
    if (stack.top() == query_tree_node.get())
263
        return nullptr;
264

265
    /// This is needed to avoid infinite recursion.
266
    auto mutable_context = Context::createCopy(context);
267
    mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
268

269
    /// Here we replace tables to dummy, in order to build a temporary query plan for parallel replicas analysis.
270
    ResultReplacementMap replacement_map;
271
    auto updated_query_tree = replaceTablesWithDummyTables(query_tree_node, mutable_context);
272

273
    SelectQueryOptions options;
274
    Planner planner(updated_query_tree, options, std::make_shared<GlobalPlannerContext>(nullptr, nullptr, FiltersForTableExpressionMap{}));
275
    planner.buildQueryPlanIfNeeded();
276

277
    /// This part is a bit clumsy.
278
    /// We updated a query_tree with dummy storages, and mapping is using updated_query_tree now.
279
    /// But QueryNode result should be taken from initial query tree.
280
    /// So that we build a list of candidates again, and call findQueryForParallelReplicas for it.
281
    auto new_stack = getSupportingParallelReplicasQuery(updated_query_tree.get());
282
    const auto & mapping = planner.getQueryNodeToPlanStepMapping();
283
    const auto * res = findQueryForParallelReplicas(new_stack, mapping, context->getSettingsRef());
284

285
    /// Now, return a query from initial stack.
286
    if (res)
287
    {
288
        while (!new_stack.empty())
289
        {
290
            if (res == new_stack.top())
291
                return stack.top();
292

293
            stack.pop();
294
            new_stack.pop();
295
        }
296
    }
297

298
    return res;
299
}
300

301
static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * query_tree_node)
302
{
303
    std::stack<const IQueryTreeNode *> right_join_nodes;
304
    while (query_tree_node || !right_join_nodes.empty())
305
    {
306
        if (!query_tree_node)
307
        {
308
            query_tree_node = right_join_nodes.top();
309
            right_join_nodes.pop();
310
        }
311

312
        auto join_tree_node_type = query_tree_node->getNodeType();
313

314
        switch (join_tree_node_type)
315
        {
316
            case QueryTreeNodeType::TABLE:
317
            {
318
                const auto & table_node = query_tree_node->as<TableNode &>();
319
                const auto & storage = table_node.getStorage();
320
                if (std::dynamic_pointer_cast<MergeTreeData>(storage) || typeid_cast<const StorageDummy *>(storage.get()))
321
                    return &table_node;
322

323
                query_tree_node = nullptr;
324
                break;
325
            }
326
            case QueryTreeNodeType::TABLE_FUNCTION:
327
            {
328
                query_tree_node = nullptr;
329
                break;
330
            }
331
            case QueryTreeNodeType::QUERY:
332
            {
333
                const auto & query_node_to_process = query_tree_node->as<QueryNode &>();
334
                query_tree_node = query_node_to_process.getJoinTree().get();
335
                break;
336
            }
337
            case QueryTreeNodeType::UNION:
338
            {
339
                const auto & union_node = query_tree_node->as<UnionNode &>();
340
                const auto & union_queries = union_node.getQueries().getNodes();
341

342
                query_tree_node = nullptr;
343
                if (!union_queries.empty())
344
                    query_tree_node = union_queries.front().get();
345

346
                break;
347
            }
348
            case QueryTreeNodeType::ARRAY_JOIN:
349
            {
350
                const auto & array_join_node = query_tree_node->as<ArrayJoinNode &>();
351
                query_tree_node = array_join_node.getTableExpression().get();
352
                break;
353
            }
354
            case QueryTreeNodeType::JOIN:
355
            {
356
                const auto & join_node = query_tree_node->as<JoinNode &>();
357
                query_tree_node = join_node.getLeftTableExpression().get();
358
                right_join_nodes.push(join_node.getRightTableExpression().get());
359
                break;
360
            }
361
            default:
362
            {
363
                throw Exception(ErrorCodes::LOGICAL_ERROR,
364
                                "Unexpected node type for table expression. "
365
                                "Expected table, table function, query, union, join or array join. Actual {}",
366
                                query_tree_node->getNodeTypeName());
367
            }
368
        }
369
    }
370

371
    return nullptr;
372
}
373

374
const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
375
{
376
    if (select_query_options.only_analyze)
377
        return nullptr;
378

379
    auto * query_node = query_tree_node->as<QueryNode>();
380
    auto * union_node = query_tree_node->as<UnionNode>();
381

382
    if (!query_node && !union_node)
383
        throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
384
            "Expected QUERY or UNION node. Actual {}",
385
            query_tree_node->formatASTForErrorMessage());
386

387
    auto context = query_node ? query_node->getContext() : union_node->getContext();
388

389
    if (!context->canUseParallelReplicasOnFollower())
390
        return nullptr;
391

392
    return findTableForParallelReplicas(query_tree_node.get());
393
}
394

395
JoinTreeQueryPlan buildQueryPlanForParallelReplicas(
396
    const QueryNode & query_node,
397
    const PlannerContextPtr & planner_context,
398
    std::shared_ptr<const StorageLimitsList> storage_limits)
399
{
400
    auto processed_stage = QueryProcessingStage::WithMergeableState;
401
    auto context = planner_context->getQueryContext();
402

403
    QueryTreeNodePtr modified_query_tree = query_node.clone();
404

405
    Block initial_header = InterpreterSelectQueryAnalyzer::getSampleBlock(
406
        modified_query_tree, context, SelectQueryOptions(processed_stage).analyze());
407

408
    rewriteJoinToGlobalJoin(modified_query_tree, context);
409
    modified_query_tree = buildQueryTreeForShard(planner_context, modified_query_tree);
410
    ASTPtr modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree);
411

412
    Block header = InterpreterSelectQueryAnalyzer::getSampleBlock(
413
        modified_query_tree, context, SelectQueryOptions(processed_stage).analyze());
414

415
    ClusterProxy::SelectStreamFactory select_stream_factory =
416
        ClusterProxy::SelectStreamFactory(
417
            header,
418
            {},
419
            {},
420
            processed_stage);
421

422
    QueryPlan query_plan;
423
    ClusterProxy::executeQueryWithParallelReplicas(
424
        query_plan,
425
        select_stream_factory,
426
        modified_query_ast,
427
        context,
428
        storage_limits);
429

430
    auto converting = ActionsDAG::makeConvertingActions(
431
        header.getColumnsWithTypeAndName(),
432
        initial_header.getColumnsWithTypeAndName(),
433
        ActionsDAG::MatchColumnsMode::Position);
434

435
    /// initial_header is a header expected by initial query.
436
    /// header is a header which is returned by the follower.
437
    /// They are different because tables will have different aliases (e.g. _table1 or _table5).
438
    /// Here we just rename columns by position, with the hope the types would match.
439
    auto step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), std::move(converting));
440
    step->setStepDescription("Convert distributed names");
441
    query_plan.addStep(std::move(step));
442

443
    return {std::move(query_plan), std::move(processed_stage), {}, {}, {}};
444
}
445

446
}
447

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.