ClickHouse

Форк
0
/
OwnSplitChannel.cpp 
176 строк · 5.7 Кб
1
#include "OwnSplitChannel.h"
2
#include "OwnFormattingChannel.h"
3

4
#include <Interpreters/InternalTextLogsQueue.h>
5
#include <Interpreters/TextLog.h>
6
#include <IO/WriteBufferFromFileDescriptor.h>
7
#include <Poco/Message.h>
8
#include <Common/CurrentThread.h>
9
#include <Common/DNSResolver.h>
10
#include <Common/setThreadName.h>
11
#include <Common/LockMemoryExceptionInThread.h>
12
#include <Common/SensitiveDataMasker.h>
13
#include <Common/IO.h>
14

15

16
namespace DB
17
{
18

19
void OwnSplitChannel::log(const Poco::Message & msg)
20
{
21
#ifndef WITHOUT_TEXT_LOG
22
    auto logs_queue = CurrentThread::getInternalTextLogsQueue();
23

24
    if (channels.empty() && (logs_queue == nullptr || !logs_queue->isNeeded(msg.getPriority(), msg.getSource())))
25
        return;
26
#endif
27

28
    if (auto masker = SensitiveDataMasker::getInstance())
29
    {
30
        auto message_text = msg.getText();
31
        auto matches = masker->wipeSensitiveData(message_text);
32
        if (matches > 0)
33
        {
34
            tryLogSplit({msg, message_text}); // we will continue with the copy of original message with text modified
35
            return;
36
        }
37

38
    }
39

40
    tryLogSplit(msg);
41
}
42

43

44
void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
45
{
46
    LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
47

48
    try
49
    {
50
        logSplit(msg);
51
    }
52
    /// It is better to catch the errors here in order to avoid
53
    /// breaking some functionality because of unexpected "File not
54
    /// found" (or similar) error.
55
    ///
56
    /// For example DistributedAsyncInsertDirectoryQueue will mark batch
57
    /// as broken, some MergeTree code can also be affected.
58
    ///
59
    /// Also note, that we cannot log the exception here, since this
60
    /// will lead to recursion, using regular tryLogCurrentException().
61
    /// but let's log it into the stderr at least.
62
    catch (...)
63
    {
64
        const std::string & exception_message = getCurrentExceptionMessage(true);
65
        const std::string & message = msg.getText();
66

67
        /// NOTE: errors are ignored, since nothing can be done.
68
        writeRetry(STDERR_FILENO, "Cannot add message to the log: ");
69
        writeRetry(STDERR_FILENO, message.data(), message.size());
70
        writeRetry(STDERR_FILENO, "\n");
71
        writeRetry(STDERR_FILENO, exception_message.data(), exception_message.size());
72
        writeRetry(STDERR_FILENO, "\n");
73
    }
74
}
75

76
void OwnSplitChannel::logSplit(const Poco::Message & msg)
77
{
78
    ExtendedLogMessage msg_ext = ExtendedLogMessage::getFrom(msg);
79

80
    /// Log data to child channels
81
    for (auto & [name, channel] : channels)
82
    {
83
        if (channel.second)
84
            channel.second->logExtended(msg_ext); // extended child
85
        else
86
            channel.first->log(msg); // ordinary child
87
    }
88

89
#ifndef WITHOUT_TEXT_LOG
90
    auto logs_queue = CurrentThread::getInternalTextLogsQueue();
91

92
    /// Log to "TCP queue" if message is not too noisy
93
    if (logs_queue && logs_queue->isNeeded(msg.getPriority(), msg.getSource()))
94
    {
95
        MutableColumns columns = InternalTextLogsQueue::getSampleColumns();
96

97
        size_t i = 0;
98
        columns[i++]->insert(msg_ext.time_seconds);
99
        columns[i++]->insert(msg_ext.time_microseconds);
100
        columns[i++]->insert(DNSResolver::instance().getHostName());
101
        columns[i++]->insert(msg_ext.query_id);
102
        columns[i++]->insert(msg_ext.thread_id);
103
        columns[i++]->insert(static_cast<Int64>(msg.getPriority()));
104
        columns[i++]->insert(msg.getSource());
105
        columns[i++]->insert(msg.getText());
106

107
        [[maybe_unused]] bool push_result = logs_queue->emplace(std::move(columns));
108
    }
109

110
    /// Also log to system.text_log table, if message is not too noisy
111
    auto text_log_max_priority_loaded = text_log_max_priority.load(std::memory_order_relaxed);
112
    if (text_log_max_priority_loaded && msg.getPriority() <= text_log_max_priority_loaded)
113
    {
114
        TextLogElement elem;
115

116
        elem.event_time = msg_ext.time_seconds;
117
        elem.event_time_microseconds = msg_ext.time_in_microseconds;
118

119
        elem.thread_name = getThreadName();
120
        elem.thread_id = msg_ext.thread_id;
121

122
        elem.query_id = msg_ext.query_id;
123

124
        elem.message = msg.getText();
125
        elem.logger_name = msg.getSource();
126
        elem.level = msg.getPriority();
127

128
        if (msg.getSourceFile() != nullptr)
129
            elem.source_file = msg.getSourceFile();
130

131
        elem.source_line = msg.getSourceLine();
132
        elem.message_format_string = msg.getFormatString();
133

134
        std::shared_ptr<SystemLogQueue<TextLogElement>> text_log_locked{};
135
        text_log_locked = text_log.lock();
136
        if (text_log_locked)
137
            text_log_locked->push(std::move(elem));
138
    }
139
#endif
140
}
141

142

143
void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel, const std::string & name)
144
{
145
    channels.emplace(name, ExtendedChannelPtrPair(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get())));
146
}
147

148
#ifndef WITHOUT_TEXT_LOG
149
void OwnSplitChannel::addTextLog(std::shared_ptr<SystemLogQueue<TextLogElement>> log_queue, int max_priority)
150
{
151
    text_log = log_queue;
152
    text_log_max_priority.store(max_priority, std::memory_order_relaxed);
153
}
154
#endif
155

156
void OwnSplitChannel::setLevel(const std::string & name, int level)
157
{
158
     auto it = channels.find(name);
159
     if (it != channels.end())
160
     {
161
         if (auto * channel = dynamic_cast<DB::OwnFormattingChannel *>(it->second.first.get()))
162
            channel->setLevel(level);
163
     }
164
}
165

166
void OwnSplitChannel::setChannelProperty(const std::string& channel_name, const std::string& name, const std::string& value)
167
{
168
    auto it = channels.find(channel_name);
169
    if (it != channels.end())
170
    {
171
        if (auto * channel = dynamic_cast<DB::OwnFormattingChannel *>(it->second.first.get()))
172
            channel->setProperty(name, value);
173
    }
174
}
175

176
}
177

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.