cubefs
326 строк · 10.9 Кб
1package hystrix2
3import (4"bytes"5"encoding/json"6"net/http"7"sync"8"time"9
10"github.com/afex/hystrix-go/hystrix/rolling"11)
12
13const (14streamEventBufferSize = 1015)
16
17// NewStreamHandler returns a server capable of exposing dashboard metrics via HTTP.
18func NewStreamHandler() *StreamHandler {19return &StreamHandler{}20}
21
22// StreamHandler publishes metrics for each command and each pool once a second to all connected HTTP client.
23type StreamHandler struct {24requests map[*http.Request]chan []byte25mu sync.RWMutex26done chan struct{}27}
28
29// Start begins watching the in-memory circuit breakers for metrics
30func (sh *StreamHandler) Start() {31sh.requests = make(map[*http.Request]chan []byte)32sh.done = make(chan struct{})33go sh.loop()34}
35
36// Stop shuts down the metric collection routine
37func (sh *StreamHandler) Stop() {38close(sh.done)39}
40
41var _ http.Handler = (*StreamHandler)(nil)42
43func (sh *StreamHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {44// Make sure that the writer supports flushing.45f, ok := rw.(http.Flusher)46if !ok {47http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)48return49}50events := sh.register(req)51defer sh.unregister(req)52
53notify := rw.(http.CloseNotifier).CloseNotify()54
55rw.Header().Add("Content-Type", "text/event-stream")56rw.Header().Set("Cache-Control", "no-cache")57rw.Header().Set("Connection", "keep-alive")58for {59select {60case <-notify:61// client is gone62return63case event := <-events:64_, err := rw.Write(event)65if err != nil {66return67}68f.Flush()69}70}71}
72
73func (sh *StreamHandler) loop() {74tick := time.Tick(1 * time.Second)75for {76select {77case <-tick:78circuitBreakersMutex.RLock()79for _, cb := range circuitBreakers {80sh.publishMetrics(cb)81sh.publishThreadPools(cb.executorPool)82}83circuitBreakersMutex.RUnlock()84case <-sh.done:85return86}87}88}
89
90func (sh *StreamHandler) publishMetrics(cb *CircuitBreaker) error {91now := time.Now()92reqCount := cb.metrics.Requests().Sum(now)93errCount := cb.metrics.DefaultCollector().Errors().Sum(now)94errPct := cb.metrics.ErrorPercent(now)95
96eventBytes, err := json.Marshal(&streamCmdMetric{97Type: "HystrixCommand",98Name: cb.Name,99Group: cb.Name,100Time: currentTime(),101ReportingHosts: 1,102
103RequestCount: uint32(reqCount),104ErrorCount: uint32(errCount),105ErrorPct: uint32(errPct),106CircuitBreakerOpen: cb.IsOpen(),107
108RollingCountSuccess: uint32(cb.metrics.DefaultCollector().Successes().Sum(now)),109RollingCountFailure: uint32(cb.metrics.DefaultCollector().Failures().Sum(now)),110RollingCountThreadPoolRejected: uint32(cb.metrics.DefaultCollector().Rejects().Sum(now)),111RollingCountShortCircuited: uint32(cb.metrics.DefaultCollector().ShortCircuits().Sum(now)),112RollingCountTimeout: uint32(cb.metrics.DefaultCollector().Timeouts().Sum(now)),113RollingCountFallbackSuccess: uint32(cb.metrics.DefaultCollector().FallbackSuccesses().Sum(now)),114RollingCountFallbackFailure: uint32(cb.metrics.DefaultCollector().FallbackFailures().Sum(now)),115
116LatencyTotal: generateLatencyTimings(cb.metrics.DefaultCollector().TotalDuration()),117LatencyTotalMean: cb.metrics.DefaultCollector().TotalDuration().Mean(),118LatencyExecute: generateLatencyTimings(cb.metrics.DefaultCollector().RunDuration()),119LatencyExecuteMean: cb.metrics.DefaultCollector().RunDuration().Mean(),120
121// TODO: all hard-coded values should become configurable settings, per circuit122
123RollingStatsWindow: 10000,124ExecutionIsolationStrategy: "THREAD",125
126CircuitBreakerEnabled: true,127CircuitBreakerForceClosed: false,128CircuitBreakerForceOpen: cb.forceOpen,129CircuitBreakerErrorThresholdPercent: uint32(getSettings(cb.Name).ErrorPercentThreshold),130CircuitBreakerSleepWindow: uint32(getSettings(cb.Name).SleepWindow.Seconds() * 1000),131CircuitBreakerRequestVolumeThreshold: uint32(getSettings(cb.Name).RequestVolumeThreshold),132})133if err != nil {134return err135}136err = sh.writeToRequests(eventBytes)137if err != nil {138return err139}140
141return nil142}
143
144func (sh *StreamHandler) publishThreadPools(pool *executorPool) error {145now := time.Now()146
147eventBytes, err := json.Marshal(&streamThreadPoolMetric{148Type: "HystrixThreadPool",149Name: pool.Name,150ReportingHosts: 1,151
152CurrentActiveCount: uint32(pool.ActiveCount()),153CurrentTaskCount: 0,154CurrentCompletedTaskCount: 0,155
156RollingCountThreadsExecuted: uint32(pool.Metrics.Executed.Sum(now)),157RollingMaxActiveThreads: uint32(pool.Metrics.MaxActiveRequests.Max(now)),158
159CurrentPoolSize: uint32(pool.Max),160CurrentCorePoolSize: uint32(pool.Max),161CurrentLargestPoolSize: uint32(pool.Max),162CurrentMaximumPoolSize: uint32(pool.Max),163
164RollingStatsWindow: 10000,165QueueSizeRejectionThreshold: 0,166CurrentQueueSize: 0,167})168if err != nil {169return err170}171err = sh.writeToRequests(eventBytes)172
173return nil174}
175
176func (sh *StreamHandler) writeToRequests(eventBytes []byte) error {177var b bytes.Buffer178_, err := b.Write([]byte("data:"))179if err != nil {180return err181}182
183_, err = b.Write(eventBytes)184if err != nil {185return err186}187_, err = b.Write([]byte("\n\n"))188if err != nil {189return err190}191dataBytes := b.Bytes()192sh.mu.RLock()193
194for _, requestEvents := range sh.requests {195select {196case requestEvents <- dataBytes:197default:198}199}200sh.mu.RUnlock()201
202return nil203}
204
205func (sh *StreamHandler) register(req *http.Request) <-chan []byte {206sh.mu.RLock()207events, ok := sh.requests[req]208sh.mu.RUnlock()209if ok {210return events211}212
213events = make(chan []byte, streamEventBufferSize)214sh.mu.Lock()215sh.requests[req] = events216sh.mu.Unlock()217return events218}
219
220func (sh *StreamHandler) unregister(req *http.Request) {221sh.mu.Lock()222delete(sh.requests, req)223sh.mu.Unlock()224}
225
226func generateLatencyTimings(r *rolling.Timing) streamCmdLatency {227return streamCmdLatency{228Timing0: r.Percentile(0),229Timing25: r.Percentile(25),230Timing50: r.Percentile(50),231Timing75: r.Percentile(75),232Timing90: r.Percentile(90),233Timing95: r.Percentile(95),234Timing99: r.Percentile(99),235Timing995: r.Percentile(99.5),236Timing100: r.Percentile(100),237}238}
239
240type streamCmdMetric struct {241Type string `json:"type"`242Name string `json:"name"`243Group string `json:"group"`244Time int64 `json:"currentTime"`245ReportingHosts uint32 `json:"reportingHosts"`246
247// Health248RequestCount uint32 `json:"requestCount"`249ErrorCount uint32 `json:"errorCount"`250ErrorPct uint32 `json:"errorPercentage"`251CircuitBreakerOpen bool `json:"isCircuitBreakerOpen"`252
253RollingCountCollapsedRequests uint32 `json:"rollingCountCollapsedRequests"`254RollingCountExceptionsThrown uint32 `json:"rollingCountExceptionsThrown"`255RollingCountFailure uint32 `json:"rollingCountFailure"`256RollingCountFallbackFailure uint32 `json:"rollingCountFallbackFailure"`257RollingCountFallbackRejection uint32 `json:"rollingCountFallbackRejection"`258RollingCountFallbackSuccess uint32 `json:"rollingCountFallbackSuccess"`259RollingCountResponsesFromCache uint32 `json:"rollingCountResponsesFromCache"`260RollingCountSemaphoreRejected uint32 `json:"rollingCountSemaphoreRejected"`261RollingCountShortCircuited uint32 `json:"rollingCountShortCircuited"`262RollingCountSuccess uint32 `json:"rollingCountSuccess"`263RollingCountThreadPoolRejected uint32 `json:"rollingCountThreadPoolRejected"`264RollingCountTimeout uint32 `json:"rollingCountTimeout"`265
266CurrentConcurrentExecutionCount uint32 `json:"currentConcurrentExecutionCount"`267
268LatencyExecuteMean uint32 `json:"latencyExecute_mean"`269LatencyExecute streamCmdLatency `json:"latencyExecute"`270LatencyTotalMean uint32 `json:"latencyTotal_mean"`271LatencyTotal streamCmdLatency `json:"latencyTotal"`272
273// Properties274CircuitBreakerRequestVolumeThreshold uint32 `json:"propertyValue_circuitBreakerRequestVolumeThreshold"`275CircuitBreakerSleepWindow uint32 `json:"propertyValue_circuitBreakerSleepWindowInMilliseconds"`276CircuitBreakerErrorThresholdPercent uint32 `json:"propertyValue_circuitBreakerErrorThresholdPercentage"`277CircuitBreakerForceOpen bool `json:"propertyValue_circuitBreakerForceOpen"`278CircuitBreakerForceClosed bool `json:"propertyValue_circuitBreakerForceClosed"`279CircuitBreakerEnabled bool `json:"propertyValue_circuitBreakerEnabled"`280ExecutionIsolationStrategy string `json:"propertyValue_executionIsolationStrategy"`281ExecutionIsolationThreadTimeout uint32 `json:"propertyValue_executionIsolationThreadTimeoutInMilliseconds"`282ExecutionIsolationThreadInterruptOnTimeout bool `json:"propertyValue_executionIsolationThreadInterruptOnTimeout"`283ExecutionIsolationThreadPoolKeyOverride string `json:"propertyValue_executionIsolationThreadPoolKeyOverride"`284ExecutionIsolationSemaphoreMaxConcurrentRequests uint32 `json:"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests"`285FallbackIsolationSemaphoreMaxConcurrentRequests uint32 `json:"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests"`286RollingStatsWindow uint32 `json:"propertyValue_metricsRollingStatisticalWindowInMilliseconds"`287RequestCacheEnabled bool `json:"propertyValue_requestCacheEnabled"`288RequestLogEnabled bool `json:"propertyValue_requestLogEnabled"`289}
290
291type streamCmdLatency struct {292Timing0 uint32 `json:"0"`293Timing25 uint32 `json:"25"`294Timing50 uint32 `json:"50"`295Timing75 uint32 `json:"75"`296Timing90 uint32 `json:"90"`297Timing95 uint32 `json:"95"`298Timing99 uint32 `json:"99"`299Timing995 uint32 `json:"99.5"`300Timing100 uint32 `json:"100"`301}
302
303type streamThreadPoolMetric struct {304Type string `json:"type"`305Name string `json:"name"`306ReportingHosts uint32 `json:"reportingHosts"`307
308CurrentActiveCount uint32 `json:"currentActiveCount"`309CurrentCompletedTaskCount uint32 `json:"currentCompletedTaskCount"`310CurrentCorePoolSize uint32 `json:"currentCorePoolSize"`311CurrentLargestPoolSize uint32 `json:"currentLargestPoolSize"`312CurrentMaximumPoolSize uint32 `json:"currentMaximumPoolSize"`313CurrentPoolSize uint32 `json:"currentPoolSize"`314CurrentQueueSize uint32 `json:"currentQueueSize"`315CurrentTaskCount uint32 `json:"currentTaskCount"`316
317RollingMaxActiveThreads uint32 `json:"rollingMaxActiveThreads"`318RollingCountThreadsExecuted uint32 `json:"rollingCountThreadsExecuted"`319
320RollingStatsWindow uint32 `json:"propertyValue_metricsRollingStatisticalWindowInMilliseconds"`321QueueSizeRejectionThreshold uint32 `json:"propertyValue_queueSizeRejectionThreshold"`322}
323
324func currentTime() int64 {325return time.Now().UnixNano() / int64(1000000)326}
327