ClickHouse
446 строк · 16.6 Кб
1#include <Planner/findQueryForParallelReplicas.h>
2#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
3#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
4#include <Processors/QueryPlan/JoinStep.h>
5#include <Processors/QueryPlan/CreatingSetsStep.h>
6#include <Storages/buildQueryTreeForShard.h>
7#include <Interpreters/ClusterProxy/executeQuery.h>
8#include <Planner/PlannerJoinTree.h>
9#include <Planner/Utils.h>
10#include <Analyzer/ArrayJoinNode.h>
11#include <Analyzer/InDepthQueryTreeVisitor.h>
12#include <Analyzer/JoinNode.h>
13#include <Analyzer/QueryNode.h>
14#include <Analyzer/TableNode.h>
15#include <Analyzer/UnionNode.h>
16#include <Parsers/ASTSubquery.h>
17#include <Parsers/queryToString.h>
18#include <Processors/QueryPlan/ExpressionStep.h>
19#include <Processors/QueryPlan/FilterStep.h>
20#include <Storages/MergeTree/MergeTreeData.h>
21#include <Storages/StorageDummy.h>
22
23namespace DB
24{
25
26namespace ErrorCodes
27{
28extern const int LOGICAL_ERROR;
29extern const int UNSUPPORTED_METHOD;
30}
31
32/// Returns a list of (sub)queries (candidates) which may support parallel replicas.
33/// The rule is :
34/// subquery has only LEFT or ALL INNER JOIN (or none), and left part is MergeTree table or subquery candidate as well.
35///
36/// Additional checks are required, so we return many candidates. The innermost subquery is on top.
37std::stack<const QueryNode *> getSupportingParallelReplicasQuery(const IQueryTreeNode * query_tree_node)
38{
39std::stack<const QueryNode *> res;
40
41while (query_tree_node)
42{
43auto join_tree_node_type = query_tree_node->getNodeType();
44
45switch (join_tree_node_type)
46{
47case QueryTreeNodeType::TABLE:
48{
49const auto & table_node = query_tree_node->as<TableNode &>();
50const auto & storage = table_node.getStorage();
51/// Here we check StorageDummy as well, to support a query tree with replaced storages.
52if (std::dynamic_pointer_cast<MergeTreeData>(storage) || typeid_cast<const StorageDummy *>(storage.get()))
53return res;
54
55return {};
56}
57case QueryTreeNodeType::TABLE_FUNCTION:
58{
59return {};
60}
61case QueryTreeNodeType::QUERY:
62{
63const auto & query_node_to_process = query_tree_node->as<QueryNode &>();
64query_tree_node = query_node_to_process.getJoinTree().get();
65res.push(&query_node_to_process);
66break;
67}
68case QueryTreeNodeType::UNION:
69{
70const auto & union_node = query_tree_node->as<UnionNode &>();
71const auto & union_queries = union_node.getQueries().getNodes();
72
73if (union_queries.empty())
74return {};
75
76query_tree_node = union_queries.front().get();
77break;
78}
79case QueryTreeNodeType::ARRAY_JOIN:
80{
81const auto & array_join_node = query_tree_node->as<ArrayJoinNode &>();
82query_tree_node = array_join_node.getTableExpression().get();
83break;
84}
85case QueryTreeNodeType::JOIN:
86{
87const auto & join_node = query_tree_node->as<JoinNode &>();
88auto join_kind = join_node.getKind();
89auto join_strictness = join_node.getStrictness();
90
91bool can_parallelize_join =
92join_kind == JoinKind::Left
93|| (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All);
94
95if (!can_parallelize_join)
96return {};
97
98query_tree_node = join_node.getLeftTableExpression().get();
99break;
100}
101default:
102{
103throw Exception(ErrorCodes::LOGICAL_ERROR,
104"Unexpected node type for table expression. "
105"Expected table, table function, query, union, join or array join. Actual {}",
106query_tree_node->getNodeTypeName());
107}
108}
109}
110
111return res;
112}
113
114class ReplaceTableNodeToDummyVisitor : public InDepthQueryTreeVisitor<ReplaceTableNodeToDummyVisitor, true>
115{
116public:
117using Base = InDepthQueryTreeVisitor<ReplaceTableNodeToDummyVisitor, true>;
118using Base::Base;
119
120void visitImpl(const QueryTreeNodePtr & node)
121{
122auto * table_node = node->as<TableNode>();
123auto * table_function_node = node->as<TableFunctionNode>();
124
125if (table_node || table_function_node)
126{
127const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
128auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
129
130auto storage_dummy = std::make_shared<StorageDummy>(
131storage_snapshot->storage.getStorageID(),
132ColumnsDescription(storage_snapshot->getColumns(get_column_options)),
133storage_snapshot);
134
135auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);
136
137dummy_table_node->setAlias(node->getAlias());
138replacement_map.emplace(node.get(), std::move(dummy_table_node));
139}
140}
141
142ContextPtr context;
143std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
144};
145
146QueryTreeNodePtr replaceTablesWithDummyTables(const QueryTreeNodePtr & query, const ContextPtr & context)
147{
148ReplaceTableNodeToDummyVisitor visitor;
149visitor.context = context;
150visitor.visit(query);
151
152return query->cloneAndReplace(visitor.replacement_map);
153}
154
155/// Find the best candidate for parallel replicas execution by verifying query plan.
156/// If query plan has only Expression, Filter of Join steps, we can execute it fully remotely and check the next query.
157/// Otherwise we can execute current query up to WithMergableStage only.
158const QueryNode * findQueryForParallelReplicas(
159std::stack<const QueryNode *> stack,
160const std::unordered_map<const QueryNode *, const QueryPlan::Node *> & mapping,
161const Settings & settings)
162{
163const QueryPlan::Node * prev_checked_node = nullptr;
164const QueryNode * res = nullptr;
165
166while (!stack.empty())
167{
168const QueryNode * subquery_node = stack.top();
169stack.pop();
170
171auto it = mapping.find(subquery_node);
172/// This should not happen ideally.
173if (it == mapping.end())
174break;
175
176const QueryPlan::Node * curr_node = it->second;
177const QueryPlan::Node * next_node_to_check = curr_node;
178bool can_distribute_full_node = true;
179
180while (next_node_to_check && next_node_to_check != prev_checked_node)
181{
182const auto & children = next_node_to_check->children;
183auto * step = next_node_to_check->step.get();
184
185if (children.empty())
186{
187/// Found a source step. This should be possible only in the first iteration.
188if (prev_checked_node)
189return nullptr;
190
191next_node_to_check = nullptr;
192}
193else if (children.size() == 1)
194{
195const auto * expression = typeid_cast<ExpressionStep *>(step);
196const auto * filter = typeid_cast<FilterStep *>(step);
197
198const auto * creating_sets = typeid_cast<DelayedCreatingSetsStep *>(step);
199bool allowed_creating_sets = settings.parallel_replicas_allow_in_with_subquery && creating_sets;
200
201if (!expression && !filter && !allowed_creating_sets)
202can_distribute_full_node = false;
203
204next_node_to_check = children.front();
205}
206else
207{
208const auto * join = typeid_cast<JoinStep *>(step);
209/// We've checked that JOIN is INNER/LEFT in query tree.
210/// Don't distribute UNION node.
211if (!join)
212return res;
213
214next_node_to_check = children.front();
215}
216}
217
218/// Current node contains steps like GROUP BY / DISTINCT
219/// Will try to execute query up to WithMergableStage
220if (!can_distribute_full_node)
221{
222/// Current query node does not contain subqueries.
223/// We can execute parallel replicas over storage::read.
224if (!res)
225return nullptr;
226
227return subquery_node;
228}
229
230/// Query is simple enough to be fully distributed.
231res = subquery_node;
232prev_checked_node = curr_node;
233}
234
235return res;
236}
237
238const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
239{
240if (select_query_options.only_analyze)
241return nullptr;
242
243auto * query_node = query_tree_node->as<QueryNode>();
244auto * union_node = query_tree_node->as<UnionNode>();
245
246if (!query_node && !union_node)
247throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
248"Expected QUERY or UNION node. Actual {}",
249query_tree_node->formatASTForErrorMessage());
250
251auto context = query_node ? query_node->getContext() : union_node->getContext();
252
253if (!context->canUseParallelReplicasOnInitiator())
254return nullptr;
255
256auto stack = getSupportingParallelReplicasQuery(query_tree_node.get());
257/// Empty stack means that storage does not support parallel replicas.
258if (stack.empty())
259return nullptr;
260
261/// We don't have any subquery and storage can process parallel replicas by itself.
262if (stack.top() == query_tree_node.get())
263return nullptr;
264
265/// This is needed to avoid infinite recursion.
266auto mutable_context = Context::createCopy(context);
267mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
268
269/// Here we replace tables to dummy, in order to build a temporary query plan for parallel replicas analysis.
270ResultReplacementMap replacement_map;
271auto updated_query_tree = replaceTablesWithDummyTables(query_tree_node, mutable_context);
272
273SelectQueryOptions options;
274Planner planner(updated_query_tree, options, std::make_shared<GlobalPlannerContext>(nullptr, nullptr, FiltersForTableExpressionMap{}));
275planner.buildQueryPlanIfNeeded();
276
277/// This part is a bit clumsy.
278/// We updated a query_tree with dummy storages, and mapping is using updated_query_tree now.
279/// But QueryNode result should be taken from initial query tree.
280/// So that we build a list of candidates again, and call findQueryForParallelReplicas for it.
281auto new_stack = getSupportingParallelReplicasQuery(updated_query_tree.get());
282const auto & mapping = planner.getQueryNodeToPlanStepMapping();
283const auto * res = findQueryForParallelReplicas(new_stack, mapping, context->getSettingsRef());
284
285/// Now, return a query from initial stack.
286if (res)
287{
288while (!new_stack.empty())
289{
290if (res == new_stack.top())
291return stack.top();
292
293stack.pop();
294new_stack.pop();
295}
296}
297
298return res;
299}
300
301static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * query_tree_node)
302{
303std::stack<const IQueryTreeNode *> right_join_nodes;
304while (query_tree_node || !right_join_nodes.empty())
305{
306if (!query_tree_node)
307{
308query_tree_node = right_join_nodes.top();
309right_join_nodes.pop();
310}
311
312auto join_tree_node_type = query_tree_node->getNodeType();
313
314switch (join_tree_node_type)
315{
316case QueryTreeNodeType::TABLE:
317{
318const auto & table_node = query_tree_node->as<TableNode &>();
319const auto & storage = table_node.getStorage();
320if (std::dynamic_pointer_cast<MergeTreeData>(storage) || typeid_cast<const StorageDummy *>(storage.get()))
321return &table_node;
322
323query_tree_node = nullptr;
324break;
325}
326case QueryTreeNodeType::TABLE_FUNCTION:
327{
328query_tree_node = nullptr;
329break;
330}
331case QueryTreeNodeType::QUERY:
332{
333const auto & query_node_to_process = query_tree_node->as<QueryNode &>();
334query_tree_node = query_node_to_process.getJoinTree().get();
335break;
336}
337case QueryTreeNodeType::UNION:
338{
339const auto & union_node = query_tree_node->as<UnionNode &>();
340const auto & union_queries = union_node.getQueries().getNodes();
341
342query_tree_node = nullptr;
343if (!union_queries.empty())
344query_tree_node = union_queries.front().get();
345
346break;
347}
348case QueryTreeNodeType::ARRAY_JOIN:
349{
350const auto & array_join_node = query_tree_node->as<ArrayJoinNode &>();
351query_tree_node = array_join_node.getTableExpression().get();
352break;
353}
354case QueryTreeNodeType::JOIN:
355{
356const auto & join_node = query_tree_node->as<JoinNode &>();
357query_tree_node = join_node.getLeftTableExpression().get();
358right_join_nodes.push(join_node.getRightTableExpression().get());
359break;
360}
361default:
362{
363throw Exception(ErrorCodes::LOGICAL_ERROR,
364"Unexpected node type for table expression. "
365"Expected table, table function, query, union, join or array join. Actual {}",
366query_tree_node->getNodeTypeName());
367}
368}
369}
370
371return nullptr;
372}
373
374const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
375{
376if (select_query_options.only_analyze)
377return nullptr;
378
379auto * query_node = query_tree_node->as<QueryNode>();
380auto * union_node = query_tree_node->as<UnionNode>();
381
382if (!query_node && !union_node)
383throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
384"Expected QUERY or UNION node. Actual {}",
385query_tree_node->formatASTForErrorMessage());
386
387auto context = query_node ? query_node->getContext() : union_node->getContext();
388
389if (!context->canUseParallelReplicasOnFollower())
390return nullptr;
391
392return findTableForParallelReplicas(query_tree_node.get());
393}
394
395JoinTreeQueryPlan buildQueryPlanForParallelReplicas(
396const QueryNode & query_node,
397const PlannerContextPtr & planner_context,
398std::shared_ptr<const StorageLimitsList> storage_limits)
399{
400auto processed_stage = QueryProcessingStage::WithMergeableState;
401auto context = planner_context->getQueryContext();
402
403QueryTreeNodePtr modified_query_tree = query_node.clone();
404
405Block initial_header = InterpreterSelectQueryAnalyzer::getSampleBlock(
406modified_query_tree, context, SelectQueryOptions(processed_stage).analyze());
407
408rewriteJoinToGlobalJoin(modified_query_tree, context);
409modified_query_tree = buildQueryTreeForShard(planner_context, modified_query_tree);
410ASTPtr modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree);
411
412Block header = InterpreterSelectQueryAnalyzer::getSampleBlock(
413modified_query_tree, context, SelectQueryOptions(processed_stage).analyze());
414
415ClusterProxy::SelectStreamFactory select_stream_factory =
416ClusterProxy::SelectStreamFactory(
417header,
418{},
419{},
420processed_stage);
421
422QueryPlan query_plan;
423ClusterProxy::executeQueryWithParallelReplicas(
424query_plan,
425select_stream_factory,
426modified_query_ast,
427context,
428storage_limits);
429
430auto converting = ActionsDAG::makeConvertingActions(
431header.getColumnsWithTypeAndName(),
432initial_header.getColumnsWithTypeAndName(),
433ActionsDAG::MatchColumnsMode::Position);
434
435/// initial_header is a header expected by initial query.
436/// header is a header which is returned by the follower.
437/// They are different because tables will have different aliases (e.g. _table1 or _table5).
438/// Here we just rename columns by position, with the hope the types would match.
439auto step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), std::move(converting));
440step->setStepDescription("Convert distributed names");
441query_plan.addStep(std::move(step));
442
443return {std::move(query_plan), std::move(processed_stage), {}, {}, {}};
444}
445
446}
447