cubefs

Форк
0
299 строк · 8.0 Кб
1
package hystrix
2

3
import (
4
	"context"
5
	"fmt"
6
	"sync"
7
	"time"
8
)
9

10
type runFunc func() error
11
type fallbackFunc func(error) error
12
type runFuncC func(context.Context) error
13
type fallbackFuncC func(context.Context, error) error
14

15
// A CircuitError is an error which models various failure states of execution,
16
// such as the circuit being open or a timeout.
17
type CircuitError struct {
18
	Message string
19
}
20

21
func (e CircuitError) Error() string {
22
	return "hystrix: " + e.Message
23
}
24

25
// command models the state used for a single execution on a circuit. "hystrix command" is commonly
26
// used to describe the pairing of your run/fallback functions with a circuit.
27
type command struct {
28
	sync.Mutex
29

30
	ticket      *struct{}
31
	start       time.Time
32
	errChan     chan error
33
	finished    chan bool
34
	circuit     *CircuitBreaker
35
	run         runFuncC
36
	fallback    fallbackFuncC
37
	runDuration time.Duration
38
	events      []string
39
}
40

41
var (
42
	// ErrMaxConcurrency occurs when too many of the same named command are executed at the same time.
43
	ErrMaxConcurrency = CircuitError{Message: "max concurrency"}
44
	// ErrCircuitOpen returns when an execution attempt "short circuits". This happens due to the circuit being measured as unhealthy.
45
	ErrCircuitOpen = CircuitError{Message: "circuit open"}
46
	// ErrTimeout occurs when the provided function takes too long to execute.
47
	ErrTimeout = CircuitError{Message: "timeout"}
48
)
49

50
// Go runs your function while tracking the health of previous calls to it.
51
// If your function begins slowing down or failing repeatedly, we will block
52
// new calls to it for you to give the dependent service time to repair.
53
//
54
// Define a fallback function if you want to define some code to execute during outages.
55
func Go(name string, run runFunc, fallback fallbackFunc) chan error {
56
	runC := func(ctx context.Context) error {
57
		return run()
58
	}
59
	var fallbackC fallbackFuncC
60
	if fallback != nil {
61
		fallbackC = func(ctx context.Context, err error) error {
62
			return fallback(err)
63
		}
64
	}
65
	return GoC(context.Background(), name, runC, fallbackC)
66
}
67

68
// GoC runs your function while tracking the health of previous calls to it.
69
// If your function begins slowing down or failing repeatedly, we will block
70
// new calls to it for you to give the dependent service time to repair.
71
//
72
// Define a fallback function if you want to define some code to execute during outages.
73
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
74
	cmd := &command{
75
		run:      run,
76
		fallback: fallback,
77
		start:    time.Now(),
78
		errChan:  make(chan error, 1),
79
		finished: make(chan bool, 1),
80
	}
81

82
	// dont have methods with explicit params and returns
83
	// let data come in and out naturally, like with any closure
84
	// explicit error return to give place for us to kill switch the operation (fallback)
85

86
	circuit, _, err := GetCircuit(name)
87
	if err != nil {
88
		cmd.errChan <- err
89
		return cmd.errChan
90
	}
91
	cmd.circuit = circuit
92
	ticketCond := sync.NewCond(cmd)
93
	ticketChecked := false
94
	// When the caller extracts error from returned errChan, it's assumed that
95
	// the ticket's been returned to executorPool. Therefore, returnTicket() can
96
	// not run after cmd.errorWithFallback().
97
	returnTicket := func() {
98
		cmd.Lock()
99
		// Avoid releasing before a ticket is acquired.
100
		for !ticketChecked {
101
			ticketCond.Wait()
102
		}
103
		cmd.circuit.executorPool.Return(cmd.ticket)
104
		cmd.Unlock()
105
	}
106
	// Shared by the following two goroutines. It ensures only the faster
107
	// goroutine runs errWithFallback() and reportAllEvent().
108
	returnOnce := &sync.Once{}
109
	reportAllEvent := func() {
110
		err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
111
		if err != nil {
112
			log.Printf(err.Error())
113
		}
114
	}
115

