moira

Форк
0
/
check.go 
481 строка · 17.1 Кб
1
package checker
2

3
import (
4
	"fmt"
5
	"time"
6

7
	"go.avito.ru/DO/moira"
8
	"go.avito.ru/DO/moira/target"
9
)
10

11
const (
12
	chunkSize           = 20
13
	checkPointGap int64 = 120
14
)
15

16
// ErrTriggerHasNoTimeSeries used if trigger has no metrics
17
type ErrTriggerHasNoTimeSeries struct{}
18

19
// ErrTriggerHasNoTimeSeries implementation with constant error message
20
func (err ErrTriggerHasNoTimeSeries) Error() string {
21
	return fmt.Sprintf("Trigger has no metrics, check your target")
22
}
23

24
// ErrTriggerHasOnlyWildcards used if trigger has only wildcard metrics
25
type ErrTriggerHasOnlyWildcards struct{}
26

27
// ErrTriggerHasOnlyWildcards implementation with constant error message
28
func (err ErrTriggerHasOnlyWildcards) Error() string {
29
	return fmt.Sprintf("Trigger never received metrics")
30
}
31

32
// ErrTriggerHasSameTimeSeriesNames used if trigger has two timeseries with same name
33
type ErrTriggerHasSameTimeSeriesNames struct{}
34

35
// ErrTriggerHasSameTimeSeriesNames implementation with constant error message
36
func (err ErrTriggerHasSameTimeSeriesNames) Error() string {
37
	return fmt.Sprintf("Trigger has same timeseries names")
38
}
39

40
// Check handle trigger and last check and write new state of trigger, if state were change then write new NotificationEvent
41
func (triggerChecker *TriggerChecker) Check() error {
42
	triggerChecker.logger.DebugF("Checking trigger %s", triggerChecker.TriggerID)
43

44
	// don't handle trigger at all if it is disabled by maintenance/schedule/etc.
45
	if triggerChecker.isHandleMetricDisabled(triggerChecker.CheckStarted, moira.WildcardMetric, "Basic check") {
46
		return nil
47
	}
48

49
	// log call quantity stats
50
	defer triggerChecker.logger.TraceSelfStats(triggerChecker.TriggerID, time.Now())
51

52
	checkData, err := triggerChecker.handleTrigger()
53
	if err != nil {
54
		triggerChecker.logger.ErrorE(fmt.Sprintf("Error while handleTrigger: %v", err), map[string]interface{}{
55
			"trigger_id": triggerChecker.TriggerID,
56
		})
57
		checkData, err = triggerChecker.handleErrorCheck(checkData, err)
58
		if err != nil {
59
			triggerChecker.logger.ErrorE(fmt.Sprintf("Error while handleErrorCheck: %v", err), map[string]interface{}{
60
				"trigger_id": triggerChecker.TriggerID,
61
			})
62
			return err
63
		}
64
	}
65

66
	checkData.UpdateScore()
67
	err = triggerChecker.Database.SetTriggerLastCheck(triggerChecker.TriggerID, &checkData)
68

69
	return err
70
}
71

