cubefs

Форк
0
326 строк · 10.9 Кб
1
package hystrix
2

3
import (
4
	"bytes"
5
	"encoding/json"
6
	"net/http"
7
	"sync"
8
	"time"
9

10
	"github.com/afex/hystrix-go/hystrix/rolling"
11
)
12

13
const (
14
	streamEventBufferSize = 10
15
)
16

17
// NewStreamHandler returns a server capable of exposing dashboard metrics via HTTP.
18
func NewStreamHandler() *StreamHandler {
19
	return &StreamHandler{}
20
}
21

22
// StreamHandler publishes metrics for each command and each pool once a second to all connected HTTP client.
23
type StreamHandler struct {
24
	requests map[*http.Request]chan []byte
25
	mu       sync.RWMutex
26
	done     chan struct{}
27
}
28

29
// Start begins watching the in-memory circuit breakers for metrics
30
func (sh *StreamHandler) Start() {
31
	sh.requests = make(map[*http.Request]chan []byte)
32
	sh.done = make(chan struct{})
33
	go sh.loop()
34
}
35

36
// Stop shuts down the metric collection routine
37
func (sh *StreamHandler) Stop() {
38
	close(sh.done)
39
}
40

41
var _ http.Handler = (*StreamHandler)(nil)
42

43
func (sh *StreamHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
44
	// Make sure that the writer supports flushing.
45
	f, ok := rw.(http.Flusher)
46
	if !ok {
47
		http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
48
		return
49
	}
50
	events := sh.register(req)
51
	defer sh.unregister(req)
52

53
	notify := rw.(http.CloseNotifier).CloseNotify()
54

55
	rw.Header().Add("Content-Type", "text/event-stream")
56
	rw.Header().Set("Cache-Control", "no-cache")
57
	rw.Header().Set("Connection", "keep-alive")
58
	for {
59
		select {
60
		case <-notify:
61
			// client is gone
62
			return
63
		case event := <-events:
64
			_, err := rw.Write(event)
65
			if err != nil {
66
				return
67
			}
68
			f.Flush()
69
		}
70
	}
71
}
72

73
func (sh *StreamHandler) loop() {
74
	tick := time.Tick(1 * time.Second)
75
	for {
76
		select {
77
		case <-tick:
78
			circuitBreakersMutex.RLock()
79
			for _, cb := range circuitBreakers {
80
				sh.publishMetrics(cb)
81
				sh.publishThreadPools(cb.executorPool)
82
			}
83
			circuitBreakersMutex.RUnlock()
84
		case <-sh.done:
85
			return
86
		}
87
	}
88
}
89

90
func (sh *StreamHandler) publishMetrics(cb *CircuitBreaker) error {
91
	now := time.Now()
92
	reqCount := cb.metrics.Requests().Sum(now)
93
	errCount := cb.metrics.DefaultCollector().Errors().Sum(now)
94
	errPct := cb.metrics.ErrorPercent(now)
95

96
	eventBytes, err := json.Marshal(&streamCmdMetric{
97
		Type:           "HystrixCommand",
98
		Name:           cb.Name,
99
		Group:          cb.Name,
100
		Time:           currentTime(),
101
		ReportingHosts: 1,
102

103
		RequestCount:       uint32(reqCount),
104
		ErrorCount:         uint32(errCount),
105
		ErrorPct:           uint32(errPct),
106
		CircuitBreakerOpen: cb.IsOpen(),
107

108
		RollingCountSuccess:            uint32(cb.metrics.DefaultCollector().Successes().Sum(now)),
109
		RollingCountFailure:            uint32(cb.metrics.DefaultCollector().Failures().Sum(now)),
110
		RollingCountThreadPoolRejected: uint32(cb.metrics.DefaultCollector().Rejects().Sum(now)),
111
		RollingCountShortCircuited:     uint32(cb.metrics.DefaultCollector().ShortCircuits().Sum(now)),
112
		RollingCountTimeout:            uint32(cb.metrics.DefaultCollector().Timeouts().Sum(now)),
113
		RollingCountFallbackSuccess:    uint32(cb.metrics.DefaultCollector().FallbackSuccesses().Sum(now)),
114
		RollingCountFallbackFailure:    uint32(cb.metrics.DefaultCollector().FallbackFailures().Sum(now)),
115

116
		LatencyTotal:       generateLatencyTimings(cb.metrics.DefaultCollector().TotalDuration()),
117
		LatencyTotalMean:   cb.metrics.DefaultCollector().TotalDuration().Mean(),
118
		LatencyExecute:     generateLatencyTimings(cb.metrics.DefaultCollector().RunDuration()),
119
		LatencyExecuteMean: cb.metrics.DefaultCollector().RunDuration().Mean(),
120

121
		// TODO: all hard-coded values should become configurable settings, per circuit
122

123
		RollingStatsWindow:         10000,
124
		ExecutionIsolationStrategy: "THREAD",
125

126
		CircuitBreakerEnabled:                true,
127
		CircuitBreakerForceClosed:            false,
128
		CircuitBreakerForceOpen:              cb.forceOpen,
129
		CircuitBreakerErrorThresholdPercent:  uint32(getSettings(cb.Name).ErrorPercentThreshold),
130
		CircuitBreakerSleepWindow:            uint32(getSettings(cb.Name).SleepWindow.Seconds() * 1000),
131
		CircuitBreakerRequestVolumeThreshold: uint32(getSettings(cb.Name).RequestVolumeThreshold),
132
	})
133
	if err != nil {
134
		return err
135
	}
136
	err = sh.writeToRequests(eventBytes)
137
	if err != nil {
138
		return err
139
	}
140

141
	return nil
142
}
143

