moira

Форк
0
/
event.go 
622 строки · 18.4 Кб
1
package events
2

3
import (
4
	"fmt"
5
	"strings"
6
	"time"
7

8
	"gopkg.in/tomb.v2"
9

10
	"go.avito.ru/DO/moira"
11
	"go.avito.ru/DO/moira/database"
12
	"go.avito.ru/DO/moira/fan"
13
	"go.avito.ru/DO/moira/logging"
14
	"go.avito.ru/DO/moira/metrics"
15
	"go.avito.ru/DO/moira/notifier"
16
)
17

18
const (
19
	FanDelay = 10
20
	// this should be greater than the task TTL used in service-fan
21
	// (currently 120 seconds)
22
	FanMaxWait = 150
23
)
24

25
// FetchEventsWorker checks for new events and new notifications based on it
26
type FetchEventsWorker struct {
27
	Logger                     moira.Logger
28
	Database                   moira.Database
29
	TriggerInheritanceDatabase moira.TriggerInheritanceDatabase
30
	Scheduler                  notifier.Scheduler
31
	Metrics                    *metrics.NotifierMetrics
32
	tomb                       tomb.Tomb
33
	Fetcher                    func() (moira.NotificationEvents, error)
34
	Fan                        fan.Client
35
}
36

37
// notificationEscalationFlag stores information about whether subscription which caused this notification has escalations
38
type notificationEscalationFlag struct {
39
	HasEscalations bool
40
	Notification   *moira.ScheduledNotification
41
}
42

43
// Start is a cycle that fetches events from database
44
func (worker *FetchEventsWorker) Start() {
45
	worker.tomb.Go(func() error {
46
		for {
47
			select {
48
			case <-worker.tomb.Dying():
49
				{
50
					worker.Logger.Info("Moira Notifier Fetching events stopped")
51
					return nil
52
				}
53
			default:
54
				{
55
					events, err := worker.Fetcher()
56
					if err != nil {
57
						if err != database.ErrNil {
58
							worker.Metrics.EventsMalformed.Increment()
59
							worker.Logger.Warn(err.Error())
60
							time.Sleep(time.Second * 5)
61
						}
62
						continue
63
					}
64
					for _, event := range events {
65
						worker.Metrics.EventsReceived.Increment()
66
						if err := worker.processEvent(event); err != nil {
67
							worker.Metrics.EventsProcessingFailed.Increment()
68
							worker.Logger.ErrorF("Failed processEvent. %v", err)
69
						}
70
					}
71
				}
72
			}
73
		}
74
	})
75
	worker.Logger.Info("Moira Notifier Fetching events started")
76
}
77

78
// Stop stops new event fetching and wait for finish
79
func (worker *FetchEventsWorker) Stop() error {
80
	worker.tomb.Kill(nil)
81
	return worker.tomb.Wait()
82
}
83

