ClickHouse
215 строк · 5.5 Кб
1#include "PostgreSQLSource.h"
2#include "Common/Exception.h"
3
4#if USE_LIBPQXX
5#include <Columns/ColumnNullable.h>
6#include <Columns/ColumnString.h>
7#include <Columns/ColumnArray.h>
8#include <Columns/ColumnsNumber.h>
9#include <Columns/ColumnDecimal.h>
10#include <DataTypes/IDataType.h>
11#include <DataTypes/DataTypeNullable.h>
12#include <DataTypes/DataTypeArray.h>
13#include <DataTypes/DataTypesDecimal.h>
14#include <Interpreters/convertFieldToType.h>
15#include <IO/ReadHelpers.h>
16#include <IO/WriteHelpers.h>
17#include <IO/ReadBufferFromString.h>
18#include <Common/assert_cast.h>
19#include <base/range.h>
20#include <Common/logger_useful.h>
21
22
23namespace DB
24{
25
26namespace ErrorCodes
27{
28extern const int TOO_MANY_COLUMNS;
29}
30
31template<typename T>
32PostgreSQLSource<T>::PostgreSQLSource(
33postgres::ConnectionHolderPtr connection_holder_,
34const std::string & query_str_,
35const Block & sample_block,
36UInt64 max_block_size_)
37: ISource(sample_block.cloneEmpty())
38, query_str(query_str_)
39, max_block_size(max_block_size_)
40, connection_holder(std::move(connection_holder_))
41{
42init(sample_block);
43}
44
45
46template<typename T>
47PostgreSQLSource<T>::PostgreSQLSource(
48std::shared_ptr<T> tx_,
49const std::string & query_str_,
50const Block & sample_block,
51UInt64 max_block_size_,
52bool auto_commit_)
53: ISource(sample_block.cloneEmpty())
54, query_str(query_str_)
55, tx(std::move(tx_))
56, max_block_size(max_block_size_)
57, auto_commit(auto_commit_)
58{
59init(sample_block);
60}
61
62template<typename T>
63void PostgreSQLSource<T>::init(const Block & sample_block)
64{
65description.init(sample_block);
66
67for (const auto idx : collections::range(0, description.sample_block.columns()))
68if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
69preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type);
70
71/// pqxx::stream_from uses COPY command, will get error if ';' is present
72if (query_str.ends_with(';'))
73query_str.resize(query_str.size() - 1);
74}
75
76
77template<typename T>
78void PostgreSQLSource<T>::onStart()
79{
80if (!tx)
81{
82try
83{
84auto & conn = connection_holder->get();
85tx = std::make_shared<T>(conn);
86}
87catch (const pqxx::broken_connection &)
88{
89connection_holder->update();
90tx = std::make_shared<T>(connection_holder->get());
91}
92}
93
94stream = std::make_unique<pqxx::stream_from>(*tx, pqxx::from_query, std::string_view{query_str});
95}
96
97template<typename T>
98IProcessor::Status PostgreSQLSource<T>::prepare()
99{
100if (!started)
101{
102onStart();
103started = true;
104}
105
106auto status = ISource::prepare();
107if (status == Status::Finished)
108onFinish();
109
110return status;
111}
112
113template<typename T>
114Chunk PostgreSQLSource<T>::generate()
115{
116/// Check if pqxx::stream_from is finished
117if (!stream || !(*stream))
118return {};
119
120MutableColumns columns = description.sample_block.cloneEmptyColumns();
121size_t num_rows = 0;
122
123while (true)
124{
125const std::vector<pqxx::zview> * row{stream->read_row()};
126
127/// row is nullptr if pqxx::stream_from is finished
128if (!row)
129break;
130
131if (row->size() > description.sample_block.columns())
132throw Exception(ErrorCodes::TOO_MANY_COLUMNS,
133"Row has too many columns: {}, expected structure: {}",
134row->size(), description.sample_block.dumpStructure());
135
136for (const auto idx : collections::range(0, row->size()))
137{
138const auto & sample = description.sample_block.getByPosition(idx);
139
140/// if got NULL type, then pqxx::zview will return nullptr in c_str()
141if ((*row)[idx].c_str())
142{
143if (description.types[idx].second)
144{
145ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
146const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
147
148insertPostgreSQLValue(
149column_nullable.getNestedColumn(), (*row)[idx],
150description.types[idx].first, data_type.getNestedType(), array_info, idx);
151
152column_nullable.getNullMapData().emplace_back(0);
153}
154else
155{
156insertPostgreSQLValue(
157*columns[idx], (*row)[idx], description.types[idx].first, sample.type, array_info, idx);
158}
159}
160else
161{
162insertDefaultPostgreSQLValue(*columns[idx], *sample.column);
163}
164
165}
166
167if (++num_rows == max_block_size)
168break;
169}
170
171return Chunk(std::move(columns), num_rows);
172}
173
174
175template<typename T>
176void PostgreSQLSource<T>::onFinish()
177{
178if (stream)
179stream->close();
180
181if (tx && auto_commit)
182tx->commit();
183
184is_completed = true;
185}
186
187template<typename T>
188PostgreSQLSource<T>::~PostgreSQLSource()
189{
190if (!is_completed)
191{
192try
193{
194stream.reset();
195tx.reset();
196}
197catch (...)
198{
199tryLogCurrentException(__PRETTY_FUNCTION__);
200}
201
202if (connection_holder)
203connection_holder->setBroken();
204}
205}
206
207template
208class PostgreSQLSource<pqxx::ReplicationTransaction>;
209
210template
211class PostgreSQLSource<pqxx::ReadTransaction>;
212
213}
214
215#endif
216