144
func (sh *StreamHandler) publishThreadPools(pool *executorPool) error {
145
	now := time.Now()
146

147
	eventBytes, err := json.Marshal(&streamThreadPoolMetric{
148
		Type:           "HystrixThreadPool",
149
		Name:           pool.Name,
150
		ReportingHosts: 1,
151

152
		CurrentActiveCount:        uint32(pool.ActiveCount()),
153
		CurrentTaskCount:          0,
154
		CurrentCompletedTaskCount: 0,
155

156
		RollingCountThreadsExecuted: uint32(pool.Metrics.Executed.Sum(now)),
157
		RollingMaxActiveThreads:     uint32(pool.Metrics.MaxActiveRequests.Max(now)),
158

159
		CurrentPoolSize:        uint32(pool.Max),
160
		CurrentCorePoolSize:    uint32(pool.Max),
161
		CurrentLargestPoolSize: uint32(pool.Max),
162
		CurrentMaximumPoolSize: uint32(pool.Max),
163

164
		RollingStatsWindow:          10000,
165
		QueueSizeRejectionThreshold: 0,
166
		CurrentQueueSize:            0,
167
	})
168
	if err != nil {
169
		return err
170
	}
171
	err = sh.writeToRequests(eventBytes)
172

173
	return nil
174
}
175

176
func (sh *StreamHandler) writeToRequests(eventBytes []byte) error {
177
	var b bytes.Buffer
178
	_, err := b.Write([]byte("data:"))
179
	if err != nil {
180
		return err
181
	}
182

183
	_, err = b.Write(eventBytes)
184
	if err != nil {
185
		return err
186
	}
187
	_, err = b.Write([]byte("\n\n"))
188
	if err != nil {
189
		return err
190
	}
191
	dataBytes := b.Bytes()
192
	sh.mu.RLock()
193

194
	for _, requestEvents := range sh.requests {
195
		select {
196
		case requestEvents <- dataBytes:
197
		default:
198
		}
199
	}
200
	sh.mu.RUnlock()
201

202
	return nil
203
}
204

205
func (sh *StreamHandler) register(req *http.Request) <-chan []byte {
206
	sh.mu.RLock()
207
	events, ok := sh.requests[req]
208
	sh.mu.RUnlock()
209
	if ok {
210
		return events
211
	}
212

213
	events = make(chan []byte, streamEventBufferSize)
214
	sh.mu.Lock()
215
	sh.requests[req] = events
216
	sh.mu.Unlock()
217
	return events
218
}
219

220
func (sh *StreamHandler) unregister(req *http.Request) {
221
	sh.mu.Lock()
222
	delete(sh.requests, req)
223
	sh.mu.Unlock()
224
}
225

226
func generateLatencyTimings(r *rolling.Timing) streamCmdLatency {
227
	return streamCmdLatency{
228
		Timing0:   r.Percentile(0),
229
		Timing25:  r.Percentile(25),
230
		Timing50:  r.Percentile(50),
231
		Timing75:  r.Percentile(75),
232
		Timing90:  r.Percentile(90),
233
		Timing95:  r.Percentile(95),
234
		Timing99:  r.Percentile(99),
235
		Timing995: r.Percentile(99.5),
236
		Timing100: r.Percentile(100),
237
	}
238
}
239

