moira

Форк
0
/
patterns_storage.go 
340 строк · 8.2 Кб
1
package filter
2

3
import (
4
	"fmt"
5
	"path"
6
	"strconv"
7
	"strings"
8
	"sync"
9
	"time"
10
	"unicode"
11

12
	"github.com/aristanetworks/goarista/monotime"
13
	"github.com/segmentio/fasthash/fnv1a"
14

15
	"go.avito.ru/DO/moira"
16
	"go.avito.ru/DO/moira/metrics"
17
)
18

19
const (
20
	bufferAlloc   = 256
21
	heartBeatCap  = 10000
22
	heartBeatDrop = 8000
23

24
	processThreshold = 10 * time.Millisecond
25
)
26

27
var (
28
	asteriskHash = fnv1a.HashString64("*")
29
)
30

31
// PatternStorage contains pattern tree
32
type PatternStorage struct {
33
	database moira.Database
34
	logger   moira.Logger
35
	metrics  *metrics.FilterMetrics
36

37
	heartbeat   chan bool
38
	matcherPool sync.Pool
39
	PatternTree *patternNode
40
}
41

42
// matcherBuffer is operative buffer for PatternStorage.matchPattern method
43
// it helps to avoid redundant allocations
44
type matcherBuffer struct {
45
	curr []*patternNode // curr is tree's nodes (horizontal) level which is being currently processed
46
	next []*patternNode // next is tree's nodes (horizontal) level which is supposed to be processed after current
47
}
48

49
// patternNode contains pattern node
50
type patternNode struct {
51
	Children   []*patternNode
52
	Part       string
53
	Hash       uint64
54
	Prefix     string
55
	InnerParts []string
56
}
57

58
// NewPatternStorage creates new PatternStorage struct
59
func NewPatternStorage(
60
	database moira.Database,
61
	metrics *metrics.FilterMetrics,
62
	logger moira.Logger,
63
) (*PatternStorage, error) {
64
	storage := &PatternStorage{
65
		database:  database,
66
		logger:    logger,
67
		metrics:   metrics,
68
		heartbeat: make(chan bool, heartBeatCap),
69
	}
70
	storage.matcherPool = sync.Pool{
71
		New: func() interface{} {
72
			return &matcherBuffer{
73
				curr: make([]*patternNode, 0, bufferAlloc),
74
				next: make([]*patternNode, 0, bufferAlloc),
75
			}
76
		},
77
	}
78
	err := storage.RefreshTree()
79
	return storage, err
80
}
81

82
// GetHeartbeat returns heartbeat chan
83
func (storage *PatternStorage) GetHeartbeat() chan bool {
84
	return storage.heartbeat
85
}
86

87
// ProcessIncomingMetric validates, parses and matches incoming raw string
88
func (storage *PatternStorage) ProcessIncomingMetric(line []byte) *moira.MatchedMetric {
89
	if len(storage.heartbeat) < heartBeatDrop {
90
		storage.heartbeat <- true
91
	}
92
	storage.metrics.TotalMetricsReceived.Increment()
93

94
	metric, value, timestamp, err := parseMetricFromString(line)
95
	if err != nil {
96
		storage.logger.InfoF("cannot parse input: %v", err)
97
		return nil
98
	}
99
	storage.metrics.ValidMetricsReceived.Increment()
100

101
	matchingStart := monotime.Now()
102
	matched := storage.matchPattern(metric)
103
	matchingDuration := monotime.Since(matchingStart)
104

105
	storage.metrics.MatchingTimer.Timing(int64(matchingDuration))
106
	if matchingDuration > processThreshold {
107
		storage.logger.WarnF("[Attempt 2] It took too long (%s) to process metric: %s", matchingDuration.String(), line)
108
	}
109

110
	if len(matched) > 0 {
111
		storage.metrics.MatchingMetricsReceived.Increment()
112
		return &moira.MatchedMetric{
113
			Metric:             metric,
114
			Patterns:           matched,
115
			Value:              value,
116
			Timestamp:          timestamp,
117
			RetentionTimestamp: timestamp,
118
			Retention:          60,
119
		}
120
	}
121

122
	return nil
123
}
124

125
// RefreshTree builds pattern tree from redis data
126
func (storage *PatternStorage) RefreshTree() error {
127
	patterns, err := storage.database.GetPatterns()
128
	if err != nil {
129
		return err
130
	}
131

132
	return storage.buildTree(patterns)
133
}
134

135
// matchPattern returns array of matched patterns
136
func (storage *PatternStorage) matchPattern(metric string) []string {
137
	buff := storage.matchedBufferAcquire()
138
	defer storage.matchedBufferRelease(buff)
139

140
	index := 0
141
	for i, c := range metric {
142
		if c == '.' {
143
			part := metric[index:i]
144
			if len(part) == 0 {
145
				return []string{}
146
			}
147
			index = i + 1
148

149
			if !buff.findPart(part) {
150
				return []string{}
151
			}
152
		}
153
	}
154

155
	part := metric[index:]
156
	if !buff.findPart(part) {
157
		return []string{}
158
	}
159

160
	matched := make([]string, 0, len(buff.curr))
161
	for _, node := range buff.curr {
162
		if len(node.Children) == 0 {
163
			matched = append(matched, node.Prefix)
164
		}
165
	}
166

167
	return matched
168
}
169

