7
"github.com/garyburd/redigo/redis"
10
"go.avito.ru/DO/moira"
11
"go.avito.ru/DO/moira/database"
12
"go.avito.ru/DO/moira/database/redis/reply"
16
func (connector *DbConnector) GetPatterns() ([]string, error) {
17
c := connector.pool.Get()
19
patterns, err := redis.Strings(c.Do("SMEMBERS", patternsListKey))
21
return nil, fmt.Errorf("Failed to get moira patterns, error: %v", err)
27
func (connector *DbConnector) GetMetricsValues(metrics []string, from int64, until int64) (map[string][]*moira.MetricValue, error) {
28
c := connector.pool.Get()
31
for _, metric := range metrics {
32
c.Send("ZRANGEBYSCORE", metricDataKey(metric), from, until, "WITHSCORES")
34
resultByMetrics, err := redis.Values(c.Do(""))
36
return nil, fmt.Errorf("Failed to get metric values: %v", err)
39
res := make(map[string][]*moira.MetricValue, len(resultByMetrics))
41
for i, resultByMetric := range resultByMetrics {
43
metricsValues, err := reply.MetricValues(resultByMetric)
47
res[metric] = metricsValues
53
func (connector *DbConnector) GetMetricRetention(metric string) (int64, error) {
54
retention, ok := connector.getCachedRetention(metric)
58
retention, err := connector.getMetricRetention(metric)
60
if err == database.ErrNil {
65
connector.retentionCache.Set(metric, retention, 0)
69
func (connector *DbConnector) getCachedRetention(metric string) (int64, bool) {
70
value, ok := connector.retentionCache.Get(metric)
74
retention, ok := value.(int64)
78
func (connector *DbConnector) getMetricRetention(metric string) (int64, error) {
79
c := connector.pool.Get()
82
retention, err := redis.Int64(c.Do("GET", metricRetentionKey(metric)))
84
if err == redis.ErrNil {
85
return 60, database.ErrNil
87
return 0, fmt.Errorf("Failed GET metric retention:%s, error: %v", metric, err)
93
func (connector *DbConnector) SaveMetrics(metrics map[string]*moira.MatchedMetric) error {
94
c := connector.pool.Get()
96
for _, metric := range metrics {
97
metricValue := fmt.Sprintf("%v %v", metric.Timestamp, metric.Value)
98
c.Send("ZADD", metricDataKey(metric.Metric), metric.RetentionTimestamp, metricValue)
99
c.Send("SET", metricRetentionKey(metric.Metric), metric.Retention)
101
for _, pattern := range metric.Patterns {
102
c.Send("SADD", patternMetricsKey(pattern), metric.Metric)
103
event, err := json.Marshal(&moira.MetricEvent{
104
Metric: metric.Metric,
110
c.Send("PUBLISH", metricEventKey, event)
117
func (connector *DbConnector) SubscribeMetricEvents(tomb *tomb.Tomb) (<-chan *moira.MetricEvent, error) {
118
metricsChannel := make(chan *moira.MetricEvent, 100)
119
dataChannel, err := connector.manageSubscriptions(tomb, metricEventKey)
126
data, ok := <-dataChannel
128
connector.logger.Info("No more subscriptions, channel is closed. Stop process data...")
129
close(metricsChannel)
132
metricEvent := &moira.MetricEvent{}
133
if err := json.Unmarshal(data, metricEvent); err != nil {
134
connector.logger.ErrorF("Failed to parse MetricEvent: %s, error : %v", string(data), err)
137
metricsChannel <- metricEvent
141
return metricsChannel, nil
145
func (connector *DbConnector) AddPatternMetric(pattern, metric string) error {
146
c := connector.pool.Get()
148
if _, err := c.Do("SADD", patternMetricsKey(pattern), metric); err != nil {
149
return fmt.Errorf("Failed to SADD pattern-metrics, pattern: %s, metric: %s, error: %v", pattern, metric, err)
155
func (connector *DbConnector) GetPatternMetrics(pattern string) ([]string, error) {
156
c := connector.pool.Get()
159
metrics, err := redis.Strings(c.Do("SMEMBERS", patternMetricsKey(pattern)))
161
if err == redis.ErrNil {
162
return make([]string, 0), nil
164
return nil, fmt.Errorf("Failed to get pattern metrics for pattern %s, error: %v", pattern, err)
170
func (connector *DbConnector) RemovePattern(pattern string) error {
171
c := connector.pool.Get()
173
if _, err := c.Do("SREM", patternsListKey, pattern); err != nil {
174
return fmt.Errorf("Failed to remove pattern: %s, error: %v", pattern, err)
180
func (connector *DbConnector) RemovePatternsMetrics(patterns []string) error {
181
c := connector.pool.Get()
184
for _, pattern := range patterns {
185
c.Send("DEL", patternMetricsKey(pattern))
187
if _, err := c.Do("EXEC"); err != nil {
188
return fmt.Errorf("Failed to EXEC: %v", err)
194
func (connector *DbConnector) RemovePatternWithMetrics(pattern string) error {
195
metrics, err := connector.GetPatternMetrics(pattern)
199
c := connector.pool.Get()
202
c.Send("SREM", patternsListKey, pattern)
203
for _, metric := range metrics {
204
c.Send("DEL", metricDataKey(metric))
206
c.Send("DEL", patternMetricsKey(pattern))
207
if _, err = c.Do("EXEC"); err != nil {
208
return fmt.Errorf("Failed to EXEC: %v", err)
214
func (connector *DbConnector) RemoveMetricValues(metric string, toTime int64) error {
215
if !connector.needRemoveMetrics(metric) {
218
c := connector.pool.Get()
220
if _, err := c.Do("ZREMRANGEBYSCORE", metricDataKey(metric), "-inf", toTime); err != nil {
221
return fmt.Errorf("Failed to remove metrics from -inf to %v, error: %v", toTime, err)
227
func (connector *DbConnector) RemoveMetricsValues(metrics []string, toTime int64) error {
228
c := connector.pool.Get()
232
for _, metric := range metrics {
233
if connector.needRemoveMetrics(metric) {
234
c.Send("ZREMRANGEBYSCORE", metricDataKey(metric), "-inf", toTime)
237
if _, err := c.Do("EXEC"); err != nil {
238
return fmt.Errorf("Failed to EXEC remove metrics: %v", err)
243
func (connector *DbConnector) needRemoveMetrics(metric string) bool {
244
err := connector.metricsCache.Add(metric, true, 0)
248
var patternsListKey = "moira-pattern-list"
249
var metricEventKey = "metric-event"
251
func patternMetricsKey(pattern string) string {
252
return fmt.Sprintf("moira-pattern-metrics:%s", pattern)
255
func metricDataKey(metric string) string {
256
return fmt.Sprintf("moira-metric-data:%s", metric)
259
func metricRetentionKey(metric string) string {
260
return fmt.Sprintf("moira-metric-retention:%s", metric)