cubefs

Форк
0
196 строк · 5.0 Кб
1
package hystrix
2

3
import (
4
	"fmt"
5
	"sync"
6
	"sync/atomic"
7
	"time"
8
)
9

10
// CircuitBreaker is created for each ExecutorPool to track whether requests
11
// should be attempted, or rejected if the Health of the circuit is too low.
12
type CircuitBreaker struct {
13
	Name                   string
14
	open                   bool
15
	forceOpen              bool
16
	mutex                  *sync.RWMutex
17
	openedOrLastTestedTime int64
18

19
	executorPool *executorPool
20
	metrics      *metricExchange
21
}
22

23
var (
24
	circuitBreakersMutex *sync.RWMutex
25
	circuitBreakers      map[string]*CircuitBreaker
26
)
27

28
func init() {
29
	circuitBreakersMutex = &sync.RWMutex{}
30
	circuitBreakers = make(map[string]*CircuitBreaker)
31
}
32

33
// GetCircuit returns the circuit for the given command and whether this call created it.
34
func GetCircuit(name string) (*CircuitBreaker, bool, error) {
35
	circuitBreakersMutex.RLock()
36
	_, ok := circuitBreakers[name]
37
	if !ok {
38
		circuitBreakersMutex.RUnlock()
39
		circuitBreakersMutex.Lock()
40
		defer circuitBreakersMutex.Unlock()
41
		// because we released the rlock before we obtained the exclusive lock,
42
		// we need to double check that some other thread didn't beat us to
43
		// creation.
44
		if cb, ok := circuitBreakers[name]; ok {
45
			return cb, false, nil
46
		}
47
		circuitBreakers[name] = newCircuitBreaker(name)
48
	} else {
49
		defer circuitBreakersMutex.RUnlock()
50
	}
51

52
	return circuitBreakers[name], !ok, nil
53
}
54

55
// Flush purges all circuit and metric information from memory.
56
func Flush() {
57
	circuitBreakersMutex.Lock()
58
	defer circuitBreakersMutex.Unlock()
59

60
	for name, cb := range circuitBreakers {
61
		cb.metrics.Reset()
62
		cb.executorPool.Metrics.Reset()
63
		delete(circuitBreakers, name)
64
	}
65
}
66

67
// newCircuitBreaker creates a CircuitBreaker with associated Health
68
func newCircuitBreaker(name string) *CircuitBreaker {
69
	c := &CircuitBreaker{}
70
	c.Name = name
71
	c.metrics = newMetricExchange(name)
72
	c.executorPool = newExecutorPool(name)
73
	c.mutex = &sync.RWMutex{}
74

75
	return c
76
}
77

78
// toggleForceOpen allows manually causing the fallback logic for all instances
79
// of a given command.
80
func (circuit *CircuitBreaker) toggleForceOpen(toggle bool) error {
81
	circuit, _, err := GetCircuit(circuit.Name)
82
	if err != nil {
83
		return err
84
	}
85

86
	circuit.forceOpen = toggle
87
	return nil
88
}
89

90
// IsOpen is called before any Command execution to check whether or
91
// not it should be attempted. An "open" circuit means it is disabled.
92
func (circuit *CircuitBreaker) IsOpen() bool {
93
	circuit.mutex.RLock()
94
	o := circuit.forceOpen || circuit.open
95
	circuit.mutex.RUnlock()
96

97
	if o {
98
		return true
99
	}
100

101
	if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
102
		return false
103
	}
104

105
	if !circuit.metrics.IsHealthy(time.Now()) {
106
		// too many failures, open the circuit
107
		circuit.setOpen()
108
		return true
109
	}
110

111
	return false
112
}
113

114
// AllowRequest is checked before a command executes, ensuring that circuit state and metric health allow it.
115
// When the circuit is open, this call will occasionally return true to measure whether the external service
116
// has recovered.
117
func (circuit *CircuitBreaker) AllowRequest() bool {
118
	return !circuit.IsOpen() || circuit.allowSingleTest()
119
}
120

121
func (circuit *CircuitBreaker) allowSingleTest() bool {
122
	circuit.mutex.RLock()
123
	defer circuit.mutex.RUnlock()
124

125
	now := time.Now().UnixNano()
126
	openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
127
	if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
128
		swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)
129
		if swapped {
130
			log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
131
		}
132
		return swapped
133
	}
134

135
	return false
136
}
137

138
func (circuit *CircuitBreaker) setOpen() {
139
	circuit.mutex.Lock()
140
	defer circuit.mutex.Unlock()
141

142
	if circuit.open {
143
		return
144
	}
145

146
	log.Printf("hystrix-go: opening circuit %v", circuit.Name)
147

148
	circuit.openedOrLastTestedTime = time.Now().UnixNano()
149
	circuit.open = true
150
}
151

152
func (circuit *CircuitBreaker) setClose() {
153
	circuit.mutex.Lock()
154
	defer circuit.mutex.Unlock()
155

156
	if !circuit.open {
157
		return
158
	}
159

160
	log.Printf("hystrix-go: closing circuit %v", circuit.Name)
161

162
	circuit.open = false
163
	circuit.metrics.Reset()
164
}
165

166
// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
167
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
168
	if len(eventTypes) == 0 {
169
		return fmt.Errorf("no event types sent for metrics")
170
	}
171

172
	circuit.mutex.RLock()
173
	o := circuit.open
174
	circuit.mutex.RUnlock()
175
	if eventTypes[0] == "success" && o {
176
		circuit.setClose()
177
	}
178

179
	var concurrencyInUse float64
180
	if circuit.executorPool.Max > 0 {
181
		concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
182
	}
183

184
	select {
185
	case circuit.metrics.Updates <- &commandExecution{
186
		Types:            eventTypes,
187
		Start:            start,
188
		RunDuration:      runDuration,
189
		ConcurrencyInUse: concurrencyInUse,
190
	}:
191
	default:
192
		return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
193
	}
194

195
	return nil
196
}
197

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.