72
func (triggerChecker *TriggerChecker) handleTrigger() (moira.CheckData, error) {
73
	var (
74
		checkingError, err error
75
		triggerTimeSeries  *triggerTimeSeries
76
	)
77

78
	triggerChecker.cleanupMaintenanceMetrics()
79
	checkData := moira.CheckData{
80
		State:             moira.OK,
81
		Timestamp:         triggerChecker.Until,
82
		EventTimestamp:    triggerChecker.lastCheck.EventTimestamp,
83
		Score:             triggerChecker.lastCheck.Score,
84
		Maintenance:       triggerChecker.lastCheck.Maintenance,
85
		MaintenanceMetric: triggerChecker.lastCheck.MaintenanceMetric,
86
		Metrics:           triggerChecker.lastCheck.Metrics,
87
		Version:           triggerChecker.lastCheck.Version,
88
	}
89

90
	// getting time series: either from graphite or from DB
91
	if triggerChecker.trigger.IsPullType {
92
		triggerTimeSeries, err = triggerChecker.getRemoteTimeSeries(triggerChecker.From, triggerChecker.Until)
93
		if err != nil {
94
			return checkData, err
95
		}
96
	} else {
97
		triggerTimeSeries, _, err = triggerChecker.getTimeSeries(triggerChecker.From, triggerChecker.Until)
98
		if err != nil {
99
			return checkData, err
100
		}
101
	}
102

103
	hasOnlyWildcards := triggerTimeSeries.hasOnlyWildcards()
104
	triggerChecker.traceHandledData(triggerTimeSeries, hasOnlyWildcards)
105
	if hasOnlyWildcards {
106
		return checkData, ErrTriggerHasOnlyWildcards{}
107
	}
108

109
	deleteForcedNotifications := make([]string, 0, len(triggerTimeSeries.Main))
110
	timeSeriesNamesMap := make(map[string]bool, len(triggerTimeSeries.Main))
111
	patternMetricsRemoved := false
112
	if len(triggerTimeSeries.Main) > 0 {
113
		for _, timeSeries := range triggerTimeSeries.Main {
114
			triggerChecker.logger.DebugF("[TriggerID:%s] Checking timeSeries %s: %v", triggerChecker.TriggerID, timeSeries.Name, timeSeries.Values)
115
			triggerChecker.logger.DebugF("[TriggerID:%s][TimeSeries:%s] Checking interval: %v - %v (%vs), step: %v", triggerChecker.TriggerID, timeSeries.Name, timeSeries.StartTime, timeSeries.StopTime, timeSeries.StepTime, timeSeries.StopTime-timeSeries.StartTime)
116

117
			if triggerChecker.isHandleMetricDisabled(triggerChecker.CheckStarted, timeSeries.Name, "handleTrigger:checkMainMetrics") {
118
				continue
119
			}
120

121
			if _, ok := timeSeriesNamesMap[timeSeries.Name]; ok {
122
				triggerChecker.logger.InfoF("[TriggerID:%s][TimeSeries:%s] Trigger has same time series names", triggerChecker.TriggerID, timeSeries.Name)
123
				checkingError = ErrTriggerHasSameTimeSeriesNames{}
124
				continue
125
			}
126

127
			timeSeriesNamesMap[timeSeries.Name] = true
128
			metricState, deleteMetric, deleteForced, err := triggerChecker.checkTimeSeries(timeSeries, triggerTimeSeries)
129

130
			if deleteMetric {
131
				triggerChecker.logger.InfoF("[TriggerID:%s] Remove metric: '%s'", triggerChecker.TriggerID, timeSeries.Name)
132
				delete(checkData.Metrics, timeSeries.Name)
133

134
				if !patternMetricsRemoved {
135
					err = triggerChecker.Database.RemovePatternsMetrics(triggerChecker.trigger.Patterns)
136
					patternMetricsRemoved = true
137
				}
138
			} else {
139
				checkData.Metrics[timeSeries.Name] = metricState
140
			}
141

142
			if deleteForced {
143
				deleteForcedNotifications = append(deleteForcedNotifications, timeSeries.Name)
144
			}
145

146
			if err != nil {
147
				return checkData, err
148
			}
149
		}
150
	} else {
151
		checkingError = ErrTriggerHasNoTimeSeries{}
152
	}
153

154
	// there might be metrics which were present in lastCheck (after prior checks) but are not present at the moment
155
	// if there are then there is a special action for them
156
	metricsReplacement := make(map[string]*moira.MetricState, len(checkData.Metrics))
157
	metricsToDelete := make([]string, 0, len(checkData.Metrics))
158
	for metricName, metricState := range checkData.Metrics {
159
		if _, ok := timeSeriesNamesMap[metricName]; ok {
160
			continue
161
		}
162

163
		// forced notification should be deleted since metricName is not present in triggerTimeSeries.Main
164
		isForced := triggerChecker.forced[metricName]
165
		// find out if this metric is to be deleted according to trigger ttlState setting
166
		noDataState, deleteMetric := triggerChecker.checkForNoData(metricState)
167
		// find out if its state differs from nodata
168
		stateDiffers := noDataState != nil && (!metricState.IsNoData || metricState.State != noDataState.State)
169

170
		triggerChecker.logger.InfoE(
171
			fmt.Sprintf("[TriggerID:%s] Processing obsolete metric %s", triggerChecker.TriggerID, metricName),
172
			map[string]interface{}{
173
				"no_data_state": noDataState,
174
				"metric_state":  metricState,
175
				"state_differs": stateDiffers,
176
				"metric_name":   metricName,
177
				"is_forced":     isForced,
178
				"to_delete":     deleteMetric,
179
			},
180
		)
181

182
		if isForced {
183
			deleteForcedNotifications = append(deleteForcedNotifications, metricName)
184
		}
185

186
		if deleteMetric {
187
			metricsToDelete = append(metricsToDelete, metricName)
188
			continue
189
		}
190

191
		// `noDataState != nil condition` is here so that IDE could be happy
192
		if noDataState != nil && stateDiffers {
193
			if triggerChecker.isHandleMetricDisabled(triggerChecker.CheckStarted, metricName, "handleTrigger:checkMissingMetrics") {
194
				continue
195
			}
196

197
			metricNewState, err := triggerChecker.compareStates(metricName, noDataState, metricState, isForced)
198
			if err == nil {
199
				metricsReplacement[metricName] = metricNewState
200
			}
201
		}
202
	}
203

204
	// and now apply this special action if there are some metrics lost
205
	for metricName, metricState := range metricsReplacement {
206
		checkData.Metrics[metricName] = metricState
207
	}
208
	for _, metricName := range metricsToDelete {
209
		delete(checkData.Metrics, metricName)
210
	}
211

212
	// deleting forced notifications which have been sent
213
	for len(deleteForcedNotifications) > 0 {
214
		// cutting these into chunks so that Redis doesn't choke
215
		var chunk, rest []string = deleteForcedNotifications, nil
216
		if len(chunk) > chunkSize {
217
			chunk, rest = chunk[:chunkSize], chunk[chunkSize:]
218
		}
219
		if err := triggerChecker.Database.DeleteTriggerForcedNotifications(triggerChecker.TriggerID, chunk); err != nil {
220
			triggerChecker.logger.ErrorE("Could not delete forced notification for trigger", map[string]interface{}{
221
				"TriggerID": triggerChecker.TriggerID,
222
				"Error":     err.Error(),
223
			})
224
		}
225
		deleteForcedNotifications = rest
226
	}
227

228
	return checkData, checkingError
229
}
230

