moira

Форк
0
/
escalation.go 
331 строка · 9.5 Кб
1
package redis
2

3
import (
4
	"encoding/json"
5
	"fmt"
6
	"strings"
7

8
	"github.com/garyburd/redigo/redis"
9

10
	"go.avito.ru/DO/moira"
11
	"go.avito.ru/DO/moira/database/redis/reply"
12
)
13

14
const (
15
	keyScheduledEscalations           = "moira-notifier-scheduled-escalations"
16
	prefixPendingEscalation           = "moira-trigger-pending-escalations"
17
	prefixPendingEscalationResolution = "moira-trigger-pending-escalations-res"
18
	prefixProcessedEscalation         = "moira-trigger-processed-escalations"
19
)
20

21
func (connector *DbConnector) AddEscalations(
22
	ts int64, event moira.NotificationEvent, trigger moira.TriggerData, escalations []moira.EscalationData,
23
) error {
24
	c := connector.pool.Get()
25
	defer c.Close()
26

27
	var (
28
		err    error
29
		offset int64
30
	)
31

32
	// if event is about resolution then it's necessary to remove escalations
33
	// and vice versa: if event is about an error then it's necessary to remove resolutions
34
	isResolution := event.State == moira.OK
35
	_ = connector.ackTriggerEscalations(trigger.ID, event.Metric, !isResolution)
36

37
	c.Send("MULTI")
38
	c.Send("SET", triggerPendingEscalationsKey(trigger.ID, event.Metric, isResolution), "")
39

40
	// resolution-type escalations must be filtered by those ones which were sent
41
	if isResolution {
42
		if escalations, err = connector.filterEscalationsByRegistered(escalations, event.Metric, trigger.ID); err != nil {
43
			return err
44
		}
45
	}
46

47
	lastIndex := len(escalations) - 1
48
	for i, e := range escalations {
49
		scheduledEvent := &moira.ScheduledEscalationEvent{
50
			Escalation:   e,
51
			Event:        event,
52
			Trigger:      trigger,
53
			IsFinal:      i == lastIndex,
54
			IsResolution: isResolution,
55
		}
56
		bytes, err := json.Marshal(scheduledEvent)
57
		if err != nil {
58
			return err
59
		}
60

61
		if !isResolution {
62
			// if event is not about problem being resolved then escalation is needed according to the schedule
63
			offset = e.OffsetInMinutes * 60
64
		} else {
65
			// otherwise (if problem is resolved) escalation can be sent right now
66
			offset = 0
67
		}
68
		c.Send("ZADD", keyScheduledEscalations, ts+offset, bytes)
69
	}
70

71
	if _, err := c.Do("EXEC"); err != nil {
72
		return fmt.Errorf("Failed to EXEC: %s", err.Error())
73
	}
74
	return nil
75
}
76

77
func (connector *DbConnector) TriggerHasPendingEscalations(triggerID string, withResolutions bool) (bool, error) {
78
	c := connector.pool.Get()
79
	defer c.Close()
80

81
	for _, isResolution := range []bool{false, true} {
82
		if isResolution && !withResolutions {
83
			continue
84
		} else {
85
			value, err := redis.Strings(c.Do("KEYS", triggerPendingEscalationsKey(triggerID, "*", isResolution)))
86
			if err != nil && err != redis.ErrNil {
87
				return false, err
88
			}
89
			if len(value) > 0 {
90
				return true, nil
91
			}
92
		}
93
	}
94

95
	return false, nil
96
}
97

98
func (connector *DbConnector) MetricHasPendingEscalations(triggerID, metric string, withResolutions bool) (bool, error) {
99
	c := connector.pool.Get()
100
	defer c.Close()
101

102
	for _, isResolution := range []bool{false, true} {
103
		if isResolution && !withResolutions {
104
			continue
105
		} else if value, err := redis.Bool(c.Do("EXISTS", triggerPendingEscalationsKey(triggerID, metric, isResolution))); err != nil || value {
106
			return value, err
107
		}
108
	}
109

110
	return false, nil
111
}
112

113
func (connector *DbConnector) AckEscalations(triggerID, metric string, withResolutions bool) error {
114
	for _, isResolution := range []bool{false, true} {
115
		if isResolution && !withResolutions {
116
			continue
117
		} else if err := connector.ackTriggerEscalations(triggerID, metric, isResolution); err != nil {
118
			return err
119
		}
120
	}
121

122
	return nil
123
}
124

