cubefs
150 строк · 3.5 Кб
1package hystrix
2
3import (
4"sync"
5"time"
6
7"github.com/afex/hystrix-go/hystrix/metric_collector"
8"github.com/afex/hystrix-go/hystrix/rolling"
9)
10
11type commandExecution struct {
12Types []string `json:"types"`
13Start time.Time `json:"start_time"`
14RunDuration time.Duration `json:"run_duration"`
15ConcurrencyInUse float64 `json:"concurrency_inuse"`
16}
17
18type metricExchange struct {
19Name string
20Updates chan *commandExecution
21Mutex *sync.RWMutex
22
23metricCollectors []metricCollector.MetricCollector
24}
25
26func newMetricExchange(name string) *metricExchange {
27m := &metricExchange{}
28m.Name = name
29
30m.Updates = make(chan *commandExecution, 2000)
31m.Mutex = &sync.RWMutex{}
32m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
33m.Reset()
34
35go m.Monitor()
36
37return m
38}
39
40// The Default Collector function will panic if collectors are not setup to specification.
41func (m *metricExchange) DefaultCollector() *metricCollector.DefaultMetricCollector {
42if len(m.metricCollectors) < 1 {
43panic("No Metric Collectors Registered.")
44}
45collection, ok := m.metricCollectors[0].(*metricCollector.DefaultMetricCollector)
46if !ok {
47panic("Default metric collector is not registered correctly. The default metric collector must be registered first.")
48}
49return collection
50}
51
52func (m *metricExchange) Monitor() {
53for update := range m.Updates {
54// we only grab a read lock to make sure Reset() isn't changing the numbers.
55m.Mutex.RLock()
56
57totalDuration := time.Since(update.Start)
58wg := &sync.WaitGroup{}
59for _, collector := range m.metricCollectors {
60wg.Add(1)
61go m.IncrementMetrics(wg, collector, update, totalDuration)
62}
63wg.Wait()
64
65m.Mutex.RUnlock()
66}
67}
68
69func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
70// granular metrics
71r := metricCollector.MetricResult{
72Attempts: 1,
73TotalDuration: totalDuration,
74RunDuration: update.RunDuration,
75ConcurrencyInUse: update.ConcurrencyInUse,
76}
77
78switch update.Types[0] {
79case "success":
80r.Successes = 1
81case "failure":
82r.Failures = 1
83r.Errors = 1
84case "rejected":
85r.Rejects = 1
86r.Errors = 1
87case "short-circuit":
88r.ShortCircuits = 1
89r.Errors = 1
90case "timeout":
91r.Timeouts = 1
92r.Errors = 1
93case "context_canceled":
94r.ContextCanceled = 1
95case "context_deadline_exceeded":
96r.ContextDeadlineExceeded = 1
97}
98
99if len(update.Types) > 1 {
100// fallback metrics
101if update.Types[1] == "fallback-success" {
102r.FallbackSuccesses = 1
103}
104if update.Types[1] == "fallback-failure" {
105r.FallbackFailures = 1
106}
107}
108
109collector.Update(r)
110
111wg.Done()
112}
113
114func (m *metricExchange) Reset() {
115m.Mutex.Lock()
116defer m.Mutex.Unlock()
117
118for _, collector := range m.metricCollectors {
119collector.Reset()
120}
121}
122
123func (m *metricExchange) Requests() *rolling.Number {
124m.Mutex.RLock()
125defer m.Mutex.RUnlock()
126return m.requestsLocked()
127}
128
129func (m *metricExchange) requestsLocked() *rolling.Number {
130return m.DefaultCollector().NumRequests()
131}
132
133func (m *metricExchange) ErrorPercent(now time.Time) int {
134m.Mutex.RLock()
135defer m.Mutex.RUnlock()
136
137var errPct float64
138reqs := m.requestsLocked().Sum(now)
139errs := m.DefaultCollector().Errors().Sum(now)
140
141if reqs > 0 {
142errPct = (float64(errs) / float64(reqs)) * 100
143}
144
145return int(errPct + 0.5)
146}
147
148func (m *metricExchange) IsHealthy(now time.Time) bool {
149return m.ErrorPercent(now) < getSettings(m.Name).ErrorPercentThreshold
150}
151