ClickHouse

Форк
0
/
Connection.cpp 
79 строк · 1.9 Кб
1
#include "Connection.h"
2

3
#if USE_LIBPQXX
4
#include <Common/logger_useful.h>
5

6

7
namespace postgres
8
{
9

10
Connection::Connection(const ConnectionInfo & connection_info_, bool replication_, size_t num_tries_)
11
    : connection_info(connection_info_), replication(replication_), num_tries(num_tries_)
12
    , log(getLogger("PostgreSQLReplicaConnection"))
13
{
14
    if (replication)
15
        connection_info = {fmt::format("{} replication=database", connection_info.connection_string), connection_info.host_port};
16
}
17

18
void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)> & exec)
19
{
20
    for (size_t try_no = 0; try_no < num_tries; ++try_no)
21
    {
22
        try
23
        {
24
            pqxx::nontransaction tx(getRef());
25
            exec(tx);
26
            break;
27
        }
28
        catch (const pqxx::broken_connection & e)
29
        {
30
            LOG_DEBUG(log, "Cannot execute query due to connection failure, attempt: {}/{}. (Message: {})",
31
                      try_no, num_tries, e.what());
32

33
            if (try_no + 1 == num_tries)
34
                throw;
35
        }
36
    }
37
}
38

39
pqxx::connection & Connection::getRef()
40
{
41
    connect();
42
    return *connection;
43
}
44

45
void Connection::tryUpdateConnection()
46
{
47
    try
48
    {
49
        updateConnection();
50
    }
51
    catch (const pqxx::broken_connection & e)
52
    {
53
        LOG_ERROR(log, "Unable to update connection: {}", e.what());
54
    }
55
}
56

57
void Connection::updateConnection()
58
{
59
    if (connection)
60
        connection->close();
61

62
    /// Always throws if there is no connection.
63
    connection = std::make_unique<pqxx::connection>(connection_info.connection_string);
64

65
    if (replication)
66
        connection->set_variable("default_transaction_isolation", "'repeatable read'");
67

68
    LOG_DEBUG(getLogger("PostgreSQLConnection"), "New connection to {}", connection_info.host_port);
69
}
70

71
void Connection::connect()
72
{
73
    if (!connection || !connection->is_open())
74
        updateConnection();
75
}
76

77
}
78

79
#endif
80

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.