ClickHouse

Форк
0
572 строки · 17.5 Кб
1
#include <stack>
2

3
#include <Common/JSONBuilder.h>
4

5
#include <Interpreters/ActionsDAG.h>
6
#include <Interpreters/ArrayJoinAction.h>
7

8
#include <IO/Operators.h>
9
#include <IO/WriteBuffer.h>
10

11
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
12
#include <Processors/QueryPlan/IQueryPlanStep.h>
13
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
14
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
15
#include <Processors/QueryPlan/QueryPlan.h>
16
#include <Processors/QueryPlan/ReadFromMergeTree.h>
17
#include <Processors/QueryPlan/ITransformingStep.h>
18
#include <Processors/QueryPlan/QueryPlanVisitor.h>
19

20
#include <QueryPipeline/QueryPipelineBuilder.h>
21

22

23
namespace DB
24
{
25

26
namespace ErrorCodes
27
{
28
    extern const int LOGICAL_ERROR;
29
}
30

31
QueryPlan::QueryPlan() = default;
32
QueryPlan::~QueryPlan() = default;
33
QueryPlan::QueryPlan(QueryPlan &&) noexcept = default;
34
QueryPlan & QueryPlan::operator=(QueryPlan &&) noexcept = default;
35

36
void QueryPlan::checkInitialized() const
37
{
38
    if (!isInitialized())
39
        throw Exception(ErrorCodes::LOGICAL_ERROR, "QueryPlan was not initialized");
40
}
41

42
void QueryPlan::checkNotCompleted() const
43
{
44
    if (isCompleted())
45
        throw Exception(ErrorCodes::LOGICAL_ERROR, "QueryPlan was already completed");
46
}
47

48
bool QueryPlan::isCompleted() const
49
{
50
    return isInitialized() && !root->step->hasOutputStream();
51
}
52

53
const DataStream & QueryPlan::getCurrentDataStream() const
54
{
55
    checkInitialized();
56
    checkNotCompleted();
57
    return root->step->getOutputStream();
58
}
59

60
void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<std::unique_ptr<QueryPlan>> plans)
61
{
62
    if (isInitialized())
63
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite plans because current QueryPlan is already initialized");
64

65
    const auto & inputs = step->getInputStreams();
66
    size_t num_inputs = step->getInputStreams().size();
67
    if (num_inputs != plans.size())
68
        throw Exception(
69
            ErrorCodes::LOGICAL_ERROR,
70
            "Cannot unite QueryPlans using {} because step has different number of inputs. Has {} plans and {} inputs",
71
            step->getName(),
72
            plans.size(),
73
            num_inputs);
74

75
    for (size_t i = 0; i < num_inputs; ++i)
76
    {
77
        const auto & step_header = inputs[i].header;
78
        const auto & plan_header = plans[i]->getCurrentDataStream().header;
79
        if (!blocksHaveEqualStructure(step_header, plan_header))
80
            throw Exception(
81
                ErrorCodes::LOGICAL_ERROR,
82
                "Cannot unite QueryPlans using {} because it has incompatible header with plan {} plan header: {} step header: {}",
83
                step->getName(),
84
                root->step->getName(),
85
                plan_header.dumpStructure(),
86
                step_header.dumpStructure());
87
    }
88

89
    for (auto & plan : plans)
90
        nodes.splice(nodes.end(), std::move(plan->nodes));
91

92
    nodes.emplace_back(Node{.step = std::move(step)});
93
    root = &nodes.back();
94

95
    for (auto & plan : plans)
96
        root->children.emplace_back(plan->root);
97

98
    for (auto & plan : plans)
99
    {
100
        max_threads = std::max(max_threads, plan->max_threads);
101
        resources = std::move(plan->resources);
102
    }
103
}
104

105
void QueryPlan::addStep(QueryPlanStepPtr step)
106
{
107
    checkNotCompleted();
108

109
    size_t num_input_streams = step->getInputStreams().size();
110

111
    if (num_input_streams == 0)
112
    {
113
        if (isInitialized())
114
            throw Exception(
115
                ErrorCodes::LOGICAL_ERROR,
116
                "Cannot add step {} to QueryPlan because step has no inputs, but QueryPlan is already initialized",
117
                step->getName());
118

119
        nodes.emplace_back(Node{.step = std::move(step)});
120
        root = &nodes.back();
121
        return;
122
    }
123

124
    if (num_input_streams == 1)
125
    {
126
        if (!isInitialized())
127
            throw Exception(
128
                ErrorCodes::LOGICAL_ERROR,
129
                "Cannot add step {} to QueryPlan because step has input, but QueryPlan is not initialized",
130
                step->getName());
131

132
        const auto & root_header = root->step->getOutputStream().header;
133
        const auto & step_header = step->getInputStreams().front().header;
134
        if (!blocksHaveEqualStructure(root_header, step_header))
135
            throw Exception(
136
                ErrorCodes::LOGICAL_ERROR,
137
                "Cannot add step {} to QueryPlan because it has incompatible header with root step {} root header: {} step header: {}",
138
                step->getName(),
139
                root->step->getName(),
140
                root_header.dumpStructure(),
141
                step_header.dumpStructure());
142

143
        nodes.emplace_back(Node{.step = std::move(step), .children = {root}});
144
        root = &nodes.back();
145
        return;
146
    }
147

148
    throw Exception(
149
        ErrorCodes::LOGICAL_ERROR,
150
        "Cannot add step {} to QueryPlan because it has {} inputs but {} input expected",
151
        step->getName(),
152
        num_input_streams,
153
        isInitialized() ? 1 : 0);
154
}
155

