12
"github.com/aristanetworks/goarista/monotime"
13
"github.com/segmentio/fasthash/fnv1a"
15
"go.avito.ru/DO/moira"
16
"go.avito.ru/DO/moira/metrics"
24
processThreshold = 10 * time.Millisecond
28
asteriskHash = fnv1a.HashString64("*")
31
// PatternStorage contains pattern tree
32
type PatternStorage struct {
33
database moira.Database
35
metrics *metrics.FilterMetrics
39
PatternTree *patternNode
42
// matcherBuffer is operative buffer for PatternStorage.matchPattern method
43
// it helps to avoid redundant allocations
44
type matcherBuffer struct {
45
curr []*patternNode // curr is tree's nodes (horizontal) level which is being currently processed
46
next []*patternNode // next is tree's nodes (horizontal) level which is supposed to be processed after current
49
// patternNode contains pattern node
50
type patternNode struct {
51
Children []*patternNode
58
// NewPatternStorage creates new PatternStorage struct
59
func NewPatternStorage(
60
database moira.Database,
61
metrics *metrics.FilterMetrics,
63
) (*PatternStorage, error) {
64
storage := &PatternStorage{
68
heartbeat: make(chan bool, heartBeatCap),
70
storage.matcherPool = sync.Pool{
71
New: func() interface{} {
72
return &matcherBuffer{
73
curr: make([]*patternNode, 0, bufferAlloc),
74
next: make([]*patternNode, 0, bufferAlloc),
78
err := storage.RefreshTree()
82
// GetHeartbeat returns heartbeat chan
83
func (storage *PatternStorage) GetHeartbeat() chan bool {
84
return storage.heartbeat
87
// ProcessIncomingMetric validates, parses and matches incoming raw string
88
func (storage *PatternStorage) ProcessIncomingMetric(line []byte) *moira.MatchedMetric {
89
if len(storage.heartbeat) < heartBeatDrop {
90
storage.heartbeat <- true
92
storage.metrics.TotalMetricsReceived.Increment()
94
metric, value, timestamp, err := parseMetricFromString(line)
96
storage.logger.InfoF("cannot parse input: %v", err)
99
storage.metrics.ValidMetricsReceived.Increment()
101
matchingStart := monotime.Now()
102
matched := storage.matchPattern(metric)
103
matchingDuration := monotime.Since(matchingStart)
105
storage.metrics.MatchingTimer.Timing(int64(matchingDuration))
106
if matchingDuration > processThreshold {
107
storage.logger.WarnF("[Attempt 2] It took too long (%s) to process metric: %s", matchingDuration.String(), line)
110
if len(matched) > 0 {
111
storage.metrics.MatchingMetricsReceived.Increment()
112
return &moira.MatchedMetric{
116
Timestamp: timestamp,
117
RetentionTimestamp: timestamp,
125
// RefreshTree builds pattern tree from redis data
126
func (storage *PatternStorage) RefreshTree() error {
127
patterns, err := storage.database.GetPatterns()
132
return storage.buildTree(patterns)
135
// matchPattern returns array of matched patterns
136
func (storage *PatternStorage) matchPattern(metric string) []string {
137
buff := storage.matchedBufferAcquire()
138
defer storage.matchedBufferRelease(buff)
141
for i, c := range metric {
143
part := metric[index:i]
149
if !buff.findPart(part) {
155
part := metric[index:]
156
if !buff.findPart(part) {
160
matched := make([]string, 0, len(buff.curr))
161
for _, node := range buff.curr {
162
if len(node.Children) == 0 {
163
matched = append(matched, node.Prefix)
170
func (storage *PatternStorage) buildTree(patterns []string) error {
171
newTree := &patternNode{}
173
for _, pattern := range patterns {
174
currentNode := newTree
175
parts := strings.Split(pattern, ".")
176
if hasEmptyParts(parts) {
180
for _, part := range parts {
182
for _, child := range currentNode.Children {
183
if part == child.Part {
191
newNode := &patternNode{Part: part}
193
if currentNode.Prefix == "" {
194
newNode.Prefix = part
196
newNode.Prefix = fmt.Sprintf("%s.%s", currentNode.Prefix, part)
199
if part == "*" || !strings.ContainsAny(part, "{*?") {
200
newNode.Hash = fnv1a.HashString64(part)
201
} else if strings.Contains(part, "{") && strings.Contains(part, "}") {
202
prefix, bigSuffix := split2(part, "{")
203
inner, suffix := split2(bigSuffix, "}")
204
innerParts := strings.Split(inner, ",")
206
newNode.InnerParts = make([]string, 0, len(innerParts))
207
for _, innerPart := range innerParts {
208
newNode.InnerParts = append(newNode.InnerParts, fmt.Sprintf("%s%s%s", prefix, innerPart, suffix))
211
newNode.InnerParts = []string{part}
214
currentNode.Children = append(currentNode.Children, newNode)
215
currentNode = newNode
220
storage.PatternTree = newTree
224
// matchedBufferAcquire acquires buffer from pool and initializes it with tree's root
225
func (storage *PatternStorage) matchedBufferAcquire() *matcherBuffer {
226
buff := storage.matcherPool.Get().(*matcherBuffer)
227
buff.curr = append(buff.curr, storage.PatternTree)
231
// matchedBufferRelease returns buffer to pool as well as clears its nodes levels
232
func (storage *PatternStorage) matchedBufferRelease(buff *matcherBuffer) {
233
buff.curr = buff.curr[:0]
234
buff.next = buff.next[:0]
235
storage.matcherPool.Put(buff)
238
// findPart seeks for the given part among current nodes level
239
// while preparing the next level
240
func (buff *matcherBuffer) findPart(part string) bool {
241
buff.next = buff.next[:0]
243
hash := fnv1a.HashString64(part)
246
for _, node := range buff.curr {
247
for _, child := range node.Children {
250
if child.Hash == asteriskHash || child.Hash == hash {
253
for _, innerPart := range child.InnerParts {
254
innerMatch, _ := path.Match(innerPart, part)
263
buff.next = append(buff.next, child)
268
// swap current and the next level: the next one will be used at the next iteration
269
buff.curr, buff.next = buff.next, buff.curr
271
// it should be len(buff.next), but curr and next have just been swapped
272
return len(buff.curr) > 0
275
func hasEmptyParts(parts []string) bool {
276
for _, part := range parts {
284
// parseMetricFromString parses metric from string
285
// supported format: "<metricString> <valueFloat64> <timestampInt64>"
286
func parseMetricFromString(line []byte) (string, float64, int64, error) {
290
for i, b := range line {
292
if r > unicode.MaxASCII || !strconv.IsPrint(r) {
293
return "", 0, 0, fmt.Errorf("non-ascii or non-printable chars in metric name: '%s'", line)
296
parts[partIndex] = line[partOffset:i]
301
return "", 0, 0, fmt.Errorf("too many space-separated items: '%s'", line)
306
return "", 0, 0, fmt.Errorf("too few space-separated items: '%s'", line)
309
parts[partIndex] = line[partOffset:]
313
return "", 0, 0, fmt.Errorf("metric name is empty: '%s'", line)
316
value, err := strconv.ParseFloat(string(parts[1]), 64)
318
return "", 0, 0, fmt.Errorf("cannot parse value: '%s' (%s)", line, err)
321
timestamp, err := parseTimestamp(string(parts[2]))
322
if err != nil || timestamp == 0 {
323
return "", 0, 0, fmt.Errorf("cannot parse timestamp: '%s' (%s)", line, err)
326
return string(metric), value, timestamp, nil
329
func parseTimestamp(unixTimestamp string) (int64, error) {
330
timestamp, err := strconv.ParseFloat(unixTimestamp, 64)
331
return int64(timestamp), err
334
func split2(s, sep string) (string, string) {
335
splitResult := strings.SplitN(s, sep, 2)
336
if len(splitResult) < 2 {
337
return splitResult[0], ""
339
return splitResult[0], splitResult[1]