moira

Форк
0
/
worker.go 
299 строк · 6.0 Кб
1
package metrics
2

3
import (
4
	"fmt"
5
	"math/rand"
6
	"os"
7
	"os/signal"
8
	"sync"
9
	"syscall"
10
	"time"
11

12
	"gopkg.in/alexcesaro/statsd.v2"
13
	"gopkg.in/tomb.v2"
14
)
15

16
const (
17
	bufferSize      = 1048576
18
	bufferThreshold = 786432
19
	connectionsQty  = 16
20
	flushInterval   = 10 * time.Second
21
)
22

23
const (
24
	ctCount callType = iota
25
	ctHistogram
26
	ctTiming
27
)
28

29
type callType int
30

31
type delayedCall struct {
32
	callType callType
33
	bucket   string
34
	value    int64
35
}
36

37
type metricsWorker struct {
38
	caches  []*metricsCache
39
	clients []*statsd.Client
40
	locks   []sync.Mutex
41
	queue   chan delayedCall
42
	rate    float64
43
	threads int
44
	tomb    tomb.Tomb
45

46
	// simple map-based counter for test purposes only
47
	counter map[string]int64
48
	sync    chan bool
49
}
50

51
type sample struct {
52
	name   string
53
	countM int64
54
	countH int64
55
	values []int64
56
}
57

58
func newMetricsWorker() (*metricsWorker, error) {
59
	var (
60
		err     error
61
		threads int
62
		options []statsd.Option
63
	)
64

65
	options = []statsd.Option{
66
		statsd.Address(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)),
67
		statsd.FlushPeriod(1 * time.Second),
68
		statsd.Prefix(cfg.Prefix),
69
		statsd.Network("udp"),
70
		statsd.Mute(!cfg.Enabled),
71
	}
72
	threads = cfg.Limits.ThreadsQty
73

74
	caches := make([]*metricsCache, threads)
75
	for i := 0; i < threads; i++ {
76
		caches[i] = newMetricsCache()
77
	}
78

79
	clients := make([]*statsd.Client, connectionsQty)
80
	for i := 0; i < connectionsQty; i++ {
81
		clients[i], err = statsd.New(options...)
82
		if err != nil {
83
			return nil, err
84
		}
85
	}
86

87
	result := &metricsWorker{
88
		caches:  caches,
89
		clients: clients,
90
		locks:   make([]sync.Mutex, threads),
91
		queue:   make(chan delayedCall, bufferSize),
92
		rate:    cfg.Limits.AcceptRate,
93
		threads: threads,
94
		counter: make(map[string]int64),
95
		sync:    make(chan bool, 1),
96
	}
97
	return result, nil
98
}
99

100
func (worker *metricsWorker) addCall(delayedCall *delayedCall) {
101
	worker.queue <- *delayedCall
102

103
	if cfg.IsTest {
104
		<-worker.sync
105
	}
106
}
107

108
func (worker *metricsWorker) consumer(id int) error {
109
	var (
110
		call      delayedCall
111
		cache     *metricsCache
112
		pair      pair
113
		dropped   int64
114
		queueSize int
115
		skip      bool
116
	)
117

118
	for {
119
		select {
120
		case <-worker.tomb.Dying():
121
			return nil
122
		case call = <-worker.queue:
123
			// pass
124
		}
125

126
		skip = !cfg.Enabled || (worker.rate != 1 && worker.rate < rand.Float64())
127
		if skip {
128
			if cfg.IsTest {
129
				worker.counter[call.bucket] += call.value
130
				worker.sync <- true
131
			}
132
			continue
133
		}
134

135
		// drop metrics if queue is about to overflow
136
		queueSize = len(worker.queue)
137
		if queueSize > bufferThreshold {
138
			dropped++
139
			continue
140
		}
141

142
		worker.locks[id].Lock()
143
		cache = worker.caches[id]
144

145
		// record self metrics
146
		cache.queueSize.meter.Update(int64(queueSize))
147
		if dropped > 0 {
148
			cache.droppedCalls.meter.Update(dropped)
149
		}
150
		dropped = 0
151

152
		// record requested metric
153
		pair = cache.getOrCreate(call.bucket)
154
		switch call.callType {
155
		case ctCount:
156
			pair.meter.Update(call.value)
157
		case ctHistogram:
158
		case ctTiming:
159
			pair.histogram.Update(call.value)
160
		}
161
		worker.locks[id].Unlock()
162
	}
163
}
164

