ClickHouse
572 строки · 17.5 Кб
1#include <stack>
2
3#include <Common/JSONBuilder.h>
4
5#include <Interpreters/ActionsDAG.h>
6#include <Interpreters/ArrayJoinAction.h>
7
8#include <IO/Operators.h>
9#include <IO/WriteBuffer.h>
10
11#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
12#include <Processors/QueryPlan/IQueryPlanStep.h>
13#include <Processors/QueryPlan/Optimizations/Optimizations.h>
14#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
15#include <Processors/QueryPlan/QueryPlan.h>
16#include <Processors/QueryPlan/ReadFromMergeTree.h>
17#include <Processors/QueryPlan/ITransformingStep.h>
18#include <Processors/QueryPlan/QueryPlanVisitor.h>
19
20#include <QueryPipeline/QueryPipelineBuilder.h>
21
22
23namespace DB
24{
25
26namespace ErrorCodes
27{
28extern const int LOGICAL_ERROR;
29}
30
31QueryPlan::QueryPlan() = default;
32QueryPlan::~QueryPlan() = default;
33QueryPlan::QueryPlan(QueryPlan &&) noexcept = default;
34QueryPlan & QueryPlan::operator=(QueryPlan &&) noexcept = default;
35
36void QueryPlan::checkInitialized() const
37{
38if (!isInitialized())
39throw Exception(ErrorCodes::LOGICAL_ERROR, "QueryPlan was not initialized");
40}
41
42void QueryPlan::checkNotCompleted() const
43{
44if (isCompleted())
45throw Exception(ErrorCodes::LOGICAL_ERROR, "QueryPlan was already completed");
46}
47
48bool QueryPlan::isCompleted() const
49{
50return isInitialized() && !root->step->hasOutputStream();
51}
52
53const DataStream & QueryPlan::getCurrentDataStream() const
54{
55checkInitialized();
56checkNotCompleted();
57return root->step->getOutputStream();
58}
59
60void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<std::unique_ptr<QueryPlan>> plans)
61{
62if (isInitialized())
63throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite plans because current QueryPlan is already initialized");
64
65const auto & inputs = step->getInputStreams();
66size_t num_inputs = step->getInputStreams().size();
67if (num_inputs != plans.size())
68throw Exception(
69ErrorCodes::LOGICAL_ERROR,
70"Cannot unite QueryPlans using {} because step has different number of inputs. Has {} plans and {} inputs",
71step->getName(),
72plans.size(),
73num_inputs);
74
75for (size_t i = 0; i < num_inputs; ++i)
76{
77const auto & step_header = inputs[i].header;
78const auto & plan_header = plans[i]->getCurrentDataStream().header;
79if (!blocksHaveEqualStructure(step_header, plan_header))
80throw Exception(
81ErrorCodes::LOGICAL_ERROR,
82"Cannot unite QueryPlans using {} because it has incompatible header with plan {} plan header: {} step header: {}",
83step->getName(),
84root->step->getName(),
85plan_header.dumpStructure(),
86step_header.dumpStructure());
87}
88
89for (auto & plan : plans)
90nodes.splice(nodes.end(), std::move(plan->nodes));
91
92nodes.emplace_back(Node{.step = std::move(step)});
93root = &nodes.back();
94
95for (auto & plan : plans)
96root->children.emplace_back(plan->root);
97
98for (auto & plan : plans)
99{
100max_threads = std::max(max_threads, plan->max_threads);
101resources = std::move(plan->resources);
102}
103}
104
105void QueryPlan::addStep(QueryPlanStepPtr step)
106{
107checkNotCompleted();
108
109size_t num_input_streams = step->getInputStreams().size();
110
111if (num_input_streams == 0)
112{
113if (isInitialized())
114throw Exception(
115ErrorCodes::LOGICAL_ERROR,
116"Cannot add step {} to QueryPlan because step has no inputs, but QueryPlan is already initialized",
117step->getName());
118
119nodes.emplace_back(Node{.step = std::move(step)});
120root = &nodes.back();
121return;
122}
123
124if (num_input_streams == 1)
125{
126if (!isInitialized())
127throw Exception(
128ErrorCodes::LOGICAL_ERROR,
129"Cannot add step {} to QueryPlan because step has input, but QueryPlan is not initialized",
130step->getName());
131
132const auto & root_header = root->step->getOutputStream().header;
133const auto & step_header = step->getInputStreams().front().header;
134if (!blocksHaveEqualStructure(root_header, step_header))
135throw Exception(
136ErrorCodes::LOGICAL_ERROR,
137"Cannot add step {} to QueryPlan because it has incompatible header with root step {} root header: {} step header: {}",
138step->getName(),
139root->step->getName(),
140root_header.dumpStructure(),
141step_header.dumpStructure());
142
143nodes.emplace_back(Node{.step = std::move(step), .children = {root}});
144root = &nodes.back();
145return;
146}
147
148throw Exception(
149ErrorCodes::LOGICAL_ERROR,
150"Cannot add step {} to QueryPlan because it has {} inputs but {} input expected",
151step->getName(),
152num_input_streams,
153isInitialized() ? 1 : 0);
154}
155
156QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
157const QueryPlanOptimizationSettings & optimization_settings,
158const BuildQueryPipelineSettings & build_pipeline_settings)
159{
160checkInitialized();
161optimize(optimization_settings);
162
163struct Frame
164{
165Node * node = {};
166QueryPipelineBuilders pipelines = {};
167};
168
169QueryPipelineBuilderPtr last_pipeline;
170
171std::stack<Frame> stack;
172stack.push(Frame{.node = root});
173
174while (!stack.empty())
175{
176auto & frame = stack.top();
177
178if (last_pipeline)
179{
180frame.pipelines.emplace_back(std::move(last_pipeline));
181last_pipeline = nullptr;
182}
183
184size_t next_child = frame.pipelines.size();
185if (next_child == frame.node->children.size())
186{
187bool limit_max_threads = frame.pipelines.empty();
188last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
189
190if (limit_max_threads && max_threads)
191last_pipeline->limitMaxThreads(max_threads);
192
193stack.pop();
194}
195else
196stack.push(Frame{.node = frame.node->children[next_child]});
197}
198
199last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
200last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
201last_pipeline->addResources(std::move(resources));
202
203return last_pipeline;
204}
205
206static void explainStep(const IQueryPlanStep & step, JSONBuilder::JSONMap & map, const QueryPlan::ExplainPlanOptions & options)
207{
208map.add("Node Type", step.getName());
209
210if (options.description)
211{
212const auto & description = step.getStepDescription();
213if (!description.empty())
214map.add("Description", description);
215}
216
217if (options.header && step.hasOutputStream())
218{
219auto header_array = std::make_unique<JSONBuilder::JSONArray>();
220
221for (const auto & output_column : step.getOutputStream().header)
222{
223auto column_map = std::make_unique<JSONBuilder::JSONMap>();
224column_map->add("Name", output_column.name);
225if (output_column.type)
226column_map->add("Type", output_column.type->getName());
227
228header_array->add(std::move(column_map));
229}
230
231map.add("Header", std::move(header_array));
232}
233
234if (options.actions)
235step.describeActions(map);
236
237if (options.indexes)
238step.describeIndexes(map);
239}
240
241JSONBuilder::ItemPtr QueryPlan::explainPlan(const ExplainPlanOptions & options)
242{
243checkInitialized();
244
245struct Frame
246{
247Node * node = {};
248size_t next_child = 0;
249std::unique_ptr<JSONBuilder::JSONMap> node_map = {};
250std::unique_ptr<JSONBuilder::JSONArray> children_array = {};
251};
252
253std::stack<Frame> stack;
254stack.push(Frame{.node = root});
255
256std::unique_ptr<JSONBuilder::JSONMap> tree;
257
258while (!stack.empty())
259{
260auto & frame = stack.top();
261
262if (frame.next_child == 0)
263{
264if (!frame.node->children.empty())
265frame.children_array = std::make_unique<JSONBuilder::JSONArray>();
266
267frame.node_map = std::make_unique<JSONBuilder::JSONMap>();
268explainStep(*frame.node->step, *frame.node_map, options);
269}
270
271if (frame.next_child < frame.node->children.size())
272{
273stack.push(Frame{frame.node->children[frame.next_child]});
274++frame.next_child;
275}
276else
277{
278auto child_plans = frame.node->step->getChildPlans();
279
280if (!frame.children_array && !child_plans.empty())
281frame.children_array = std::make_unique<JSONBuilder::JSONArray>();
282
283for (const auto & child_plan : child_plans)
284frame.children_array->add(child_plan->explainPlan(options));
285
286if (frame.children_array)
287frame.node_map->add("Plans", std::move(frame.children_array));
288
289tree.swap(frame.node_map);
290stack.pop();
291
292if (!stack.empty())
293stack.top().children_array->add(std::move(tree));
294}
295}
296
297return tree;
298}
299
300static void explainStep(
301const IQueryPlanStep & step,
302IQueryPlanStep::FormatSettings & settings,
303const QueryPlan::ExplainPlanOptions & options)
304{
305std::string prefix(settings.offset, ' ');
306settings.out << prefix;
307settings.out << step.getName();
308
309const auto & description = step.getStepDescription();
310if (options.description && !description.empty())
311settings.out <<" (" << description << ')';
312
313settings.out.write('\n');
314
315if (options.header)
316{
317settings.out << prefix;
318
319if (!step.hasOutputStream())
320settings.out << "No header";
321else if (!step.getOutputStream().header)
322settings.out << "Empty header";
323else
324{
325settings.out << "Header: ";
326bool first = true;
327
328for (const auto & elem : step.getOutputStream().header)
329{
330if (!first)
331settings.out << "\n" << prefix << " ";
332
333first = false;
334elem.dumpNameAndType(settings.out);
335}
336}
337settings.out.write('\n');
338
339}
340
341if (options.sorting)
342{
343if (step.hasOutputStream())
344{
345settings.out << prefix << "Sorting (" << step.getOutputStream().sort_scope << ")";
346if (step.getOutputStream().sort_scope != DataStream::SortScope::None)
347{
348settings.out << ": ";
349dumpSortDescription(step.getOutputStream().sort_description, settings.out);
350}
351settings.out.write('\n');
352}
353}
354
355if (options.actions)
356step.describeActions(settings);
357
358if (options.indexes)
359step.describeIndexes(settings);
360}
361
362std::string debugExplainStep(const IQueryPlanStep & step)
363{
364WriteBufferFromOwnString out;
365IQueryPlanStep::FormatSettings settings{.out = out};
366QueryPlan::ExplainPlanOptions options{.actions = true};
367explainStep(step, settings, options);
368return out.str();
369}
370
371void QueryPlan::explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & options, size_t indent)
372{
373checkInitialized();
374
375IQueryPlanStep::FormatSettings settings{.out = buffer, .write_header = options.header};
376
377struct Frame
378{
379Node * node = {};
380bool is_description_printed = false;
381size_t next_child = 0;
382};
383
384std::stack<Frame> stack;
385stack.push(Frame{.node = root});
386
387while (!stack.empty())
388{
389auto & frame = stack.top();
390
391if (!frame.is_description_printed)
392{
393settings.offset = (indent + stack.size() - 1) * settings.indent;
394explainStep(*frame.node->step, settings, options);
395frame.is_description_printed = true;
396}
397
398if (frame.next_child < frame.node->children.size())
399{
400stack.push(Frame{frame.node->children[frame.next_child]});
401++frame.next_child;
402}
403else
404{
405auto child_plans = frame.node->step->getChildPlans();
406
407for (const auto & child_plan : child_plans)
408child_plan->explainPlan(buffer, options, indent + stack.size());
409
410stack.pop();
411}
412}
413}
414
415static void explainPipelineStep(IQueryPlanStep & step, IQueryPlanStep::FormatSettings & settings)
416{
417settings.out << String(settings.offset, settings.indent_char) << "(" << step.getName() << ")\n";
418
419size_t current_offset = settings.offset;
420step.describePipeline(settings);
421if (current_offset == settings.offset)
422settings.offset += settings.indent;
423}
424
425void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptions & options)
426{
427checkInitialized();
428
429IQueryPlanStep::FormatSettings settings{.out = buffer, .write_header = options.header};
430
431struct Frame
432{
433Node * node = {};
434size_t offset = 0;
435bool is_description_printed = false;
436size_t next_child = 0;
437};
438
439std::stack<Frame> stack;
440stack.push(Frame{.node = root});
441
442while (!stack.empty())
443{
444auto & frame = stack.top();
445
446if (!frame.is_description_printed)
447{
448settings.offset = frame.offset;
449explainPipelineStep(*frame.node->step, settings);
450frame.offset = settings.offset;
451frame.is_description_printed = true;
452}
453
454if (frame.next_child < frame.node->children.size())
455{
456stack.push(Frame{frame.node->children[frame.next_child], frame.offset});
457++frame.next_child;
458}
459else
460stack.pop();
461}
462}
463
464static void updateDataStreams(QueryPlan::Node & root)
465{
466class UpdateDataStreams : public QueryPlanVisitor<UpdateDataStreams, false>
467{
468public:
469explicit UpdateDataStreams(QueryPlan::Node * root_) : QueryPlanVisitor<UpdateDataStreams, false>(root_) { }
470
471static bool visitTopDownImpl(QueryPlan::Node * /*current_node*/, QueryPlan::Node * /*parent_node*/) { return true; }
472
473static void visitBottomUpImpl(QueryPlan::Node * current_node, QueryPlan::Node * /*parent_node*/)
474{
475auto & current_step = *current_node->step;
476if (!current_step.canUpdateInputStream() || current_node->children.empty())
477return;
478
479for (const auto * child : current_node->children)
480{
481if (!child->step->hasOutputStream())
482return;
483}
484
485DataStreams streams;
486streams.reserve(current_node->children.size());
487for (const auto * child : current_node->children)
488streams.emplace_back(child->step->getOutputStream());
489
490current_step.updateInputStreams(std::move(streams));
491}
492};
493
494UpdateDataStreams(&root).visit();
495}
496
497void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings)
498{
499/// optimization need to be applied before "mergeExpressions" optimization
500/// it removes redundant sorting steps, but keep underlying expressions,
501/// so "mergeExpressions" optimization handles them afterwards
502if (optimization_settings.remove_redundant_sorting)
503QueryPlanOptimizations::tryRemoveRedundantSorting(root);
504
505QueryPlanOptimizations::optimizeTreeFirstPass(optimization_settings, *root, nodes);
506QueryPlanOptimizations::optimizeTreeSecondPass(optimization_settings, *root, nodes);
507QueryPlanOptimizations::optimizeTreeThirdPass(*this, *root, nodes);
508
509updateDataStreams(*root);
510}
511
512void QueryPlan::explainEstimate(MutableColumns & columns)
513{
514checkInitialized();
515
516struct EstimateCounters
517{
518std::string database_name;
519std::string table_name;
520UInt64 parts = 0;
521UInt64 rows = 0;
522UInt64 marks = 0;
523
524EstimateCounters(const std::string & database, const std::string & table) : database_name(database), table_name(table)
525{
526}
527};
528
529using CountersPtr = std::shared_ptr<EstimateCounters>;
530std::unordered_map<std::string, CountersPtr> counters;
531using processNodeFuncType = std::function<void(const Node * node)>;
532processNodeFuncType process_node = [&counters, &process_node] (const Node * node)
533{
534if (!node)
535return;
536if (const auto * step = dynamic_cast<ReadFromMergeTree*>(node->step.get()))
537{
538const auto & id = step->getStorageID();
539auto key = id.database_name + "." + id.table_name;
540auto it = counters.find(key);
541if (it == counters.end())
542{
543it = counters.insert({key, std::make_shared<EstimateCounters>(id.database_name, id.table_name)}).first;
544}
545it->second->parts += step->getSelectedParts();
546it->second->rows += step->getSelectedRows();
547it->second->marks += step->getSelectedMarks();
548}
549for (const auto * child : node->children)
550process_node(child);
551};
552process_node(root);
553
554for (const auto & counter : counters)
555{
556size_t index = 0;
557const auto & database_name = counter.second->database_name;
558const auto & table_name = counter.second->table_name;
559columns[index++]->insertData(database_name.c_str(), database_name.size());
560columns[index++]->insertData(table_name.c_str(), table_name.size());
561columns[index++]->insert(counter.second->parts);
562columns[index++]->insert(counter.second->rows);
563columns[index++]->insert(counter.second->marks);
564}
565}
566
567std::pair<QueryPlan::Nodes, QueryPlanResourceHolder> QueryPlan::detachNodesAndResources(QueryPlan && plan)
568{
569return {std::move(plan.nodes), std::move(plan.resources)};
570}
571
572}
573