ClickHouse

Форк
0
/
CreatingSetsStep.cpp 
236 строк · 7.3 Кб
1
#include <exception>
2
#include <Processors/QueryPlan/CreatingSetsStep.h>
3
#include <Processors/QueryPlan/QueryPlan.h>
4
//#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
5
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
6
#include <QueryPipeline/QueryPipelineBuilder.h>
7
#include <Processors/Transforms/CreatingSetsTransform.h>
8
#include <IO/Operators.h>
9
#include <Interpreters/ExpressionActions.h>
10
#include <Common/JSONBuilder.h>
11
#include <Interpreters/PreparedSets.h>
12
#include <Interpreters/Context.h>
13

14
namespace DB
15
{
16

17
namespace ErrorCodes
18
{
19
    extern const int LOGICAL_ERROR;
20
}
21

22
static ITransformingStep::Traits getTraits()
23
{
24
    return ITransformingStep::Traits
25
    {
26
        {
27
            .returns_single_stream = false,
28
            .preserves_number_of_streams = true,
29
            .preserves_sorting = true,
30
        },
31
        {
32
            .preserves_number_of_rows = true,
33
        }
34
    };
35
}
36

37
CreatingSetStep::CreatingSetStep(
38
    const DataStream & input_stream_,
39
    SetAndKeyPtr set_and_key_,
40
    StoragePtr external_table_,
41
    SizeLimits network_transfer_limits_,
42
    ContextPtr context_)
43
    : ITransformingStep(input_stream_, Block{}, getTraits())
44
    , set_and_key(std::move(set_and_key_))
45
    , external_table(std::move(external_table_))
46
    , network_transfer_limits(std::move(network_transfer_limits_))
47
    , context(std::move(context_))
48
{
49
}
50

51
void CreatingSetStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
52
{
53
    pipeline.addCreatingSetsTransform(getOutputStream().header, std::move(set_and_key), std::move(external_table), network_transfer_limits, context->getPreparedSetsCache());
54
}
55

56
void CreatingSetStep::updateOutputStream()
57
{
58
    output_stream = createOutputStream(input_streams.front(), Block{}, getDataStreamTraits());
59
}
60

61
void CreatingSetStep::describeActions(FormatSettings & settings) const
62
{
63
    String prefix(settings.offset, ' ');
64

65
    settings.out << prefix;
66
    if (set_and_key->set)
67
        settings.out << "Set: ";
68

69
    settings.out << set_and_key->key << '\n';
70
}
71

72
void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const
73
{
74
    if (set_and_key->set)
75
        map.add("Set", set_and_key->key);
76
}
77

78

79
CreatingSetsStep::CreatingSetsStep(DataStreams input_streams_)
80
{
81
    if (input_streams_.empty())
82
        throw Exception(ErrorCodes::LOGICAL_ERROR, "CreatingSetsStep cannot be created with no inputs");
83

84
    input_streams = std::move(input_streams_);
85
    output_stream = DataStream{input_streams.front().header};
86

87
    for (size_t i = 1; i < input_streams.size(); ++i)
88
        if (input_streams[i].header)
89
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Creating set input must have empty header. Got: {}",
90
                            input_streams[i].header.dumpStructure());
91
}
92

93
QueryPipelineBuilderPtr CreatingSetsStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
94
{
95
    if (pipelines.empty())
96
        throw Exception(ErrorCodes::LOGICAL_ERROR, "CreatingSetsStep cannot be created with no inputs");
97

98
    auto main_pipeline = std::move(pipelines.front());
99
    if (pipelines.size() == 1)
100
        return main_pipeline;
101

102
    pipelines.erase(pipelines.begin());
103

104
    QueryPipelineBuilder delayed_pipeline;
105
    if (pipelines.size() > 1)
106
    {
107
        QueryPipelineProcessorsCollector collector(delayed_pipeline, this);
108
        delayed_pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines));
109
        processors = collector.detachProcessors();
110
    }
111
    else
112
        delayed_pipeline = std::move(*pipelines.front());
113

114
    QueryPipelineProcessorsCollector collector(*main_pipeline, this);
115
    main_pipeline->addPipelineBefore(std::move(delayed_pipeline));
116
    auto added_processors = collector.detachProcessors();