170
func (storage *PatternStorage) buildTree(patterns []string) error {
171
	newTree := &patternNode{}
172

173
	for _, pattern := range patterns {
174
		currentNode := newTree
175
		parts := strings.Split(pattern, ".")
176
		if hasEmptyParts(parts) {
177
			continue
178
		}
179

180
		for _, part := range parts {
181
			found := false
182
			for _, child := range currentNode.Children {
183
				if part == child.Part {
184
					currentNode = child
185
					found = true
186
					break
187
				}
188
			}
189

190
			if !found {
191
				newNode := &patternNode{Part: part}
192

193
				if currentNode.Prefix == "" {
194
					newNode.Prefix = part
195
				} else {
196
					newNode.Prefix = fmt.Sprintf("%s.%s", currentNode.Prefix, part)
197
				}
198

199
				if part == "*" || !strings.ContainsAny(part, "{*?") {
200
					newNode.Hash = fnv1a.HashString64(part)
201
				} else if strings.Contains(part, "{") && strings.Contains(part, "}") {
202
					prefix, bigSuffix := split2(part, "{")
203
					inner, suffix := split2(bigSuffix, "}")
204
					innerParts := strings.Split(inner, ",")
205

206
					newNode.InnerParts = make([]string, 0, len(innerParts))
207
					for _, innerPart := range innerParts {
208
						newNode.InnerParts = append(newNode.InnerParts, fmt.Sprintf("%s%s%s", prefix, innerPart, suffix))
209
					}
210
				} else {
211
					newNode.InnerParts = []string{part}
212
				}
213

214
				currentNode.Children = append(currentNode.Children, newNode)
215
				currentNode = newNode
216
			}
217
		}
218
	}
219

220
	storage.PatternTree = newTree
221
	return nil
222
}
223

224
// matchedBufferAcquire acquires buffer from pool and initializes it with tree's root
225
func (storage *PatternStorage) matchedBufferAcquire() *matcherBuffer {
226
	buff := storage.matcherPool.Get().(*matcherBuffer)
227
	buff.curr = append(buff.curr, storage.PatternTree)
228
	return buff
229
}
230

231
// matchedBufferRelease returns buffer to pool as well as clears its nodes levels
232
func (storage *PatternStorage) matchedBufferRelease(buff *matcherBuffer) {
233
	buff.curr = buff.curr[:0]
234
	buff.next = buff.next[:0]
235
	storage.matcherPool.Put(buff)
236
}
237

238
// findPart seeks for the given part among current nodes level
239
// while preparing the next level
240
func (buff *matcherBuffer) findPart(part string) bool {
241
	buff.next = buff.next[:0]
242

243
	hash := fnv1a.HashString64(part)
244
	match := false
245

246
	for _, node := range buff.curr {
247
		for _, child := range node.Children {
248
			match = false
249

250
			if child.Hash == asteriskHash || child.Hash == hash {
251
				match = true
252
			} else {
253
				for _, innerPart := range child.InnerParts {
254
					innerMatch, _ := path.Match(innerPart, part)
255
					if innerMatch {
256
						match = true
257
						break
258
					}
259
				}
260
			}
261

262
			if match {
263
				buff.next = append(buff.next, child)
264
			}
265
		}
266
	}
267

268
	// swap current and the next level: the next one will be used at the next iteration
269
	buff.curr, buff.next = buff.next, buff.curr
270

271
	// it should be len(buff.next), but curr and next have just been swapped
272
	return len(buff.curr) > 0
273
}
274

275
func hasEmptyParts(parts []string) bool {
276
	for _, part := range parts {
277
		if part == "" {
278
			return true
279
		}
280
	}
281
	return false
282
}
283

284
// parseMetricFromString parses metric from string
285
// supported format: "<metricString> <valueFloat64> <timestampInt64>"
286
func parseMetricFromString(line []byte) (string, float64, int64, error) {
287
	var parts [3][]byte
288
	partIndex := 0
289
	partOffset := 0
290
	for i, b := range line {
291
		r := rune(b)
292
		if r > unicode.MaxASCII || !strconv.IsPrint(r) {
293
			return "", 0, 0, fmt.Errorf("non-ascii or non-printable chars in metric name: '%s'", line)
294
		}
295
		if b == ' ' {
296
			parts[partIndex] = line[partOffset:i]
297
			partOffset = i + 1
298
			partIndex++
299
		}
300
		if partIndex > 2 {
301
			return "", 0, 0, fmt.Errorf("too many space-separated items: '%s'", line)
302
		}
303
	}
304

305
	if partIndex < 2 {
306
		return "", 0, 0, fmt.Errorf("too few space-separated items: '%s'", line)
307
	}
308

309
	parts[partIndex] = line[partOffset:]
310

311
	metric := parts[0]
312
	if len(metric) < 1 {
313
		return "", 0, 0, fmt.Errorf("metric name is empty: '%s'", line)
314
	}
315

316
	value, err := strconv.ParseFloat(string(parts[1]), 64)
317
	if err != nil {
318
		return "", 0, 0, fmt.Errorf("cannot parse value: '%s' (%s)", line, err)
319
	}
320

321
	timestamp, err := parseTimestamp(string(parts[2]))
322
	if err != nil || timestamp == 0 {
323
		return "", 0, 0, fmt.Errorf("cannot parse timestamp: '%s' (%s)", line, err)
324
	}
325

326
	return string(metric), value, timestamp, nil
327
}
328

329
func parseTimestamp(unixTimestamp string) (int64, error) {
330
	timestamp, err := strconv.ParseFloat(unixTimestamp, 64)
331
	return int64(timestamp), err
332
}
333

334
func split2(s, sep string) (string, string) {
335
	splitResult := strings.SplitN(s, sep, 2)
336
	if len(splitResult) < 2 {
337
		return splitResult[0], ""
338
	}
339
	return splitResult[0], splitResult[1]
340
}
341

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.