ClickHouse
109 строк · 2.4 Кб
1#include <Processors/ISimpleTransform.h>
2
3
4namespace DB
5{
6
7ISimpleTransform::ISimpleTransform(Block input_header_, Block output_header_, bool skip_empty_chunks_)
8: IProcessor({std::move(input_header_)}, {std::move(output_header_)})
9, input(inputs.front())
10, output(outputs.front())
11, skip_empty_chunks(skip_empty_chunks_)
12{
13}
14
15ISimpleTransform::Status ISimpleTransform::prepare()
16{
17/// Check can output.
18
19if (output.isFinished())
20{
21input.close();
22return Status::Finished;
23}
24
25if (!output.canPush())
26{
27input.setNotNeeded();
28return Status::PortFull;
29}
30
31/// Output if has data.
32if (has_output)
33{
34output.pushData(std::move(output_data));
35has_output = false;
36
37if (!no_more_data_needed)
38return Status::PortFull;
39
40}
41
42/// Stop if don't need more data.
43if (no_more_data_needed)
44{
45input.close();
46output.finish();
47return Status::Finished;
48}
49
50/// Check can input.
51if (!has_input)
52{
53if (input.isFinished())
54{
55output.finish();
56return Status::Finished;
57}
58
59input.setNeeded();
60
61if (!input.hasData())
62return Status::NeedData;
63
64input_data = input.pullData(set_input_not_needed_after_read);
65has_input = true;
66
67if (input_data.exception)
68/// No more data needed. Exception will be thrown (or swallowed) later.
69input.setNotNeeded();
70}
71
72/// Now transform.
73return Status::Ready;
74}
75
76void ISimpleTransform::work()
77{
78if (input_data.exception)
79{
80/// Skip transform in case of exception.
81output_data = std::move(input_data);
82has_input = false;
83has_output = true;
84return;
85}
86
87try
88{
89transform(input_data.chunk, output_data.chunk);
90}
91catch (DB::Exception &)
92{
93output_data.exception = std::current_exception();
94has_output = true;
95has_input = false;
96return;
97}
98
99has_input = !needInputData();
100
101if (!skip_empty_chunks || output_data.chunk)
102has_output = true;
103
104if (has_output && !output_data.chunk && getOutputPort().getHeader())
105/// Support invariant that chunks must have the same number of columns as header.
106output_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0);
107}
108
109}
110
111