156
QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
157
    const QueryPlanOptimizationSettings & optimization_settings,
158
    const BuildQueryPipelineSettings & build_pipeline_settings)
159
{
160
    checkInitialized();
161
    optimize(optimization_settings);
162

163
    struct Frame
164
    {
165
        Node * node = {};
166
        QueryPipelineBuilders pipelines = {};
167
    };
168

169
    QueryPipelineBuilderPtr last_pipeline;
170

171
    std::stack<Frame> stack;
172
    stack.push(Frame{.node = root});
173

174
    while (!stack.empty())
175
    {
176
        auto & frame = stack.top();
177

178
        if (last_pipeline)
179
        {
180
            frame.pipelines.emplace_back(std::move(last_pipeline));
181
            last_pipeline = nullptr;
182
        }
183

184
        size_t next_child = frame.pipelines.size();
185
        if (next_child == frame.node->children.size())
186
        {
187
            bool limit_max_threads = frame.pipelines.empty();
188
            last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
189

190
            if (limit_max_threads && max_threads)
191
                last_pipeline->limitMaxThreads(max_threads);
192

193
            stack.pop();
194
        }
195
        else
196
            stack.push(Frame{.node = frame.node->children[next_child]});
197
    }
198

199
    last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
200
    last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
201
    last_pipeline->addResources(std::move(resources));
202

203
    return last_pipeline;
204
}
205

206
static void explainStep(const IQueryPlanStep & step, JSONBuilder::JSONMap & map, const QueryPlan::ExplainPlanOptions & options)
207
{
208
    map.add("Node Type", step.getName());
209

210
    if (options.description)
211
    {
212
        const auto & description = step.getStepDescription();
213
        if (!description.empty())
214
            map.add("Description", description);
215
    }
216

217
    if (options.header && step.hasOutputStream())
218
    {
219
        auto header_array = std::make_unique<JSONBuilder::JSONArray>();
220

221
        for (const auto & output_column : step.getOutputStream().header)
222
        {
223
            auto column_map = std::make_unique<JSONBuilder::JSONMap>();
224
            column_map->add("Name", output_column.name);
225
            if (output_column.type)
226
                column_map->add("Type", output_column.type->getName());
227

228
            header_array->add(std::move(column_map));
229
        }
230

231
        map.add("Header", std::move(header_array));
232
    }
233

234
    if (options.actions)
235
        step.describeActions(map);
236

237
    if (options.indexes)
238
        step.describeIndexes(map);
239
}
240

241
JSONBuilder::ItemPtr QueryPlan::explainPlan(const ExplainPlanOptions & options)
242
{
243
    checkInitialized();
244

245
    struct Frame
246
    {
247
        Node * node = {};
248
        size_t next_child = 0;
249
        std::unique_ptr<JSONBuilder::JSONMap> node_map = {};
250
        std::unique_ptr<JSONBuilder::JSONArray> children_array = {};
251
    };
252

253
    std::stack<Frame> stack;
254
    stack.push(Frame{.node = root});
255

256
    std::unique_ptr<JSONBuilder::JSONMap> tree;
257

258
    while (!stack.empty())
259
    {
260
        auto & frame = stack.top();
261

262
        if (frame.next_child == 0)
263
        {
264
            if (!frame.node->children.empty())
265
                frame.children_array = std::make_unique<JSONBuilder::JSONArray>();
266

267
            frame.node_map = std::make_unique<JSONBuilder::JSONMap>();
268
            explainStep(*frame.node->step, *frame.node_map, options);
269
        }
270

271
        if (frame.next_child < frame.node->children.size())
272
        {
273
            stack.push(Frame{frame.node->children[frame.next_child]});
274
            ++frame.next_child;
275
        }
276
        else
277
        {
278
            auto child_plans = frame.node->step->getChildPlans();
279

280
            if (!frame.children_array && !child_plans.empty())
281
                frame.children_array = std::make_unique<JSONBuilder::JSONArray>();
282

283
            for (const auto & child_plan : child_plans)
284
                frame.children_array->add(child_plan->explainPlan(options));
285

286
            if (frame.children_array)
287
                frame.node_map->add("Plans", std::move(frame.children_array));
288

289
            tree.swap(frame.node_map);
290
            stack.pop();
291

292
            if (!stack.empty())
293
                stack.top().children_array->add(std::move(tree));
294
        }
295
    }
296

297
    return tree;
298
}
299

