moira

Форк
0
/
metric.go 
261 строка · 7.5 Кб
1
package redis
2

3
import (
4
	"encoding/json"
5
	"fmt"
6

7
	"github.com/garyburd/redigo/redis"
8
	"gopkg.in/tomb.v2"
9

10
	"go.avito.ru/DO/moira"
11
	"go.avito.ru/DO/moira/database"
12
	"go.avito.ru/DO/moira/database/redis/reply"
13
)
14

15
// GetPatterns gets updated patterns array
16
func (connector *DbConnector) GetPatterns() ([]string, error) {
17
	c := connector.pool.Get()
18
	defer c.Close()
19
	patterns, err := redis.Strings(c.Do("SMEMBERS", patternsListKey))
20
	if err != nil {
21
		return nil, fmt.Errorf("Failed to get moira patterns, error: %v", err)
22
	}
23
	return patterns, nil
24
}
25

26
// GetMetricsValues gets metrics values for given interval
27
func (connector *DbConnector) GetMetricsValues(metrics []string, from int64, until int64) (map[string][]*moira.MetricValue, error) {
28
	c := connector.pool.Get()
29
	defer c.Close()
30

31
	for _, metric := range metrics {
32
		c.Send("ZRANGEBYSCORE", metricDataKey(metric), from, until, "WITHSCORES")
33
	}
34
	resultByMetrics, err := redis.Values(c.Do(""))
35
	if err != nil {
36
		return nil, fmt.Errorf("Failed to get metric values: %v", err)
37
	}
38

39
	res := make(map[string][]*moira.MetricValue, len(resultByMetrics))
40

41
	for i, resultByMetric := range resultByMetrics {
42
		metric := metrics[i]
43
		metricsValues, err := reply.MetricValues(resultByMetric)
44
		if err != nil {
45
			return nil, err
46
		}
47
		res[metric] = metricsValues
48
	}
49
	return res, nil
50
}
51

52
// GetMetricRetention gets given metric retention, if retention is empty then return default retention value(60)
53
func (connector *DbConnector) GetMetricRetention(metric string) (int64, error) {
54
	retention, ok := connector.getCachedRetention(metric)
55
	if ok {
56
		return retention, nil
57
	}
58
	retention, err := connector.getMetricRetention(metric)
59
	if err != nil {
60
		if err == database.ErrNil {
61
			return retention, nil
62
		}
63
		return retention, err
64
	}
65
	connector.retentionCache.Set(metric, retention, 0)
66
	return retention, nil
67
}
68

69
func (connector *DbConnector) getCachedRetention(metric string) (int64, bool) {
70
	value, ok := connector.retentionCache.Get(metric)
71
	if !ok {
72
		return 0, false
73
	}
74
	retention, ok := value.(int64)
75
	return retention, ok
76
}
77

78
func (connector *DbConnector) getMetricRetention(metric string) (int64, error) {
79
	c := connector.pool.Get()
80
	defer c.Close()
81

82
	retention, err := redis.Int64(c.Do("GET", metricRetentionKey(metric)))
83
	if err != nil {
84
		if err == redis.ErrNil {
85
			return 60, database.ErrNil
86
		}
87
		return 0, fmt.Errorf("Failed GET metric retention:%s, error: %v", metric, err)
88
	}
89
	return retention, nil
90
}
91

92
// SaveMetrics saves new metrics
93
func (connector *DbConnector) SaveMetrics(metrics map[string]*moira.MatchedMetric) error {
94
	c := connector.pool.Get()
95
	defer c.Close()
96
	for _, metric := range metrics {
97
		metricValue := fmt.Sprintf("%v %v", metric.Timestamp, metric.Value)
98
		c.Send("ZADD", metricDataKey(metric.Metric), metric.RetentionTimestamp, metricValue)
99
		c.Send("SET", metricRetentionKey(metric.Metric), metric.Retention)
100

101
		for _, pattern := range metric.Patterns {
102
			c.Send("SADD", patternMetricsKey(pattern), metric.Metric)
103
			event, err := json.Marshal(&moira.MetricEvent{
104
				Metric:  metric.Metric,
105
				Pattern: pattern,
106
			})
107
			if err != nil {
108
				continue
109
			}
110
			c.Send("PUBLISH", metricEventKey, event)
111
		}
112
	}
113
	return c.Flush()
114
}
115

116
// SubscribeMetricEvents creates subscription for new metrics and return channel for this events
117
func (connector *DbConnector) SubscribeMetricEvents(tomb *tomb.Tomb) (<-chan *moira.MetricEvent, error) {
118
	metricsChannel := make(chan *moira.MetricEvent, 100)
119
	dataChannel, err := connector.manageSubscriptions(tomb, metricEventKey)
120
	if err != nil {
121
		return nil, err
122
	}
123

124
	go func() {
125
		for {
126
			data, ok := <-dataChannel
127
			if !ok {
128
				connector.logger.Info("No more subscriptions, channel is closed. Stop process data...")
129
				close(metricsChannel)
130
				return
131
			}
132
			metricEvent := &moira.MetricEvent{}
133
			if err := json.Unmarshal(data, metricEvent); err != nil {
134
				connector.logger.ErrorF("Failed to parse MetricEvent: %s, error : %v", string(data), err)
135
				continue
136
			}
137
			metricsChannel <- metricEvent
138
		}
139
	}()
140

141
	return metricsChannel, nil
142
}
143