84
func (worker *FetchEventsWorker) processEvent(event moira.NotificationEvent) error {
85
	var (
86
		subscriptions []*moira.SubscriptionData
87
		tags          []string
88
		triggerData   moira.TriggerData
89
	)
90

91
	logger := logging.GetLogger(event.TriggerID)
92
	isTest := event.State == moira.TEST
93

94
	if !isTest {
95
		worker.Logger.DebugF(
96
			"Processing trigger id %s for metric %s == %f, %s -> %s",
97
			event.TriggerID, event.Metric, moira.UseFloat64(event.Value),
98
			event.OldState, event.State,
99
		)
100

101
		trigger, err := worker.Database.GetTrigger(event.TriggerID)
102
		if err != nil {
103
			return err
104
		}
105
		if len(trigger.Tags) == 0 {
106
			return fmt.Errorf("No tags found for trigger id %s", event.TriggerID)
107
		}
108

109
		if event.State == moira.OK && len(trigger.Parents) > 0 {
110
			// OKs should never get delayed
111
			// so we set DelayedForAncestor (for the code below) but don't actually delay the event
112
			event.DelayedForAncestor = true
113
		}
114

115
		if !event.DelayedForAncestor {
116
			var (
117
				depth       = 0
118
				err   error = nil
119
			)
120

121
			if worker.TriggerInheritanceDatabase != nil {
122
				depth, err = worker.TriggerInheritanceDatabase.GetMaxDepthInGraph(event.TriggerID)
123
			}
124

125
			if err != nil {
126
				logger.ErrorE("Disabling trigger inheritance", map[string]interface{}{
127
					"TriggerID": event.TriggerID,
128
					"Error":     err.Error(),
129
				})
130
			} else {
131
				var delay = int64(depth) * 60
132
				if delay > 0 {
133
					delay += 60
134
				}
135

136
				if delay > 0 {
137
					event.DelayedForAncestor = true
138
					if err := worker.delayEventAndLog(&event, delay); err == nil {
139
						return nil
140
					}
141
				}
142
			}
143
		}
144

145
		if event.DelayedForAncestor && event.AncestorTriggerID == "" {
146
			events, ancestors, err := worker.processTriggerAncestors(&event)
147
			switch {
148
			case err != nil:
149
				logger.ErrorF(
150
					"Could not process ancestors for trigger %s: %v, disabling trigger inheritance!",
151
					event.TriggerID, err,
152
				)
153
			case len(events) > 0:
154
				logger.InfoE("found ancestors for event", map[string]interface{}{
155
					"Event":     event,
156
					"Ancestors": ancestors,
157
				})
158
				for _, event := range events {
159
					if err := worker.processEvent(event); err != nil {
160
						return err
161
					}
162
				}
163
				return nil
164
			}
165
		}
166

167
		triggerData = moira.TriggerData{
168
			ID:         trigger.ID,
169
			Name:       trigger.Name,
170
			Desc:       moira.UseString(trigger.Desc),
171
			Targets:    trigger.Targets,
172
			Parents:    trigger.Parents,
173
			WarnValue:  moira.UseFloat64(trigger.WarnValue),
174
			ErrorValue: moira.UseFloat64(trigger.ErrorValue),
175
			Tags:       trigger.Tags,
176
			Dashboard:  trigger.Dashboard,
177
			Saturation: worker.filterSaturationForEvent(&event, trigger.Saturation),
178
		}
179

180
		if len(triggerData.Saturation) > 0 {
181
			saturationResult := worker.saturate(&event, &triggerData)
182
			if saturationResult.Done {
183
				event = *saturationResult.Event
184
				triggerData = *saturationResult.TriggerData
185
			} else {
186
				return nil
187
			}
188
		}
189

190
		tags = append(triggerData.Tags, event.GetEventTags()...)
191
		worker.Logger.DebugF("Getting subscriptions for tags %v", tags)
192
		subscriptions, err = worker.Database.GetTagsSubscriptions(tags)
193
		if err != nil {
194
			return err
195
		}
196
	} else {
197
		sub, err := worker.getNotificationSubscriptions(event)
198
		if err != nil {
199
			return err
200
		}
201
		subscriptions = []*moira.SubscriptionData{sub}
202
	}
203

204
	notificationSet := make(map[string]notificationEscalationFlag)
205
	for _, subscription := range subscriptions {
206
		if subscription == nil {
207
			worker.Logger.Debug("Subscription is nil")
208
			continue
209
		}
210
		if !isTest {
211
			if !subscription.Enabled {
212
				worker.Logger.DebugF("Subscription %s is disabled", subscription.ID)
213
				continue
214
			}
215
			if !subset(subscription.Tags, tags) {
216
				worker.Logger.DebugF("Subscription %s has extra tags", subscription.ID)
217
				continue
218
			}
219
		}
220

221
		next, throttled := worker.Scheduler.GetDeliveryInfo(time.Now(), event, false, 0)
222
		hasEscalations := len(subscription.Escalations) > 0
223
		needAck := false
224
		if hasEscalations {
225
			if err := worker.Database.MaybeUpdateEscalationsOfSubscription(subscription); err != nil {
226
				worker.Logger.ErrorF("Failed to update old-style escalations: %v", err)
227
				continue
228
			} else if err := worker.Database.AddEscalations(next.Unix(), event, triggerData, subscription.Escalations); err != nil {
229
				worker.Logger.ErrorF("Failed to save escalations: %v", err)
230
				continue
231
			}
232

233
			needAck = event.State == moira.ERROR || event.State == moira.WARN
234
		}
235

236
		worker.Logger.DebugF("Processing contact ids %v for subscription %s", subscription.Contacts, subscription.ID)
237
		for _, contactID := range subscription.Contacts {
238
			contact, err := worker.Database.GetContact(contactID)
239
			if err != nil {
240
				worker.Logger.WarnF("Failed to get contact: %s, skip handling it, error: %v", contactID, err)
241
				continue
242
			}
243
			event.SubscriptionID = &subscription.ID
244

245
			notification := worker.Scheduler.ScheduleNotification(next, throttled, event, triggerData, contact, 0, needAck)
246
			notificationKey := notification.GetKey()
247

248
			// notifications with escalations are preferable
249
			if value, exists := notificationSet[notificationKey]; exists {
250
				if !hasEscalations || value.HasEscalations {
251
					// either current notification has no escalations or already existing one has any - no need to replace - skipping current
252
					worker.Logger.DebugF("Skip duplicated notification for contact %s", notification.Contact)
253
				} else {
254
					// current notification has escalations and already existing one has none - replacing with current and skipping existing
255
					notificationSet[notificationKey] = notificationEscalationFlag{
256
						HasEscalations: true,
257
						Notification:   notification,
258
					}
259
					worker.Logger.DebugF("Skip duplicated notification for contact %s", value.Notification.Contact)
260
				}
261
			} else {
262
				notificationSet[notificationKey] = notificationEscalationFlag{
263
					HasEscalations: hasEscalations,
264
					Notification:   notification,
265
				}
266
			}
267
		}
268
	}
269

270
	// adding unique notifications (with escalations as priority)
271
	for _, value := range notificationSet {
272
		if err := worker.Database.AddNotification(value.Notification); err != nil {
273
			worker.Logger.ErrorF("Failed to save scheduled notification: %s", err)
274
			logger.ErrorE(fmt.Sprintf("Failed to save scheduled notification: %s", err), value.Notification)
275
		} else {
276
			logger.InfoE("Trace added notification", value.Notification)
277
		}
278
	}
279

280
	return nil
281
}
282

