10
"go.avito.ru/DO/moira"
11
"go.avito.ru/DO/moira/database"
12
"go.avito.ru/DO/moira/fan"
13
"go.avito.ru/DO/moira/logging"
14
"go.avito.ru/DO/moira/metrics"
15
"go.avito.ru/DO/moira/notifier"
20
// this should be greater than the task TTL used in service-fan
21
// (currently 120 seconds)
25
// FetchEventsWorker checks for new events and new notifications based on it
26
type FetchEventsWorker struct {
28
Database moira.Database
29
TriggerInheritanceDatabase moira.TriggerInheritanceDatabase
30
Scheduler notifier.Scheduler
31
Metrics *metrics.NotifierMetrics
33
Fetcher func() (moira.NotificationEvents, error)
37
// notificationEscalationFlag stores information about whether subscription which caused this notification has escalations
38
type notificationEscalationFlag struct {
40
Notification *moira.ScheduledNotification
43
// Start is a cycle that fetches events from database
44
func (worker *FetchEventsWorker) Start() {
45
worker.tomb.Go(func() error {
48
case <-worker.tomb.Dying():
50
worker.Logger.Info("Moira Notifier Fetching events stopped")
55
events, err := worker.Fetcher()
57
if err != database.ErrNil {
58
worker.Metrics.EventsMalformed.Increment()
59
worker.Logger.Warn(err.Error())
60
time.Sleep(time.Second * 5)
64
for _, event := range events {
65
worker.Metrics.EventsReceived.Increment()
66
if err := worker.processEvent(event); err != nil {
67
worker.Metrics.EventsProcessingFailed.Increment()
68
worker.Logger.ErrorF("Failed processEvent. %v", err)
75
worker.Logger.Info("Moira Notifier Fetching events started")
78
// Stop stops new event fetching and wait for finish
79
func (worker *FetchEventsWorker) Stop() error {
81
return worker.tomb.Wait()
84
func (worker *FetchEventsWorker) processEvent(event moira.NotificationEvent) error {
86
subscriptions []*moira.SubscriptionData
88
triggerData moira.TriggerData
91
logger := logging.GetLogger(event.TriggerID)
92
isTest := event.State == moira.TEST
96
"Processing trigger id %s for metric %s == %f, %s -> %s",
97
event.TriggerID, event.Metric, moira.UseFloat64(event.Value),
98
event.OldState, event.State,
101
trigger, err := worker.Database.GetTrigger(event.TriggerID)
105
if len(trigger.Tags) == 0 {
106
return fmt.Errorf("No tags found for trigger id %s", event.TriggerID)
109
if event.State == moira.OK && len(trigger.Parents) > 0 {
110
// OKs should never get delayed
111
// so we set DelayedForAncestor (for the code below) but don't actually delay the event
112
event.DelayedForAncestor = true
115
if !event.DelayedForAncestor {
121
if worker.TriggerInheritanceDatabase != nil {
122
depth, err = worker.TriggerInheritanceDatabase.GetMaxDepthInGraph(event.TriggerID)
126
logger.ErrorE("Disabling trigger inheritance", map[string]interface{}{
127
"TriggerID": event.TriggerID,
128
"Error": err.Error(),
131
var delay = int64(depth) * 60
137
event.DelayedForAncestor = true
138
if err := worker.delayEventAndLog(&event, delay); err == nil {
145
if event.DelayedForAncestor && event.AncestorTriggerID == "" {
146
events, ancestors, err := worker.processTriggerAncestors(&event)
150
"Could not process ancestors for trigger %s: %v, disabling trigger inheritance!",
151
event.TriggerID, err,
153
case len(events) > 0:
154
logger.InfoE("found ancestors for event", map[string]interface{}{
156
"Ancestors": ancestors,
158
for _, event := range events {
159
if err := worker.processEvent(event); err != nil {
167
triggerData = moira.TriggerData{
170
Desc: moira.UseString(trigger.Desc),
171
Targets: trigger.Targets,
172
Parents: trigger.Parents,
173
WarnValue: moira.UseFloat64(trigger.WarnValue),
174
ErrorValue: moira.UseFloat64(trigger.ErrorValue),
176
Dashboard: trigger.Dashboard,
177
Saturation: worker.filterSaturationForEvent(&event, trigger.Saturation),
180
if len(triggerData.Saturation) > 0 {
181
saturationResult := worker.saturate(&event, &triggerData)
182
if saturationResult.Done {
183
event = *saturationResult.Event
184
triggerData = *saturationResult.TriggerData
190
tags = append(triggerData.Tags, event.GetEventTags()...)
191
worker.Logger.DebugF("Getting subscriptions for tags %v", tags)
192
subscriptions, err = worker.Database.GetTagsSubscriptions(tags)
197
sub, err := worker.getNotificationSubscriptions(event)
201
subscriptions = []*moira.SubscriptionData{sub}
204
notificationSet := make(map[string]notificationEscalationFlag)
205
for _, subscription := range subscriptions {
206
if subscription == nil {
207
worker.Logger.Debug("Subscription is nil")
211
if !subscription.Enabled {
212
worker.Logger.DebugF("Subscription %s is disabled", subscription.ID)
215
if !subset(subscription.Tags, tags) {
216
worker.Logger.DebugF("Subscription %s has extra tags", subscription.ID)
221
next, throttled := worker.Scheduler.GetDeliveryInfo(time.Now(), event, false, 0)
222
hasEscalations := len(subscription.Escalations) > 0
225
if err := worker.Database.MaybeUpdateEscalationsOfSubscription(subscription); err != nil {
226
worker.Logger.ErrorF("Failed to update old-style escalations: %v", err)
228
} else if err := worker.Database.AddEscalations(next.Unix(), event, triggerData, subscription.Escalations); err != nil {
229
worker.Logger.ErrorF("Failed to save escalations: %v", err)
233
needAck = event.State == moira.ERROR || event.State == moira.WARN
236
worker.Logger.DebugF("Processing contact ids %v for subscription %s", subscription.Contacts, subscription.ID)
237
for _, contactID := range subscription.Contacts {
238
contact, err := worker.Database.GetContact(contactID)
240
worker.Logger.WarnF("Failed to get contact: %s, skip handling it, error: %v", contactID, err)
243
event.SubscriptionID = &subscription.ID
245
notification := worker.Scheduler.ScheduleNotification(next, throttled, event, triggerData, contact, 0, needAck)
246
notificationKey := notification.GetKey()
248
// notifications with escalations are preferable
249
if value, exists := notificationSet[notificationKey]; exists {
250
if !hasEscalations || value.HasEscalations {
251
// either current notification has no escalations or already existing one has any - no need to replace - skipping current
252
worker.Logger.DebugF("Skip duplicated notification for contact %s", notification.Contact)
254
// current notification has escalations and already existing one has none - replacing with current and skipping existing
255
notificationSet[notificationKey] = notificationEscalationFlag{
256
HasEscalations: true,
257
Notification: notification,
259
worker.Logger.DebugF("Skip duplicated notification for contact %s", value.Notification.Contact)
262
notificationSet[notificationKey] = notificationEscalationFlag{
263
HasEscalations: hasEscalations,
264
Notification: notification,
270
// adding unique notifications (with escalations as priority)
271
for _, value := range notificationSet {
272
if err := worker.Database.AddNotification(value.Notification); err != nil {
273
worker.Logger.ErrorF("Failed to save scheduled notification: %s", err)
274
logger.ErrorE(fmt.Sprintf("Failed to save scheduled notification: %s", err), value.Notification)
276
logger.InfoE("Trace added notification", value.Notification)
283
// filterSaturationForEvent applies some internal notifier-side logic
284
// and (probably) reduces the list of saturation methods applicable
285
// for this moira.NotificationEvent
286
func (worker *FetchEventsWorker) filterSaturationForEvent(event *moira.NotificationEvent, saturation []moira.Saturation) []moira.Saturation {
287
if saturation == nil {
291
result := make([]moira.Saturation, 0, len(saturation))
292
for _, sat := range saturation {
293
// don't take screenshots for OK events
294
if sat.Type == moira.SatTakeScreen && event.State == moira.OK {
298
result = append(result, sat)
304
type saturationResult struct {
306
Event *moira.NotificationEvent
307
TriggerData *moira.TriggerData
310
func (worker *FetchEventsWorker) saturate(event *moira.NotificationEvent, triggerData *moira.TriggerData) saturationResult {
311
// done == True means the event is ready and can be used
312
// done == False means that saturation is not yet complete, this event should be delayed
314
logger := logging.GetLogger(event.TriggerID)
316
if event.FanTaskID != "" {
317
// a request to fan has already been made
319
response, err := worker.Fan.CheckProgress(event.FanTaskID)
321
// a retry should happen even if CheckProgress returned an error
322
// because the error may be temporary
324
var result saturationResult
326
// if Fan returned an error, don't use the data it returns
328
result = saturationResult{
330
TriggerData: triggerData,
332
logger.ErrorE("failed to check fan progress", map[string]interface{}{
333
"Error": err.Error(),
335
"Trigger": triggerData,
338
shouldRetry = !response.Done
339
// use the data returned by Fan even if it's not done yet
340
result = saturationResult{
342
Event: response.Event,
343
TriggerData: response.TriggerData,
348
// checking for timeouts
349
if time.Now().Unix()-event.WaitingForFanSince > FanMaxWait {
350
logger.ErrorE("fan task timed out, abandoned", map[string]interface{}{
351
"Event": result.Event,
352
"Trigger": result.TriggerData,
354
worker.Fan.ApplyFallbacks(result.Event, result.TriggerData, result.TriggerData.Saturation)
358
// not timed out yet, retrying
359
if err := worker.delayEventAndLog(event, FanDelay); err != nil {
360
worker.Fan.ApplyFallbacks(result.Event, result.TriggerData, result.TriggerData.Saturation)
364
// successfully scheduled a retry
365
return saturationResult{
374
// no request made yet
376
request := fan.Request{
378
TriggerData: *triggerData,
380
taskID, err := worker.Fan.SendRequest(request)
381
if event.WaitingForFanSince == 0 {
382
event.WaitingForFanSince = time.Now().Unix()
385
// some error happened when requesting ventilation
386
// normally we should retry the request
387
// however, if we have been retrying for `FanMaxWait`, then it's time to give up
388
// and use the fallbacks
389
if event.WaitingForFanSince != 0 {
390
alreadyWaitedFor := time.Now().Unix() - event.WaitingForFanSince
391
if alreadyWaitedFor > FanMaxWait {
392
worker.Fan.ApplyFallbacks(event, triggerData, triggerData.Saturation)
393
return saturationResult{
396
TriggerData: triggerData,
402
event.FanTaskID = taskID
403
// wait for saturation to finish
404
if err := worker.delayEventAndLog(event, FanDelay); err != nil {
405
worker.Fan.ApplyFallbacks(event, triggerData, triggerData.Saturation)
406
return saturationResult{
409
TriggerData: triggerData,
412
return saturationResult{
418
func (worker *FetchEventsWorker) delayEventAndLog(event *moira.NotificationEvent, delay int64) error {
419
logger := logging.GetLogger(event.TriggerID)
420
logger.InfoE("Delaying event", map[string]interface{}{
422
"DelaySeconds": delay,
425
if err := worker.Database.AddDelayedNotificationEvent(*event, time.Now().Unix()+delay); err != nil {
426
logger.ErrorE("Could not delay event", map[string]interface{}{
428
"Error": err.Error(),
435
func (worker *FetchEventsWorker) processTriggerAncestors(event *moira.NotificationEvent) ([]moira.NotificationEvent, []string, error) {
436
ancestors, err := worker.TriggerInheritanceDatabase.GetAllAncestors(event.TriggerID)
441
type triggerMetric struct {
445
ancestorsOfEvent := make([]triggerMetric, 0)
446
ancestorsForLog := make([]string, 0)
448
if event.State == moira.OK {
449
parentEvents, err := worker.Database.GetParentEvents(event.TriggerID, event.Metric)
451
worker.Logger.ErrorF(
452
"Could not get parent events for event [%s:%s:%s]: %v. Disabling trigger inheritance!",
453
event.TriggerID, event.Metric, event.State, err,
456
for parentTriggerID, parentMetrics := range parentEvents {
457
for _, parentMetric := range parentMetrics {
458
ancestorsOfEvent = append(ancestorsOfEvent, triggerMetric{parentTriggerID, parentMetric})
459
ancestorsForLog = append(ancestorsForLog, parentTriggerID+":"+parentMetric)
463
for _, ancestor := range ancestorsOfEvent {
464
_ = worker.Database.DeleteChildEvents(
465
ancestor.TriggerID, ancestor.Metric,
466
event.TriggerID, []string{event.Metric},
470
for _, ancestorChain := range ancestors {
471
ancestorID, ancestorMetric, err := worker.processChainOfAncestors(event, ancestorChain)
474
worker.Logger.ErrorF(
475
"Could not get ancestors of trigger [%s]: %v. Disabling trigger inheritance!",
476
event.TriggerID, err,
479
case ancestorID != "":
480
ancestorsOfEvent = append(ancestorsOfEvent, triggerMetric{ancestorID, ancestorMetric})
481
ancestorsForLog = append(ancestorsForLog, ancestorID+":"+ancestorMetric)
485
for _, ancestor := range ancestorsOfEvent {
486
_ = worker.Database.AddChildEvents(
487
ancestor.TriggerID, ancestor.Metric,
488
event.TriggerID, []string{event.Metric},
492
if event.IsForceSent {
493
parents, err := worker.Database.GetParentEvents(event.TriggerID, event.Metric)
495
worker.Logger.ErrorF(
496
"Could not get parent events for event [%s:%s:%s]: %v. Forced notifications may be broken",
497
event.TriggerID, event.Metric, event.State, err,
500
for parentTriggerID, parentMetrics := range parents {
501
for _, parentMetric := range parentMetrics {
502
_ = worker.Database.DeleteChildEvents(
503
parentTriggerID, parentMetric,
504
event.TriggerID, []string{event.Metric},
510
// ancestorsOfEvent now contains all triggers that have changed state (TODO: rewrite this comment)
512
result := make([]moira.NotificationEvent, len(ancestorsOfEvent))
513
for i, ancestor := range ancestorsOfEvent {
515
newEvent.OverriddenByAncestor = true
516
newEvent.AncestorTriggerID = ancestor.TriggerID
517
newEvent.AncestorMetric = ancestor.Metric
520
return result, ancestorsForLog, nil
523
func (worker *FetchEventsWorker) processChainOfAncestors(event *moira.NotificationEvent, ancestorChain []string) (string, string, error) {
524
ancestorStates, err := worker.Database.GetTriggerLastChecks(ancestorChain)
529
metricTag := getMetricTag(event.Metric)
530
for _, ancestorID := range ancestorChain {
531
state := ancestorStates[ancestorID]
535
ancestorMetrics := ancestorStates[ancestorID].Metrics
536
for ancestorMetric, ancestorMetricState := range ancestorMetrics {
537
if isAncestorEvent(ancestorMetricState, event) {
538
if len(ancestorMetrics) == 1 {
539
// all metrics of the descendant are overridden
540
return ancestorID, ancestorMetric, nil
542
// only matching metrics are overridden
543
if metricTag == getMetricTag(ancestorMetric) {
544
return ancestorID, ancestorMetric, nil
551
// no suitable ancestor found
555
func isAncestorEvent(ancestorMetricState *moira.MetricState, currentEvent *moira.NotificationEvent) bool {
556
// if the child event and the ancestor event happened within `timeGap` seconds,
557
// Moira decides the child event is caused by the ancestor event
558
const timeGap = 5 * 60
559
if int64Abs(currentEvent.Timestamp-ancestorMetricState.EventTimestamp) <= timeGap {
560
if ancestorMetricState.State == currentEvent.State {
567
func getMetricTag(metric string) string {
568
return strings.SplitN(metric, ".", 2)[0]
571
func int64Abs(val int64) int64 {
579
func (worker *FetchEventsWorker) getNotificationSubscriptions(event moira.NotificationEvent) (*moira.SubscriptionData, error) {
580
if event.SubscriptionID != nil {
581
worker.Logger.DebugF("Getting subscriptionID %s for test message", *event.SubscriptionID)
582
sub, err := worker.Database.GetSubscription(*event.SubscriptionID)
584
worker.Metrics.SubsMalformed.Increment()
585
return nil, fmt.Errorf("Error while read subscription %s: %s", *event.SubscriptionID, err.Error())
588
} else if event.ContactID != "" {
589
worker.Logger.DebugF("Getting contactID %s for test message", event.ContactID)
590
contact, err := worker.Database.GetContact(event.ContactID)
592
return nil, fmt.Errorf("Error while read contact %s: %s", event.ContactID, err.Error())
594
sub := &moira.SubscriptionData{
595
ID: "testSubscription",
597
ThrottlingEnabled: false,
599
Tags: make([]string, 0),
600
Contacts: []string{contact.ID},
601
Schedule: moira.ScheduleData{},
609
func subset(first, second []string) bool {
610
set := make(map[string]bool)
611
for _, value := range second {
615
for _, value := range first {