ClickHouse

Форк
0
/
DelayedPortsProcessor.cpp 
182 строки · 4.8 Кб
1
#include <Processors/DelayedPortsProcessor.h>
2

3
#include <base/sort.h>
4

5

6
namespace DB
7
{
8

9
namespace ErrorCodes
10
{
11
    extern const int LOGICAL_ERROR;
12
}
13

14
InputPorts createInputPorts(
15
    const Block & header,
16
    size_t num_ports,
17
    IProcessor::PortNumbers delayed_ports,
18
    bool assert_main_ports_empty)
19
{
20
    if (!assert_main_ports_empty)
21
        return InputPorts(num_ports, header);
22

23
    InputPorts res;
24
    ::sort(delayed_ports.begin(), delayed_ports.end());
25
    size_t next_delayed_port = 0;
26
    for (size_t i = 0; i < num_ports; ++i)
27
    {
28
        if (next_delayed_port < delayed_ports.size() && i == delayed_ports[next_delayed_port])
29
        {
30
            res.emplace_back(header);
31
            ++next_delayed_port;
32
        }
33
        else
34
            res.emplace_back(Block());
35
    }
36

37
    return res;
38
}
39

40
DelayedPortsProcessor::DelayedPortsProcessor(
41
    const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_main_ports_empty)
42
    : IProcessor(createInputPorts(header, num_ports, delayed_ports, assert_main_ports_empty),
43
                 OutputPorts((assert_main_ports_empty ? delayed_ports.size() : num_ports), header))
44
    , num_delayed_ports(delayed_ports.size())
45
{
46
    port_pairs.resize(num_ports);
47
    output_to_pair.reserve(outputs.size());
48

49
    for (const auto & delayed : delayed_ports)
50
        port_pairs[delayed].is_delayed = true;
51

52
    auto input_it = inputs.begin();
53
    auto output_it = outputs.begin();
54
    for (size_t i = 0; i < num_ports; ++i)
55
    {
56
        port_pairs[i].input_port = &*input_it;
57
        ++input_it;
58

59
        if (port_pairs[i].is_delayed || !assert_main_ports_empty)
60
        {
61
            port_pairs[i].output_port = &*output_it;
62
            output_to_pair.push_back(i);
63
            ++output_it;
64
        }
65
    }
66
}
67

68
void DelayedPortsProcessor::finishPair(PortsPair & pair)
69
{
70
    if (!pair.is_finished)
71
    {
72
        if (pair.output_port)
73
            pair.output_port->finish();
74

75
        pair.input_port->close();
76

77
        pair.is_finished = true;
78
        ++num_finished_inputs;
79

80
        if (pair.output_port)
81
            ++num_finished_outputs;
82

83
        if (!pair.is_delayed)
84
            ++num_finished_main_inputs;
85
    }
86
}
87

88
bool DelayedPortsProcessor::processPair(PortsPair & pair)
89
{
90
    if (pair.output_port && pair.output_port->isFinished())
91
    {
92
        finishPair(pair);
93
        return false;
94
    }
95

96
    if (pair.input_port->isFinished())
97
    {
98
        finishPair(pair);
99
        return false;
100
    }
101

102
    if (pair.output_port && !pair.output_port->canPush())
103
        return false;
104

105
    pair.input_port->setNeeded();
106
    if (pair.input_port->hasData())
107
    {
108
        if (!pair.output_port)
109
            throw Exception(ErrorCodes::LOGICAL_ERROR,
110
                            "Input port for DelayedPortsProcessor is assumed to have no data, but it has one");
111

112
        pair.output_port->pushData(pair.input_port->pullData(true));
113
    }
114

115
    return true;
116
}
117

118

119
bool DelayedPortsProcessor::shouldSkipDelayed() const
120
{
121
    return num_finished_main_inputs + num_delayed_ports < port_pairs.size();
122
}
123

124
IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
125
{
126
    bool skip_delayed = shouldSkipDelayed();
127
    bool need_data = false;
128

129
    if (!are_inputs_initialized && !updated_outputs.empty())
130
    {
131
        /// Activate inputs with no output.
132
        for (const auto & pair : port_pairs)
133
            if (!pair.output_port)
134
                pair.input_port->setNeeded();
135

136
        are_inputs_initialized = true;
137
    }
138

139
    for (const auto & output_number : updated_outputs)
140
    {
141
        auto & pair = port_pairs[output_to_pair[output_number]];
142

143
        /// Finish pair of ports earlier if possible.
144
        if (!pair.is_finished && pair.output_port && pair.output_port->isFinished())
145
            finishPair(pair);
146
        else if (!skip_delayed || !pair.is_delayed)
147
            need_data = processPair(pair) || need_data;
148
    }
149

150
    /// Do not wait for delayed ports if all output ports are finished.
151
    if (num_finished_outputs == outputs.size())
152
    {
153
        for (auto & pair : port_pairs)
154
            finishPair(pair);
155

156
        return Status::Finished;
157
    }
158

159
    for (const auto & input_number : updated_inputs)
160
    {
161
        if (!skip_delayed || !port_pairs[input_number].is_delayed)
162
            need_data = processPair(port_pairs[input_number]) || need_data;
163
    }
164

165
    /// In case if main streams are finished at current iteration, start processing delayed streams.
166
    if (skip_delayed && !shouldSkipDelayed())
167
    {
168
        for (auto & pair : port_pairs)
169
            if (pair.is_delayed)
170
                need_data = processPair(pair) || need_data;
171
    }
172

173
    if (num_finished_inputs == port_pairs.size())
174
        return Status::Finished;
175

176
    if (need_data)
177
        return Status::NeedData;
178

179
    return Status::PortFull;
180
}
181

182
}
183

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.