283
// filterSaturationForEvent applies some internal notifier-side logic
284
// and (probably) reduces the list of saturation methods applicable
285
// for this moira.NotificationEvent
286
func (worker *FetchEventsWorker) filterSaturationForEvent(event *moira.NotificationEvent, saturation []moira.Saturation) []moira.Saturation {
287
	if saturation == nil {
288
		return nil
289
	}
290

291
	result := make([]moira.Saturation, 0, len(saturation))
292
	for _, sat := range saturation {
293
		// don't take screenshots for OK events
294
		if sat.Type == moira.SatTakeScreen && event.State == moira.OK {
295
			continue
296
		}
297

298
		result = append(result, sat)
299
	}
300

301
	return result
302
}
303

304
type saturationResult struct {
305
	Done        bool
306
	Event       *moira.NotificationEvent
307
	TriggerData *moira.TriggerData
308
}
309

310
func (worker *FetchEventsWorker) saturate(event *moira.NotificationEvent, triggerData *moira.TriggerData) saturationResult {
311
	// done == True means the event is ready and can be used
312
	// done == False means that saturation is not yet complete, this event should be delayed
313

314
	logger := logging.GetLogger(event.TriggerID)
315

316
	if event.FanTaskID != "" {
317
		// a request to fan has already been made
318

319
		response, err := worker.Fan.CheckProgress(event.FanTaskID)
320

321
		// a retry should happen even if CheckProgress returned an error
322
		// because the error may be temporary
323
		var shouldRetry bool
324
		var result saturationResult
325
		if err != nil {
326
			// if Fan returned an error, don't use the data it returns
327
			shouldRetry = true
328
			result = saturationResult{
329
				Event:       event,
330
				TriggerData: triggerData,
331
			}
332
			logger.ErrorE("failed to check fan progress", map[string]interface{}{
333
				"Error":   err.Error(),
334
				"Event":   event,
335
				"Trigger": triggerData,
336
			})
337
		} else {
338
			shouldRetry = !response.Done
339
			// use the data returned by Fan even if it's not done yet
340
			result = saturationResult{
341
				Done:        response.Done,
342
				Event:       response.Event,
343
				TriggerData: response.TriggerData,
344
			}
345
		}
346

347
		if shouldRetry {
348
			// checking for timeouts
349
			if time.Now().Unix()-event.WaitingForFanSince > FanMaxWait {
350
				logger.ErrorE("fan task timed out, abandoned", map[string]interface{}{
351
					"Event":   result.Event,
352
					"Trigger": result.TriggerData,
353
				})
354
				worker.Fan.ApplyFallbacks(result.Event, result.TriggerData, result.TriggerData.Saturation)
355
				result.Done = true
356
				return result
357
			}
358
			// not timed out yet, retrying
359
			if err := worker.delayEventAndLog(event, FanDelay); err != nil {
360
				worker.Fan.ApplyFallbacks(result.Event, result.TriggerData, result.TriggerData.Saturation)
361
				result.Done = true
362
				return result
363
			}
364
			// successfully scheduled a retry
365
			return saturationResult{
366
				Done: false,
367
			}
368
		}
369

370
		// fan returned OK
371
		return result
372

373
	} else {
374
		// no request made yet
375

376
		request := fan.Request{
377
			Event:       *event,
378
			TriggerData: *triggerData,
379
		}
380
		taskID, err := worker.Fan.SendRequest(request)
381
		if event.WaitingForFanSince == 0 {
382
			event.WaitingForFanSince = time.Now().Unix()
383
		}
384
		if err != nil {
385
			// some error happened when requesting ventilation
386
			// normally we should retry the request
387
			// however, if we have been retrying for `FanMaxWait`, then it's time to give up
388
			// and use the fallbacks
389
			if event.WaitingForFanSince != 0 {
390
				alreadyWaitedFor := time.Now().Unix() - event.WaitingForFanSince
391
				if alreadyWaitedFor > FanMaxWait {
392
					worker.Fan.ApplyFallbacks(event, triggerData, triggerData.Saturation)
393
					return saturationResult{
394
						Done:        true,
395
						Event:       event,
396
						TriggerData: triggerData,
397
					}
398
				}
399
			}
400
		}
401

402
		event.FanTaskID = taskID
403
		// wait for saturation to finish
404
		if err := worker.delayEventAndLog(event, FanDelay); err != nil {
405
			worker.Fan.ApplyFallbacks(event, triggerData, triggerData.Saturation)
406
			return saturationResult{
407
				Done:        true,
408
				Event:       event,
409
				TriggerData: triggerData,
410
			}
411
		}
412
		return saturationResult{
413
			Done: false,
414
		}
415
	}
416
}
417