125
func (connector *DbConnector) AckEscalationsBatch(triggerID string, metrics []string, withResolutions bool) error {
126
	for _, isResolution := range []bool{false, true} {
127
		if isResolution && !withResolutions {
128
			continue
129
		} else if err := connector.ackTriggerEscalationsBatch(triggerID, metrics, isResolution); err != nil {
130
			return err
131
		}
132
	}
133

134
	return nil
135
}
136

137
func (connector *DbConnector) FetchScheduledEscalationEvents(to int64) ([]*moira.ScheduledEscalationEvent, error) {
138
	c := connector.pool.Get()
139
	defer c.Close()
140

141
	c.Send("MULTI")
142
	c.Send("ZRANGEBYSCORE", keyScheduledEscalations, "-inf", to)
143
	c.Send("ZREMRANGEBYSCORE", keyScheduledEscalations, "-inf", to)
144
	response, err := redis.Values(c.Do("EXEC"))
145
	if err != nil {
146
		return nil, fmt.Errorf("Failed to EXEC: %s", err)
147
	}
148
	if len(response) == 0 {
149
		return make([]*moira.ScheduledEscalationEvent, 0), nil
150
	}
151
	return reply.ScheduledEscalationEvents(response[0], nil)
152
}
153

154
// RegisterProcessedEscalation stores data of escalation (but not resolution) event that was processed (sent)
155
func (connector *DbConnector) RegisterProcessedEscalationID(escalationID, triggerID, metric string) error {
156
	c := connector.pool.Get()
157
	defer c.Close()
158

159
	_, err := c.Do("LPUSH", triggerProcessedEscalationsKey(triggerID, metric), escalationID)
160
	return err
161
}
162

163
func (connector *DbConnector) ackTriggerEscalations(triggerID, metric string, isResolution bool) error {
164
	c := connector.pool.Get()
165
	defer c.Close()
166

167
	c.Send("MULTI")
168
	c.Send("DEL", triggerPendingEscalationsKey(triggerID, metric, isResolution))
169
	if isResolution {
170
		// the metric is now OK, so we can delete all unacked messages
171
		c.Send("DEL", unacknowledgedMessagesKey(triggerID, metric))
172
	}
173
	_, err := c.Do("EXEC")
174
	return err
175
}
176

177
func (connector *DbConnector) ackTriggerEscalationsBatch(triggerID string, metrics []string, isResolution bool) error {
178
	c := connector.pool.Get()
179
	defer c.Close()
180

181
	c.Send("MULTI")
182

183
	// cutting these into chunks so that Redis doesn't choke
184
	for len(metrics) > 0 {
185
		const chunkSize = 25
186
		var chunk, rest []string = metrics, nil
187
		if len(chunk) > chunkSize {
188
			chunk, rest = chunk[:chunkSize], chunk[chunkSize:]
189
		}
190

191
		peKeys := make([]interface{}, len(chunk))
192
		for i, metric := range chunk {
193
			peKeys[i] = triggerPendingEscalationsKey(triggerID, metric, isResolution)
194
		}
195
		c.Send("DEL", peKeys...)
196

197
		if isResolution {
198
			// the metric is now OK, so we can delete all unacked messages
199
			umKeys := make([]interface{}, len(chunk))
200
			for i, metric := range chunk {
201
				umKeys[i] = unacknowledgedMessagesKey(triggerID, metric)
202
			}
203
			c.Send("DEL", umKeys...)
204
		}
205

206
		metrics = rest
207
	}
208

209
	_, err := c.Do("EXEC")
210
	return err
211
}
212

213
func (connector *DbConnector) fetchRegisteredEscalationIDs(triggerID, metric string) ([]string, error) {
214
	c := connector.pool.Get()
215
	defer c.Close()
216

217
	c.Send("MULTI")
218
	c.Send("LRANGE", triggerProcessedEscalationsKey(triggerID, metric), 0, -1)
219
	c.Send("DEL", triggerProcessedEscalationsKey(triggerID, metric))
220

221
	if rawResponse, err := redis.Values(c.Do("EXEC")); err != nil {
222
		return nil, fmt.Errorf("Failed to EXEC: %s", err.Error())
223
	} else {
224
		return redis.Strings(rawResponse[0], nil)
225
	}
226
}
227