240
type streamCmdMetric struct {
241
	Type           string `json:"type"`
242
	Name           string `json:"name"`
243
	Group          string `json:"group"`
244
	Time           int64  `json:"currentTime"`
245
	ReportingHosts uint32 `json:"reportingHosts"`
246

247
	// Health
248
	RequestCount       uint32 `json:"requestCount"`
249
	ErrorCount         uint32 `json:"errorCount"`
250
	ErrorPct           uint32 `json:"errorPercentage"`
251
	CircuitBreakerOpen bool   `json:"isCircuitBreakerOpen"`
252

253
	RollingCountCollapsedRequests  uint32 `json:"rollingCountCollapsedRequests"`
254
	RollingCountExceptionsThrown   uint32 `json:"rollingCountExceptionsThrown"`
255
	RollingCountFailure            uint32 `json:"rollingCountFailure"`
256
	RollingCountFallbackFailure    uint32 `json:"rollingCountFallbackFailure"`
257
	RollingCountFallbackRejection  uint32 `json:"rollingCountFallbackRejection"`
258
	RollingCountFallbackSuccess    uint32 `json:"rollingCountFallbackSuccess"`
259
	RollingCountResponsesFromCache uint32 `json:"rollingCountResponsesFromCache"`
260
	RollingCountSemaphoreRejected  uint32 `json:"rollingCountSemaphoreRejected"`
261
	RollingCountShortCircuited     uint32 `json:"rollingCountShortCircuited"`
262
	RollingCountSuccess            uint32 `json:"rollingCountSuccess"`
263
	RollingCountThreadPoolRejected uint32 `json:"rollingCountThreadPoolRejected"`
264
	RollingCountTimeout            uint32 `json:"rollingCountTimeout"`
265

266
	CurrentConcurrentExecutionCount uint32 `json:"currentConcurrentExecutionCount"`
267

268
	LatencyExecuteMean uint32           `json:"latencyExecute_mean"`
269
	LatencyExecute     streamCmdLatency `json:"latencyExecute"`
270
	LatencyTotalMean   uint32           `json:"latencyTotal_mean"`
271
	LatencyTotal       streamCmdLatency `json:"latencyTotal"`
272

273
	// Properties
274
	CircuitBreakerRequestVolumeThreshold             uint32 `json:"propertyValue_circuitBreakerRequestVolumeThreshold"`
275
	CircuitBreakerSleepWindow                        uint32 `json:"propertyValue_circuitBreakerSleepWindowInMilliseconds"`
276
	CircuitBreakerErrorThresholdPercent              uint32 `json:"propertyValue_circuitBreakerErrorThresholdPercentage"`
277
	CircuitBreakerForceOpen                          bool   `json:"propertyValue_circuitBreakerForceOpen"`
278
	CircuitBreakerForceClosed                        bool   `json:"propertyValue_circuitBreakerForceClosed"`
279
	CircuitBreakerEnabled                            bool   `json:"propertyValue_circuitBreakerEnabled"`
280
	ExecutionIsolationStrategy                       string `json:"propertyValue_executionIsolationStrategy"`
281
	ExecutionIsolationThreadTimeout                  uint32 `json:"propertyValue_executionIsolationThreadTimeoutInMilliseconds"`
282
	ExecutionIsolationThreadInterruptOnTimeout       bool   `json:"propertyValue_executionIsolationThreadInterruptOnTimeout"`
283
	ExecutionIsolationThreadPoolKeyOverride          string `json:"propertyValue_executionIsolationThreadPoolKeyOverride"`
284
	ExecutionIsolationSemaphoreMaxConcurrentRequests uint32 `json:"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests"`
285
	FallbackIsolationSemaphoreMaxConcurrentRequests  uint32 `json:"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests"`
286
	RollingStatsWindow                               uint32 `json:"propertyValue_metricsRollingStatisticalWindowInMilliseconds"`
287
	RequestCacheEnabled                              bool   `json:"propertyValue_requestCacheEnabled"`
288
	RequestLogEnabled                                bool   `json:"propertyValue_requestLogEnabled"`
289
}
290

291
type streamCmdLatency struct {
292
	Timing0   uint32 `json:"0"`
293
	Timing25  uint32 `json:"25"`
294
	Timing50  uint32 `json:"50"`
295
	Timing75  uint32 `json:"75"`
296
	Timing90  uint32 `json:"90"`
297
	Timing95  uint32 `json:"95"`
298
	Timing99  uint32 `json:"99"`
299
	Timing995 uint32 `json:"99.5"`
300
	Timing100 uint32 `json:"100"`
301
}
302

303
type streamThreadPoolMetric struct {
304
	Type           string `json:"type"`
305
	Name           string `json:"name"`
306
	ReportingHosts uint32 `json:"reportingHosts"`
307

308
	CurrentActiveCount        uint32 `json:"currentActiveCount"`
309
	CurrentCompletedTaskCount uint32 `json:"currentCompletedTaskCount"`
310
	CurrentCorePoolSize       uint32 `json:"currentCorePoolSize"`
311
	CurrentLargestPoolSize    uint32 `json:"currentLargestPoolSize"`
312
	CurrentMaximumPoolSize    uint32 `json:"currentMaximumPoolSize"`
313
	CurrentPoolSize           uint32 `json:"currentPoolSize"`
314
	CurrentQueueSize          uint32 `json:"currentQueueSize"`
315
	CurrentTaskCount          uint32 `json:"currentTaskCount"`
316

317
	RollingMaxActiveThreads     uint32 `json:"rollingMaxActiveThreads"`
318
	RollingCountThreadsExecuted uint32 `json:"rollingCountThreadsExecuted"`
319

320
	RollingStatsWindow          uint32 `json:"propertyValue_metricsRollingStatisticalWindowInMilliseconds"`
321
	QueueSizeRejectionThreshold uint32 `json:"propertyValue_queueSizeRejectionThreshold"`
322
}
323

324
func currentTime() int64 {
325
	return time.Now().UnixNano() / int64(1000000)
326
}
327

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.