ClickHouse

Форк
0
/
PoolWithFailover.cpp 
139 строк · 5.0 Кб
1
#include "PoolWithFailover.h"
2

3
#if USE_LIBPQXX
4

5
#include "Utils.h"
6
#include <Common/parseRemoteDescription.h>
7
#include <Common/Exception.h>
8
#include <Common/quoteString.h>
9
#include <Common/logger_useful.h>
10
#include <IO/WriteBufferFromString.h>
11
#include <IO/Operators.h>
12

13
namespace DB
14
{
15
namespace ErrorCodes
16
{
17
    extern const int POSTGRESQL_CONNECTION_FAILURE;
18
    extern const int LOGICAL_ERROR;
19
}
20
}
21

22
namespace postgres
23
{
24

25
PoolWithFailover::PoolWithFailover(
26
    const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
27
    size_t pool_size,
28
    size_t pool_wait_timeout_,
29
    size_t max_tries_,
30
    bool auto_close_connection_)
31
    : pool_wait_timeout(pool_wait_timeout_)
32
    , max_tries(max_tries_)
33
    , auto_close_connection(auto_close_connection_)
34
{
35
    LOG_TRACE(getLogger("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
36
              pool_size, pool_wait_timeout, max_tries_);
37

38
    for (const auto & [priority, configurations] : configurations_by_priority)
39
    {
40
        for (const auto & replica_configuration : configurations)
41
        {
42
            auto connection_info = formatConnectionString(replica_configuration.database,
43
                replica_configuration.host, replica_configuration.port, replica_configuration.username, replica_configuration.password);
44
            replicas_with_priority[priority].emplace_back(connection_info, pool_size);
45
        }
46
    }
47
}
48

49
PoolWithFailover::PoolWithFailover(
50
    const DB::StoragePostgreSQL::Configuration & configuration,
51
    size_t pool_size,
52
    size_t pool_wait_timeout_,
53
    size_t max_tries_,
54
    bool auto_close_connection_)
55
    : pool_wait_timeout(pool_wait_timeout_)
56
    , max_tries(max_tries_)
57
    , auto_close_connection(auto_close_connection_)
58
{
59
    LOG_TRACE(getLogger("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
60
              pool_size, pool_wait_timeout, max_tries_);
61

62
    /// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
63
    for (const auto & [host, port] : configuration.addresses)
64
    {
65
        LOG_DEBUG(getLogger("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port);
66
        auto connection_string = formatConnectionString(configuration.database, host, port, configuration.username, configuration.password);
67
        replicas_with_priority[0].emplace_back(connection_string, pool_size);
68
    }
69
}
70

71
ConnectionHolderPtr PoolWithFailover::get()
72
{
73
    std::lock_guard lock(mutex);
74

75
    if (replicas_with_priority.empty())
76
        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "No address specified");
77

78
    PreformattedMessage error_message;
79
    for (size_t try_idx = 0; try_idx < max_tries; ++try_idx)
80
    {
81
        for (auto & priority : replicas_with_priority)
82
        {
83
            auto & replicas = priority.second;
84
            for (size_t i = 0; i < replicas.size(); ++i)
85
            {
86
                auto & replica = replicas[i];
87

88
                ConnectionPtr connection;
89
                auto connection_available = replica.pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout);
90

91
                if (!connection_available)
92
                {
93
                    LOG_WARNING(log, "Unable to fetch connection within the timeout");
94
                    continue;
95
                }
96

97
                try
98
                {
99
                    /// Create a new connection or reopen an old connection if it became invalid.
100
                    if (!connection)
101
                    {
102
                        connection = std::make_unique<Connection>(replica.connection_info);
103
                        LOG_DEBUG(log, "New connection to {}", connection->getInfoForLog());
104
                    }
105

106
                    connection->connect();
107
                }
108
                catch (const pqxx::broken_connection & pqxx_error)
109
                {
110
                    LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
111
                    error_message = PreformattedMessage::create(
112
                        "Try {}. Connection to {} failed with error: {}\n",
113
                        try_idx + 1, DB::backQuote(replica.connection_info.host_port), pqxx_error.what());
114

115
                    replica.pool->returnObject(std::move(connection));
116
                    continue;
117
                }
118
                catch (...)
119
                {
120
                    replica.pool->returnObject(std::move(connection));
121
                    throw;
122
                }
123

124
                auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection), auto_close_connection);
125

126
                /// Move all traversed replicas to the end.
127
                if (replicas.size() > 1)
128
                    std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
129

130
                return connection_holder;
131
            }
132
        }
133
    }
134

135
    throw DB::Exception(error_message, DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE);
136
}
137
}
138

139
#endif
140

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.