8
"github.com/garyburd/redigo/redis"
10
"go.avito.ru/DO/moira"
11
"go.avito.ru/DO/moira/database/redis/reply"
15
keyScheduledEscalations = "moira-notifier-scheduled-escalations"
16
prefixPendingEscalation = "moira-trigger-pending-escalations"
17
prefixPendingEscalationResolution = "moira-trigger-pending-escalations-res"
18
prefixProcessedEscalation = "moira-trigger-processed-escalations"
21
func (connector *DbConnector) AddEscalations(
22
ts int64, event moira.NotificationEvent, trigger moira.TriggerData, escalations []moira.EscalationData,
24
c := connector.pool.Get()
32
// if event is about resolution then it's necessary to remove escalations
33
// and vice versa: if event is about an error then it's necessary to remove resolutions
34
isResolution := event.State == moira.OK
35
_ = connector.ackTriggerEscalations(trigger.ID, event.Metric, !isResolution)
38
c.Send("SET", triggerPendingEscalationsKey(trigger.ID, event.Metric, isResolution), "")
40
// resolution-type escalations must be filtered by those ones which were sent
42
if escalations, err = connector.filterEscalationsByRegistered(escalations, event.Metric, trigger.ID); err != nil {
47
lastIndex := len(escalations) - 1
48
for i, e := range escalations {
49
scheduledEvent := &moira.ScheduledEscalationEvent{
53
IsFinal: i == lastIndex,
54
IsResolution: isResolution,
56
bytes, err := json.Marshal(scheduledEvent)
62
// if event is not about problem being resolved then escalation is needed according to the schedule
63
offset = e.OffsetInMinutes * 60
65
// otherwise (if problem is resolved) escalation can be sent right now
68
c.Send("ZADD", keyScheduledEscalations, ts+offset, bytes)
71
if _, err := c.Do("EXEC"); err != nil {
72
return fmt.Errorf("Failed to EXEC: %s", err.Error())
77
func (connector *DbConnector) TriggerHasPendingEscalations(triggerID string, withResolutions bool) (bool, error) {
78
c := connector.pool.Get()
81
for _, isResolution := range []bool{false, true} {
82
if isResolution && !withResolutions {
85
value, err := redis.Strings(c.Do("KEYS", triggerPendingEscalationsKey(triggerID, "*", isResolution)))
86
if err != nil && err != redis.ErrNil {
98
func (connector *DbConnector) MetricHasPendingEscalations(triggerID, metric string, withResolutions bool) (bool, error) {
99
c := connector.pool.Get()
102
for _, isResolution := range []bool{false, true} {
103
if isResolution && !withResolutions {
105
} else if value, err := redis.Bool(c.Do("EXISTS", triggerPendingEscalationsKey(triggerID, metric, isResolution))); err != nil || value {
113
func (connector *DbConnector) AckEscalations(triggerID, metric string, withResolutions bool) error {
114
for _, isResolution := range []bool{false, true} {
115
if isResolution && !withResolutions {
117
} else if err := connector.ackTriggerEscalations(triggerID, metric, isResolution); err != nil {
125
func (connector *DbConnector) AckEscalationsBatch(triggerID string, metrics []string, withResolutions bool) error {
126
for _, isResolution := range []bool{false, true} {
127
if isResolution && !withResolutions {
129
} else if err := connector.ackTriggerEscalationsBatch(triggerID, metrics, isResolution); err != nil {
137
func (connector *DbConnector) FetchScheduledEscalationEvents(to int64) ([]*moira.ScheduledEscalationEvent, error) {
138
c := connector.pool.Get()
142
c.Send("ZRANGEBYSCORE", keyScheduledEscalations, "-inf", to)
143
c.Send("ZREMRANGEBYSCORE", keyScheduledEscalations, "-inf", to)
144
response, err := redis.Values(c.Do("EXEC"))
146
return nil, fmt.Errorf("Failed to EXEC: %s", err)
148
if len(response) == 0 {
149
return make([]*moira.ScheduledEscalationEvent, 0), nil
151
return reply.ScheduledEscalationEvents(response[0], nil)
154
// RegisterProcessedEscalation stores data of escalation (but not resolution) event that was processed (sent)
155
func (connector *DbConnector) RegisterProcessedEscalationID(escalationID, triggerID, metric string) error {
156
c := connector.pool.Get()
159
_, err := c.Do("LPUSH", triggerProcessedEscalationsKey(triggerID, metric), escalationID)
163
func (connector *DbConnector) ackTriggerEscalations(triggerID, metric string, isResolution bool) error {
164
c := connector.pool.Get()
168
c.Send("DEL", triggerPendingEscalationsKey(triggerID, metric, isResolution))
170
// the metric is now OK, so we can delete all unacked messages
171
c.Send("DEL", unacknowledgedMessagesKey(triggerID, metric))
173
_, err := c.Do("EXEC")
177
func (connector *DbConnector) ackTriggerEscalationsBatch(triggerID string, metrics []string, isResolution bool) error {
178
c := connector.pool.Get()
183
// cutting these into chunks so that Redis doesn't choke
184
for len(metrics) > 0 {
186
var chunk, rest []string = metrics, nil
187
if len(chunk) > chunkSize {
188
chunk, rest = chunk[:chunkSize], chunk[chunkSize:]
191
peKeys := make([]interface{}, len(chunk))
192
for i, metric := range chunk {
193
peKeys[i] = triggerPendingEscalationsKey(triggerID, metric, isResolution)
195
c.Send("DEL", peKeys...)
198
// the metric is now OK, so we can delete all unacked messages
199
umKeys := make([]interface{}, len(chunk))
200
for i, metric := range chunk {
201
umKeys[i] = unacknowledgedMessagesKey(triggerID, metric)
203
c.Send("DEL", umKeys...)
209
_, err := c.Do("EXEC")
213
func (connector *DbConnector) fetchRegisteredEscalationIDs(triggerID, metric string) ([]string, error) {
214
c := connector.pool.Get()
218
c.Send("LRANGE", triggerProcessedEscalationsKey(triggerID, metric), 0, -1)
219
c.Send("DEL", triggerProcessedEscalationsKey(triggerID, metric))
221
if rawResponse, err := redis.Values(c.Do("EXEC")); err != nil {
222
return nil, fmt.Errorf("Failed to EXEC: %s", err.Error())
224
return redis.Strings(rawResponse[0], nil)
228
func (connector *DbConnector) filterEscalationsByRegistered(escalations []moira.EscalationData, triggerID, metric string) ([]moira.EscalationData, error) {
229
if registeredEscalationIDs, err := connector.fetchRegisteredEscalationIDs(triggerID, metric); err != nil {
232
filteredEscalations := make([]moira.EscalationData, 0, len(escalations))
233
registeredEscalationSet := make(map[string]bool)
235
// create map of registered escalation ids
236
for _, id := range registeredEscalationIDs {
237
registeredEscalationSet[id] = true
240
// keep only registered escalations
241
for _, escalation := range escalations {
242
if _, ok := registeredEscalationSet[escalation.ID]; ok {
243
filteredEscalations = append(filteredEscalations, escalation)
247
return filteredEscalations, nil
251
func triggerPendingEscalationsKey(triggerID, metric string, isResolution bool) string {
254
prefix = prefixPendingEscalation
256
prefix = prefixPendingEscalationResolution
259
return fmt.Sprintf("%s:%s:%s", prefix, triggerID, metric)
262
func triggerProcessedEscalationsKey(triggerID, metric string) string {
263
return fmt.Sprintf("%s:%s:%s", prefixProcessedEscalation, triggerID, metric)
266
func (connector *DbConnector) AddUnacknowledgedMessage(
267
triggerID string, metric string, link moira.MessageLink,
269
c := connector.pool.Get()
273
key := unacknowledgedMessagesKey(triggerID, metric)
274
c.Send("SADD", key, link.StorageKey())
275
_, err := redis.Values(c.Do("EXEC"))
277
return fmt.Errorf("Failed to EXEC: %s", err)
282
func (connector *DbConnector) GetUnacknowledgedMessages(triggerID, metric string) ([]moira.MessageLink, error) {
283
c := connector.pool.Get()
286
key := unacknowledgedMessagesKey(triggerID, metric)
287
response, err := redis.Strings(c.Do("SMEMBERS", key))
289
return nil, fmt.Errorf("could not get acknowledged messages: %s", err.Error())
291
parsedResponse := make([]moira.MessageLink, len(response))
292
for i, item := range response {
293
link, err := parseMessageLink(item)
295
return nil, fmt.Errorf("could not get acknowledged messages: %s", err.Error())
297
parsedResponse[i] = link
299
return parsedResponse, nil
302
func (connector *DbConnector) AckUnacknowledgedMessages(triggerID, metric string) error {
303
c := connector.pool.Get()
306
_, err := c.Do("DEL", unacknowledgedMessagesKey(triggerID, metric))
310
func parseMessageLink(s string) (moira.MessageLink, error) {
311
split := strings.SplitN(s, ":", 2)
313
return nil, fmt.Errorf("%s is not a valid MessageLink", s)
318
link := new(moira.SlackThreadLink)
319
if err := link.FromString(split[1]); err != nil {
325
return nil, fmt.Errorf("%s is not a known MessageLink type", split[0])
329
func unacknowledgedMessagesKey(triggerID, metric string) string {
330
return fmt.Sprintf("moira-unacknowledged-messages:%s:%s", triggerID, metric)