300
static void explainStep(
301
    const IQueryPlanStep & step,
302
    IQueryPlanStep::FormatSettings & settings,
303
    const QueryPlan::ExplainPlanOptions & options)
304
{
305
    std::string prefix(settings.offset, ' ');
306
    settings.out << prefix;
307
    settings.out << step.getName();
308

309
    const auto & description = step.getStepDescription();
310
    if (options.description && !description.empty())
311
        settings.out <<" (" << description << ')';
312

313
    settings.out.write('\n');
314

315
    if (options.header)
316
    {
317
        settings.out << prefix;
318

319
        if (!step.hasOutputStream())
320
            settings.out << "No header";
321
        else if (!step.getOutputStream().header)
322
            settings.out << "Empty header";
323
        else
324
        {
325
            settings.out << "Header: ";
326
            bool first = true;
327

328
            for (const auto & elem : step.getOutputStream().header)
329
            {
330
                if (!first)
331
                    settings.out << "\n" << prefix << "        ";
332

333
                first = false;
334
                elem.dumpNameAndType(settings.out);
335
            }
336
        }
337
        settings.out.write('\n');
338

339
    }
340

341
    if (options.sorting)
342
    {
343
        if (step.hasOutputStream())
344
        {
345
            settings.out << prefix << "Sorting (" << step.getOutputStream().sort_scope << ")";
346
            if (step.getOutputStream().sort_scope != DataStream::SortScope::None)
347
            {
348
                settings.out << ": ";
349
                dumpSortDescription(step.getOutputStream().sort_description, settings.out);
350
            }
351
            settings.out.write('\n');
352
        }
353
    }
354

355
    if (options.actions)
356
        step.describeActions(settings);
357

358
    if (options.indexes)
359
        step.describeIndexes(settings);
360
}
361

362
std::string debugExplainStep(const IQueryPlanStep & step)
363
{
364
    WriteBufferFromOwnString out;
365
    IQueryPlanStep::FormatSettings settings{.out = out};
366
    QueryPlan::ExplainPlanOptions options{.actions = true};
367
    explainStep(step, settings, options);
368
    return out.str();
369
}
370

371
void QueryPlan::explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & options, size_t indent)
372
{
373
    checkInitialized();
374

375
    IQueryPlanStep::FormatSettings settings{.out = buffer, .write_header = options.header};
376

377
    struct Frame
378
    {
379
        Node * node = {};
380
        bool is_description_printed = false;
381
        size_t next_child = 0;
382
    };
383

384
    std::stack<Frame> stack;
385
    stack.push(Frame{.node = root});
386

387
    while (!stack.empty())
388
    {
389
        auto & frame = stack.top();
390

391
        if (!frame.is_description_printed)
392
        {
393
            settings.offset = (indent + stack.size() - 1) * settings.indent;
394
            explainStep(*frame.node->step, settings, options);
395
            frame.is_description_printed = true;
396
        }
397

398
        if (frame.next_child < frame.node->children.size())
399
        {
400
            stack.push(Frame{frame.node->children[frame.next_child]});
401
            ++frame.next_child;
402
        }
403
        else
404
        {
405
            auto child_plans = frame.node->step->getChildPlans();
406

407
            for (const auto & child_plan : child_plans)
408
                child_plan->explainPlan(buffer, options, indent + stack.size());
409

410
            stack.pop();
411
        }
412
    }
413
}
414

415
static void explainPipelineStep(IQueryPlanStep & step, IQueryPlanStep::FormatSettings & settings)
416
{
417
    settings.out << String(settings.offset, settings.indent_char) << "(" << step.getName() << ")\n";
418

419
    size_t current_offset = settings.offset;
420
    step.describePipeline(settings);
421
    if (current_offset == settings.offset)
422
        settings.offset += settings.indent;
423
}
424

425
void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptions & options)
426
{
427
    checkInitialized();
428

429
    IQueryPlanStep::FormatSettings settings{.out = buffer, .write_header = options.header};
430

431
    struct Frame
432
    {
433
        Node * node = {};
434
        size_t offset = 0;
435
        bool is_description_printed = false;
436
        size_t next_child = 0;
437
    };
438

439
    std::stack<Frame> stack;
440
    stack.push(Frame{.node = root});
441

442
    while (!stack.empty())
443
    {
444
        auto & frame = stack.top();
445

446
        if (!frame.is_description_printed)
447
        {
448
            settings.offset = frame.offset;
449
            explainPipelineStep(*frame.node->step, settings);
450
            frame.offset = settings.offset;
451
            frame.is_description_printed = true;
452
        }
453

454
        if (frame.next_child < frame.node->children.size())
455
        {
456
            stack.push(Frame{frame.node->children[frame.next_child], frame.offset});
457
            ++frame.next_child;
458
        }
459
        else
460
            stack.pop();
461
    }
462
}
463