144
// AddPatternMetric adds new metrics by given pattern
145
func (connector *DbConnector) AddPatternMetric(pattern, metric string) error {
146
	c := connector.pool.Get()
147
	defer c.Close()
148
	if _, err := c.Do("SADD", patternMetricsKey(pattern), metric); err != nil {
149
		return fmt.Errorf("Failed to SADD pattern-metrics, pattern: %s, metric: %s, error: %v", pattern, metric, err)
150
	}
151
	return nil
152
}
153

154
// GetPatternMetrics gets all metrics by given pattern
155
func (connector *DbConnector) GetPatternMetrics(pattern string) ([]string, error) {
156
	c := connector.pool.Get()
157
	defer c.Close()
158

159
	metrics, err := redis.Strings(c.Do("SMEMBERS", patternMetricsKey(pattern)))
160
	if err != nil {
161
		if err == redis.ErrNil {
162
			return make([]string, 0), nil
163
		}
164
		return nil, fmt.Errorf("Failed to get pattern metrics for pattern %s, error: %v", pattern, err)
165
	}
166
	return metrics, nil
167
}
168

169
// RemovePattern removes pattern from patterns list
170
func (connector *DbConnector) RemovePattern(pattern string) error {
171
	c := connector.pool.Get()
172
	defer c.Close()
173
	if _, err := c.Do("SREM", patternsListKey, pattern); err != nil {
174
		return fmt.Errorf("Failed to remove pattern: %s, error: %v", pattern, err)
175
	}
176
	return nil
177
}
178

179
// RemovePatternsMetrics removes metrics by given patterns
180
func (connector *DbConnector) RemovePatternsMetrics(patterns []string) error {
181
	c := connector.pool.Get()
182
	defer c.Close()
183
	c.Send("MULTI")
184
	for _, pattern := range patterns {
185
		c.Send("DEL", patternMetricsKey(pattern))
186
	}
187
	if _, err := c.Do("EXEC"); err != nil {
188
		return fmt.Errorf("Failed to EXEC: %v", err)
189
	}
190
	return nil
191
}
192

193
// RemovePatternWithMetrics removes pattern metrics with data and given pattern
194
func (connector *DbConnector) RemovePatternWithMetrics(pattern string) error {
195
	metrics, err := connector.GetPatternMetrics(pattern)
196
	if err != nil {
197
		return err
198
	}
199
	c := connector.pool.Get()
200
	defer c.Close()
201
	c.Send("MULTI")
202
	c.Send("SREM", patternsListKey, pattern)
203
	for _, metric := range metrics {
204
		c.Send("DEL", metricDataKey(metric))
205
	}
206
	c.Send("DEL", patternMetricsKey(pattern))
207
	if _, err = c.Do("EXEC"); err != nil {
208
		return fmt.Errorf("Failed to EXEC: %v", err)
209
	}
210
	return nil
211
}
212

213
// RemoveMetricValues remove metric timestamps values from 0 to given time
214
func (connector *DbConnector) RemoveMetricValues(metric string, toTime int64) error {
215
	if !connector.needRemoveMetrics(metric) {
216
		return nil
217
	}
218
	c := connector.pool.Get()
219
	defer c.Close()
220
	if _, err := c.Do("ZREMRANGEBYSCORE", metricDataKey(metric), "-inf", toTime); err != nil {
221
		return fmt.Errorf("Failed to remove metrics from -inf to %v, error: %v", toTime, err)
222
	}
223
	return nil
224
}
225

226
// RemoveMetricsValues remove metrics timestamps values from 0 to given time
227
func (connector *DbConnector) RemoveMetricsValues(metrics []string, toTime int64) error {
228
	c := connector.pool.Get()
229
	defer c.Close()
230

231
	c.Send("MULTI")
232
	for _, metric := range metrics {
233
		if connector.needRemoveMetrics(metric) {
234
			c.Send("ZREMRANGEBYSCORE", metricDataKey(metric), "-inf", toTime)
235
		}
236
	}
237
	if _, err := c.Do("EXEC"); err != nil {
238
		return fmt.Errorf("Failed to EXEC remove metrics: %v", err)
239
	}
240
	return nil
241
}
242

243
func (connector *DbConnector) needRemoveMetrics(metric string) bool {
244
	err := connector.metricsCache.Add(metric, true, 0)
245
	return err == nil
246
}
247

248
var patternsListKey = "moira-pattern-list"
249
var metricEventKey = "metric-event"
250

251
func patternMetricsKey(pattern string) string {
252
	return fmt.Sprintf("moira-pattern-metrics:%s", pattern)
253
}
254

255
func metricDataKey(metric string) string {
256
	return fmt.Sprintf("moira-metric-data:%s", metric)
257
}
258

259
func metricRetentionKey(metric string) string {
260
	return fmt.Sprintf("moira-metric-retention:%s", metric)
261
}
262

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.