ClickHouse

Форк
0
/
PostgreSQLSource.cpp 
215 строк · 5.5 Кб
1
#include "PostgreSQLSource.h"
2
#include "Common/Exception.h"
3

4
#if USE_LIBPQXX
5
#include <Columns/ColumnNullable.h>
6
#include <Columns/ColumnString.h>
7
#include <Columns/ColumnArray.h>
8
#include <Columns/ColumnsNumber.h>
9
#include <Columns/ColumnDecimal.h>
10
#include <DataTypes/IDataType.h>
11
#include <DataTypes/DataTypeNullable.h>
12
#include <DataTypes/DataTypeArray.h>
13
#include <DataTypes/DataTypesDecimal.h>
14
#include <Interpreters/convertFieldToType.h>
15
#include <IO/ReadHelpers.h>
16
#include <IO/WriteHelpers.h>
17
#include <IO/ReadBufferFromString.h>
18
#include <Common/assert_cast.h>
19
#include <base/range.h>
20
#include <Common/logger_useful.h>
21

22

23
namespace DB
24
{
25

26
namespace ErrorCodes
27
{
28
    extern const int TOO_MANY_COLUMNS;
29
}
30

31
template<typename T>
32
PostgreSQLSource<T>::PostgreSQLSource(
33
    postgres::ConnectionHolderPtr connection_holder_,
34
    const std::string & query_str_,
35
    const Block & sample_block,
36
    UInt64 max_block_size_)
37
    : ISource(sample_block.cloneEmpty())
38
    , query_str(query_str_)
39
    , max_block_size(max_block_size_)
40
    , connection_holder(std::move(connection_holder_))
41
{
42
    init(sample_block);
43
}
44

45

46
template<typename T>
47
PostgreSQLSource<T>::PostgreSQLSource(
48
    std::shared_ptr<T> tx_,
49
    const std::string & query_str_,
50
    const Block & sample_block,
51
    UInt64 max_block_size_,
52
    bool auto_commit_)
53
    : ISource(sample_block.cloneEmpty())
54
    , query_str(query_str_)
55
    , tx(std::move(tx_))
56
    , max_block_size(max_block_size_)
57
    , auto_commit(auto_commit_)
58
{
59
    init(sample_block);
60
}
61

62
template<typename T>
63
void PostgreSQLSource<T>::init(const Block & sample_block)
64
{
65
    description.init(sample_block);
66

67
    for (const auto idx : collections::range(0, description.sample_block.columns()))
68
        if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
69
            preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type);
70

71
    /// pqxx::stream_from uses COPY command, will get error if ';' is present
72
    if (query_str.ends_with(';'))
73
        query_str.resize(query_str.size() - 1);
74
}
75

76

77
template<typename T>
78
void PostgreSQLSource<T>::onStart()
79
{
80
    if (!tx)
81
    {
82
        try
83
        {
84
            auto & conn = connection_holder->get();
85
            tx = std::make_shared<T>(conn);
86
        }
87
        catch (const pqxx::broken_connection &)
88
        {
89
            connection_holder->update();
90
            tx = std::make_shared<T>(connection_holder->get());
91
        }
92
    }
93

94
    stream = std::make_unique<pqxx::stream_from>(*tx, pqxx::from_query, std::string_view{query_str});
95
}
96

97
template<typename T>
98
IProcessor::Status PostgreSQLSource<T>::prepare()
99
{
100
    if (!started)
101
    {
102
        onStart();
103
        started = true;
104
    }
105

106
    auto status = ISource::prepare();
107
    if (status == Status::Finished)
108
        onFinish();
109

110
    return status;
111
}
112

113
template<typename T>
114
Chunk PostgreSQLSource<T>::generate()
115
{
116
    /// Check if pqxx::stream_from is finished
117
    if (!stream || !(*stream))
118
        return {};
119

120
    MutableColumns columns = description.sample_block.cloneEmptyColumns();
121
    size_t num_rows = 0;
122

123
    while (true)
124
    {
125
        const std::vector<pqxx::zview> * row{stream->read_row()};
126

127
        /// row is nullptr if pqxx::stream_from is finished
128
        if (!row)
129
            break;
130

131
        if (row->size() > description.sample_block.columns())
132
            throw Exception(ErrorCodes::TOO_MANY_COLUMNS,
133
                            "Row has too many columns: {}, expected structure: {}",
134
                            row->size(), description.sample_block.dumpStructure());
135

136
        for (const auto idx : collections::range(0, row->size()))
137
        {
138
            const auto & sample = description.sample_block.getByPosition(idx);
139

140
            /// if got NULL type, then pqxx::zview will return nullptr in c_str()
141
            if ((*row)[idx].c_str())
142
            {
143
                if (description.types[idx].second)
144
                {
145
                    ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
146
                    const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
147

148
                    insertPostgreSQLValue(
149
                            column_nullable.getNestedColumn(), (*row)[idx],
150
                            description.types[idx].first, data_type.getNestedType(), array_info, idx);
151

152
                    column_nullable.getNullMapData().emplace_back(0);
153
                }
154
                else
155
                {
156
                    insertPostgreSQLValue(
157
                            *columns[idx], (*row)[idx], description.types[idx].first, sample.type, array_info, idx);
158
                }
159
            }
160
            else
161
            {
162
                insertDefaultPostgreSQLValue(*columns[idx], *sample.column);
163
            }
164

165
        }
166

167
        if (++num_rows == max_block_size)
168
            break;
169
    }
170

171
    return Chunk(std::move(columns), num_rows);
172
}
173

174

175
template<typename T>
176
void PostgreSQLSource<T>::onFinish()
177
{
178
    if (stream)
179
        stream->close();
180

181
    if (tx && auto_commit)
182
        tx->commit();
183

184
    is_completed = true;
185
}
186

187
template<typename T>
188
PostgreSQLSource<T>::~PostgreSQLSource()
189
{
190
    if (!is_completed)
191
    {
192
        try
193
        {
194
            stream.reset();
195
            tx.reset();
196
        }
197
        catch (...)
198
        {
199
            tryLogCurrentException(__PRETTY_FUNCTION__);
200
        }
201

202
        if (connection_holder)
203
            connection_holder->setBroken();
204
    }
205
}
206

207
template
208
class PostgreSQLSource<pqxx::ReplicationTransaction>;
209

210
template
211
class PostgreSQLSource<pqxx::ReadTransaction>;
212

213
}
214

215
#endif
216

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.