cubefs
196 строк · 5.0 Кб
1package hystrix
2
3import (
4"fmt"
5"sync"
6"sync/atomic"
7"time"
8)
9
10// CircuitBreaker is created for each ExecutorPool to track whether requests
11// should be attempted, or rejected if the Health of the circuit is too low.
12type CircuitBreaker struct {
13Name string
14open bool
15forceOpen bool
16mutex *sync.RWMutex
17openedOrLastTestedTime int64
18
19executorPool *executorPool
20metrics *metricExchange
21}
22
23var (
24circuitBreakersMutex *sync.RWMutex
25circuitBreakers map[string]*CircuitBreaker
26)
27
28func init() {
29circuitBreakersMutex = &sync.RWMutex{}
30circuitBreakers = make(map[string]*CircuitBreaker)
31}
32
33// GetCircuit returns the circuit for the given command and whether this call created it.
34func GetCircuit(name string) (*CircuitBreaker, bool, error) {
35circuitBreakersMutex.RLock()
36_, ok := circuitBreakers[name]
37if !ok {
38circuitBreakersMutex.RUnlock()
39circuitBreakersMutex.Lock()
40defer circuitBreakersMutex.Unlock()
41// because we released the rlock before we obtained the exclusive lock,
42// we need to double check that some other thread didn't beat us to
43// creation.
44if cb, ok := circuitBreakers[name]; ok {
45return cb, false, nil
46}
47circuitBreakers[name] = newCircuitBreaker(name)
48} else {
49defer circuitBreakersMutex.RUnlock()
50}
51
52return circuitBreakers[name], !ok, nil
53}
54
55// Flush purges all circuit and metric information from memory.
56func Flush() {
57circuitBreakersMutex.Lock()
58defer circuitBreakersMutex.Unlock()
59
60for name, cb := range circuitBreakers {
61cb.metrics.Reset()
62cb.executorPool.Metrics.Reset()
63delete(circuitBreakers, name)
64}
65}
66
67// newCircuitBreaker creates a CircuitBreaker with associated Health
68func newCircuitBreaker(name string) *CircuitBreaker {
69c := &CircuitBreaker{}
70c.Name = name
71c.metrics = newMetricExchange(name)
72c.executorPool = newExecutorPool(name)
73c.mutex = &sync.RWMutex{}
74
75return c
76}
77
78// toggleForceOpen allows manually causing the fallback logic for all instances
79// of a given command.
80func (circuit *CircuitBreaker) toggleForceOpen(toggle bool) error {
81circuit, _, err := GetCircuit(circuit.Name)
82if err != nil {
83return err
84}
85
86circuit.forceOpen = toggle
87return nil
88}
89
90// IsOpen is called before any Command execution to check whether or
91// not it should be attempted. An "open" circuit means it is disabled.
92func (circuit *CircuitBreaker) IsOpen() bool {
93circuit.mutex.RLock()
94o := circuit.forceOpen || circuit.open
95circuit.mutex.RUnlock()
96
97if o {
98return true
99}
100
101if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
102return false
103}
104
105if !circuit.metrics.IsHealthy(time.Now()) {
106// too many failures, open the circuit
107circuit.setOpen()
108return true
109}
110
111return false
112}
113
114// AllowRequest is checked before a command executes, ensuring that circuit state and metric health allow it.
115// When the circuit is open, this call will occasionally return true to measure whether the external service
116// has recovered.
117func (circuit *CircuitBreaker) AllowRequest() bool {
118return !circuit.IsOpen() || circuit.allowSingleTest()
119}
120
121func (circuit *CircuitBreaker) allowSingleTest() bool {
122circuit.mutex.RLock()
123defer circuit.mutex.RUnlock()
124
125now := time.Now().UnixNano()
126openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
127if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
128swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)
129if swapped {
130log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
131}
132return swapped
133}
134
135return false
136}
137
138func (circuit *CircuitBreaker) setOpen() {
139circuit.mutex.Lock()
140defer circuit.mutex.Unlock()
141
142if circuit.open {
143return
144}
145
146log.Printf("hystrix-go: opening circuit %v", circuit.Name)
147
148circuit.openedOrLastTestedTime = time.Now().UnixNano()
149circuit.open = true
150}
151
152func (circuit *CircuitBreaker) setClose() {
153circuit.mutex.Lock()
154defer circuit.mutex.Unlock()
155
156if !circuit.open {
157return
158}
159
160log.Printf("hystrix-go: closing circuit %v", circuit.Name)
161
162circuit.open = false
163circuit.metrics.Reset()
164}
165
166// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
167func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
168if len(eventTypes) == 0 {
169return fmt.Errorf("no event types sent for metrics")
170}
171
172circuit.mutex.RLock()
173o := circuit.open
174circuit.mutex.RUnlock()
175if eventTypes[0] == "success" && o {
176circuit.setClose()
177}
178
179var concurrencyInUse float64
180if circuit.executorPool.Max > 0 {
181concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
182}
183
184select {
185case circuit.metrics.Updates <- &commandExecution{
186Types: eventTypes,
187Start: start,
188RunDuration: runDuration,
189ConcurrencyInUse: concurrencyInUse,
190}:
191default:
192return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
193}
194
195return nil
196}
197