231
func (triggerChecker *TriggerChecker) handleErrorCheck(checkData moira.CheckData, checkingError error) (moira.CheckData, error) {
232
	switch checkingError.(type) {
233
	case ErrTriggerHasNoTimeSeries:
234
		triggerChecker.logger.DebugF("Trigger %s: %s", triggerChecker.TriggerID, checkingError.Error())
235
		checkData.State = triggerChecker.ttlState
236
		checkData.Message = checkingError.Error()
237
		if triggerChecker.ttl == 0 || triggerChecker.ttlState == moira.DEL {
238
			return checkData, nil
239
		}
240
	case ErrTriggerHasOnlyWildcards:
241
		triggerChecker.logger.DebugF("Trigger %s: %s", triggerChecker.TriggerID, checkingError.Error())
242
		if len(checkData.Metrics) == 0 && triggerChecker.ttlState != moira.OK && triggerChecker.ttlState != moira.DEL {
243
			checkData.State = moira.NODATA
244
			checkData.Message = checkingError.Error()
245
			if triggerChecker.ttl == 0 || triggerChecker.ttlState == moira.DEL {
246
				return checkData, nil
247
			}
248
		}
249
	case target.ErrUnknownFunction:
250
		triggerChecker.logger.WarnF("Trigger %s: %s", triggerChecker.TriggerID, checkingError.Error())
251
		checkData.State = moira.EXCEPTION
252
		checkData.Message = checkingError.Error()
253
	case ErrWrongTriggerTarget, ErrTriggerHasSameTimeSeriesNames:
254
		checkData.State = moira.EXCEPTION
255
		checkData.Message = checkingError.Error()
256
	default:
257
		triggerChecker.Statsd.CheckError.Increment()
258
		triggerChecker.logger.ErrorF("Trigger %s check failed: %s", triggerChecker.TriggerID, checkingError.Error())
259
		checkData.State = moira.EXCEPTION
260
		checkData.Message = checkingError.Error()
261
	}
262

263
	return triggerChecker.compareChecks(checkData, triggerChecker.forced[moira.WildcardMetric])
264
}
265