418
func (worker *FetchEventsWorker) delayEventAndLog(event *moira.NotificationEvent, delay int64) error {
419
	logger := logging.GetLogger(event.TriggerID)
420
	logger.InfoE("Delaying event", map[string]interface{}{
421
		"Event":        event,
422
		"DelaySeconds": delay,
423
	})
424

425
	if err := worker.Database.AddDelayedNotificationEvent(*event, time.Now().Unix()+delay); err != nil {
426
		logger.ErrorE("Could not delay event", map[string]interface{}{
427
			"Event": event,
428
			"Error": err.Error(),
429
		})
430
		return err
431
	}
432
	return nil
433
}
434

435
func (worker *FetchEventsWorker) processTriggerAncestors(event *moira.NotificationEvent) ([]moira.NotificationEvent, []string, error) {
436
	ancestors, err := worker.TriggerInheritanceDatabase.GetAllAncestors(event.TriggerID)
437
	if err != nil {
438
		return nil, nil, err
439
	}
440

441
	type triggerMetric struct {
442
		TriggerID string
443
		Metric    string
444
	}
445
	ancestorsOfEvent := make([]triggerMetric, 0)
446
	ancestorsForLog := make([]string, 0)
447

448
	if event.State == moira.OK {
449
		parentEvents, err := worker.Database.GetParentEvents(event.TriggerID, event.Metric)
450
		if err != nil {
451
			worker.Logger.ErrorF(
452
				"Could not get parent events for event [%s:%s:%s]: %v. Disabling trigger inheritance!",
453
				event.TriggerID, event.Metric, event.State, err,
454
			)
455
		}
456
		for parentTriggerID, parentMetrics := range parentEvents {
457
			for _, parentMetric := range parentMetrics {
458
				ancestorsOfEvent = append(ancestorsOfEvent, triggerMetric{parentTriggerID, parentMetric})
459
				ancestorsForLog = append(ancestorsForLog, parentTriggerID+":"+parentMetric)
460
			}
461
		}
462

463
		for _, ancestor := range ancestorsOfEvent {
464
			_ = worker.Database.DeleteChildEvents(
465
				ancestor.TriggerID, ancestor.Metric,
466
				event.TriggerID, []string{event.Metric},
467
			)
468
		}
469
	} else {
470
		for _, ancestorChain := range ancestors {
471
			ancestorID, ancestorMetric, err := worker.processChainOfAncestors(event, ancestorChain)
472
			switch {
473
			case err != nil:
474
				worker.Logger.ErrorF(
475
					"Could not get ancestors of trigger [%s]: %v. Disabling trigger inheritance!",
476
					event.TriggerID, err,
477
				)
478
				return nil, nil, err
479
			case ancestorID != "":
480
				ancestorsOfEvent = append(ancestorsOfEvent, triggerMetric{ancestorID, ancestorMetric})
481
				ancestorsForLog = append(ancestorsForLog, ancestorID+":"+ancestorMetric)
482
			}
483
		}
484

485
		for _, ancestor := range ancestorsOfEvent {
486
			_ = worker.Database.AddChildEvents(
487
				ancestor.TriggerID, ancestor.Metric,
488
				event.TriggerID, []string{event.Metric},
489
			)
490
		}
491

492
		if event.IsForceSent {
493
			parents, err := worker.Database.GetParentEvents(event.TriggerID, event.Metric)
494
			if err != nil {
495
				worker.Logger.ErrorF(
496
					"Could not get parent events for event [%s:%s:%s]: %v. Forced notifications may be broken",
497
					event.TriggerID, event.Metric, event.State, err,
498
				)
499
			}
500
			for parentTriggerID, parentMetrics := range parents {
501
				for _, parentMetric := range parentMetrics {
502
					_ = worker.Database.DeleteChildEvents(
503
						parentTriggerID, parentMetric,
504
						event.TriggerID, []string{event.Metric},
505
					)
506
				}
507
			}
508
		}
509
	}
510
	// ancestorsOfEvent now contains all triggers that have changed state (TODO: rewrite this comment)
511

512
	result := make([]moira.NotificationEvent, len(ancestorsOfEvent))
513
	for i, ancestor := range ancestorsOfEvent {
514
		newEvent := *event
515
		newEvent.OverriddenByAncestor = true
516
		newEvent.AncestorTriggerID = ancestor.TriggerID
517
		newEvent.AncestorMetric = ancestor.Metric
518
		result[i] = newEvent
519
	}
520
	return result, ancestorsForLog, nil
521
}
522

