ClickHouse
139 строк · 5.0 Кб
1#include "PoolWithFailover.h"
2
3#if USE_LIBPQXX
4
5#include "Utils.h"
6#include <Common/parseRemoteDescription.h>
7#include <Common/Exception.h>
8#include <Common/quoteString.h>
9#include <Common/logger_useful.h>
10#include <IO/WriteBufferFromString.h>
11#include <IO/Operators.h>
12
13namespace DB
14{
15namespace ErrorCodes
16{
17extern const int POSTGRESQL_CONNECTION_FAILURE;
18extern const int LOGICAL_ERROR;
19}
20}
21
22namespace postgres
23{
24
25PoolWithFailover::PoolWithFailover(
26const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
27size_t pool_size,
28size_t pool_wait_timeout_,
29size_t max_tries_,
30bool auto_close_connection_)
31: pool_wait_timeout(pool_wait_timeout_)
32, max_tries(max_tries_)
33, auto_close_connection(auto_close_connection_)
34{
35LOG_TRACE(getLogger("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
36pool_size, pool_wait_timeout, max_tries_);
37
38for (const auto & [priority, configurations] : configurations_by_priority)
39{
40for (const auto & replica_configuration : configurations)
41{
42auto connection_info = formatConnectionString(replica_configuration.database,
43replica_configuration.host, replica_configuration.port, replica_configuration.username, replica_configuration.password);
44replicas_with_priority[priority].emplace_back(connection_info, pool_size);
45}
46}
47}
48
49PoolWithFailover::PoolWithFailover(
50const DB::StoragePostgreSQL::Configuration & configuration,
51size_t pool_size,
52size_t pool_wait_timeout_,
53size_t max_tries_,
54bool auto_close_connection_)
55: pool_wait_timeout(pool_wait_timeout_)
56, max_tries(max_tries_)
57, auto_close_connection(auto_close_connection_)
58{
59LOG_TRACE(getLogger("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
60pool_size, pool_wait_timeout, max_tries_);
61
62/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
63for (const auto & [host, port] : configuration.addresses)
64{
65LOG_DEBUG(getLogger("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port);
66auto connection_string = formatConnectionString(configuration.database, host, port, configuration.username, configuration.password);
67replicas_with_priority[0].emplace_back(connection_string, pool_size);
68}
69}
70
71ConnectionHolderPtr PoolWithFailover::get()
72{
73std::lock_guard lock(mutex);
74
75if (replicas_with_priority.empty())
76throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "No address specified");
77
78PreformattedMessage error_message;
79for (size_t try_idx = 0; try_idx < max_tries; ++try_idx)
80{
81for (auto & priority : replicas_with_priority)
82{
83auto & replicas = priority.second;
84for (size_t i = 0; i < replicas.size(); ++i)
85{
86auto & replica = replicas[i];
87
88ConnectionPtr connection;
89auto connection_available = replica.pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout);
90
91if (!connection_available)
92{
93LOG_WARNING(log, "Unable to fetch connection within the timeout");
94continue;
95}
96
97try
98{
99/// Create a new connection or reopen an old connection if it became invalid.
100if (!connection)
101{
102connection = std::make_unique<Connection>(replica.connection_info);
103LOG_DEBUG(log, "New connection to {}", connection->getInfoForLog());
104}
105
106connection->connect();
107}
108catch (const pqxx::broken_connection & pqxx_error)
109{
110LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
111error_message = PreformattedMessage::create(
112"Try {}. Connection to {} failed with error: {}\n",
113try_idx + 1, DB::backQuote(replica.connection_info.host_port), pqxx_error.what());
114
115replica.pool->returnObject(std::move(connection));
116continue;
117}
118catch (...)
119{
120replica.pool->returnObject(std::move(connection));
121throw;
122}
123
124auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection), auto_close_connection);
125
126/// Move all traversed replicas to the end.
127if (replicas.size() > 1)
128std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
129
130return connection_holder;
131}
132}
133}
134
135throw DB::Exception(error_message, DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE);
136}
137}
138
139#endif
140