ClickHouse
225 строк · 5.3 Кб
1#include <Processors/Merges/IMergingTransform.h>
2
3namespace DB
4{
5
6namespace ErrorCodes
7{
8extern const int LOGICAL_ERROR;
9extern const int NOT_IMPLEMENTED;
10}
11
12IMergingTransformBase::IMergingTransformBase(
13size_t num_inputs,
14const Block & input_header,
15const Block & output_header,
16bool have_all_inputs_,
17UInt64 limit_hint_,
18bool always_read_till_end_)
19: IProcessor(InputPorts(num_inputs, input_header), {output_header})
20, have_all_inputs(have_all_inputs_)
21, limit_hint(limit_hint_)
22, always_read_till_end(always_read_till_end_)
23{
24}
25
26static InputPorts createPorts(const Blocks & blocks)
27{
28InputPorts ports;
29for (const auto & block : blocks)
30ports.emplace_back(block);
31return ports;
32}
33
34IMergingTransformBase::IMergingTransformBase(
35const Blocks & input_headers,
36const Block & output_header,
37bool have_all_inputs_,
38UInt64 limit_hint_,
39bool always_read_till_end_)
40: IProcessor(createPorts(input_headers), {output_header})
41, have_all_inputs(have_all_inputs_)
42, limit_hint(limit_hint_)
43, always_read_till_end(always_read_till_end_)
44{
45}
46
47void IMergingTransformBase::onNewInput()
48{
49throw Exception(ErrorCodes::NOT_IMPLEMENTED, "onNewInput is not implemented for {}", getName());
50}
51
52void IMergingTransformBase::addInput()
53{
54if (have_all_inputs)
55throw Exception(ErrorCodes::LOGICAL_ERROR, "IMergingTransform already have all inputs.");
56
57inputs.emplace_back(outputs.front().getHeader(), this);
58onNewInput();
59}
60
61void IMergingTransformBase::setHaveAllInputs()
62{
63if (have_all_inputs)
64throw Exception(ErrorCodes::LOGICAL_ERROR, "IMergingTransform already have all inputs.");
65
66have_all_inputs = true;
67}
68
69IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
70{
71/// Add information about inputs.
72if (input_states.empty())
73{
74input_states.reserve(inputs.size());
75for (auto & input : inputs)
76input_states.emplace_back(input);
77
78state.init_chunks.resize(inputs.size());
79}
80
81/// Check for inputs we need.
82bool all_inputs_has_data = true;
83auto it = inputs.begin();
84for (size_t i = 0; it != inputs.end(); ++i, ++it)
85{
86auto & input = *it;
87if (input.isFinished())
88continue;
89
90if (input_states[i].is_initialized)
91continue;
92
93input.setNeeded();
94
95if (!input.hasData())
96{
97all_inputs_has_data = false;
98continue;
99}
100
101/// setNotNeeded after reading first chunk, because in optimismtic case
102/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
103/// we won't have to read any chunks anymore;
104auto chunk = input.pull(limit_hint != 0);
105if ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end)
106input.setNeeded();
107
108if (!chunk.hasRows())
109{
110if (!input.isFinished())
111{
112input.setNeeded();
113all_inputs_has_data = false;
114}
115
116continue;
117}
118
119state.init_chunks[i].set(std::move(chunk));
120input_states[i].is_initialized = true;
121}
122
123if (!all_inputs_has_data)
124return Status::NeedData;
125
126is_initialized = true;
127return Status::Ready;
128}
129
130IProcessor::Status IMergingTransformBase::prepare()
131{
132if (!have_all_inputs)
133return Status::NeedData;
134
135auto & output = outputs.front();
136
137/// Special case for no inputs.
138if (inputs.empty())
139{
140output.finish();
141onFinish();
142return Status::Finished;
143}
144
145/// Check can output.
146
147if (output.isFinished())
148{
149for (auto & in : inputs)
150in.close();
151
152onFinish();
153return Status::Finished;
154}
155
156/// Do not disable inputs, so they can be executed in parallel.
157bool is_port_full = !output.canPush();
158
159/// Push if has data.
160if ((state.output_chunk || state.output_chunk.hasChunkInfo()) && !is_port_full)
161output.push(std::move(state.output_chunk));
162
163if (!is_initialized)
164return prepareInitializeInputs();
165
166if (state.is_finished)
167{
168if (is_port_full)
169return Status::PortFull;
170
171if (always_read_till_end)
172{
173for (auto & input : inputs)
174{
175if (!input.isFinished())
176{
177input.setNeeded();
178if (input.hasData())
179std::ignore = input.pull();
180
181return Status::NeedData;
182}
183}
184}
185
186for (auto & input : inputs)
187input.close();
188
189outputs.front().finish();
190
191onFinish();
192return Status::Finished;
193}
194
195if (state.need_data)
196{
197auto & input = input_states[state.next_input_to_read].port;
198if (!input.isFinished())
199{
200input.setNeeded();
201
202if (!input.hasData())
203return Status::NeedData;
204
205state.input_chunk.set(input.pull());
206if (!state.input_chunk.chunk.hasRows() && !input.isFinished())
207return Status::NeedData;
208
209state.has_input = true;
210}
211else
212{
213state.no_data = true;
214}
215
216state.need_data = false;
217}
218
219if (is_port_full)
220return Status::PortFull;
221
222return Status::Ready;
223}
224
225}
226