ClickHouse
88 строк · 3.4 Кб
1#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
2
3#include <Common/config_version.h>
4#include <Common/checkStackSize.h>
5#include <Core/ProtocolDefines.h>
6#include <Interpreters/ActionsDAG.h>
7#include <Interpreters/InterpreterSelectQuery.h>
8#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
9#include <Processors/QueryPlan/ExpressionStep.h>
10
11namespace DB
12{
13
14namespace
15{
16
17void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects)
18{
19if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
20return;
21
22auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name;
23
24auto get_converting_dag = [mode](const Block & block_, const Block & header_)
25{
26/// Convert header structure to expected.
27/// Also we ignore constants from result and replace it with constants from header.
28/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
29return ActionsDAG::makeConvertingActions(
30block_.getColumnsWithTypeAndName(),
31header_.getColumnsWithTypeAndName(),
32mode,
33true);
34};
35
36auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header);
37auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), convert_actions_dag);
38plan.addStep(std::move(converting));
39}
40
41}
42
43std::unique_ptr<QueryPlan> createLocalPlan(
44const ASTPtr & query_ast,
45const Block & header,
46ContextPtr context,
47QueryProcessingStage::Enum processed_stage,
48size_t shard_num,
49size_t shard_count,
50bool has_missing_objects)
51{
52checkStackSize();
53
54auto query_plan = std::make_unique<QueryPlan>();
55auto new_context = Context::createCopy(context);
56
57/// Do not push down limit to local plan, as it will break `rows_before_limit_at_least` counter.
58if (processed_stage == QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit)
59processed_stage = QueryProcessingStage::WithMergeableStateAfterAggregation;
60
61/// Do not apply AST optimizations, because query
62/// is already optimized and some optimizations
63/// can be applied only for non-distributed tables
64/// and we can produce query, inconsistent with remote plans.
65auto select_query_options = SelectQueryOptions(processed_stage)
66.setShardInfo(static_cast<UInt32>(shard_num), static_cast<UInt32>(shard_count))
67.ignoreASTOptimizations();
68
69if (context->getSettingsRef().allow_experimental_analyzer)
70{
71/// For Analyzer, identifier in GROUP BY/ORDER BY/LIMIT BY lists has been resolved to
72/// ConstantNode in QueryTree if it is an alias of a constant, so we should not replace
73/// ConstantNode with ProjectionNode again(https://github.com/ClickHouse/ClickHouse/issues/62289).
74new_context->setSetting("enable_positional_arguments", Field(false));
75auto interpreter = InterpreterSelectQueryAnalyzer(query_ast, new_context, select_query_options);
76query_plan = std::make_unique<QueryPlan>(std::move(interpreter).extractQueryPlan());
77}
78else
79{
80auto interpreter = InterpreterSelectQuery(query_ast, new_context, select_query_options);
81interpreter.buildQueryPlan(*query_plan);
82}
83
84addConvertingActions(*query_plan, header, has_missing_objects);
85return query_plan;
86}
87
88}
89