cubefs
299 строк · 8.0 Кб
1package hystrix
2
3import (
4"context"
5"fmt"
6"sync"
7"time"
8)
9
10type runFunc func() error
11type fallbackFunc func(error) error
12type runFuncC func(context.Context) error
13type fallbackFuncC func(context.Context, error) error
14
15// A CircuitError is an error which models various failure states of execution,
16// such as the circuit being open or a timeout.
17type CircuitError struct {
18Message string
19}
20
21func (e CircuitError) Error() string {
22return "hystrix: " + e.Message
23}
24
25// command models the state used for a single execution on a circuit. "hystrix command" is commonly
26// used to describe the pairing of your run/fallback functions with a circuit.
27type command struct {
28sync.Mutex
29
30ticket *struct{}
31start time.Time
32errChan chan error
33finished chan bool
34circuit *CircuitBreaker
35run runFuncC
36fallback fallbackFuncC
37runDuration time.Duration
38events []string
39}
40
41var (
42// ErrMaxConcurrency occurs when too many of the same named command are executed at the same time.
43ErrMaxConcurrency = CircuitError{Message: "max concurrency"}
44// ErrCircuitOpen returns when an execution attempt "short circuits". This happens due to the circuit being measured as unhealthy.
45ErrCircuitOpen = CircuitError{Message: "circuit open"}
46// ErrTimeout occurs when the provided function takes too long to execute.
47ErrTimeout = CircuitError{Message: "timeout"}
48)
49
50// Go runs your function while tracking the health of previous calls to it.
51// If your function begins slowing down or failing repeatedly, we will block
52// new calls to it for you to give the dependent service time to repair.
53//
54// Define a fallback function if you want to define some code to execute during outages.
55func Go(name string, run runFunc, fallback fallbackFunc) chan error {
56runC := func(ctx context.Context) error {
57return run()
58}
59var fallbackC fallbackFuncC
60if fallback != nil {
61fallbackC = func(ctx context.Context, err error) error {
62return fallback(err)
63}
64}
65return GoC(context.Background(), name, runC, fallbackC)
66}
67
68// GoC runs your function while tracking the health of previous calls to it.
69// If your function begins slowing down or failing repeatedly, we will block
70// new calls to it for you to give the dependent service time to repair.
71//
72// Define a fallback function if you want to define some code to execute during outages.
73func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
74cmd := &command{
75run: run,
76fallback: fallback,
77start: time.Now(),
78errChan: make(chan error, 1),
79finished: make(chan bool, 1),
80}
81
82// dont have methods with explicit params and returns
83// let data come in and out naturally, like with any closure
84// explicit error return to give place for us to kill switch the operation (fallback)
85
86circuit, _, err := GetCircuit(name)
87if err != nil {
88cmd.errChan <- err
89return cmd.errChan
90}
91cmd.circuit = circuit
92ticketCond := sync.NewCond(cmd)
93ticketChecked := false
94// When the caller extracts error from returned errChan, it's assumed that
95// the ticket's been returned to executorPool. Therefore, returnTicket() can
96// not run after cmd.errorWithFallback().
97returnTicket := func() {
98cmd.Lock()
99// Avoid releasing before a ticket is acquired.
100for !ticketChecked {
101ticketCond.Wait()
102}
103cmd.circuit.executorPool.Return(cmd.ticket)
104cmd.Unlock()
105}
106// Shared by the following two goroutines. It ensures only the faster
107// goroutine runs errWithFallback() and reportAllEvent().
108returnOnce := &sync.Once{}
109reportAllEvent := func() {
110err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
111if err != nil {
112log.Printf(err.Error())
113}
114}
115
116go func() {
117defer func() { cmd.finished <- true }()
118
119// Circuits get opened when recent executions have shown to have a high error rate.
120// Rejecting new executions allows backends to recover, and the circuit will allow
121// new traffic when it feels a healthly state has returned.
122if !cmd.circuit.AllowRequest() {
123cmd.Lock()
124// It's safe for another goroutine to go ahead releasing a nil ticket.
125ticketChecked = true
126ticketCond.Signal()
127cmd.Unlock()
128returnOnce.Do(func() {
129returnTicket()
130cmd.errorWithFallback(ctx, ErrCircuitOpen)
131reportAllEvent()
132})
133return
134}
135
136// As backends falter, requests take longer but don't always fail.
137//
138// When requests slow down but the incoming rate of requests stays the same, you have to
139// run more at a time to keep up. By controlling concurrency during these situations, you can
140// shed load which accumulates due to the increasing ratio of active commands to incoming requests.
141cmd.Lock()
142select {
143case cmd.ticket = <-circuit.executorPool.Tickets:
144ticketChecked = true
145ticketCond.Signal()
146cmd.Unlock()
147default:
148ticketChecked = true
149ticketCond.Signal()
150cmd.Unlock()
151returnOnce.Do(func() {
152returnTicket()
153cmd.errorWithFallback(ctx, ErrMaxConcurrency)
154reportAllEvent()
155})
156return
157}
158
159runStart := time.Now()
160runErr := run(ctx)
161returnOnce.Do(func() {
162defer reportAllEvent()
163cmd.runDuration = time.Since(runStart)
164returnTicket()
165if runErr != nil {
166cmd.errorWithFallback(ctx, runErr)
167return
168}
169cmd.reportEvent("success")
170})
171}()
172
173go func() {
174timer := time.NewTimer(getSettings(name).Timeout)
175defer timer.Stop()
176
177select {
178case <-cmd.finished:
179// returnOnce has been executed in another goroutine
180case <-ctx.Done():
181returnOnce.Do(func() {
182returnTicket()
183cmd.errorWithFallback(ctx, ctx.Err())
184reportAllEvent()
185})
186return
187case <-timer.C:
188returnOnce.Do(func() {
189returnTicket()
190cmd.errorWithFallback(ctx, ErrTimeout)
191reportAllEvent()
192})
193return
194}
195}()
196
197return cmd.errChan
198}
199
200// Do runs your function in a synchronous manner, blocking until either your function succeeds
201// or an error is returned, including hystrix circuit errors
202func Do(name string, run runFunc, fallback fallbackFunc) error {
203runC := func(ctx context.Context) error {
204return run()
205}
206var fallbackC fallbackFuncC
207if fallback != nil {
208fallbackC = func(ctx context.Context, err error) error {
209return fallback(err)
210}
211}
212return DoC(context.Background(), name, runC, fallbackC)
213}
214
215// DoC runs your function in a synchronous manner, blocking until either your function succeeds
216// or an error is returned, including hystrix circuit errors
217func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
218done := make(chan struct{}, 1)
219
220r := func(ctx context.Context) error {
221err := run(ctx)
222if err != nil {
223return err
224}
225
226done <- struct{}{}
227return nil
228}
229
230f := func(ctx context.Context, e error) error {
231err := fallback(ctx, e)
232if err != nil {
233return err
234}
235
236done <- struct{}{}
237return nil
238}
239
240var errChan chan error
241if fallback == nil {
242errChan = GoC(ctx, name, r, nil)
243} else {
244errChan = GoC(ctx, name, r, f)
245}
246
247select {
248case <-done:
249return nil
250case err := <-errChan:
251return err
252}
253}
254
255func (c *command) reportEvent(eventType string) {
256c.Lock()
257defer c.Unlock()
258
259c.events = append(c.events, eventType)
260}
261
262// errorWithFallback triggers the fallback while reporting the appropriate metric events.
263func (c *command) errorWithFallback(ctx context.Context, err error) {
264eventType := "failure"
265if err == ErrCircuitOpen {
266eventType = "short-circuit"
267} else if err == ErrMaxConcurrency {
268eventType = "rejected"
269} else if err == ErrTimeout {
270eventType = "timeout"
271} else if err == context.Canceled {
272eventType = "context_canceled"
273} else if err == context.DeadlineExceeded {
274eventType = "context_deadline_exceeded"
275}
276
277c.reportEvent(eventType)
278fallbackErr := c.tryFallback(ctx, err)
279if fallbackErr != nil {
280c.errChan <- fallbackErr
281}
282}
283
284func (c *command) tryFallback(ctx context.Context, err error) error {
285if c.fallback == nil {
286// If we don't have a fallback return the original error.
287return err
288}
289
290fallbackErr := c.fallback(ctx, err)
291if fallbackErr != nil {
292c.reportEvent("fallback-failure")
293return fmt.Errorf("fallback failed with '%v'. run error was '%v'", fallbackErr, err)
294}
295
296c.reportEvent("fallback-success")
297
298return nil
299}
300