ClickHouse

Форк
0
/
IMergingTransform.cpp 
225 строк · 5.3 Кб
1
#include <Processors/Merges/IMergingTransform.h>
2

3
namespace DB
4
{
5

6
namespace ErrorCodes
7
{
8
    extern const int LOGICAL_ERROR;
9
    extern const int NOT_IMPLEMENTED;
10
}
11

12
IMergingTransformBase::IMergingTransformBase(
13
    size_t num_inputs,
14
    const Block & input_header,
15
    const Block & output_header,
16
    bool have_all_inputs_,
17
    UInt64 limit_hint_,
18
    bool always_read_till_end_)
19
    : IProcessor(InputPorts(num_inputs, input_header), {output_header})
20
    , have_all_inputs(have_all_inputs_)
21
    , limit_hint(limit_hint_)
22
    , always_read_till_end(always_read_till_end_)
23
{
24
}
25

26
static InputPorts createPorts(const Blocks & blocks)
27
{
28
    InputPorts ports;
29
    for (const auto & block : blocks)
30
        ports.emplace_back(block);
31
    return ports;
32
}
33

34
IMergingTransformBase::IMergingTransformBase(
35
    const Blocks & input_headers,
36
    const Block & output_header,
37
    bool have_all_inputs_,
38
    UInt64 limit_hint_,
39
    bool always_read_till_end_)
40
    : IProcessor(createPorts(input_headers), {output_header})
41
    , have_all_inputs(have_all_inputs_)
42
    , limit_hint(limit_hint_)
43
    , always_read_till_end(always_read_till_end_)
44
{
45
}
46

47
void IMergingTransformBase::onNewInput()
48
{
49
    throw Exception(ErrorCodes::NOT_IMPLEMENTED, "onNewInput is not implemented for {}", getName());
50
}
51

52
void IMergingTransformBase::addInput()
53
{
54
    if (have_all_inputs)
55
        throw Exception(ErrorCodes::LOGICAL_ERROR, "IMergingTransform already have all inputs.");
56

57
    inputs.emplace_back(outputs.front().getHeader(), this);
58
    onNewInput();
59
}
60

61
void IMergingTransformBase::setHaveAllInputs()
62
{
63
    if (have_all_inputs)
64
        throw Exception(ErrorCodes::LOGICAL_ERROR, "IMergingTransform already have all inputs.");
65

66
    have_all_inputs = true;
67
}
68

69
IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
70
{
71
    /// Add information about inputs.
72
    if (input_states.empty())
73
    {
74
        input_states.reserve(inputs.size());
75
        for (auto & input : inputs)
76
            input_states.emplace_back(input);
77

78
        state.init_chunks.resize(inputs.size());
79
    }
80

81
    /// Check for inputs we need.
82
    bool all_inputs_has_data = true;
83
    auto it = inputs.begin();
84
    for (size_t i = 0; it != inputs.end(); ++i, ++it)
85
    {
86
        auto & input = *it;
87
        if (input.isFinished())
88
            continue;
89

90
        if (input_states[i].is_initialized)
91
            continue;
92

93
        input.setNeeded();
94

95
        if (!input.hasData())
96
        {
97
            all_inputs_has_data = false;
98
            continue;
99
        }
100

101
        /// setNotNeeded after reading first chunk, because in optimismtic case
102
        /// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
103
        /// we won't have to read any chunks anymore;
104
        auto chunk = input.pull(limit_hint != 0);
105
        if ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end)
106
            input.setNeeded();
107

108
        if (!chunk.hasRows())
109
        {
110
            if (!input.isFinished())
111
            {
112
                input.setNeeded();
113
                all_inputs_has_data = false;
114
            }
115

116
            continue;
117
        }
118

119
        state.init_chunks[i].set(std::move(chunk));
120
        input_states[i].is_initialized = true;
121
    }
122

123
    if (!all_inputs_has_data)
124
        return Status::NeedData;
125

126
    is_initialized = true;
127
    return Status::Ready;
128
}
129

130
IProcessor::Status IMergingTransformBase::prepare()
131
{
132
    if (!have_all_inputs)
133
        return Status::NeedData;
134

135
    auto & output = outputs.front();
136

137
    /// Special case for no inputs.
138
    if (inputs.empty())
139
    {
140
        output.finish();
141
        onFinish();
142
        return Status::Finished;
143
    }
144

145
    /// Check can output.
146

147
    if (output.isFinished())
148
    {
149
        for (auto & in : inputs)
150
            in.close();
151

152
        onFinish();
153
        return Status::Finished;
154
    }
155

156
    /// Do not disable inputs, so they can be executed in parallel.
157
    bool is_port_full = !output.canPush();
158

159
    /// Push if has data.
160
    if ((state.output_chunk || state.output_chunk.hasChunkInfo()) && !is_port_full)
161
        output.push(std::move(state.output_chunk));
162

163
    if (!is_initialized)
164
        return prepareInitializeInputs();
165

166
    if (state.is_finished)
167
    {
168
        if (is_port_full)
169
            return Status::PortFull;
170

171
        if (always_read_till_end)
172
        {
173
            for (auto & input : inputs)
174
            {
175
                if (!input.isFinished())
176
                {
177
                    input.setNeeded();
178
                    if (input.hasData())
179
                        std::ignore = input.pull();
180

181
                    return Status::NeedData;
182
                }
183
            }
184
        }
185

186
        for (auto & input : inputs)
187
            input.close();
188

189
        outputs.front().finish();
190

191
        onFinish();
192
        return Status::Finished;
193
    }
194

195
    if (state.need_data)
196
    {
197
        auto & input = input_states[state.next_input_to_read].port;
198
        if (!input.isFinished())
199
        {
200
            input.setNeeded();
201

202
            if (!input.hasData())
203
                return Status::NeedData;
204

205
            state.input_chunk.set(input.pull());
206
            if (!state.input_chunk.chunk.hasRows() && !input.isFinished())
207
                return Status::NeedData;
208

209
            state.has_input = true;
210
        }
211
        else
212
        {
213
            state.no_data = true;
214
        }
215

216
        state.need_data = false;
217
    }
218

219
    if (is_port_full)
220
        return Status::PortFull;
221

222
    return Status::Ready;
223
}
224

225
}
226

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.