ClickHouse
236 строк · 7.3 Кб
1#include <exception>
2#include <Processors/QueryPlan/CreatingSetsStep.h>
3#include <Processors/QueryPlan/QueryPlan.h>
4//#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
5#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
6#include <QueryPipeline/QueryPipelineBuilder.h>
7#include <Processors/Transforms/CreatingSetsTransform.h>
8#include <IO/Operators.h>
9#include <Interpreters/ExpressionActions.h>
10#include <Common/JSONBuilder.h>
11#include <Interpreters/PreparedSets.h>
12#include <Interpreters/Context.h>
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19extern const int LOGICAL_ERROR;
20}
21
22static ITransformingStep::Traits getTraits()
23{
24return ITransformingStep::Traits
25{
26{
27.returns_single_stream = false,
28.preserves_number_of_streams = true,
29.preserves_sorting = true,
30},
31{
32.preserves_number_of_rows = true,
33}
34};
35}
36
37CreatingSetStep::CreatingSetStep(
38const DataStream & input_stream_,
39SetAndKeyPtr set_and_key_,
40StoragePtr external_table_,
41SizeLimits network_transfer_limits_,
42ContextPtr context_)
43: ITransformingStep(input_stream_, Block{}, getTraits())
44, set_and_key(std::move(set_and_key_))
45, external_table(std::move(external_table_))
46, network_transfer_limits(std::move(network_transfer_limits_))
47, context(std::move(context_))
48{
49}
50
51void CreatingSetStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
52{
53pipeline.addCreatingSetsTransform(getOutputStream().header, std::move(set_and_key), std::move(external_table), network_transfer_limits, context->getPreparedSetsCache());
54}
55
56void CreatingSetStep::updateOutputStream()
57{
58output_stream = createOutputStream(input_streams.front(), Block{}, getDataStreamTraits());
59}
60
61void CreatingSetStep::describeActions(FormatSettings & settings) const
62{
63String prefix(settings.offset, ' ');
64
65settings.out << prefix;
66if (set_and_key->set)
67settings.out << "Set: ";
68
69settings.out << set_and_key->key << '\n';
70}
71
72void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const
73{
74if (set_and_key->set)
75map.add("Set", set_and_key->key);
76}
77
78
79CreatingSetsStep::CreatingSetsStep(DataStreams input_streams_)
80{
81if (input_streams_.empty())
82throw Exception(ErrorCodes::LOGICAL_ERROR, "CreatingSetsStep cannot be created with no inputs");
83
84input_streams = std::move(input_streams_);
85output_stream = DataStream{input_streams.front().header};
86
87for (size_t i = 1; i < input_streams.size(); ++i)
88if (input_streams[i].header)
89throw Exception(ErrorCodes::LOGICAL_ERROR, "Creating set input must have empty header. Got: {}",
90input_streams[i].header.dumpStructure());
91}
92
93QueryPipelineBuilderPtr CreatingSetsStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
94{
95if (pipelines.empty())
96throw Exception(ErrorCodes::LOGICAL_ERROR, "CreatingSetsStep cannot be created with no inputs");
97
98auto main_pipeline = std::move(pipelines.front());
99if (pipelines.size() == 1)
100return main_pipeline;
101
102pipelines.erase(pipelines.begin());
103
104QueryPipelineBuilder delayed_pipeline;
105if (pipelines.size() > 1)
106{
107QueryPipelineProcessorsCollector collector(delayed_pipeline, this);
108delayed_pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines));
109processors = collector.detachProcessors();
110}
111else
112delayed_pipeline = std::move(*pipelines.front());
113
114QueryPipelineProcessorsCollector collector(*main_pipeline, this);
115main_pipeline->addPipelineBefore(std::move(delayed_pipeline));
116auto added_processors = collector.detachProcessors();
117processors.insert(processors.end(), added_processors.begin(), added_processors.end());
118
119return main_pipeline;
120}
121
122void CreatingSetsStep::describePipeline(FormatSettings & settings) const
123{
124IQueryPlanStep::describePipeline(processors, settings);
125}
126
127void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::Subqueries subqueries, ContextPtr context)
128{
129DataStreams input_streams;
130input_streams.emplace_back(query_plan.getCurrentDataStream());
131
132std::vector<std::unique_ptr<QueryPlan>> plans;
133plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
134query_plan = QueryPlan();
135
136for (auto & future_set : subqueries)
137{
138if (future_set->get())
139continue;
140
141auto plan = future_set->build(context);
142if (!plan)
143continue;
144
145input_streams.emplace_back(plan->getCurrentDataStream());
146plans.emplace_back(std::move(plan));
147}
148
149if (plans.size() == 1)
150{
151query_plan = std::move(*plans.front());
152return;
153}
154
155auto creating_sets = std::make_unique<CreatingSetsStep>(std::move(input_streams));
156creating_sets->setStepDescription("Create sets before main query execution");
157query_plan.unitePlans(std::move(creating_sets), std::move(plans));
158}
159
160QueryPipelineBuilderPtr addCreatingSetsTransform(QueryPipelineBuilderPtr pipeline, PreparedSets::Subqueries subqueries, ContextPtr context)
161{
162DataStreams input_streams;
163input_streams.emplace_back(DataStream{pipeline->getHeader()});
164
165QueryPipelineBuilders pipelines;
166pipelines.reserve(1 + subqueries.size());
167pipelines.push_back(std::move(pipeline));
168
169auto plan_settings = QueryPlanOptimizationSettings::fromContext(context);
170auto pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
171
172for (auto & future_set : subqueries)
173{
174if (future_set->get())
175continue;
176
177auto plan = future_set->build(context);
178if (!plan)
179continue;
180
181input_streams.emplace_back(plan->getCurrentDataStream());
182pipelines.emplace_back(plan->buildQueryPipeline(plan_settings, pipeline_settings));
183}
184
185return CreatingSetsStep(input_streams).updatePipeline(std::move(pipelines), pipeline_settings);
186}
187
188std::vector<std::unique_ptr<QueryPlan>> DelayedCreatingSetsStep::makePlansForSets(DelayedCreatingSetsStep && step)
189{
190std::vector<std::unique_ptr<QueryPlan>> plans;
191
192for (auto & future_set : step.subqueries)
193{
194if (future_set->get())
195continue;
196
197auto plan = future_set->build(step.context);
198if (!plan)
199continue;
200
201plan->optimize(QueryPlanOptimizationSettings::fromContext(step.context));
202
203plans.emplace_back(std::move(plan));
204}
205
206return plans;
207}
208
209void addCreatingSetsStep(QueryPlan & query_plan, PreparedSetsPtr prepared_sets, ContextPtr context)
210{
211if (!prepared_sets)
212return;
213
214auto subqueries = prepared_sets->getSubqueries();
215if (subqueries.empty())
216return;
217
218addCreatingSetsStep(query_plan, std::move(subqueries), context);
219}
220
221DelayedCreatingSetsStep::DelayedCreatingSetsStep(
222DataStream input_stream, PreparedSets::Subqueries subqueries_, ContextPtr context_)
223: subqueries(std::move(subqueries_)), context(std::move(context_))
224{
225input_streams = {input_stream};
226output_stream = std::move(input_stream);
227}
228
229QueryPipelineBuilderPtr DelayedCreatingSetsStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings &)
230{
231throw Exception(
232ErrorCodes::LOGICAL_ERROR,
233"Cannot build pipeline in DelayedCreatingSets. This step should be optimized out.");
234}
235
236}
237