117
    processors.insert(processors.end(), added_processors.begin(), added_processors.end());
118

119
    return main_pipeline;
120
}
121

122
void CreatingSetsStep::describePipeline(FormatSettings & settings) const
123
{
124
    IQueryPlanStep::describePipeline(processors, settings);
125
}
126

127
void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::Subqueries subqueries, ContextPtr context)
128
{
129
    DataStreams input_streams;
130
    input_streams.emplace_back(query_plan.getCurrentDataStream());
131

132
    std::vector<std::unique_ptr<QueryPlan>> plans;
133
    plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
134
    query_plan = QueryPlan();
135

136
    for (auto & future_set : subqueries)
137
    {
138
        if (future_set->get())
139
            continue;
140

141
        auto plan = future_set->build(context);
142
        if (!plan)
143
            continue;
144

145
        input_streams.emplace_back(plan->getCurrentDataStream());
146
        plans.emplace_back(std::move(plan));
147
    }
148

149
    if (plans.size() == 1)
150
    {
151
        query_plan = std::move(*plans.front());
152
        return;
153
    }
154

155
    auto creating_sets = std::make_unique<CreatingSetsStep>(std::move(input_streams));
156
    creating_sets->setStepDescription("Create sets before main query execution");
157
    query_plan.unitePlans(std::move(creating_sets), std::move(plans));
158
}
159

160
QueryPipelineBuilderPtr addCreatingSetsTransform(QueryPipelineBuilderPtr pipeline, PreparedSets::Subqueries subqueries, ContextPtr context)
161
{
162
    DataStreams input_streams;
163
    input_streams.emplace_back(DataStream{pipeline->getHeader()});
164

165
    QueryPipelineBuilders pipelines;
166
    pipelines.reserve(1 + subqueries.size());
167
    pipelines.push_back(std::move(pipeline));
168

169
    auto plan_settings = QueryPlanOptimizationSettings::fromContext(context);
170
    auto pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
171

172
    for (auto & future_set : subqueries)
173
    {
174
        if (future_set->get())
175
            continue;
176

177
        auto plan = future_set->build(context);
178
        if (!plan)
179
            continue;
180

181
        input_streams.emplace_back(plan->getCurrentDataStream());
182
        pipelines.emplace_back(plan->buildQueryPipeline(plan_settings, pipeline_settings));
183
    }
184

185
    return CreatingSetsStep(input_streams).updatePipeline(std::move(pipelines), pipeline_settings);
186
}
187

188
std::vector<std::unique_ptr<QueryPlan>> DelayedCreatingSetsStep::makePlansForSets(DelayedCreatingSetsStep && step)
189
{
190
    std::vector<std::unique_ptr<QueryPlan>> plans;
191

192
    for (auto & future_set : step.subqueries)
193
    {
194
        if (future_set->get())
195
            continue;
196

197
        auto plan = future_set->build(step.context);
198
        if (!plan)
199
            continue;
200

201
        plan->optimize(QueryPlanOptimizationSettings::fromContext(step.context));
202

203
        plans.emplace_back(std::move(plan));
204
    }
205

206
    return plans;
207
}
208

209
void addCreatingSetsStep(QueryPlan & query_plan, PreparedSetsPtr prepared_sets, ContextPtr context)
210
{
211
    if (!prepared_sets)
212
        return;
213

214
    auto subqueries = prepared_sets->getSubqueries();
215
    if (subqueries.empty())
216
        return;
217

218
    addCreatingSetsStep(query_plan, std::move(subqueries), context);
219
}
220

221
DelayedCreatingSetsStep::DelayedCreatingSetsStep(
222
    DataStream input_stream, PreparedSets::Subqueries subqueries_, ContextPtr context_)
223
    : subqueries(std::move(subqueries_)), context(std::move(context_))
224
{
225
    input_streams = {input_stream};
226
    output_stream = std::move(input_stream);
227
}
228

229
QueryPipelineBuilderPtr DelayedCreatingSetsStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings &)
230
{
231
    throw Exception(
232
        ErrorCodes::LOGICAL_ERROR,
233
        "Cannot build pipeline in DelayedCreatingSets. This step should be optimized out.");
234
}
235

236
}
237

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.