moira

Форк
0
80 строк · 2.0 Кб
1
package matchedmetrics
2

3
import (
4
	"sync"
5
	"time"
6

7
	"go.avito.ru/DO/moira"
8
	"go.avito.ru/DO/moira/filter"
9
	"go.avito.ru/DO/moira/metrics"
10
)
11

12
const (
13
	cacheCapacity = 1000
14
	flushInterval = 5 * time.Second
15
)
16

17
// MetricsMatcher make buffer of metrics and save it
18
type MetricsMatcher struct {
19
	logger       moira.Logger
20
	metrics      *metrics.FilterMetrics
21
	database     moira.Database
22
	cacheStorage *filter.Storage
23
	waitGroup    *sync.WaitGroup
24
}
25

26
// NewMetricsMatcher creates new MetricsMatcher
27
func NewMetricsMatcher(metrics *metrics.FilterMetrics, logger moira.Logger, database moira.Database, cacheStorage *filter.Storage) *MetricsMatcher {
28
	return &MetricsMatcher{
29
		metrics:      metrics,
30
		logger:       logger,
31
		database:     database,
32
		cacheStorage: cacheStorage,
33
		waitGroup:    &sync.WaitGroup{},
34
	}
35
}
36

37
// Start process matched metrics from channel and save it in cache storage
38
func (matcher *MetricsMatcher) Start(channel chan *moira.MatchedMetric) {
39
	matcher.waitGroup.Add(1)
40
	go func() {
41
		defer matcher.waitGroup.Done()
42

43
		buffer := make(map[string]*moira.MatchedMetric, cacheCapacity)
44
		for {
45
			select {
46
			case metric, ok := <-channel:
47
				if !ok {
48
					matcher.logger.Info("Moira Filter Metrics Matcher stopped")
49
					return
50
				}
51
				matcher.cacheStorage.EnrichMatchedMetric(buffer, metric)
52
				if len(buffer) < cacheCapacity {
53
					continue
54
				}
55
			case <-time.After(flushInterval):
56
			}
57

58
			if len(buffer) == 0 {
59
				continue
60
			}
61

62
			timer := time.Now()
63
			matcher.save(buffer)
64
			matcher.metrics.SavingTimer.UpdateSince(timer)
65
			buffer = make(map[string]*moira.MatchedMetric, cacheCapacity)
66
		}
67
	}()
68
	matcher.logger.Info("Moira Filter Metrics Matcher started")
69
}
70

71
// Wait waits for metric matcher instance will stop
72
func (matcher *MetricsMatcher) Wait() {
73
	matcher.waitGroup.Wait()
74
}
75

76
func (matcher *MetricsMatcher) save(buffer map[string]*moira.MatchedMetric) {
77
	if err := matcher.database.SaveMetrics(buffer); err != nil {
78
		matcher.logger.InfoF("Failed to save value in cache storage: %s", err.Error())
79
	}
80
}
81

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.