165
func (worker *metricsWorker) flush() {
166
	started := time.Now()
167

168
	// swap current metric caches with new ones (which are empty)
169
	caches := make([]*metricsCache, worker.threads)
170
	for i := 0; i < worker.threads; i++ {
171
		caches[i] = newMetricsCache()
172
		worker.locks[i].Lock()
173
		caches[i], worker.caches[i] = worker.caches[i], caches[i]
174
		worker.locks[i].Unlock()
175
	}
176

177
	// aggregate metric caches
178
	total := make(map[string]*sample)
179
	for i := 0; i < worker.threads; i++ {
180
		for name, pair := range caches[i].data {
181
			meterCount := pair.meter.Count()
182
			histogramCount := pair.histogram.Count()
183
			if meterCount == 0 && histogramCount == 0 {
184
				continue
185
			}
186
			histogramValues := pair.histogram.Sample().Values()
187

188
			data, ok := total[name]
189
			if !ok {
190
				data = &sample{name: name}
191
				total[name] = data
192
			}
193

194
			data.countM += meterCount
195
			data.countH += histogramCount
196
			data.values = append(data.values, histogramValues...)
197
		}
198
	}
199

200
	// emit aggregated data
201
	source := make(chan *delayedCall, connectionsQty)
202
	wg := sync.WaitGroup{}
203
	wg.Add(connectionsQty)
204
	for i := 0; i < connectionsQty; i++ {
205
		go func(id int, wg *sync.WaitGroup) {
206
			defer wg.Done()
207

208
			client := worker.clients[id]
209
			for data := range source {
210
				switch data.callType {
211
				case ctCount:
212
					client.Count(data.bucket, data.value)
213
				case ctHistogram:
214
					client.Timing(data.bucket, data.value)
215
				}
216
			}
217
		}(i, &wg)
218
	}
219

220
	for bucket, data := range total {
221
		if data.countM > 0 { // counter is emitted as is
222
			source <- &delayedCall{
223
				callType: ctCount,
224
				bucket:   bucket,
225
				value:    data.countM,
226
			}
227
		}
228
		if data.countH > 0 { //
229
			for _, value := range data.values { // each value of histogram's sample is emitted separately
230
				source <- &delayedCall{
231
					callType: ctHistogram,
232
					bucket:   bucket,
233
					value:    value,
234
				}
235
			}
236

237
			// histogram keeps only limited buffer of events, but its count is real
238
			// so if the buffer has been truncated, it is needed to emit increasing `.count` suffix
239
			if data.countH != int64(len(data.values)) {
240
				source <- &delayedCall{
241
					callType: ctCount,
242
					bucket:   bucket + ".count",
243
					value:    data.countH,
244
				}
245
			}
246
		}
247
	}
248
	close(source)
249
	wg.Wait()
250

251
	worker.clients[0].Timing(sdFlushTime, int64(time.Since(started)))
252
	worker.counter = make(map[string]int64)
253
}
254

255
func (worker *metricsWorker) flushTicker() error {
256
	if cfg.IsTest { // there is no periodical flushing in testing mode
257
		return nil
258
	}
259

260
	ticker := time.NewTicker(flushInterval)
261
	for {
262
		select {
263
		case <-worker.tomb.Dying():
264
			return nil
265
		case <-ticker.C:
266
			worker.flush()
267
		}
268
	}
269
}
270

271
func (worker *metricsWorker) getCount(bucket string) int64 {
272
	return worker.counter[bucket]
273
}
274

275
func (worker *metricsWorker) lifeCycle() {
276
	worker.start()
277
	defer worker.stop()
278

279
	ch := make(chan os.Signal, 1)
280
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
281
	<-ch
282
}
283

284
func (worker *metricsWorker) start() {
285
	for i := 0; i < worker.threads; i++ {
286
		func(id int) {
287
			worker.tomb.Go(func() error {
288
				return worker.consumer(id)
289
			})
290
		}(i)
291
	}
292
	worker.tomb.Go(worker.flushTicker)
293
}
294

295
func (worker *metricsWorker) stop() {
296
	worker.tomb.Kill(nil)
297
	worker.flush()
298
	_ = worker.tomb.Wait()
299
}
300

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.