moira

Форк
0
/
worker.go 
265 строк · 5.5 Кб
1
package logging
2

3
import (
4
	"bytes"
5
	"encoding/json"
6
	"fmt"
7
	"math/rand"
8
	"net"
9
	"os"
10
	"os/signal"
11
	"strconv"
12
	"syscall"
13
	"time"
14

15
	"github.com/aristanetworks/goarista/monotime"
16
	"gopkg.in/tomb.v2"
17
)
18

19
const (
20
	bufferSize = 65536
21
)
22

23
type loggingWorker struct {
24
	conn  []net.Conn
25
	queue chan *logDelayedMessage
26
	tomb  tomb.Tomb
27
}
28

29
func newLoggingWorker() (*loggingWorker, error) {
30
	var (
31
		conn = make([]net.Conn, cfg.limits.ThreadsQty)
32
		err  error
33
	)
34

35
	if cfg.enabled {
36
		for i := 0; i < cfg.limits.ThreadsQty; i++ {
37
			conn[i], err = dial()
38
			if err != nil {
39
				return nil, ErrFailedTransport{reason: err}
40
			}
41
		}
42
	}
43

44
	result := &loggingWorker{
45
		conn:  conn,
46
		queue: make(chan *logDelayedMessage, bufferSize),
47
	}
48
	return result, nil
49
}
50

51
func (worker *loggingWorker) consumer(id int) error {
52
	var (
53
		delayedMessage *logDelayedMessage
54
		skip           bool
55
		level          string
56
		weight         int
57

58
		data  []byte
59
		err   error
60
		extra string
61
	)
62

63
	for {
64
		select {
65
		case <-worker.tomb.Dying():
66
			return nil
67
		case delayedMessage = <-worker.queue:
68
			// pass
69
		}
70

71
		// level check
72
		level, weight = parseLogLevel(delayedMessage.level)
73
		if weight < cfg.weight {
74
			continue
75
		}
76

77
		// rate limit validation
78
		skip = !cfg.enabled || (cfg.limits.AcceptRate != 1 && cfg.limits.AcceptRate < rand.Float64())
79
		if cfg.debug || skip {
80
			useFallbackLogger(level, delayedMessage.message, delayedMessage.extraData)
81
			if skip {
82
				continue
83
			}
84
		}
85

86
		// it is needed 2 formats of dateTime
87
		dateTimeRFC := delayedMessage.dateTime.Format("2006-01-02T15:04:05.999999999")
88
		dateTimeStamp := delayedMessage.dateTime.Format(time.Stamp)
89

90
		// serialized extra data if provided
91
		extra = ""
92
		if delayedMessage.extraData != nil {
93
			data, err = json.Marshal(delayedMessage.extraData)
94
			if err != nil {
95
				handleError(delayedMessage.message, ErrFailedSerialize{reason: err})
96
				continue
97
			}
98
			extra = string(data)
99
		}
100

101
		// serialize full log message
102
		entry := logEntry{
103
			Component:     delayedMessage.component,
104
			ContextID:     delayedMessage.contextId,
105
			EventDateTime: dateTimeRFC,
106
			EventDate:     dateTimeRFC[:10],
107
			Extra:         extra,
108
			Level:         level,
109
			Message:       delayedMessage.message,
110
			Path:          delayedMessage.path,
111
		}
112
		data, err = json.Marshal(entry)
113
		if err != nil {
114
			handleError(delayedMessage.message, ErrFailedSerialize{reason: err})
115
			continue
116
		}
117

118
		// write to buffer and send to connection
119
		buffer := bytes.Buffer{}
120
		buffer.Grow(len(data) + 128)
121
		buffer.WriteByte('<')
122
		buffer.WriteString(strconv.Itoa(facilityPriority))
123
		buffer.WriteByte('>')
124
		buffer.WriteString(dateTimeStamp)
125
		buffer.WriteByte(' ')
126
		buffer.WriteString(cfg.this)
127
		buffer.WriteString(" moira: ")
128
		buffer.Write(data)
129
		buffer.WriteByte('\n')
130

131
		data = buffer.Bytes()
132
		statsd.MsgSize.Timing(int64(len(data)))
133

134
		err = worker.write(id, data)
135
		if err != nil {
136
			handleError(delayedMessage.message, ErrFailedTransport{reason: err})
137
			continue
138
		}
139
	}
140
}
141