266
func (triggerChecker *TriggerChecker) checkTimeSeries(
267
	timeSeries *target.TimeSeries, triggerTimeSeries *triggerTimeSeries,
268
) (lastState *moira.MetricState, deleteMetric bool, deleteForced bool, err error) {
269
	emptyTimestampValue := int64(timeSeries.StartTime) - moira.MaxI64(triggerChecker.ttl, 3600)
270
	lastState = triggerChecker.lastCheck.GetOrCreateMetricState(timeSeries.Name, emptyTimestampValue)
271

272
	metricStates, err := triggerChecker.getTimeSeriesStepsStates(triggerTimeSeries, timeSeries, lastState)
273
	if err != nil {
274
		triggerChecker.logger.ErrorF(
275
			"getTimeSeriesStepsStates for trigger_id %s caused an error %v, time_series = %s",
276
			triggerChecker.TriggerID, err, timeSeries.Name,
277
		)
278
		return
279
	}
280
	triggerChecker.traceMetricStates(metricStates, lastState, timeSeries)
281

282
	needForceSend := triggerChecker.forced[timeSeries.Name]
283
	deleteForced = needForceSend
284

285
	for _, currentState := range metricStates {
286
		triggerChecker.logger.InfoE("About to launch compareStates", map[string]interface{}{
287
			"trigger_id":    triggerChecker.TriggerID,
288
			"time_series":   timeSeries.Name,
289
			"current_state": currentState,
290
			"last_state":    lastState,
291
		})
292
		lastState, err = triggerChecker.compareStates(timeSeries.Name, currentState, lastState, needForceSend)
293
		if err != nil {
294
			return
295
		}
296
		if lastState.IsForced {
297
			needForceSend = false
298
		}
299
	}
300

301
	noDataState, deleteMetric := triggerChecker.checkForNoData(lastState)
302
	if deleteMetric {
303
		return
304
	}
305

306
	if noDataState != nil {
307
		triggerChecker.logger.InfoE("Also compare with noDataState", map[string]interface{}{
308
			"trigger_id":  triggerChecker.TriggerID,
309
			"time_series": timeSeries.Name,
310
			"noDataState": *noDataState,
311
			"lastState":   lastState,
312
		})
313
		lastState, err = triggerChecker.compareStates(timeSeries.Name, noDataState, lastState, needForceSend)
314
	}
315

316
	return
317
}
318

319
func (triggerChecker *TriggerChecker) checkForNoData(lastState *moira.MetricState) (noDataState *moira.MetricState, deleteMetric bool) {
320
	if triggerChecker.ttl == 0 {
321
		return nil, false
322
	}
323

324
	if lastState.Timestamp+triggerChecker.ttl >= triggerChecker.lastCheck.Timestamp {
325
		return nil, false
326
	}
327

328
	if triggerChecker.ttlState == moira.DEL && (lastState.EventTimestamp != 0 || lastState.IsNoData) {
329
		return nil, true
330
	}
331

332
	noDataState = &moira.MetricState{
333
		State:      triggerChecker.getNullState(),
334
		Timestamp:  triggerChecker.CheckStarted,
335
		Value:      nil,
336
		Suppressed: lastState.Suppressed,
337
		IsNoData:   true,
338
	}
339
	return noDataState, false
340
}
341

342
// cleanupMaintenanceMetrics leaves only those of maintenance metrics which haven't expired yet
343
func (triggerChecker *TriggerChecker) cleanupMaintenanceMetrics() {
344
	maintenanceMetrics := make(map[string]int64, len(triggerChecker.lastCheck.MaintenanceMetric))
345
	now := time.Now().Unix()
346
	for k, v := range triggerChecker.lastCheck.MaintenanceMetric {
347
		if v > now {
348
			maintenanceMetrics[k] = v
349
		}
350
	}
351
	triggerChecker.lastCheck.MaintenanceMetric = maintenanceMetrics
352
}
353

354
// cleanupMetrics leaves only those metrics which haven't expired yet
355
func (triggerChecker *TriggerChecker) cleanupMetrics(metrics []string, until int64) {
356
	if len(metrics) > 0 {
357
		if err := triggerChecker.Database.RemoveMetricsValues(metrics, until-triggerChecker.Config.MetricsTTLSeconds); err != nil {
358
			triggerChecker.logger.ErrorF("Failed to remove metric values: metrics = %v, err = %v", metrics, err)
359
		}
360
	}
361
}
362

363
// getNullState returns state which is suitable to create null moira.MetricState (if it doesn't exist)
364
func (triggerChecker *TriggerChecker) getNullState() string {
365
	if triggerChecker.ttlState == moira.DEL {
366
		return moira.NODATA
367
	}
368
	return triggerChecker.ttlState
369
}
370

