ClickHouse

Форк
0
/
ISimpleTransform.cpp 
109 строк · 2.4 Кб
1
#include <Processors/ISimpleTransform.h>
2

3

4
namespace DB
5
{
6

7
ISimpleTransform::ISimpleTransform(Block input_header_, Block output_header_, bool skip_empty_chunks_)
8
    : IProcessor({std::move(input_header_)}, {std::move(output_header_)})
9
    , input(inputs.front())
10
    , output(outputs.front())
11
    , skip_empty_chunks(skip_empty_chunks_)
12
{
13
}
14

15
ISimpleTransform::Status ISimpleTransform::prepare()
16
{
17
    /// Check can output.
18

19
    if (output.isFinished())
20
    {
21
        input.close();
22
        return Status::Finished;
23
    }
24

25
    if (!output.canPush())
26
    {
27
        input.setNotNeeded();
28
        return Status::PortFull;
29
    }
30

31
    /// Output if has data.
32
    if (has_output)
33
    {
34
        output.pushData(std::move(output_data));
35
        has_output = false;
36

37
        if (!no_more_data_needed)
38
            return Status::PortFull;
39

40
    }
41

42
    /// Stop if don't need more data.
43
    if (no_more_data_needed)
44
    {
45
        input.close();
46
        output.finish();
47
        return Status::Finished;
48
    }
49

50
    /// Check can input.
51
    if (!has_input)
52
    {
53
        if (input.isFinished())
54
        {
55
            output.finish();
56
            return Status::Finished;
57
        }
58

59
        input.setNeeded();
60

61
        if (!input.hasData())
62
            return Status::NeedData;
63

64
        input_data = input.pullData(set_input_not_needed_after_read);
65
        has_input = true;
66

67
        if (input_data.exception)
68
            /// No more data needed. Exception will be thrown (or swallowed) later.
69
            input.setNotNeeded();
70
    }
71

72
    /// Now transform.
73
    return Status::Ready;
74
}
75

76
void ISimpleTransform::work()
77
{
78
    if (input_data.exception)
79
    {
80
        /// Skip transform in case of exception.
81
        output_data = std::move(input_data);
82
        has_input = false;
83
        has_output = true;
84
        return;
85
    }
86

87
    try
88
    {
89
        transform(input_data.chunk, output_data.chunk);
90
    }
91
    catch (DB::Exception &)
92
    {
93
        output_data.exception = std::current_exception();
94
        has_output = true;
95
        has_input = false;
96
        return;
97
    }
98

99
    has_input = !needInputData();
100

101
    if (!skip_empty_chunks || output_data.chunk)
102
        has_output = true;
103

104
    if (has_output && !output_data.chunk && getOutputPort().getHeader())
105
        /// Support invariant that chunks must have the same number of columns as header.
106
        output_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0);
107
}
108

109
}
110

111

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.