523
func (worker *FetchEventsWorker) processChainOfAncestors(event *moira.NotificationEvent, ancestorChain []string) (string, string, error) {
524
	ancestorStates, err := worker.Database.GetTriggerLastChecks(ancestorChain)
525
	if err != nil {
526
		return "", "", err
527
	}
528

529
	metricTag := getMetricTag(event.Metric)
530
	for _, ancestorID := range ancestorChain {
531
		state := ancestorStates[ancestorID]
532
		if state == nil {
533
			continue
534
		}
535
		ancestorMetrics := ancestorStates[ancestorID].Metrics
536
		for ancestorMetric, ancestorMetricState := range ancestorMetrics {
537
			if isAncestorEvent(ancestorMetricState, event) {
538
				if len(ancestorMetrics) == 1 {
539
					// all metrics of the descendant are overridden
540
					return ancestorID, ancestorMetric, nil
541
				} else {
542
					// only matching metrics are overridden
543
					if metricTag == getMetricTag(ancestorMetric) {
544
						return ancestorID, ancestorMetric, nil
545
					}
546
				}
547
			}
548
		}
549
	}
550

551
	// no suitable ancestor found
552
	return "", "", nil
553
}
554

555
func isAncestorEvent(ancestorMetricState *moira.MetricState, currentEvent *moira.NotificationEvent) bool {
556
	// if the child event and the ancestor event happened within `timeGap` seconds,
557
	// Moira decides the child event is caused by the ancestor event
558
	const timeGap = 5 * 60
559
	if int64Abs(currentEvent.Timestamp-ancestorMetricState.EventTimestamp) <= timeGap {
560
		if ancestorMetricState.State == currentEvent.State {
561
			return true
562
		}
563
	}
564
	return false
565
}
566

