12
"gopkg.in/alexcesaro/statsd.v2"
18
bufferThreshold = 786432
20
flushInterval = 10 * time.Second
24
ctCount callType = iota
31
type delayedCall struct {
37
type metricsWorker struct {
38
caches []*metricsCache
39
clients []*statsd.Client
41
queue chan delayedCall
46
// simple map-based counter for test purposes only
47
counter map[string]int64
58
func newMetricsWorker() (*metricsWorker, error) {
62
options []statsd.Option
65
options = []statsd.Option{
66
statsd.Address(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)),
67
statsd.FlushPeriod(1 * time.Second),
68
statsd.Prefix(cfg.Prefix),
69
statsd.Network("udp"),
70
statsd.Mute(!cfg.Enabled),
72
threads = cfg.Limits.ThreadsQty
74
caches := make([]*metricsCache, threads)
75
for i := 0; i < threads; i++ {
76
caches[i] = newMetricsCache()
79
clients := make([]*statsd.Client, connectionsQty)
80
for i := 0; i < connectionsQty; i++ {
81
clients[i], err = statsd.New(options...)
87
result := &metricsWorker{
90
locks: make([]sync.Mutex, threads),
91
queue: make(chan delayedCall, bufferSize),
92
rate: cfg.Limits.AcceptRate,
94
counter: make(map[string]int64),
95
sync: make(chan bool, 1),
100
func (worker *metricsWorker) addCall(delayedCall *delayedCall) {
101
worker.queue <- *delayedCall
108
func (worker *metricsWorker) consumer(id int) error {
120
case <-worker.tomb.Dying():
122
case call = <-worker.queue:
126
skip = !cfg.Enabled || (worker.rate != 1 && worker.rate < rand.Float64())
129
worker.counter[call.bucket] += call.value
135
// drop metrics if queue is about to overflow
136
queueSize = len(worker.queue)
137
if queueSize > bufferThreshold {
142
worker.locks[id].Lock()
143
cache = worker.caches[id]
145
// record self metrics
146
cache.queueSize.meter.Update(int64(queueSize))
148
cache.droppedCalls.meter.Update(dropped)
152
// record requested metric
153
pair = cache.getOrCreate(call.bucket)
154
switch call.callType {
156
pair.meter.Update(call.value)
159
pair.histogram.Update(call.value)
161
worker.locks[id].Unlock()
165
func (worker *metricsWorker) flush() {
166
started := time.Now()
168
// swap current metric caches with new ones (which are empty)
169
caches := make([]*metricsCache, worker.threads)
170
for i := 0; i < worker.threads; i++ {
171
caches[i] = newMetricsCache()
172
worker.locks[i].Lock()
173
caches[i], worker.caches[i] = worker.caches[i], caches[i]
174
worker.locks[i].Unlock()
177
// aggregate metric caches
178
total := make(map[string]*sample)
179
for i := 0; i < worker.threads; i++ {
180
for name, pair := range caches[i].data {
181
meterCount := pair.meter.Count()
182
histogramCount := pair.histogram.Count()
183
if meterCount == 0 && histogramCount == 0 {
186
histogramValues := pair.histogram.Sample().Values()
188
data, ok := total[name]
190
data = &sample{name: name}
194
data.countM += meterCount
195
data.countH += histogramCount
196
data.values = append(data.values, histogramValues...)
200
// emit aggregated data
201
source := make(chan *delayedCall, connectionsQty)
202
wg := sync.WaitGroup{}
203
wg.Add(connectionsQty)
204
for i := 0; i < connectionsQty; i++ {
205
go func(id int, wg *sync.WaitGroup) {
208
client := worker.clients[id]
209
for data := range source {
210
switch data.callType {
212
client.Count(data.bucket, data.value)
214
client.Timing(data.bucket, data.value)
220
for bucket, data := range total {
221
if data.countM > 0 { // counter is emitted as is
222
source <- &delayedCall{
228
if data.countH > 0 { //
229
for _, value := range data.values { // each value of histogram's sample is emitted separately
230
source <- &delayedCall{
231
callType: ctHistogram,
237
// histogram keeps only limited buffer of events, but its count is real
238
// so if the buffer has been truncated, it is needed to emit increasing `.count` suffix
239
if data.countH != int64(len(data.values)) {
240
source <- &delayedCall{
242
bucket: bucket + ".count",
251
worker.clients[0].Timing(sdFlushTime, int64(time.Since(started)))
252
worker.counter = make(map[string]int64)
255
func (worker *metricsWorker) flushTicker() error {
256
if cfg.IsTest { // there is no periodical flushing in testing mode
260
ticker := time.NewTicker(flushInterval)
263
case <-worker.tomb.Dying():
271
func (worker *metricsWorker) getCount(bucket string) int64 {
272
return worker.counter[bucket]
275
func (worker *metricsWorker) lifeCycle() {
279
ch := make(chan os.Signal, 1)
280
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
284
func (worker *metricsWorker) start() {
285
for i := 0; i < worker.threads; i++ {
287
worker.tomb.Go(func() error {
288
return worker.consumer(id)
292
worker.tomb.Go(worker.flushTicker)
295
func (worker *metricsWorker) stop() {
296
worker.tomb.Kill(nil)
298
_ = worker.tomb.Wait()