371
func (triggerChecker *TriggerChecker) getTimeSeriesState(triggerTimeSeries *triggerTimeSeries, timeSeries *target.TimeSeries, lastState *moira.MetricState, valueTimestamp, checkPoint int64) (*moira.MetricState, error) {
372
	if valueTimestamp <= checkPoint {
373
		return nil, nil
374
	}
375
	triggerExpression, noEmptyValues := triggerTimeSeries.getExpressionValues(timeSeries, valueTimestamp)
376
	if !noEmptyValues {
377
		return nil, nil
378
	}
379
	triggerChecker.logger.DebugF(
380
		"[TriggerID:%s][TimeSeries:%s] Values for ts %v: MainTargetValue: %v, additionalTargetValues: %v",
381
		triggerChecker.TriggerID, timeSeries.Name, valueTimestamp,
382
		triggerExpression.MainTargetValue, triggerExpression.AdditionalTargetsValues,
383
	)
384

385
	triggerExpression.WarnValue = triggerChecker.trigger.WarnValue
386
	triggerExpression.ErrorValue = triggerChecker.trigger.ErrorValue
387
	triggerExpression.PreviousState = lastState.State
388
	triggerExpression.Expression = triggerChecker.trigger.Expression
389

390
	expressionState, err := triggerExpression.Evaluate()
391
	if err != nil {
392
		return nil, err
393
	}
394

395
	return &moira.MetricState{
396
		State:      expressionState,
397
		Timestamp:  valueTimestamp,
398
		Value:      &triggerExpression.MainTargetValue,
399
		Suppressed: lastState.Suppressed,
400
	}, nil
401
}
402

403
func (triggerChecker *TriggerChecker) getTimeSeriesStepsStates(
404
	triggerTimeSeries *triggerTimeSeries,
405
	timeSeries *target.TimeSeries,
406
	metricLastState *moira.MetricState,
407
) ([]*moira.MetricState, error) {
408
	startTime := int64(timeSeries.StartTime)
409
	stepTime := int64(timeSeries.StepTime)
410

411
	checkPoint := metricLastState.GetCheckPoint(checkPointGap)
412
	triggerChecker.logger.DebugF("[TriggerID:%s][TimeSeries:%s] Checkpoint: %v", triggerChecker.TriggerID, timeSeries.Name, checkPoint)
413

414
	metricStates := make([]*moira.MetricState, 0)
415
	for valueTimestamp := startTime; valueTimestamp < triggerChecker.Until+stepTime; valueTimestamp += stepTime {
416
		metricNewState, err := triggerChecker.getTimeSeriesState(triggerTimeSeries, timeSeries, metricLastState, valueTimestamp, checkPoint)
417
		if err != nil {
418
			return nil, err
419
		}
420
		if metricNewState == nil {
421
			continue
422
		}
423

424
		metricLastState = metricNewState
425
		metricStates = append(metricStates, metricNewState)
426
	}
427
	return metricStates, nil
428
}
429

430
// traceHandledData is separate function because it it too much to trace :)
431
func (triggerChecker *TriggerChecker) traceHandledData(triggerTimeSeries *triggerTimeSeries, hasOnlyWildcards bool) {
432
	forcedTotal := len(triggerChecker.forced)
433
	forced := make(map[string]bool, forcedTotal)
434

435
	for key, val := range triggerChecker.forced {
436
		if val {
437
			forced[key] = val
438
		}
439
	}
440

441
	triggerChecker.logger.InfoE(
442
		fmt.Sprintf("handleTrigger id %s", triggerChecker.TriggerID),
443
		map[string]interface{}{
444
			"trigger_id":       triggerChecker.TriggerID,
445
			"forced_total":     forcedTotal,
446
			"forced_true":      forced,
447
			"is_pull_type":     triggerChecker.trigger.IsPullType,
448
			"from":             triggerChecker.From,
449
			"until":            triggerChecker.Until,
450
			"only_wildcards":   hasOnlyWildcards,
451
			"series_qty":       len(triggerTimeSeries.Main),
452
			"ttl":              triggerChecker.ttl,
453
			"ttl_state":        triggerChecker.ttlState,
454
			"last_check_ts":    triggerChecker.lastCheck.Timestamp,
455
			"last_metrics_qty": len(triggerChecker.lastCheck.Metrics),
456
		},
457
	)
458
}
459

460
// traceMetricStates implements chunked metric states logging (it can be too many of them)
461
func (triggerChecker *TriggerChecker) traceMetricStates(
462
	currentStates []*moira.MetricState,
463
	lastState *moira.MetricState,
464
	timeSeries *target.TimeSeries,
465
) {
466
	statesTotal := len(currentStates)
467
	timeSeriesName := timeSeries.Name
468

469
	triggerChecker.logger.InfoE(
470
		fmt.Sprintf("Started checkTimeSeries for '%s'", timeSeriesName),
471
		map[string]interface{}{
472
			"trigger_id": triggerChecker.TriggerID,
473
			"ts_name":    timeSeriesName,
474
			"ts_start":   timeSeries.StartTime,
475
			"ts_stop":    timeSeries.StopTime,
476
			"last_state": lastState,
477
			"states_qty": statesTotal,
478
			"states":     currentStates,
479
		},
480
	)
481
}
482

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.