moira
80 строк · 2.0 Кб
1package matchedmetrics
2
3import (
4"sync"
5"time"
6
7"go.avito.ru/DO/moira"
8"go.avito.ru/DO/moira/filter"
9"go.avito.ru/DO/moira/metrics"
10)
11
12const (
13cacheCapacity = 1000
14flushInterval = 5 * time.Second
15)
16
17// MetricsMatcher make buffer of metrics and save it
18type MetricsMatcher struct {
19logger moira.Logger
20metrics *metrics.FilterMetrics
21database moira.Database
22cacheStorage *filter.Storage
23waitGroup *sync.WaitGroup
24}
25
26// NewMetricsMatcher creates new MetricsMatcher
27func NewMetricsMatcher(metrics *metrics.FilterMetrics, logger moira.Logger, database moira.Database, cacheStorage *filter.Storage) *MetricsMatcher {
28return &MetricsMatcher{
29metrics: metrics,
30logger: logger,
31database: database,
32cacheStorage: cacheStorage,
33waitGroup: &sync.WaitGroup{},
34}
35}
36
37// Start process matched metrics from channel and save it in cache storage
38func (matcher *MetricsMatcher) Start(channel chan *moira.MatchedMetric) {
39matcher.waitGroup.Add(1)
40go func() {
41defer matcher.waitGroup.Done()
42
43buffer := make(map[string]*moira.MatchedMetric, cacheCapacity)
44for {
45select {
46case metric, ok := <-channel:
47if !ok {
48matcher.logger.Info("Moira Filter Metrics Matcher stopped")
49return
50}
51matcher.cacheStorage.EnrichMatchedMetric(buffer, metric)
52if len(buffer) < cacheCapacity {
53continue
54}
55case <-time.After(flushInterval):
56}
57
58if len(buffer) == 0 {
59continue
60}
61
62timer := time.Now()
63matcher.save(buffer)
64matcher.metrics.SavingTimer.UpdateSince(timer)
65buffer = make(map[string]*moira.MatchedMetric, cacheCapacity)
66}
67}()
68matcher.logger.Info("Moira Filter Metrics Matcher started")
69}
70
71// Wait waits for metric matcher instance will stop
72func (matcher *MetricsMatcher) Wait() {
73matcher.waitGroup.Wait()
74}
75
76func (matcher *MetricsMatcher) save(buffer map[string]*moira.MatchedMetric) {
77if err := matcher.database.SaveMetrics(buffer); err != nil {
78matcher.logger.InfoF("Failed to save value in cache storage: %s", err.Error())
79}
80}
81