ClickHouse
79 строк · 1.9 Кб
1#include "Connection.h"
2
3#if USE_LIBPQXX
4#include <Common/logger_useful.h>
5
6
7namespace postgres
8{
9
10Connection::Connection(const ConnectionInfo & connection_info_, bool replication_, size_t num_tries_)
11: connection_info(connection_info_), replication(replication_), num_tries(num_tries_)
12, log(getLogger("PostgreSQLReplicaConnection"))
13{
14if (replication)
15connection_info = {fmt::format("{} replication=database", connection_info.connection_string), connection_info.host_port};
16}
17
18void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)> & exec)
19{
20for (size_t try_no = 0; try_no < num_tries; ++try_no)
21{
22try
23{
24pqxx::nontransaction tx(getRef());
25exec(tx);
26break;
27}
28catch (const pqxx::broken_connection & e)
29{
30LOG_DEBUG(log, "Cannot execute query due to connection failure, attempt: {}/{}. (Message: {})",
31try_no, num_tries, e.what());
32
33if (try_no + 1 == num_tries)
34throw;
35}
36}
37}
38
39pqxx::connection & Connection::getRef()
40{
41connect();
42return *connection;
43}
44
45void Connection::tryUpdateConnection()
46{
47try
48{
49updateConnection();
50}
51catch (const pqxx::broken_connection & e)
52{
53LOG_ERROR(log, "Unable to update connection: {}", e.what());
54}
55}
56
57void Connection::updateConnection()
58{
59if (connection)
60connection->close();
61
62/// Always throws if there is no connection.
63connection = std::make_unique<pqxx::connection>(connection_info.connection_string);
64
65if (replication)
66connection->set_variable("default_transaction_isolation", "'repeatable read'");
67
68LOG_DEBUG(getLogger("PostgreSQLConnection"), "New connection to {}", connection_info.host_port);
69}
70
71void Connection::connect()
72{
73if (!connection || !connection->is_open())
74updateConnection();
75}
76
77}
78
79#endif
80