cubefs

Форк
0
150 строк · 3.5 Кб
1
package hystrix
2

3
import (
4
	"sync"
5
	"time"
6

7
	"github.com/afex/hystrix-go/hystrix/metric_collector"
8
	"github.com/afex/hystrix-go/hystrix/rolling"
9
)
10

11
type commandExecution struct {
12
	Types            []string      `json:"types"`
13
	Start            time.Time     `json:"start_time"`
14
	RunDuration      time.Duration `json:"run_duration"`
15
	ConcurrencyInUse float64       `json:"concurrency_inuse"`
16
}
17

18
type metricExchange struct {
19
	Name    string
20
	Updates chan *commandExecution
21
	Mutex   *sync.RWMutex
22

23
	metricCollectors []metricCollector.MetricCollector
24
}
25

26
func newMetricExchange(name string) *metricExchange {
27
	m := &metricExchange{}
28
	m.Name = name
29

30
	m.Updates = make(chan *commandExecution, 2000)
31
	m.Mutex = &sync.RWMutex{}
32
	m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
33
	m.Reset()
34

35
	go m.Monitor()
36

37
	return m
38
}
39

40
// The Default Collector function will panic if collectors are not setup to specification.
41
func (m *metricExchange) DefaultCollector() *metricCollector.DefaultMetricCollector {
42
	if len(m.metricCollectors) < 1 {
43
		panic("No Metric Collectors Registered.")
44
	}
45
	collection, ok := m.metricCollectors[0].(*metricCollector.DefaultMetricCollector)
46
	if !ok {
47
		panic("Default metric collector is not registered correctly. The default metric collector must be registered first.")
48
	}
49
	return collection
50
}
51

52
func (m *metricExchange) Monitor() {
53
	for update := range m.Updates {
54
		// we only grab a read lock to make sure Reset() isn't changing the numbers.
55
		m.Mutex.RLock()
56

57
		totalDuration := time.Since(update.Start)
58
		wg := &sync.WaitGroup{}
59
		for _, collector := range m.metricCollectors {
60
			wg.Add(1)
61
			go m.IncrementMetrics(wg, collector, update, totalDuration)
62
		}
63
		wg.Wait()
64

65
		m.Mutex.RUnlock()
66
	}
67
}
68

69
func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
70
	// granular metrics
71
	r := metricCollector.MetricResult{
72
		Attempts:         1,
73
		TotalDuration:    totalDuration,
74
		RunDuration:      update.RunDuration,
75
		ConcurrencyInUse: update.ConcurrencyInUse,
76
	}
77

78
	switch update.Types[0] {
79
	case "success":
80
		r.Successes = 1
81
	case "failure":
82
		r.Failures = 1
83
		r.Errors = 1
84
	case "rejected":
85
		r.Rejects = 1
86
		r.Errors = 1
87
	case "short-circuit":
88
		r.ShortCircuits = 1
89
		r.Errors = 1
90
	case "timeout":
91
		r.Timeouts = 1
92
		r.Errors = 1
93
	case "context_canceled":
94
		r.ContextCanceled = 1
95
	case "context_deadline_exceeded":
96
		r.ContextDeadlineExceeded = 1
97
	}
98

99
	if len(update.Types) > 1 {
100
		// fallback metrics
101
		if update.Types[1] == "fallback-success" {
102
			r.FallbackSuccesses = 1
103
		}
104
		if update.Types[1] == "fallback-failure" {
105
			r.FallbackFailures = 1
106
		}
107
	}
108

109
	collector.Update(r)
110

111
	wg.Done()
112
}
113

114
func (m *metricExchange) Reset() {
115
	m.Mutex.Lock()
116
	defer m.Mutex.Unlock()
117

118
	for _, collector := range m.metricCollectors {
119
		collector.Reset()
120
	}
121
}
122

123
func (m *metricExchange) Requests() *rolling.Number {
124
	m.Mutex.RLock()
125
	defer m.Mutex.RUnlock()
126
	return m.requestsLocked()
127
}
128

129
func (m *metricExchange) requestsLocked() *rolling.Number {
130
	return m.DefaultCollector().NumRequests()
131
}
132

133
func (m *metricExchange) ErrorPercent(now time.Time) int {
134
	m.Mutex.RLock()
135
	defer m.Mutex.RUnlock()
136

137
	var errPct float64
138
	reqs := m.requestsLocked().Sum(now)
139
	errs := m.DefaultCollector().Errors().Sum(now)
140

141
	if reqs > 0 {
142
		errPct = (float64(errs) / float64(reqs)) * 100
143
	}
144

145
	return int(errPct + 0.5)
146
}
147

148
func (m *metricExchange) IsHealthy(now time.Time) bool {
149
	return m.ErrorPercent(now) < getSettings(m.Name).ErrorPercentThreshold
150
}
151

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.