ClickHouse
176 строк · 5.7 Кб
1#include "OwnSplitChannel.h"2#include "OwnFormattingChannel.h"3
4#include <Interpreters/InternalTextLogsQueue.h>5#include <Interpreters/TextLog.h>6#include <IO/WriteBufferFromFileDescriptor.h>7#include <Poco/Message.h>8#include <Common/CurrentThread.h>9#include <Common/DNSResolver.h>10#include <Common/setThreadName.h>11#include <Common/LockMemoryExceptionInThread.h>12#include <Common/SensitiveDataMasker.h>13#include <Common/IO.h>14
15
16namespace DB17{
18
19void OwnSplitChannel::log(const Poco::Message & msg)20{
21#ifndef WITHOUT_TEXT_LOG22auto logs_queue = CurrentThread::getInternalTextLogsQueue();23
24if (channels.empty() && (logs_queue == nullptr || !logs_queue->isNeeded(msg.getPriority(), msg.getSource())))25return;26#endif27
28if (auto masker = SensitiveDataMasker::getInstance())29{30auto message_text = msg.getText();31auto matches = masker->wipeSensitiveData(message_text);32if (matches > 0)33{34tryLogSplit({msg, message_text}); // we will continue with the copy of original message with text modified35return;36}37
38}39
40tryLogSplit(msg);41}
42
43
44void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)45{
46LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);47
48try49{50logSplit(msg);51}52/// It is better to catch the errors here in order to avoid53/// breaking some functionality because of unexpected "File not54/// found" (or similar) error.55///56/// For example DistributedAsyncInsertDirectoryQueue will mark batch57/// as broken, some MergeTree code can also be affected.58///59/// Also note, that we cannot log the exception here, since this60/// will lead to recursion, using regular tryLogCurrentException().61/// but let's log it into the stderr at least.62catch (...)63{64const std::string & exception_message = getCurrentExceptionMessage(true);65const std::string & message = msg.getText();66
67/// NOTE: errors are ignored, since nothing can be done.68writeRetry(STDERR_FILENO, "Cannot add message to the log: ");69writeRetry(STDERR_FILENO, message.data(), message.size());70writeRetry(STDERR_FILENO, "\n");71writeRetry(STDERR_FILENO, exception_message.data(), exception_message.size());72writeRetry(STDERR_FILENO, "\n");73}74}
75
76void OwnSplitChannel::logSplit(const Poco::Message & msg)77{
78ExtendedLogMessage msg_ext = ExtendedLogMessage::getFrom(msg);79
80/// Log data to child channels81for (auto & [name, channel] : channels)82{83if (channel.second)84channel.second->logExtended(msg_ext); // extended child85else86channel.first->log(msg); // ordinary child87}88
89#ifndef WITHOUT_TEXT_LOG90auto logs_queue = CurrentThread::getInternalTextLogsQueue();91
92/// Log to "TCP queue" if message is not too noisy93if (logs_queue && logs_queue->isNeeded(msg.getPriority(), msg.getSource()))94{95MutableColumns columns = InternalTextLogsQueue::getSampleColumns();96
97size_t i = 0;98columns[i++]->insert(msg_ext.time_seconds);99columns[i++]->insert(msg_ext.time_microseconds);100columns[i++]->insert(DNSResolver::instance().getHostName());101columns[i++]->insert(msg_ext.query_id);102columns[i++]->insert(msg_ext.thread_id);103columns[i++]->insert(static_cast<Int64>(msg.getPriority()));104columns[i++]->insert(msg.getSource());105columns[i++]->insert(msg.getText());106
107[[maybe_unused]] bool push_result = logs_queue->emplace(std::move(columns));108}109
110/// Also log to system.text_log table, if message is not too noisy111auto text_log_max_priority_loaded = text_log_max_priority.load(std::memory_order_relaxed);112if (text_log_max_priority_loaded && msg.getPriority() <= text_log_max_priority_loaded)113{114TextLogElement elem;115
116elem.event_time = msg_ext.time_seconds;117elem.event_time_microseconds = msg_ext.time_in_microseconds;118
119elem.thread_name = getThreadName();120elem.thread_id = msg_ext.thread_id;121
122elem.query_id = msg_ext.query_id;123
124elem.message = msg.getText();125elem.logger_name = msg.getSource();126elem.level = msg.getPriority();127
128if (msg.getSourceFile() != nullptr)129elem.source_file = msg.getSourceFile();130
131elem.source_line = msg.getSourceLine();132elem.message_format_string = msg.getFormatString();133
134std::shared_ptr<SystemLogQueue<TextLogElement>> text_log_locked{};135text_log_locked = text_log.lock();136if (text_log_locked)137text_log_locked->push(std::move(elem));138}139#endif140}
141
142
143void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel, const std::string & name)144{
145channels.emplace(name, ExtendedChannelPtrPair(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get())));146}
147
148#ifndef WITHOUT_TEXT_LOG149void OwnSplitChannel::addTextLog(std::shared_ptr<SystemLogQueue<TextLogElement>> log_queue, int max_priority)150{
151text_log = log_queue;152text_log_max_priority.store(max_priority, std::memory_order_relaxed);153}
154#endif155
156void OwnSplitChannel::setLevel(const std::string & name, int level)157{
158auto it = channels.find(name);159if (it != channels.end())160{161if (auto * channel = dynamic_cast<DB::OwnFormattingChannel *>(it->second.first.get()))162channel->setLevel(level);163}164}
165
166void OwnSplitChannel::setChannelProperty(const std::string& channel_name, const std::string& name, const std::string& value)167{
168auto it = channels.find(channel_name);169if (it != channels.end())170{171if (auto * channel = dynamic_cast<DB::OwnFormattingChannel *>(it->second.first.get()))172channel->setProperty(name, value);173}174}
175
176}
177