15
"github.com/aristanetworks/goarista/monotime"
23
type loggingWorker struct {
25
queue chan *logDelayedMessage
29
func newLoggingWorker() (*loggingWorker, error) {
31
conn = make([]net.Conn, cfg.limits.ThreadsQty)
36
for i := 0; i < cfg.limits.ThreadsQty; i++ {
39
return nil, ErrFailedTransport{reason: err}
44
result := &loggingWorker{
46
queue: make(chan *logDelayedMessage, bufferSize),
51
func (worker *loggingWorker) consumer(id int) error {
53
delayedMessage *logDelayedMessage
65
case <-worker.tomb.Dying():
67
case delayedMessage = <-worker.queue:
72
level, weight = parseLogLevel(delayedMessage.level)
73
if weight < cfg.weight {
77
// rate limit validation
78
skip = !cfg.enabled || (cfg.limits.AcceptRate != 1 && cfg.limits.AcceptRate < rand.Float64())
79
if cfg.debug || skip {
80
useFallbackLogger(level, delayedMessage.message, delayedMessage.extraData)
86
// it is needed 2 formats of dateTime
87
dateTimeRFC := delayedMessage.dateTime.Format("2006-01-02T15:04:05.999999999")
88
dateTimeStamp := delayedMessage.dateTime.Format(time.Stamp)
90
// serialized extra data if provided
92
if delayedMessage.extraData != nil {
93
data, err = json.Marshal(delayedMessage.extraData)
95
handleError(delayedMessage.message, ErrFailedSerialize{reason: err})
101
// serialize full log message
103
Component: delayedMessage.component,
104
ContextID: delayedMessage.contextId,
105
EventDateTime: dateTimeRFC,
106
EventDate: dateTimeRFC[:10],
109
Message: delayedMessage.message,
110
Path: delayedMessage.path,
112
data, err = json.Marshal(entry)
114
handleError(delayedMessage.message, ErrFailedSerialize{reason: err})
118
// write to buffer and send to connection
119
buffer := bytes.Buffer{}
120
buffer.Grow(len(data) + 128)
121
buffer.WriteByte('<')
122
buffer.WriteString(strconv.Itoa(facilityPriority))
123
buffer.WriteByte('>')
124
buffer.WriteString(dateTimeStamp)
125
buffer.WriteByte(' ')
126
buffer.WriteString(cfg.this)
127
buffer.WriteString(" moira: ")
129
buffer.WriteByte('\n')
131
data = buffer.Bytes()
132
statsd.MsgSize.Timing(int64(len(data)))
134
err = worker.write(id, data)
136
handleError(delayedMessage.message, ErrFailedTransport{reason: err})
142
func (worker *loggingWorker) lifeCycle() {
146
ch := make(chan os.Signal, 1)
147
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
151
func (worker *loggingWorker) start() {
152
for i := 0; i < cfg.limits.ThreadsQty; i++ {
154
worker.tomb.Go(func() error {
155
return worker.consumer(id)
161
func (worker *loggingWorker) stop() {
162
worker.tomb.Kill(nil)
163
_ = worker.tomb.Wait()
166
// write sends the whole message body
167
func (worker *loggingWorker) write(id int, data []byte) error {
177
conn = worker.conn[id]
178
started = monotime.Now()
181
statsd.MsgTotal.Increment()
182
statsd.Write.Timing(int64(monotime.Since(started)))
185
// send data in one piece
186
n, err = conn.Write(data)
188
// try to redial and retry writing in case of error
189
reconnect = monotime.Now()
191
statsd.Reconnect.Timing(int64(monotime.Since(reconnect)))
196
// replace connection if redial was successful
197
worker.conn[id] = conn
198
n, err = conn.Write(data)
205
return ErrIncompleteWrite{written: n, total: total}
211
func dial() (net.Conn, error) {
212
return net.Dial("tcp", fmt.Sprintf("%s:%d", cfg.host, cfg.port))
215
func handleError(message string, err error) {
216
useFallbackLogger(logLevelError, fmt.Sprintf("Failed to log: %v", err), nil)
217
useFallbackLogger(logLevelError, fmt.Sprintf("Message was:\n%s", message), nil)
220
statsd.Errors.Increment()
224
func parseLogLevel(level string) (string, int) {
225
weight, ok := logLevelWeights[level]
227
level = logLevelDefault
228
weight = logLevelWeights[level]
233
func useFallbackLogger(level, message string, extra interface{}) {
234
if extra != nil { // try to serialize extra
240
if cfg.debug { // pretty print for debug mode
241
extraBytes, err := json.MarshalIndent(extra, "", " ")
243
extraStr = string(extraBytes)
248
extraStr = fmt.Sprintf("%v", extra)
251
message = fmt.Sprintf("%s\nExtra: %s", message, extraStr)
254
if level == logLevelDebug {
255
fallback.Debug(message)
256
} else if level == logLevelInfo {
257
fallback.Info(message)
258
} else if level == logLevelWarn {
259
fallback.Warning(message)
260
} else if level == logLevelError {
261
fallback.Error(message)
262
} else if level == logLevelFatal {
263
fallback.Fatal(message)