116
	go func() {
117
		defer func() { cmd.finished <- true }()
118

119
		// Circuits get opened when recent executions have shown to have a high error rate.
120
		// Rejecting new executions allows backends to recover, and the circuit will allow
121
		// new traffic when it feels a healthly state has returned.
122
		if !cmd.circuit.AllowRequest() {
123
			cmd.Lock()
124
			// It's safe for another goroutine to go ahead releasing a nil ticket.
125
			ticketChecked = true
126
			ticketCond.Signal()
127
			cmd.Unlock()
128
			returnOnce.Do(func() {
129
				returnTicket()
130
				cmd.errorWithFallback(ctx, ErrCircuitOpen)
131
				reportAllEvent()
132
			})
133
			return
134
		}
135

136
		// As backends falter, requests take longer but don't always fail.
137
		//
138
		// When requests slow down but the incoming rate of requests stays the same, you have to
139
		// run more at a time to keep up. By controlling concurrency during these situations, you can
140
		// shed load which accumulates due to the increasing ratio of active commands to incoming requests.
141
		cmd.Lock()
142
		select {
143
		case cmd.ticket = <-circuit.executorPool.Tickets:
144
			ticketChecked = true
145
			ticketCond.Signal()
146
			cmd.Unlock()
147
		default:
148
			ticketChecked = true
149
			ticketCond.Signal()
150
			cmd.Unlock()
151
			returnOnce.Do(func() {
152
				returnTicket()
153
				cmd.errorWithFallback(ctx, ErrMaxConcurrency)
154
				reportAllEvent()
155
			})
156
			return
157
		}
158

159
		runStart := time.Now()
160
		runErr := run(ctx)
161
		returnOnce.Do(func() {
162
			defer reportAllEvent()
163
			cmd.runDuration = time.Since(runStart)
164
			returnTicket()
165
			if runErr != nil {
166
				cmd.errorWithFallback(ctx, runErr)
167
				return
168
			}
169
			cmd.reportEvent("success")
170
		})
171
	}()
172

173
	go func() {
174
		timer := time.NewTimer(getSettings(name).Timeout)
175
		defer timer.Stop()
176

177
		select {
178
		case <-cmd.finished:
179
			// returnOnce has been executed in another goroutine
180
		case <-ctx.Done():
181
			returnOnce.Do(func() {
182
				returnTicket()
183
				cmd.errorWithFallback(ctx, ctx.Err())
184
				reportAllEvent()
185
			})
186
			return
187
		case <-timer.C:
188
			returnOnce.Do(func() {
189
				returnTicket()
190
				cmd.errorWithFallback(ctx, ErrTimeout)
191
				reportAllEvent()
192
			})
193
			return
194
		}
195
	}()
196

197
	return cmd.errChan
198
}
199

200
// Do runs your function in a synchronous manner, blocking until either your function succeeds
201
// or an error is returned, including hystrix circuit errors
202
func Do(name string, run runFunc, fallback fallbackFunc) error {
203
	runC := func(ctx context.Context) error {
204
		return run()
205
	}
206
	var fallbackC fallbackFuncC
207
	if fallback != nil {
208
		fallbackC = func(ctx context.Context, err error) error {
209
			return fallback(err)
210
		}
211
	}
212
	return DoC(context.Background(), name, runC, fallbackC)
213
}
214

215
// DoC runs your function in a synchronous manner, blocking until either your function succeeds
216
// or an error is returned, including hystrix circuit errors
217
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
218
	done := make(chan struct{}, 1)
219

220
	r := func(ctx context.Context) error {
221
		err := run(ctx)
222
		if err != nil {
223
			return err
224
		}
225

226
		done <- struct{}{}
227
		return nil
228
	}
229

230
	f := func(ctx context.Context, e error) error {
231
		err := fallback(ctx, e)
232
		if err != nil {
233
			return err
234
		}
235

236
		done <- struct{}{}
237
		return nil
238
	}
239

240
	var errChan chan error
241
	if fallback == nil {
242
		errChan = GoC(ctx, name, r, nil)
243
	} else {
244
		errChan = GoC(ctx, name, r, f)
245
	}
246

247
	select {
248
	case <-done:
249
		return nil
250
	case err := <-errChan:
251
		return err
252
	}
253
}
254

255
func (c *command) reportEvent(eventType string) {
256
	c.Lock()
257
	defer c.Unlock()
258

259
	c.events = append(c.events, eventType)
260
}
261

262
// errorWithFallback triggers the fallback while reporting the appropriate metric events.
263
func (c *command) errorWithFallback(ctx context.Context, err error) {
264
	eventType := "failure"
265
	if err == ErrCircuitOpen {
266
		eventType = "short-circuit"
267
	} else if err == ErrMaxConcurrency {
268
		eventType = "rejected"
269
	} else if err == ErrTimeout {
270
		eventType = "timeout"
271
	} else if err == context.Canceled {
272
		eventType = "context_canceled"
273
	} else if err == context.DeadlineExceeded {
274
		eventType = "context_deadline_exceeded"
275
	}
276

277
	c.reportEvent(eventType)
278
	fallbackErr := c.tryFallback(ctx, err)
279
	if fallbackErr != nil {
280
		c.errChan <- fallbackErr
281
	}
282
}
283

284
func (c *command) tryFallback(ctx context.Context, err error) error {
285
	if c.fallback == nil {
286
		// If we don't have a fallback return the original error.
287
		return err
288
	}
289

290
	fallbackErr := c.fallback(ctx, err)
291
	if fallbackErr != nil {
292
		c.reportEvent("fallback-failure")
293
		return fmt.Errorf("fallback failed with '%v'. run error was '%v'", fallbackErr, err)
294
	}
295

296
	c.reportEvent("fallback-success")
297

298
	return nil
299
}
300

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.