ClickHouse
45 строк · 1.3 Кб
1#include "ODBCSink.h"
2
3#include <IO/WriteBufferFromString.h>
4#include <Interpreters/Context.h>
5#include <Processors/Formats/IOutputFormat.h>
6#include <Parsers/getInsertQuery.h>
7
8
9namespace DB
10{
11
12
13ODBCSink::ODBCSink(
14nanodbc::ConnectionHolderPtr connection_holder_,
15const std::string & remote_database_name_,
16const std::string & remote_table_name_,
17const Block & sample_block_,
18ContextPtr local_context_,
19IdentifierQuotingStyle quoting_)
20: ISink(sample_block_)
21, log(getLogger("ODBCSink"))
22, connection_holder(std::move(connection_holder_))
23, db_name(remote_database_name_)
24, table_name(remote_table_name_)
25, sample_block(sample_block_)
26, local_context(local_context_)
27, quoting(quoting_)
28{
29description.init(sample_block);
30}
31
32
33void ODBCSink::consume(Chunk chunk)
34{
35auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
36WriteBufferFromOwnString values_buf;
37auto writer = local_context->getOutputFormat("Values", values_buf, sample_block);
38writer->write(block);
39
40std::string query = getInsertQuery(db_name, table_name, block.getColumnsWithTypeAndName(), quoting) + values_buf.str();
41execute<void>(connection_holder,
42[&](nanodbc::connection & connection) { execute(connection, query); });
43}
44
45}
46