464
static void updateDataStreams(QueryPlan::Node & root)
465
{
466
    class UpdateDataStreams : public QueryPlanVisitor<UpdateDataStreams, false>
467
    {
468
    public:
469
        explicit UpdateDataStreams(QueryPlan::Node * root_) : QueryPlanVisitor<UpdateDataStreams, false>(root_) { }
470

471
        static bool visitTopDownImpl(QueryPlan::Node * /*current_node*/, QueryPlan::Node * /*parent_node*/) { return true; }
472

473
        static void visitBottomUpImpl(QueryPlan::Node * current_node, QueryPlan::Node * /*parent_node*/)
474
        {
475
            auto & current_step = *current_node->step;
476
            if (!current_step.canUpdateInputStream() || current_node->children.empty())
477
                return;
478

479
            for (const auto * child : current_node->children)
480
            {
481
                if (!child->step->hasOutputStream())
482
                    return;
483
            }
484

485
            DataStreams streams;
486
            streams.reserve(current_node->children.size());
487
            for (const auto * child : current_node->children)
488
                streams.emplace_back(child->step->getOutputStream());
489

490
            current_step.updateInputStreams(std::move(streams));
491
        }
492
    };
493

494
    UpdateDataStreams(&root).visit();
495
}
496

497
void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings)
498
{
499
    /// optimization need to be applied before "mergeExpressions" optimization
500
    /// it removes redundant sorting steps, but keep underlying expressions,
501
    /// so "mergeExpressions" optimization handles them afterwards
502
    if (optimization_settings.remove_redundant_sorting)
503
        QueryPlanOptimizations::tryRemoveRedundantSorting(root);
504

505
    QueryPlanOptimizations::optimizeTreeFirstPass(optimization_settings, *root, nodes);
506
    QueryPlanOptimizations::optimizeTreeSecondPass(optimization_settings, *root, nodes);
507
    QueryPlanOptimizations::optimizeTreeThirdPass(*this, *root, nodes);
508

509
    updateDataStreams(*root);
510
}
511

512
void QueryPlan::explainEstimate(MutableColumns & columns)
513
{
514
    checkInitialized();
515

516
    struct EstimateCounters
517
    {
518
        std::string database_name;
519
        std::string table_name;
520
        UInt64 parts = 0;
521
        UInt64 rows = 0;
522
        UInt64 marks = 0;
523

524
        EstimateCounters(const std::string & database, const std::string & table) : database_name(database), table_name(table)
525
        {
526
        }
527
    };
528

529
    using CountersPtr = std::shared_ptr<EstimateCounters>;
530
    std::unordered_map<std::string, CountersPtr> counters;
531
    using processNodeFuncType = std::function<void(const Node * node)>;
532
    processNodeFuncType process_node = [&counters, &process_node] (const Node * node)
533
    {
534
        if (!node)
535
            return;
536
        if (const auto * step = dynamic_cast<ReadFromMergeTree*>(node->step.get()))
537
        {
538
            const auto & id = step->getStorageID();
539
            auto key = id.database_name + "." + id.table_name;
540
            auto it = counters.find(key);
541
            if (it == counters.end())
542
            {
543
                it = counters.insert({key, std::make_shared<EstimateCounters>(id.database_name, id.table_name)}).first;
544
            }
545
            it->second->parts += step->getSelectedParts();
546
            it->second->rows += step->getSelectedRows();
547
            it->second->marks += step->getSelectedMarks();
548
        }
549
        for (const auto * child : node->children)
550
            process_node(child);
551
    };
552
    process_node(root);
553

554
    for (const auto & counter : counters)
555
    {
556
        size_t index = 0;
557
        const auto & database_name = counter.second->database_name;
558
        const auto & table_name = counter.second->table_name;
559
        columns[index++]->insertData(database_name.c_str(), database_name.size());
560
        columns[index++]->insertData(table_name.c_str(), table_name.size());
561
        columns[index++]->insert(counter.second->parts);
562
        columns[index++]->insert(counter.second->rows);
563
        columns[index++]->insert(counter.second->marks);
564
    }
565
}
566

567
std::pair<QueryPlan::Nodes, QueryPlanResourceHolder> QueryPlan::detachNodesAndResources(QueryPlan && plan)
568
{
569
    return {std::move(plan.nodes), std::move(plan.resources)};
570
}
571

572
}
573

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.