567
func getMetricTag(metric string) string {
568
	return strings.SplitN(metric, ".", 2)[0]
569
}
570

571
func int64Abs(val int64) int64 {
572
	if val >= 0 {
573
		return val
574
	} else {
575
		return -val
576
	}
577
}
578

579
func (worker *FetchEventsWorker) getNotificationSubscriptions(event moira.NotificationEvent) (*moira.SubscriptionData, error) {
580
	if event.SubscriptionID != nil {
581
		worker.Logger.DebugF("Getting subscriptionID %s for test message", *event.SubscriptionID)
582
		sub, err := worker.Database.GetSubscription(*event.SubscriptionID)
583
		if err != nil {
584
			worker.Metrics.SubsMalformed.Increment()
585
			return nil, fmt.Errorf("Error while read subscription %s: %s", *event.SubscriptionID, err.Error())
586
		}
587
		return &sub, nil
588
	} else if event.ContactID != "" {
589
		worker.Logger.DebugF("Getting contactID %s for test message", event.ContactID)
590
		contact, err := worker.Database.GetContact(event.ContactID)
591
		if err != nil {
592
			return nil, fmt.Errorf("Error while read contact %s: %s", event.ContactID, err.Error())
593
		}
594
		sub := &moira.SubscriptionData{
595
			ID:                "testSubscription",
596
			User:              contact.User,
597
			ThrottlingEnabled: false,
598
			Enabled:           true,
599
			Tags:              make([]string, 0),
600
			Contacts:          []string{contact.ID},
601
			Schedule:          moira.ScheduleData{},
602
		}
603
		return sub, nil
604
	}
605

606
	return nil, nil
607
}
608

609
func subset(first, second []string) bool {
610
	set := make(map[string]bool)
611
	for _, value := range second {
612
		set[value] = true
613
	}
614

615
	for _, value := range first {
616
		if !set[value] {
617
			return false
618
		}
619
	}
620

621
	return true
622
}
623

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.