142
func (worker *loggingWorker) lifeCycle() {
143
	worker.start()
144
	defer worker.stop()
145

146
	ch := make(chan os.Signal, 1)
147
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
148
	<-ch
149
}
150

151
func (worker *loggingWorker) start() {
152
	for i := 0; i < cfg.limits.ThreadsQty; i++ {
153
		func(id int) {
154
			worker.tomb.Go(func() error {
155
				return worker.consumer(id)
156
			})
157
		}(i)
158
	}
159
}
160

161
func (worker *loggingWorker) stop() {
162
	worker.tomb.Kill(nil)
163
	_ = worker.tomb.Wait()
164
}
165

166
// write sends the whole message body
167
func (worker *loggingWorker) write(id int, data []byte) error {
168
	var (
169
		conn      net.Conn
170
		started   uint64
171
		reconnect uint64
172
		n, total  int
173
		err       error
174
	)
175

176
	total = len(data)
177
	conn = worker.conn[id]
178
	started = monotime.Now()
179

180
	defer func() {
181
		statsd.MsgTotal.Increment()
182
		statsd.Write.Timing(int64(monotime.Since(started)))
183
	}()
184

185
	// send data in one piece
186
	n, err = conn.Write(data)
187
	if err != nil {
188
		// try to redial and retry writing in case of error
189
		reconnect = monotime.Now()
190
		conn, err = dial()
191
		statsd.Reconnect.Timing(int64(monotime.Since(reconnect)))
192
		if err != nil {
193
			return err
194
		}
195

196
		// replace connection if redial was successful
197
		worker.conn[id] = conn
198
		n, err = conn.Write(data)
199
	}
200

201
	if err != nil {
202
		return err
203
	}
204
	if n != total {
205
		return ErrIncompleteWrite{written: n, total: total}
206
	}
207

208
	return nil
209
}
210

211
func dial() (net.Conn, error) {
212
	return net.Dial("tcp", fmt.Sprintf("%s:%d", cfg.host, cfg.port))
213
}
214

215
func handleError(message string, err error) {
216
	useFallbackLogger(logLevelError, fmt.Sprintf("Failed to log: %v", err), nil)
217
	useFallbackLogger(logLevelError, fmt.Sprintf("Message was:\n%s", message), nil)
218

219
	if statsd != nil {
220
		statsd.Errors.Increment()
221
	}
222
}
223

224
func parseLogLevel(level string) (string, int) {
225
	weight, ok := logLevelWeights[level]
226
	if !ok {
227
		level = logLevelDefault
228
		weight = logLevelWeights[level]
229
	}
230
	return level, weight
231
}
232

233
func useFallbackLogger(level, message string, extra interface{}) {
234
	if extra != nil { // try to serialize extra
235
		var (
236
			extraStr string
237
			success  bool
238
		)
239

240
		if cfg.debug { // pretty print for debug mode
241
			extraBytes, err := json.MarshalIndent(extra, "", "  ")
242
			if err == nil {
243
				extraStr = string(extraBytes)
244
				success = true
245
			}
246
		}
247
		if !success {
248
			extraStr = fmt.Sprintf("%v", extra)
249
		}
250

251
		message = fmt.Sprintf("%s\nExtra: %s", message, extraStr)
252
	}
253

254
	if level == logLevelDebug {
255
		fallback.Debug(message)
256
	} else if level == logLevelInfo {
257
		fallback.Info(message)
258
	} else if level == logLevelWarn {
259
		fallback.Warning(message)
260
	} else if level == logLevelError {
261
		fallback.Error(message)
262
	} else if level == logLevelFatal {
263
		fallback.Fatal(message)
264
	}
265
}
266

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.