ClickHouse
182 строки · 4.8 Кб
1#include <Processors/DelayedPortsProcessor.h>
2
3#include <base/sort.h>
4
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11extern const int LOGICAL_ERROR;
12}
13
14InputPorts createInputPorts(
15const Block & header,
16size_t num_ports,
17IProcessor::PortNumbers delayed_ports,
18bool assert_main_ports_empty)
19{
20if (!assert_main_ports_empty)
21return InputPorts(num_ports, header);
22
23InputPorts res;
24::sort(delayed_ports.begin(), delayed_ports.end());
25size_t next_delayed_port = 0;
26for (size_t i = 0; i < num_ports; ++i)
27{
28if (next_delayed_port < delayed_ports.size() && i == delayed_ports[next_delayed_port])
29{
30res.emplace_back(header);
31++next_delayed_port;
32}
33else
34res.emplace_back(Block());
35}
36
37return res;
38}
39
40DelayedPortsProcessor::DelayedPortsProcessor(
41const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_main_ports_empty)
42: IProcessor(createInputPorts(header, num_ports, delayed_ports, assert_main_ports_empty),
43OutputPorts((assert_main_ports_empty ? delayed_ports.size() : num_ports), header))
44, num_delayed_ports(delayed_ports.size())
45{
46port_pairs.resize(num_ports);
47output_to_pair.reserve(outputs.size());
48
49for (const auto & delayed : delayed_ports)
50port_pairs[delayed].is_delayed = true;
51
52auto input_it = inputs.begin();
53auto output_it = outputs.begin();
54for (size_t i = 0; i < num_ports; ++i)
55{
56port_pairs[i].input_port = &*input_it;
57++input_it;
58
59if (port_pairs[i].is_delayed || !assert_main_ports_empty)
60{
61port_pairs[i].output_port = &*output_it;
62output_to_pair.push_back(i);
63++output_it;
64}
65}
66}
67
68void DelayedPortsProcessor::finishPair(PortsPair & pair)
69{
70if (!pair.is_finished)
71{
72if (pair.output_port)
73pair.output_port->finish();
74
75pair.input_port->close();
76
77pair.is_finished = true;
78++num_finished_inputs;
79
80if (pair.output_port)
81++num_finished_outputs;
82
83if (!pair.is_delayed)
84++num_finished_main_inputs;
85}
86}
87
88bool DelayedPortsProcessor::processPair(PortsPair & pair)
89{
90if (pair.output_port && pair.output_port->isFinished())
91{
92finishPair(pair);
93return false;
94}
95
96if (pair.input_port->isFinished())
97{
98finishPair(pair);
99return false;
100}
101
102if (pair.output_port && !pair.output_port->canPush())
103return false;
104
105pair.input_port->setNeeded();
106if (pair.input_port->hasData())
107{
108if (!pair.output_port)
109throw Exception(ErrorCodes::LOGICAL_ERROR,
110"Input port for DelayedPortsProcessor is assumed to have no data, but it has one");
111
112pair.output_port->pushData(pair.input_port->pullData(true));
113}
114
115return true;
116}
117
118
119bool DelayedPortsProcessor::shouldSkipDelayed() const
120{
121return num_finished_main_inputs + num_delayed_ports < port_pairs.size();
122}
123
124IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
125{
126bool skip_delayed = shouldSkipDelayed();
127bool need_data = false;
128
129if (!are_inputs_initialized && !updated_outputs.empty())
130{
131/// Activate inputs with no output.
132for (const auto & pair : port_pairs)
133if (!pair.output_port)
134pair.input_port->setNeeded();
135
136are_inputs_initialized = true;
137}
138
139for (const auto & output_number : updated_outputs)
140{
141auto & pair = port_pairs[output_to_pair[output_number]];
142
143/// Finish pair of ports earlier if possible.
144if (!pair.is_finished && pair.output_port && pair.output_port->isFinished())
145finishPair(pair);
146else if (!skip_delayed || !pair.is_delayed)
147need_data = processPair(pair) || need_data;
148}
149
150/// Do not wait for delayed ports if all output ports are finished.
151if (num_finished_outputs == outputs.size())
152{
153for (auto & pair : port_pairs)
154finishPair(pair);
155
156return Status::Finished;
157}
158
159for (const auto & input_number : updated_inputs)
160{
161if (!skip_delayed || !port_pairs[input_number].is_delayed)
162need_data = processPair(port_pairs[input_number]) || need_data;
163}
164
165/// In case if main streams are finished at current iteration, start processing delayed streams.
166if (skip_delayed && !shouldSkipDelayed())
167{
168for (auto & pair : port_pairs)
169if (pair.is_delayed)
170need_data = processPair(pair) || need_data;
171}
172
173if (num_finished_inputs == port_pairs.size())
174return Status::Finished;
175
176if (need_data)
177return Status::NeedData;
178
179return Status::PortFull;
180}
181
182}
183