228
func (connector *DbConnector) filterEscalationsByRegistered(escalations []moira.EscalationData, triggerID, metric string) ([]moira.EscalationData, error) {
229
	if registeredEscalationIDs, err := connector.fetchRegisteredEscalationIDs(triggerID, metric); err != nil {
230
		return nil, err
231
	} else {
232
		filteredEscalations := make([]moira.EscalationData, 0, len(escalations))
233
		registeredEscalationSet := make(map[string]bool)
234

235
		// create map of registered escalation ids
236
		for _, id := range registeredEscalationIDs {
237
			registeredEscalationSet[id] = true
238
		}
239

240
		// keep only registered escalations
241
		for _, escalation := range escalations {
242
			if _, ok := registeredEscalationSet[escalation.ID]; ok {
243
				filteredEscalations = append(filteredEscalations, escalation)
244
			}
245
		}
246

247
		return filteredEscalations, nil
248
	}
249
}
250

251
func triggerPendingEscalationsKey(triggerID, metric string, isResolution bool) string {
252
	var prefix string
253
	if !isResolution {
254
		prefix = prefixPendingEscalation
255
	} else {
256
		prefix = prefixPendingEscalationResolution
257
	}
258

259
	return fmt.Sprintf("%s:%s:%s", prefix, triggerID, metric)
260
}
261

262
func triggerProcessedEscalationsKey(triggerID, metric string) string {
263
	return fmt.Sprintf("%s:%s:%s", prefixProcessedEscalation, triggerID, metric)
264
}
265

266
func (connector *DbConnector) AddUnacknowledgedMessage(
267
	triggerID string, metric string, link moira.MessageLink,
268
) error {
269
	c := connector.pool.Get()
270
	defer c.Close()
271

272
	c.Send("MULTI")
273
	key := unacknowledgedMessagesKey(triggerID, metric)
274
	c.Send("SADD", key, link.StorageKey())
275
	_, err := redis.Values(c.Do("EXEC"))
276
	if err != nil {
277
		return fmt.Errorf("Failed to EXEC: %s", err)
278
	}
279
	return nil
280
}
281

282
func (connector *DbConnector) GetUnacknowledgedMessages(triggerID, metric string) ([]moira.MessageLink, error) {
283
	c := connector.pool.Get()
284
	defer c.Close()
285

286
	key := unacknowledgedMessagesKey(triggerID, metric)
287
	response, err := redis.Strings(c.Do("SMEMBERS", key))
288
	if err != nil {
289
		return nil, fmt.Errorf("could not get acknowledged messages: %s", err.Error())
290
	}
291
	parsedResponse := make([]moira.MessageLink, len(response))
292
	for i, item := range response {
293
		link, err := parseMessageLink(item)
294
		if err != nil {
295
			return nil, fmt.Errorf("could not get acknowledged messages: %s", err.Error())
296
		}
297
		parsedResponse[i] = link
298
	}
299
	return parsedResponse, nil
300
}
301

302
func (connector *DbConnector) AckUnacknowledgedMessages(triggerID, metric string) error {
303
	c := connector.pool.Get()
304
	defer c.Close()
305

306
	_, err := c.Do("DEL", unacknowledgedMessagesKey(triggerID, metric))
307
	return err
308
}
309

310
func parseMessageLink(s string) (moira.MessageLink, error) {
311
	split := strings.SplitN(s, ":", 2)
312
	if len(split) < 2 {
313
		return nil, fmt.Errorf("%s is not a valid MessageLink", s)
314
	}
315

316
	switch split[0] {
317
	case "slack":
318
		link := new(moira.SlackThreadLink)
319
		if err := link.FromString(split[1]); err != nil {
320
			return nil, err
321
		} else {
322
			return link, nil
323
		}
324
	default:
325
		return nil, fmt.Errorf("%s is not a known MessageLink type", split[0])
326
	}
327
}
328

329
func unacknowledgedMessagesKey(triggerID, metric string) string {
330
	return fmt.Sprintf("moira-unacknowledged-messages:%s:%s", triggerID, metric)
331
}
332

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.