10
"go.avito.ru/DO/moira"
13
const defaultRetention = 60
15
type retentionMatcher struct {
16
pattern *regexp.Regexp
20
type retentionCacheItem struct {
25
// Storage struct to store retention matchers
27
retentions []retentionMatcher
28
retentionsCache map[string]*retentionCacheItem
29
metricsCache map[string]*moira.MatchedMetric
32
// NewCacheStorage create new Storage
33
func NewCacheStorage(reader io.Reader) (*Storage, error) {
35
retentionsCache: make(map[string]*retentionCacheItem),
36
metricsCache: make(map[string]*moira.MatchedMetric),
39
if err := storage.buildRetentions(bufio.NewScanner(reader)); err != nil {
45
// EnrichMatchedMetric calculate retention and filter cached values
46
func (storage *Storage) EnrichMatchedMetric(buffer map[string]*moira.MatchedMetric, m *moira.MatchedMetric) {
47
m.Retention = storage.getRetention(m)
48
m.RetentionTimestamp = roundToNearestRetention(m.Timestamp, int64(m.Retention))
49
if ex, ok := storage.metricsCache[m.Metric]; ok && ex.RetentionTimestamp == m.RetentionTimestamp && ex.Value == m.Value {
52
storage.metricsCache[m.Metric] = m
56
// getRetention returns first matched retention for metric
57
func (storage *Storage) getRetention(m *moira.MatchedMetric) int {
58
if item, ok := storage.retentionsCache[m.Metric]; ok && item.timestamp+60 > m.Timestamp {
61
for _, matcher := range storage.retentions {
62
if matcher.pattern.MatchString(m.Metric) {
63
storage.retentionsCache[m.Metric] = &retentionCacheItem{
64
value: matcher.retention,
65
timestamp: m.Timestamp,
67
return matcher.retention
70
return defaultRetention
73
func (storage *Storage) buildRetentions(retentionScanner *bufio.Scanner) error {
74
storage.retentions = make([]retentionMatcher, 0, 100)
76
for retentionScanner.Scan() {
77
line := retentionScanner.Text()
78
if strings.HasPrefix(line, "#") || strings.Count(line, "=") != 1 {
82
pattern, err := regexp.Compile(strings.TrimSpace(strings.Split(line, "=")[1]))
87
retentionScanner.Scan()
88
line = retentionScanner.Text()
89
retentions := strings.TrimSpace(strings.Split(line, "=")[1])
90
retention, err := rawRetentionToSeconds(retentions[0:strings.Index(retentions, ":")])
95
storage.retentions = append(storage.retentions, retentionMatcher{
100
return retentionScanner.Err()
103
func rawRetentionToSeconds(rawRetention string) (int, error) {
104
retention, err := strconv.Atoi(rawRetention)
106
return retention, nil
111
case strings.HasSuffix(rawRetention, "m"):
113
case strings.HasSuffix(rawRetention, "h"):
115
case strings.HasSuffix(rawRetention, "d"):
116
multiplier = 60 * 60 * 24
117
case strings.HasSuffix(rawRetention, "w"):
118
multiplier = 60 * 60 * 24 * 7
119
case strings.HasSuffix(rawRetention, "y"):
120
multiplier = 60 * 60 * 24 * 365
123
retention, err = strconv.Atoi(rawRetention[0 : len(rawRetention)-1])
128
return retention * multiplier, nil
131
func roundToNearestRetention(ts, retention int64) int64 {
132
return (ts + retention/2) / retention * retention