prometheus
3778 строк · 108.6 Кб
1// Copyright 2016 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package scrape
15
16import (
17"bytes"
18"compress/gzip"
19"context"
20"encoding/binary"
21"errors"
22"fmt"
23"io"
24"math"
25"net/http"
26"net/http/httptest"
27"net/url"
28"strconv"
29"strings"
30"sync"
31"testing"
32"time"
33
34"github.com/go-kit/log"
35"github.com/gogo/protobuf/proto"
36"github.com/google/go-cmp/cmp"
37"github.com/prometheus/client_golang/prometheus"
38dto "github.com/prometheus/client_model/go"
39config_util "github.com/prometheus/common/config"
40"github.com/prometheus/common/model"
41"github.com/stretchr/testify/require"
42
43"github.com/prometheus/prometheus/config"
44"github.com/prometheus/prometheus/discovery"
45"github.com/prometheus/prometheus/discovery/targetgroup"
46"github.com/prometheus/prometheus/model/exemplar"
47"github.com/prometheus/prometheus/model/histogram"
48"github.com/prometheus/prometheus/model/labels"
49"github.com/prometheus/prometheus/model/relabel"
50"github.com/prometheus/prometheus/model/textparse"
51"github.com/prometheus/prometheus/model/timestamp"
52"github.com/prometheus/prometheus/model/value"
53"github.com/prometheus/prometheus/storage"
54"github.com/prometheus/prometheus/tsdb/chunkenc"
55"github.com/prometheus/prometheus/util/pool"
56"github.com/prometheus/prometheus/util/teststorage"
57"github.com/prometheus/prometheus/util/testutil"
58)
59
60func TestMain(m *testing.M) {
61testutil.TolerantVerifyLeak(m)
62}
63
64func newTestScrapeMetrics(t testing.TB) *scrapeMetrics {
65reg := prometheus.NewRegistry()
66metrics, err := newScrapeMetrics(reg)
67require.NoError(t, err)
68return metrics
69}
70
71func TestNewScrapePool(t *testing.T) {
72var (
73app = &nopAppendable{}
74cfg = &config.ScrapeConfig{}
75sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
76)
77
78a, ok := sp.appendable.(*nopAppendable)
79require.True(t, ok, "Failure to append.")
80require.Equal(t, app, a, "Wrong sample appender.")
81require.Equal(t, cfg, sp.config, "Wrong scrape config.")
82require.NotNil(t, sp.newLoop, "newLoop function not initialized.")
83}
84
85func TestDroppedTargetsList(t *testing.T) {
86var (
87app = &nopAppendable{}
88cfg = &config.ScrapeConfig{
89JobName: "dropMe",
90ScrapeInterval: model.Duration(1),
91RelabelConfigs: []*relabel.Config{
92{
93Action: relabel.Drop,
94Regex: relabel.MustNewRegexp("dropMe"),
95SourceLabels: model.LabelNames{"job"},
96},
97},
98}
99tgs = []*targetgroup.Group{
100{
101Targets: []model.LabelSet{
102{model.AddressLabel: "127.0.0.1:9090"},
103{model.AddressLabel: "127.0.0.1:9091"},
104},
105},
106}
107sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
108expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
109expectedLength = 2
110)
111sp.Sync(tgs)
112sp.Sync(tgs)
113require.Len(t, sp.droppedTargets, expectedLength)
114require.Equal(t, expectedLength, sp.droppedTargetsCount)
115require.Equal(t, expectedLabelSetString, sp.droppedTargets[0].DiscoveredLabels().String())
116
117// Check that count is still correct when we don't retain all dropped targets.
118sp.config.KeepDroppedTargets = 1
119sp.Sync(tgs)
120require.Len(t, sp.droppedTargets, 1)
121require.Equal(t, expectedLength, sp.droppedTargetsCount)
122}
123
124// TestDiscoveredLabelsUpdate checks that DiscoveredLabels are updated
125// even when new labels don't affect the target `hash`.
126func TestDiscoveredLabelsUpdate(t *testing.T) {
127sp := &scrapePool{
128metrics: newTestScrapeMetrics(t),
129}
130
131// These are used when syncing so need this to avoid a panic.
132sp.config = &config.ScrapeConfig{
133ScrapeInterval: model.Duration(1),
134ScrapeTimeout: model.Duration(1),
135}
136sp.activeTargets = make(map[uint64]*Target)
137t1 := &Target{
138discoveredLabels: labels.FromStrings("label", "name"),
139}
140sp.activeTargets[t1.hash()] = t1
141
142t2 := &Target{
143discoveredLabels: labels.FromStrings("labelNew", "nameNew"),
144}
145sp.sync([]*Target{t2})
146
147require.Equal(t, t2.DiscoveredLabels(), sp.activeTargets[t1.hash()].DiscoveredLabels())
148}
149
150type testLoop struct {
151startFunc func(interval, timeout time.Duration, errc chan<- error)
152stopFunc func()
153forcedErr error
154forcedErrMtx sync.Mutex
155runOnce bool
156interval time.Duration
157timeout time.Duration
158}
159
160func (l *testLoop) run(errc chan<- error) {
161if l.runOnce {
162panic("loop must be started only once")
163}
164l.runOnce = true
165l.startFunc(l.interval, l.timeout, errc)
166}
167
168func (l *testLoop) disableEndOfRunStalenessMarkers() {
169}
170
171func (l *testLoop) setForcedError(err error) {
172l.forcedErrMtx.Lock()
173defer l.forcedErrMtx.Unlock()
174l.forcedErr = err
175}
176
177func (l *testLoop) getForcedError() error {
178l.forcedErrMtx.Lock()
179defer l.forcedErrMtx.Unlock()
180return l.forcedErr
181}
182
183func (l *testLoop) stop() {
184l.stopFunc()
185}
186
187func (l *testLoop) getCache() *scrapeCache {
188return nil
189}
190
191func TestScrapePoolStop(t *testing.T) {
192sp := &scrapePool{
193activeTargets: map[uint64]*Target{},
194loops: map[uint64]loop{},
195cancel: func() {},
196client: http.DefaultClient,
197metrics: newTestScrapeMetrics(t),
198}
199var mtx sync.Mutex
200stopped := map[uint64]bool{}
201numTargets := 20
202
203// Stopping the scrape pool must call stop() on all scrape loops,
204// clean them and the respective targets up. It must wait until each loop's
205// stop function returned before returning itself.
206
207for i := 0; i < numTargets; i++ {
208t := &Target{
209labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)),
210}
211l := &testLoop{}
212d := time.Duration((i+1)*20) * time.Millisecond
213l.stopFunc = func() {
214time.Sleep(d)
215
216mtx.Lock()
217stopped[t.hash()] = true
218mtx.Unlock()
219}
220
221sp.activeTargets[t.hash()] = t
222sp.loops[t.hash()] = l
223}
224
225done := make(chan struct{})
226stopTime := time.Now()
227
228go func() {
229sp.stop()
230close(done)
231}()
232
233select {
234case <-time.After(5 * time.Second):
235require.Fail(t, "scrapeLoop.stop() did not return as expected")
236case <-done:
237// This should have taken at least as long as the last target slept.
238require.GreaterOrEqual(t, time.Since(stopTime), time.Duration(numTargets*20)*time.Millisecond, "scrapeLoop.stop() exited before all targets stopped")
239}
240
241mtx.Lock()
242require.Len(t, stopped, numTargets, "Unexpected number of stopped loops")
243mtx.Unlock()
244
245require.Empty(t, sp.activeTargets, "Targets were not cleared on stopping: %d left", len(sp.activeTargets))
246require.Empty(t, sp.loops, "Loops were not cleared on stopping: %d left", len(sp.loops))
247}
248
249func TestScrapePoolReload(t *testing.T) {
250var mtx sync.Mutex
251numTargets := 20
252
253stopped := map[uint64]bool{}
254
255reloadCfg := &config.ScrapeConfig{
256ScrapeInterval: model.Duration(3 * time.Second),
257ScrapeTimeout: model.Duration(2 * time.Second),
258}
259// On starting to run, new loops created on reload check whether their preceding
260// equivalents have been stopped.
261newLoop := func(opts scrapeLoopOptions) loop {
262l := &testLoop{interval: time.Duration(reloadCfg.ScrapeInterval), timeout: time.Duration(reloadCfg.ScrapeTimeout)}
263l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
264require.Equal(t, 3*time.Second, interval, "Unexpected scrape interval")
265require.Equal(t, 2*time.Second, timeout, "Unexpected scrape timeout")
266
267mtx.Lock()
268targetScraper := opts.scraper.(*targetScraper)
269require.True(t, stopped[targetScraper.hash()], "Scrape loop for %v not stopped yet", targetScraper)
270mtx.Unlock()
271}
272return l
273}
274
275sp := &scrapePool{
276appendable: &nopAppendable{},
277activeTargets: map[uint64]*Target{},
278loops: map[uint64]loop{},
279newLoop: newLoop,
280logger: nil,
281client: http.DefaultClient,
282metrics: newTestScrapeMetrics(t),
283symbolTable: labels.NewSymbolTable(),
284}
285
286// Reloading a scrape pool with a new scrape configuration must stop all scrape
287// loops and start new ones. A new loop must not be started before the preceding
288// one terminated.
289
290for i := 0; i < numTargets; i++ {
291labels := labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i))
292t := &Target{
293labels: labels,
294discoveredLabels: labels,
295}
296l := &testLoop{}
297d := time.Duration((i+1)*20) * time.Millisecond
298l.stopFunc = func() {
299time.Sleep(d)
300
301mtx.Lock()
302stopped[t.hash()] = true
303mtx.Unlock()
304}
305
306sp.activeTargets[t.hash()] = t
307sp.loops[t.hash()] = l
308}
309done := make(chan struct{})
310
311beforeTargets := map[uint64]*Target{}
312for h, t := range sp.activeTargets {
313beforeTargets[h] = t
314}
315
316reloadTime := time.Now()
317
318go func() {
319sp.reload(reloadCfg)
320close(done)
321}()
322
323select {
324case <-time.After(5 * time.Second):
325require.FailNow(t, "scrapeLoop.reload() did not return as expected")
326case <-done:
327// This should have taken at least as long as the last target slept.
328require.GreaterOrEqual(t, time.Since(reloadTime), time.Duration(numTargets*20)*time.Millisecond, "scrapeLoop.stop() exited before all targets stopped")
329}
330
331mtx.Lock()
332require.Len(t, stopped, numTargets, "Unexpected number of stopped loops")
333mtx.Unlock()
334
335require.Equal(t, sp.activeTargets, beforeTargets, "Reloading affected target states unexpectedly")
336require.Len(t, sp.loops, numTargets, "Unexpected number of stopped loops after reload")
337}
338
339func TestScrapePoolReloadPreserveRelabeledIntervalTimeout(t *testing.T) {
340reloadCfg := &config.ScrapeConfig{
341ScrapeInterval: model.Duration(3 * time.Second),
342ScrapeTimeout: model.Duration(2 * time.Second),
343}
344newLoop := func(opts scrapeLoopOptions) loop {
345l := &testLoop{interval: opts.interval, timeout: opts.timeout}
346l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
347require.Equal(t, 5*time.Second, interval, "Unexpected scrape interval")
348require.Equal(t, 3*time.Second, timeout, "Unexpected scrape timeout")
349}
350return l
351}
352sp := &scrapePool{
353appendable: &nopAppendable{},
354activeTargets: map[uint64]*Target{
3551: {
356labels: labels.FromStrings(model.ScrapeIntervalLabel, "5s", model.ScrapeTimeoutLabel, "3s"),
357},
358},
359loops: map[uint64]loop{
3601: noopLoop(),
361},
362newLoop: newLoop,
363logger: nil,
364client: http.DefaultClient,
365metrics: newTestScrapeMetrics(t),
366symbolTable: labels.NewSymbolTable(),
367}
368
369err := sp.reload(reloadCfg)
370if err != nil {
371t.Fatalf("unable to reload configuration: %s", err)
372}
373}
374
375func TestScrapePoolTargetLimit(t *testing.T) {
376var wg sync.WaitGroup
377// On starting to run, new loops created on reload check whether their preceding
378// equivalents have been stopped.
379newLoop := func(opts scrapeLoopOptions) loop {
380wg.Add(1)
381l := &testLoop{
382startFunc: func(interval, timeout time.Duration, errc chan<- error) {
383wg.Done()
384},
385stopFunc: func() {},
386}
387return l
388}
389sp := &scrapePool{
390appendable: &nopAppendable{},
391activeTargets: map[uint64]*Target{},
392loops: map[uint64]loop{},
393newLoop: newLoop,
394logger: log.NewNopLogger(),
395client: http.DefaultClient,
396metrics: newTestScrapeMetrics(t),
397symbolTable: labels.NewSymbolTable(),
398}
399
400tgs := []*targetgroup.Group{}
401for i := 0; i < 50; i++ {
402tgs = append(tgs,
403&targetgroup.Group{
404Targets: []model.LabelSet{
405{model.AddressLabel: model.LabelValue(fmt.Sprintf("127.0.0.1:%d", 9090+i))},
406},
407},
408)
409}
410
411var limit uint
412reloadWithLimit := func(l uint) {
413limit = l
414require.NoError(t, sp.reload(&config.ScrapeConfig{
415ScrapeInterval: model.Duration(3 * time.Second),
416ScrapeTimeout: model.Duration(2 * time.Second),
417TargetLimit: l,
418}))
419}
420
421var targets int
422loadTargets := func(n int) {
423targets = n
424sp.Sync(tgs[:n])
425}
426
427validateIsRunning := func() {
428wg.Wait()
429for _, l := range sp.loops {
430require.True(t, l.(*testLoop).runOnce, "loop should be running")
431}
432}
433
434validateErrorMessage := func(shouldErr bool) {
435for _, l := range sp.loops {
436lerr := l.(*testLoop).getForcedError()
437if shouldErr {
438require.Error(t, lerr, "error was expected for %d targets with a limit of %d", targets, limit)
439require.Equal(t, fmt.Sprintf("target_limit exceeded (number of targets: %d, limit: %d)", targets, limit), lerr.Error())
440} else {
441require.NoError(t, lerr)
442}
443}
444}
445
446reloadWithLimit(0)
447loadTargets(50)
448validateIsRunning()
449
450// Simulate an initial config with a limit.
451sp.config.TargetLimit = 30
452limit = 30
453loadTargets(50)
454validateIsRunning()
455validateErrorMessage(true)
456
457reloadWithLimit(50)
458validateIsRunning()
459validateErrorMessage(false)
460
461reloadWithLimit(40)
462validateIsRunning()
463validateErrorMessage(true)
464
465loadTargets(30)
466validateIsRunning()
467validateErrorMessage(false)
468
469loadTargets(40)
470validateIsRunning()
471validateErrorMessage(false)
472
473loadTargets(41)
474validateIsRunning()
475validateErrorMessage(true)
476
477reloadWithLimit(0)
478validateIsRunning()
479validateErrorMessage(false)
480
481reloadWithLimit(51)
482validateIsRunning()
483validateErrorMessage(false)
484
485tgs = append(tgs,
486&targetgroup.Group{
487Targets: []model.LabelSet{
488{model.AddressLabel: model.LabelValue("127.0.0.1:1090")},
489},
490},
491&targetgroup.Group{
492Targets: []model.LabelSet{
493{model.AddressLabel: model.LabelValue("127.0.0.1:1090")},
494},
495},
496)
497
498sp.Sync(tgs)
499validateIsRunning()
500validateErrorMessage(false)
501}
502
503func TestScrapePoolAppender(t *testing.T) {
504cfg := &config.ScrapeConfig{}
505app := &nopAppendable{}
506sp, _ := newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
507
508loop := sp.newLoop(scrapeLoopOptions{
509target: &Target{},
510})
511appl, ok := loop.(*scrapeLoop)
512require.True(t, ok, "Expected scrapeLoop but got %T", loop)
513
514wrapped := appender(appl.appender(context.Background()), 0, 0, nativeHistogramMaxSchema)
515
516tl, ok := wrapped.(*timeLimitAppender)
517require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped)
518
519_, ok = tl.Appender.(nopAppender)
520require.True(t, ok, "Expected base appender but got %T", tl.Appender)
521
522sampleLimit := 100
523loop = sp.newLoop(scrapeLoopOptions{
524target: &Target{},
525sampleLimit: sampleLimit,
526})
527appl, ok = loop.(*scrapeLoop)
528require.True(t, ok, "Expected scrapeLoop but got %T", loop)
529
530wrapped = appender(appl.appender(context.Background()), sampleLimit, 0, nativeHistogramMaxSchema)
531
532sl, ok := wrapped.(*limitAppender)
533require.True(t, ok, "Expected limitAppender but got %T", wrapped)
534
535tl, ok = sl.Appender.(*timeLimitAppender)
536require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender)
537
538_, ok = tl.Appender.(nopAppender)
539require.True(t, ok, "Expected base appender but got %T", tl.Appender)
540
541wrapped = appender(appl.appender(context.Background()), sampleLimit, 100, nativeHistogramMaxSchema)
542
543bl, ok := wrapped.(*bucketLimitAppender)
544require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped)
545
546sl, ok = bl.Appender.(*limitAppender)
547require.True(t, ok, "Expected limitAppender but got %T", bl)
548
549tl, ok = sl.Appender.(*timeLimitAppender)
550require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender)
551
552_, ok = tl.Appender.(nopAppender)
553require.True(t, ok, "Expected base appender but got %T", tl.Appender)
554
555wrapped = appender(appl.appender(context.Background()), sampleLimit, 100, 0)
556
557ml, ok := wrapped.(*maxSchemaAppender)
558require.True(t, ok, "Expected maxSchemaAppender but got %T", wrapped)
559
560bl, ok = ml.Appender.(*bucketLimitAppender)
561require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped)
562
563sl, ok = bl.Appender.(*limitAppender)
564require.True(t, ok, "Expected limitAppender but got %T", bl)
565
566tl, ok = sl.Appender.(*timeLimitAppender)
567require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender)
568
569_, ok = tl.Appender.(nopAppender)
570require.True(t, ok, "Expected base appender but got %T", tl.Appender)
571}
572
573func TestScrapePoolRaces(t *testing.T) {
574interval, _ := model.ParseDuration("1s")
575timeout, _ := model.ParseDuration("500ms")
576newConfig := func() *config.ScrapeConfig {
577return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
578}
579sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
580tgts := []*targetgroup.Group{
581{
582Targets: []model.LabelSet{
583{model.AddressLabel: "127.0.0.1:9090"},
584{model.AddressLabel: "127.0.0.2:9090"},
585{model.AddressLabel: "127.0.0.3:9090"},
586{model.AddressLabel: "127.0.0.4:9090"},
587{model.AddressLabel: "127.0.0.5:9090"},
588{model.AddressLabel: "127.0.0.6:9090"},
589{model.AddressLabel: "127.0.0.7:9090"},
590{model.AddressLabel: "127.0.0.8:9090"},
591},
592},
593}
594
595sp.Sync(tgts)
596active := sp.ActiveTargets()
597dropped := sp.DroppedTargets()
598expectedActive, expectedDropped := len(tgts[0].Targets), 0
599
600require.Len(t, active, expectedActive, "Invalid number of active targets")
601require.Len(t, dropped, expectedDropped, "Invalid number of dropped targets")
602
603for i := 0; i < 20; i++ {
604time.Sleep(10 * time.Millisecond)
605sp.reload(newConfig())
606}
607sp.stop()
608}
609
610func TestScrapePoolScrapeLoopsStarted(t *testing.T) {
611var wg sync.WaitGroup
612newLoop := func(opts scrapeLoopOptions) loop {
613wg.Add(1)
614l := &testLoop{
615startFunc: func(interval, timeout time.Duration, errc chan<- error) {
616wg.Done()
617},
618stopFunc: func() {},
619}
620return l
621}
622sp := &scrapePool{
623appendable: &nopAppendable{},
624activeTargets: map[uint64]*Target{},
625loops: map[uint64]loop{},
626newLoop: newLoop,
627logger: nil,
628client: http.DefaultClient,
629metrics: newTestScrapeMetrics(t),
630symbolTable: labels.NewSymbolTable(),
631}
632
633tgs := []*targetgroup.Group{
634{
635Targets: []model.LabelSet{
636{model.AddressLabel: model.LabelValue("127.0.0.1:9090")},
637},
638},
639{
640Targets: []model.LabelSet{
641{model.AddressLabel: model.LabelValue("127.0.0.1:9090")},
642},
643},
644}
645
646require.NoError(t, sp.reload(&config.ScrapeConfig{
647ScrapeInterval: model.Duration(3 * time.Second),
648ScrapeTimeout: model.Duration(2 * time.Second),
649}))
650sp.Sync(tgs)
651
652require.Len(t, sp.loops, 1)
653
654wg.Wait()
655for _, l := range sp.loops {
656require.True(t, l.(*testLoop).runOnce, "loop should be running")
657}
658}
659
660func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration) *scrapeLoop {
661return newScrapeLoop(ctx,
662scraper,
663nil, nil,
664nopMutator,
665nopMutator,
666app,
667nil,
668labels.NewSymbolTable(),
6690,
670true,
671false,
672true,
6730, 0, nativeHistogramMaxSchema,
674nil,
675interval,
676time.Hour,
677false,
678false,
679false,
680false,
681false,
682nil,
683false,
684newTestScrapeMetrics(t),
685false,
686)
687}
688
689func TestScrapeLoopStopBeforeRun(t *testing.T) {
690scraper := &testScraper{}
691sl := newBasicScrapeLoop(t, context.Background(), scraper, nil, 1)
692
693// The scrape pool synchronizes on stopping scrape loops. However, new scrape
694// loops are started asynchronously. Thus it's possible, that a loop is stopped
695// again before having started properly.
696// Stopping not-yet-started loops must block until the run method was called and exited.
697// The run method must exit immediately.
698
699stopDone := make(chan struct{})
700go func() {
701sl.stop()
702close(stopDone)
703}()
704
705select {
706case <-stopDone:
707require.FailNow(t, "Stopping terminated before run exited successfully.")
708case <-time.After(500 * time.Millisecond):
709}
710
711// Running the scrape loop must exit before calling the scraper even once.
712scraper.scrapeFunc = func(context.Context, io.Writer) error {
713require.FailNow(t, "Scraper was called for terminated scrape loop.")
714return nil
715}
716
717runDone := make(chan struct{})
718go func() {
719sl.run(nil)
720close(runDone)
721}()
722
723select {
724case <-runDone:
725case <-time.After(1 * time.Second):
726require.FailNow(t, "Running terminated scrape loop did not exit.")
727}
728
729select {
730case <-stopDone:
731case <-time.After(1 * time.Second):
732require.FailNow(t, "Stopping did not terminate after running exited.")
733}
734}
735
736func nopMutator(l labels.Labels) labels.Labels { return l }
737
738func TestScrapeLoopStop(t *testing.T) {
739var (
740signal = make(chan struct{}, 1)
741appender = &collectResultAppender{}
742scraper = &testScraper{}
743app = func(ctx context.Context) storage.Appender { return appender }
744)
745
746sl := newBasicScrapeLoop(t, context.Background(), scraper, app, 10*time.Millisecond)
747
748// Terminate loop after 2 scrapes.
749numScrapes := 0
750
751scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
752numScrapes++
753if numScrapes == 2 {
754go sl.stop()
755<-sl.ctx.Done()
756}
757w.Write([]byte("metric_a 42\n"))
758return ctx.Err()
759}
760
761go func() {
762sl.run(nil)
763signal <- struct{}{}
764}()
765
766select {
767case <-signal:
768case <-time.After(5 * time.Second):
769require.FailNow(t, "Scrape wasn't stopped.")
770}
771
772// We expected 1 actual sample for each scrape plus 5 for report samples.
773// At least 2 scrapes were made, plus the final stale markers.
774require.GreaterOrEqual(t, len(appender.resultFloats), 6*3, "Expected at least 3 scrapes with 6 samples each.")
775require.Zero(t, len(appender.resultFloats)%6, "There is a scrape with missing samples.")
776// All samples in a scrape must have the same timestamp.
777var ts int64
778for i, s := range appender.resultFloats {
779switch {
780case i%6 == 0:
781ts = s.t
782case s.t != ts:
783t.Fatalf("Unexpected multiple timestamps within single scrape")
784}
785}
786// All samples from the last scrape must be stale markers.
787for _, s := range appender.resultFloats[len(appender.resultFloats)-5:] {
788require.True(t, value.IsStaleNaN(s.f), "Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.f))
789}
790}
791
792func TestScrapeLoopRun(t *testing.T) {
793var (
794signal = make(chan struct{}, 1)
795errc = make(chan error)
796
797scraper = &testScraper{}
798app = func(ctx context.Context) storage.Appender { return &nopAppender{} }
799scrapeMetrics = newTestScrapeMetrics(t)
800)
801
802ctx, cancel := context.WithCancel(context.Background())
803sl := newScrapeLoop(ctx,
804scraper,
805nil, nil,
806nopMutator,
807nopMutator,
808app,
809nil,
810nil,
8110,
812true,
813false,
814true,
8150, 0, nativeHistogramMaxSchema,
816nil,
817time.Second,
818time.Hour,
819false,
820false,
821false,
822false,
823false,
824nil,
825false,
826scrapeMetrics,
827false,
828)
829
830// The loop must terminate during the initial offset if the context
831// is canceled.
832scraper.offsetDur = time.Hour
833
834go func() {
835sl.run(errc)
836signal <- struct{}{}
837}()
838
839// Wait to make sure we are actually waiting on the offset.
840time.Sleep(1 * time.Second)
841
842cancel()
843select {
844case <-signal:
845case <-time.After(5 * time.Second):
846require.FailNow(t, "Cancellation during initial offset failed.")
847case err := <-errc:
848require.FailNow(t, "Unexpected error: %s", err)
849}
850
851// The provided timeout must cause cancellation of the context passed down to the
852// scraper. The scraper has to respect the context.
853scraper.offsetDur = 0
854
855block := make(chan struct{})
856scraper.scrapeFunc = func(ctx context.Context, _ io.Writer) error {
857select {
858case <-block:
859case <-ctx.Done():
860return ctx.Err()
861}
862return nil
863}
864
865ctx, cancel = context.WithCancel(context.Background())
866sl = newBasicScrapeLoop(t, ctx, scraper, app, time.Second)
867sl.timeout = 100 * time.Millisecond
868
869go func() {
870sl.run(errc)
871signal <- struct{}{}
872}()
873
874select {
875case err := <-errc:
876require.ErrorIs(t, err, context.DeadlineExceeded)
877case <-time.After(3 * time.Second):
878require.FailNow(t, "Expected timeout error but got none.")
879}
880
881// We already caught the timeout error and are certainly in the loop.
882// Let the scrapes returns immediately to cause no further timeout errors
883// and check whether canceling the parent context terminates the loop.
884close(block)
885cancel()
886
887select {
888case <-signal:
889// Loop terminated as expected.
890case err := <-errc:
891require.FailNow(t, "Unexpected error: %s", err)
892case <-time.After(3 * time.Second):
893require.FailNow(t, "Loop did not terminate on context cancellation")
894}
895}
896
897func TestScrapeLoopForcedErr(t *testing.T) {
898var (
899signal = make(chan struct{}, 1)
900errc = make(chan error)
901
902scraper = &testScraper{}
903app = func(ctx context.Context) storage.Appender { return &nopAppender{} }
904)
905
906ctx, cancel := context.WithCancel(context.Background())
907sl := newBasicScrapeLoop(t, ctx, scraper, app, time.Second)
908
909forcedErr := fmt.Errorf("forced err")
910sl.setForcedError(forcedErr)
911
912scraper.scrapeFunc = func(context.Context, io.Writer) error {
913require.FailNow(t, "Should not be scraped.")
914return nil
915}
916
917go func() {
918sl.run(errc)
919signal <- struct{}{}
920}()
921
922select {
923case err := <-errc:
924require.ErrorIs(t, err, forcedErr)
925case <-time.After(3 * time.Second):
926require.FailNow(t, "Expected forced error but got none.")
927}
928cancel()
929
930select {
931case <-signal:
932case <-time.After(5 * time.Second):
933require.FailNow(t, "Scrape not stopped.")
934}
935}
936
937func TestScrapeLoopMetadata(t *testing.T) {
938var (
939signal = make(chan struct{})
940scraper = &testScraper{}
941scrapeMetrics = newTestScrapeMetrics(t)
942cache = newScrapeCache(scrapeMetrics)
943)
944defer close(signal)
945
946ctx, cancel := context.WithCancel(context.Background())
947sl := newScrapeLoop(ctx,
948scraper,
949nil, nil,
950nopMutator,
951nopMutator,
952func(ctx context.Context) storage.Appender { return nopAppender{} },
953cache,
954labels.NewSymbolTable(),
9550,
956true,
957false,
958true,
9590, 0, nativeHistogramMaxSchema,
960nil,
9610,
9620,
963false,
964false,
965false,
966false,
967false,
968nil,
969false,
970scrapeMetrics,
971false,
972)
973defer cancel()
974
975slApp := sl.appender(ctx)
976total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter
977# HELP test_metric some help text
978# UNIT test_metric metric
979test_metric 1
980# TYPE test_metric_no_help gauge
981# HELP test_metric_no_type other help text
982# EOF`), "application/openmetrics-text", time.Now())
983require.NoError(t, err)
984require.NoError(t, slApp.Commit())
985require.Equal(t, 1, total)
986
987md, ok := cache.GetMetadata("test_metric")
988require.True(t, ok, "expected metadata to be present")
989require.Equal(t, model.MetricTypeCounter, md.Type, "unexpected metric type")
990require.Equal(t, "some help text", md.Help)
991require.Equal(t, "metric", md.Unit)
992
993md, ok = cache.GetMetadata("test_metric_no_help")
994require.True(t, ok, "expected metadata to be present")
995require.Equal(t, model.MetricTypeGauge, md.Type, "unexpected metric type")
996require.Equal(t, "", md.Help)
997require.Equal(t, "", md.Unit)
998
999md, ok = cache.GetMetadata("test_metric_no_type")
1000require.True(t, ok, "expected metadata to be present")
1001require.Equal(t, model.MetricTypeUnknown, md.Type, "unexpected metric type")
1002require.Equal(t, "other help text", md.Help)
1003require.Equal(t, "", md.Unit)
1004}
1005
1006func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
1007// Need a full storage for correct Add/AddFast semantics.
1008s := teststorage.New(t)
1009t.Cleanup(func() { s.Close() })
1010
1011ctx, cancel := context.WithCancel(context.Background())
1012sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0)
1013t.Cleanup(func() { cancel() })
1014
1015return ctx, sl
1016}
1017
1018func TestScrapeLoopSeriesAdded(t *testing.T) {
1019ctx, sl := simpleTestScrapeLoop(t)
1020
1021slApp := sl.appender(ctx)
1022total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
1023require.NoError(t, err)
1024require.NoError(t, slApp.Commit())
1025require.Equal(t, 1, total)
1026require.Equal(t, 1, added)
1027require.Equal(t, 1, seriesAdded)
1028
1029slApp = sl.appender(ctx)
1030total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
1031require.NoError(t, slApp.Commit())
1032require.NoError(t, err)
1033require.Equal(t, 1, total)
1034require.Equal(t, 1, added)
1035require.Equal(t, 0, seriesAdded)
1036}
1037
1038func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
1039s := teststorage.New(t)
1040defer s.Close()
1041ctx, cancel := context.WithCancel(context.Background())
1042defer cancel()
1043
1044target := &Target{
1045labels: labels.FromStrings("pod_label_invalid_012", "test"),
1046}
1047relabelConfig := []*relabel.Config{{
1048Action: relabel.LabelMap,
1049Regex: relabel.MustNewRegexp("pod_label_invalid_(.+)"),
1050Separator: ";",
1051Replacement: "$1",
1052}}
1053sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0)
1054sl.sampleMutator = func(l labels.Labels) labels.Labels {
1055return mutateSampleLabels(l, target, true, relabelConfig)
1056}
1057
1058slApp := sl.appender(ctx)
1059total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
1060require.ErrorContains(t, err, "invalid metric name or label names")
1061require.NoError(t, slApp.Rollback())
1062require.Equal(t, 1, total)
1063require.Equal(t, 0, added)
1064require.Equal(t, 0, seriesAdded)
1065}
1066
1067func makeTestMetrics(n int) []byte {
1068// Construct a metrics string to parse
1069sb := bytes.Buffer{}
1070for i := 0; i < n; i++ {
1071fmt.Fprintf(&sb, "# TYPE metric_a gauge\n")
1072fmt.Fprintf(&sb, "# HELP metric_a help text\n")
1073fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100)
1074}
1075fmt.Fprintf(&sb, "# EOF\n")
1076return sb.Bytes()
1077}
1078
1079func BenchmarkScrapeLoopAppend(b *testing.B) {
1080ctx, sl := simpleTestScrapeLoop(b)
1081
1082slApp := sl.appender(ctx)
1083metrics := makeTestMetrics(100)
1084ts := time.Time{}
1085
1086b.ResetTimer()
1087
1088for i := 0; i < b.N; i++ {
1089ts = ts.Add(time.Second)
1090_, _, _, _ = sl.append(slApp, metrics, "", ts)
1091}
1092}
1093
1094func BenchmarkScrapeLoopAppendOM(b *testing.B) {
1095ctx, sl := simpleTestScrapeLoop(b)
1096
1097slApp := sl.appender(ctx)
1098metrics := makeTestMetrics(100)
1099ts := time.Time{}
1100
1101b.ResetTimer()
1102
1103for i := 0; i < b.N; i++ {
1104ts = ts.Add(time.Second)
1105_, _, _, _ = sl.append(slApp, metrics, "application/openmetrics-text", ts)
1106}
1107}
1108
1109func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
1110appender := &collectResultAppender{}
1111var (
1112signal = make(chan struct{}, 1)
1113scraper = &testScraper{}
1114app = func(ctx context.Context) storage.Appender { return appender }
1115)
1116
1117ctx, cancel := context.WithCancel(context.Background())
1118sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
1119// Succeed once, several failures, then stop.
1120numScrapes := 0
1121
1122scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1123numScrapes++
1124
1125switch numScrapes {
1126case 1:
1127w.Write([]byte("metric_a 42\n"))
1128return nil
1129case 5:
1130cancel()
1131}
1132return errors.New("scrape failed")
1133}
1134
1135go func() {
1136sl.run(nil)
1137signal <- struct{}{}
1138}()
1139
1140select {
1141case <-signal:
1142case <-time.After(5 * time.Second):
1143require.FailNow(t, "Scrape wasn't stopped.")
1144}
1145
1146// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
1147// each scrape successful or not.
1148require.Len(t, appender.resultFloats, 27, "Appended samples not as expected:\n%s", appender)
1149require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
1150require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
1151"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
1152}
1153
1154func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
1155appender := &collectResultAppender{}
1156var (
1157signal = make(chan struct{}, 1)
1158scraper = &testScraper{}
1159app = func(ctx context.Context) storage.Appender { return appender }
1160numScrapes = 0
1161)
1162
1163ctx, cancel := context.WithCancel(context.Background())
1164sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
1165
1166// Succeed once, several failures, then stop.
1167scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1168numScrapes++
1169switch numScrapes {
1170case 1:
1171w.Write([]byte("metric_a 42\n"))
1172return nil
1173case 2:
1174w.Write([]byte("7&-\n"))
1175return nil
1176case 3:
1177cancel()
1178}
1179return errors.New("scrape failed")
1180}
1181
1182go func() {
1183sl.run(nil)
1184signal <- struct{}{}
1185}()
1186
1187select {
1188case <-signal:
1189case <-time.After(5 * time.Second):
1190require.FailNow(t, "Scrape wasn't stopped.")
1191}
1192
1193// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
1194// each scrape successful or not.
1195require.Len(t, appender.resultFloats, 17, "Appended samples not as expected:\n%s", appender)
1196require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
1197require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
1198"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
1199}
1200
1201func TestScrapeLoopCache(t *testing.T) {
1202s := teststorage.New(t)
1203defer s.Close()
1204
1205appender := &collectResultAppender{}
1206var (
1207signal = make(chan struct{}, 1)
1208scraper = &testScraper{}
1209app = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender }
1210)
1211
1212ctx, cancel := context.WithCancel(context.Background())
1213// Decreasing the scrape interval could make the test fail, as multiple scrapes might be initiated at identical millisecond timestamps.
1214// See https://github.com/prometheus/prometheus/issues/12727.
1215sl := newBasicScrapeLoop(t, ctx, scraper, app, 100*time.Millisecond)
1216
1217numScrapes := 0
1218
1219scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1220switch numScrapes {
1221case 1, 2:
1222_, ok := sl.cache.series["metric_a"]
1223require.True(t, ok, "metric_a missing from cache after scrape %d", numScrapes)
1224_, ok = sl.cache.series["metric_b"]
1225require.True(t, ok, "metric_b missing from cache after scrape %d", numScrapes)
1226case 3:
1227_, ok := sl.cache.series["metric_a"]
1228require.True(t, ok, "metric_a missing from cache after scrape %d", numScrapes)
1229_, ok = sl.cache.series["metric_b"]
1230require.False(t, ok, "metric_b present in cache after scrape %d", numScrapes)
1231}
1232
1233numScrapes++
1234switch numScrapes {
1235case 1:
1236w.Write([]byte("metric_a 42\nmetric_b 43\n"))
1237return nil
1238case 3:
1239w.Write([]byte("metric_a 44\n"))
1240return nil
1241case 4:
1242cancel()
1243}
1244return fmt.Errorf("scrape failed")
1245}
1246
1247go func() {
1248sl.run(nil)
1249signal <- struct{}{}
1250}()
1251
1252select {
1253case <-signal:
1254case <-time.After(5 * time.Second):
1255require.FailNow(t, "Scrape wasn't stopped.")
1256}
1257
1258// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
1259// each scrape successful or not.
1260require.Len(t, appender.resultFloats, 26, "Appended samples not as expected:\n%s", appender)
1261}
1262
1263func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
1264s := teststorage.New(t)
1265defer s.Close()
1266
1267sapp := s.Appender(context.Background())
1268
1269appender := &collectResultAppender{next: sapp}
1270var (
1271signal = make(chan struct{}, 1)
1272scraper = &testScraper{}
1273app = func(ctx context.Context) storage.Appender { return appender }
1274)
1275
1276ctx, cancel := context.WithCancel(context.Background())
1277sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
1278
1279numScrapes := 0
1280
1281scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1282numScrapes++
1283if numScrapes < 5 {
1284s := ""
1285for i := 0; i < 500; i++ {
1286s = fmt.Sprintf("%smetric_%d_%d 42\n", s, i, numScrapes)
1287}
1288w.Write([]byte(s + "&"))
1289} else {
1290cancel()
1291}
1292return nil
1293}
1294
1295go func() {
1296sl.run(nil)
1297signal <- struct{}{}
1298}()
1299
1300select {
1301case <-signal:
1302case <-time.After(5 * time.Second):
1303require.FailNow(t, "Scrape wasn't stopped.")
1304}
1305
1306require.LessOrEqual(t, len(sl.cache.series), 2000, "More than 2000 series cached.")
1307}
1308
1309func TestScrapeLoopAppend(t *testing.T) {
1310tests := []struct {
1311title string
1312honorLabels bool
1313scrapeLabels string
1314discoveryLabels []string
1315expLset labels.Labels
1316expValue float64
1317}{
1318{
1319// When "honor_labels" is not set
1320// label name collision is handler by adding a prefix.
1321title: "Label name collision",
1322honorLabels: false,
1323scrapeLabels: `metric{n="1"} 0`,
1324discoveryLabels: []string{"n", "2"},
1325expLset: labels.FromStrings("__name__", "metric", "exported_n", "1", "n", "2"),
1326expValue: 0,
1327}, {
1328// When "honor_labels" is not set
1329// exported label from discovery don't get overwritten
1330title: "Label name collision",
1331honorLabels: false,
1332scrapeLabels: `metric 0`,
1333discoveryLabels: []string{"n", "2", "exported_n", "2"},
1334expLset: labels.FromStrings("__name__", "metric", "n", "2", "exported_n", "2"),
1335expValue: 0,
1336}, {
1337// Labels with no value need to be removed as these should not be ingested.
1338title: "Delete Empty labels",
1339honorLabels: false,
1340scrapeLabels: `metric{n=""} 0`,
1341discoveryLabels: nil,
1342expLset: labels.FromStrings("__name__", "metric"),
1343expValue: 0,
1344}, {
1345// Honor Labels should ignore labels with the same name.
1346title: "Honor Labels",
1347honorLabels: true,
1348scrapeLabels: `metric{n1="1", n2="2"} 0`,
1349discoveryLabels: []string{"n1", "0"},
1350expLset: labels.FromStrings("__name__", "metric", "n1", "1", "n2", "2"),
1351expValue: 0,
1352}, {
1353title: "Stale - NaN",
1354honorLabels: false,
1355scrapeLabels: `metric NaN`,
1356discoveryLabels: nil,
1357expLset: labels.FromStrings("__name__", "metric"),
1358expValue: math.Float64frombits(value.NormalNaN),
1359},
1360}
1361
1362for _, test := range tests {
1363app := &collectResultAppender{}
1364
1365discoveryLabels := &Target{
1366labels: labels.FromStrings(test.discoveryLabels...),
1367}
1368
1369sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1370sl.sampleMutator = func(l labels.Labels) labels.Labels {
1371return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil)
1372}
1373sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
1374return mutateReportSampleLabels(l, discoveryLabels)
1375}
1376
1377now := time.Now()
1378
1379slApp := sl.appender(context.Background())
1380_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now)
1381require.NoError(t, err)
1382require.NoError(t, slApp.Commit())
1383
1384expected := []floatSample{
1385{
1386metric: test.expLset,
1387t: timestamp.FromTime(now),
1388f: test.expValue,
1389},
1390}
1391
1392t.Logf("Test:%s", test.title)
1393requireEqual(t, expected, app.resultFloats)
1394}
1395}
1396
1397func requireEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) {
1398testutil.RequireEqualWithOptions(t, expected, actual,
1399[]cmp.Option{cmp.Comparer(equalFloatSamples), cmp.AllowUnexported(histogramSample{})},
1400msgAndArgs...)
1401}
1402
1403func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
1404testcases := map[string]struct {
1405targetLabels []string
1406exposedLabels string
1407expected []string
1408}{
1409"One target label collides with existing label": {
1410targetLabels: []string{"foo", "2"},
1411exposedLabels: `metric{foo="1"} 0`,
1412expected: []string{"__name__", "metric", "exported_foo", "1", "foo", "2"},
1413},
1414
1415"One target label collides with existing label, plus target label already with prefix 'exported'": {
1416targetLabels: []string{"foo", "2", "exported_foo", "3"},
1417exposedLabels: `metric{foo="1"} 0`,
1418expected: []string{"__name__", "metric", "exported_exported_foo", "1", "exported_foo", "3", "foo", "2"},
1419},
1420"One target label collides with existing label, plus existing label already with prefix 'exported": {
1421targetLabels: []string{"foo", "3"},
1422exposedLabels: `metric{foo="1", exported_foo="2"} 0`,
1423expected: []string{"__name__", "metric", "exported_exported_foo", "1", "exported_foo", "2", "foo", "3"},
1424},
1425"One target label collides with existing label, both already with prefix 'exported'": {
1426targetLabels: []string{"exported_foo", "2"},
1427exposedLabels: `metric{exported_foo="1"} 0`,
1428expected: []string{"__name__", "metric", "exported_exported_foo", "1", "exported_foo", "2"},
1429},
1430"Two target labels collide with existing labels, both with and without prefix 'exported'": {
1431targetLabels: []string{"foo", "3", "exported_foo", "4"},
1432exposedLabels: `metric{foo="1", exported_foo="2"} 0`,
1433expected: []string{
1434"__name__", "metric", "exported_exported_foo", "1", "exported_exported_exported_foo",
1435"2", "exported_foo", "4", "foo", "3",
1436},
1437},
1438"Extreme example": {
1439targetLabels: []string{"foo", "0", "exported_exported_foo", "1", "exported_exported_exported_foo", "2"},
1440exposedLabels: `metric{foo="3", exported_foo="4", exported_exported_exported_foo="5"} 0`,
1441expected: []string{
1442"__name__", "metric",
1443"exported_exported_exported_exported_exported_foo", "5",
1444"exported_exported_exported_exported_foo", "3",
1445"exported_exported_exported_foo", "2",
1446"exported_exported_foo", "1",
1447"exported_foo", "4",
1448"foo", "0",
1449},
1450},
1451}
1452
1453for name, tc := range testcases {
1454t.Run(name, func(t *testing.T) {
1455app := &collectResultAppender{}
1456sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1457sl.sampleMutator = func(l labels.Labels) labels.Labels {
1458return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil)
1459}
1460slApp := sl.appender(context.Background())
1461_, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
1462require.NoError(t, err)
1463
1464require.NoError(t, slApp.Commit())
1465
1466requireEqual(t, []floatSample{
1467{
1468metric: labels.FromStrings(tc.expected...),
1469t: timestamp.FromTime(time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)),
1470f: 0,
1471},
1472}, app.resultFloats)
1473})
1474}
1475}
1476
1477func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
1478// collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next.
1479app := &collectResultAppender{}
1480sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1481
1482fakeRef := storage.SeriesRef(1)
1483expValue := float64(1)
1484metric := []byte(`metric{n="1"} 1`)
1485p, warning := textparse.New(metric, "", false, labels.NewSymbolTable())
1486require.NoError(t, warning)
1487
1488var lset labels.Labels
1489p.Next()
1490p.Metric(&lset)
1491hash := lset.Hash()
1492
1493// Create a fake entry in the cache
1494sl.cache.addRef(metric, fakeRef, lset, hash)
1495now := time.Now()
1496
1497slApp := sl.appender(context.Background())
1498_, _, _, err := sl.append(slApp, metric, "", now)
1499require.NoError(t, err)
1500require.NoError(t, slApp.Commit())
1501
1502expected := []floatSample{
1503{
1504metric: lset,
1505t: timestamp.FromTime(now),
1506f: expValue,
1507},
1508}
1509
1510require.Equal(t, expected, app.resultFloats)
1511}
1512
1513func TestScrapeLoopAppendSampleLimit(t *testing.T) {
1514resApp := &collectResultAppender{}
1515app := &limitAppender{Appender: resApp, limit: 1}
1516
1517sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1518sl.sampleMutator = func(l labels.Labels) labels.Labels {
1519if l.Has("deleteme") {
1520return labels.EmptyLabels()
1521}
1522return l
1523}
1524sl.sampleLimit = app.limit
1525
1526// Get the value of the Counter before performing the append.
1527beforeMetric := dto.Metric{}
1528err := sl.metrics.targetScrapeSampleLimit.Write(&beforeMetric)
1529require.NoError(t, err)
1530
1531beforeMetricValue := beforeMetric.GetCounter().GetValue()
1532
1533now := time.Now()
1534slApp := sl.appender(context.Background())
1535total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
1536require.ErrorIs(t, err, errSampleLimit)
1537require.NoError(t, slApp.Rollback())
1538require.Equal(t, 3, total)
1539require.Equal(t, 3, added)
1540require.Equal(t, 1, seriesAdded)
1541
1542// Check that the Counter has been incremented a single time for the scrape,
1543// not multiple times for each sample.
1544metric := dto.Metric{}
1545err = sl.metrics.targetScrapeSampleLimit.Write(&metric)
1546require.NoError(t, err)
1547
1548value := metric.GetCounter().GetValue()
1549change := value - beforeMetricValue
1550require.Equal(t, 1.0, change, "Unexpected change of sample limit metric: %f", change)
1551
1552// And verify that we got the samples that fit under the limit.
1553want := []floatSample{
1554{
1555metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1556t: timestamp.FromTime(now),
1557f: 1,
1558},
1559}
1560requireEqual(t, want, resApp.rolledbackFloats, "Appended samples not as expected:\n%s", appender)
1561
1562now = time.Now()
1563slApp = sl.appender(context.Background())
1564total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now)
1565require.ErrorIs(t, err, errSampleLimit)
1566require.NoError(t, slApp.Rollback())
1567require.Equal(t, 9, total)
1568require.Equal(t, 6, added)
1569require.Equal(t, 0, seriesAdded)
1570}
1571
1572func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
1573resApp := &collectResultAppender{}
1574app := &bucketLimitAppender{Appender: resApp, limit: 2}
1575
1576sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1577sl.enableNativeHistogramIngestion = true
1578sl.sampleMutator = func(l labels.Labels) labels.Labels {
1579if l.Has("deleteme") {
1580return labels.EmptyLabels()
1581}
1582return l
1583}
1584sl.sampleLimit = app.limit
1585
1586metric := dto.Metric{}
1587err := sl.metrics.targetScrapeNativeHistogramBucketLimit.Write(&metric)
1588require.NoError(t, err)
1589beforeMetricValue := metric.GetCounter().GetValue()
1590
1591nativeHistogram := prometheus.NewHistogramVec(
1592prometheus.HistogramOpts{
1593Namespace: "testing",
1594Name: "example_native_histogram",
1595Help: "This is used for testing",
1596ConstLabels: map[string]string{"some": "value"},
1597NativeHistogramBucketFactor: 1.1, // 10% increase from bucket to bucket
1598NativeHistogramMaxBucketNumber: 100, // intentionally higher than the limit we'll use in the scraper
1599},
1600[]string{"size"},
1601)
1602registry := prometheus.NewRegistry()
1603registry.Register(nativeHistogram)
1604nativeHistogram.WithLabelValues("S").Observe(1.0)
1605nativeHistogram.WithLabelValues("M").Observe(1.0)
1606nativeHistogram.WithLabelValues("L").Observe(1.0)
1607nativeHistogram.WithLabelValues("M").Observe(10.0)
1608nativeHistogram.WithLabelValues("L").Observe(10.0) // in different bucket since > 1*1.1
1609
1610gathered, err := registry.Gather()
1611require.NoError(t, err)
1612require.NotEmpty(t, gathered)
1613
1614histogramMetricFamily := gathered[0]
1615msg, err := MetricFamilyToProtobuf(histogramMetricFamily)
1616require.NoError(t, err)
1617
1618now := time.Now()
1619total, added, seriesAdded, err := sl.append(app, msg, "application/vnd.google.protobuf", now)
1620require.NoError(t, err)
1621require.Equal(t, 3, total)
1622require.Equal(t, 3, added)
1623require.Equal(t, 3, seriesAdded)
1624
1625err = sl.metrics.targetScrapeNativeHistogramBucketLimit.Write(&metric)
1626require.NoError(t, err)
1627metricValue := metric.GetCounter().GetValue()
1628require.Equal(t, beforeMetricValue, metricValue)
1629beforeMetricValue = metricValue
1630
1631nativeHistogram.WithLabelValues("L").Observe(100.0) // in different bucket since > 10*1.1
1632
1633gathered, err = registry.Gather()
1634require.NoError(t, err)
1635require.NotEmpty(t, gathered)
1636
1637histogramMetricFamily = gathered[0]
1638msg, err = MetricFamilyToProtobuf(histogramMetricFamily)
1639require.NoError(t, err)
1640
1641now = time.Now()
1642total, added, seriesAdded, err = sl.append(app, msg, "application/vnd.google.protobuf", now)
1643require.NoError(t, err)
1644require.Equal(t, 3, total)
1645require.Equal(t, 3, added)
1646require.Equal(t, 3, seriesAdded)
1647
1648err = sl.metrics.targetScrapeNativeHistogramBucketLimit.Write(&metric)
1649require.NoError(t, err)
1650metricValue = metric.GetCounter().GetValue()
1651require.Equal(t, beforeMetricValue, metricValue)
1652beforeMetricValue = metricValue
1653
1654nativeHistogram.WithLabelValues("L").Observe(100000.0) // in different bucket since > 10*1.1
1655
1656gathered, err = registry.Gather()
1657require.NoError(t, err)
1658require.NotEmpty(t, gathered)
1659
1660histogramMetricFamily = gathered[0]
1661msg, err = MetricFamilyToProtobuf(histogramMetricFamily)
1662require.NoError(t, err)
1663
1664now = time.Now()
1665total, added, seriesAdded, err = sl.append(app, msg, "application/vnd.google.protobuf", now)
1666if !errors.Is(err, errBucketLimit) {
1667t.Fatalf("Did not see expected histogram bucket limit error: %s", err)
1668}
1669require.NoError(t, app.Rollback())
1670require.Equal(t, 3, total)
1671require.Equal(t, 3, added)
1672require.Equal(t, 0, seriesAdded)
1673
1674err = sl.metrics.targetScrapeNativeHistogramBucketLimit.Write(&metric)
1675require.NoError(t, err)
1676metricValue = metric.GetCounter().GetValue()
1677require.Equal(t, beforeMetricValue+1, metricValue)
1678}
1679
1680func TestScrapeLoop_ChangingMetricString(t *testing.T) {
1681// This is a regression test for the scrape loop cache not properly maintaining
1682// IDs when the string representation of a metric changes across a scrape. Thus
1683// we use a real storage appender here.
1684s := teststorage.New(t)
1685defer s.Close()
1686
1687capp := &collectResultAppender{}
1688sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0)
1689
1690now := time.Now()
1691slApp := sl.appender(context.Background())
1692_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now)
1693require.NoError(t, err)
1694require.NoError(t, slApp.Commit())
1695
1696slApp = sl.appender(context.Background())
1697_, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
1698require.NoError(t, err)
1699require.NoError(t, slApp.Commit())
1700
1701want := []floatSample{
1702{
1703metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
1704t: timestamp.FromTime(now),
1705f: 1,
1706},
1707{
1708metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
1709t: timestamp.FromTime(now.Add(time.Minute)),
1710f: 2,
1711},
1712}
1713require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
1714}
1715
1716func TestScrapeLoopAppendStaleness(t *testing.T) {
1717app := &collectResultAppender{}
1718
1719sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1720
1721now := time.Now()
1722slApp := sl.appender(context.Background())
1723_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now)
1724require.NoError(t, err)
1725require.NoError(t, slApp.Commit())
1726
1727slApp = sl.appender(context.Background())
1728_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
1729require.NoError(t, err)
1730require.NoError(t, slApp.Commit())
1731
1732want := []floatSample{
1733{
1734metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1735t: timestamp.FromTime(now),
1736f: 1,
1737},
1738{
1739metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1740t: timestamp.FromTime(now.Add(time.Second)),
1741f: math.Float64frombits(value.StaleNaN),
1742},
1743}
1744requireEqual(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
1745}
1746
1747func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
1748app := &collectResultAppender{}
1749sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1750now := time.Now()
1751slApp := sl.appender(context.Background())
1752_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
1753require.NoError(t, err)
1754require.NoError(t, slApp.Commit())
1755
1756slApp = sl.appender(context.Background())
1757_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
1758require.NoError(t, err)
1759require.NoError(t, slApp.Commit())
1760
1761want := []floatSample{
1762{
1763metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1764t: 1000,
1765f: 1,
1766},
1767}
1768require.Equal(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
1769}
1770
1771func TestScrapeLoopAppendStalenessIfTrackTimestampStaleness(t *testing.T) {
1772app := &collectResultAppender{}
1773sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1774sl.trackTimestampsStaleness = true
1775
1776now := time.Now()
1777slApp := sl.appender(context.Background())
1778_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
1779require.NoError(t, err)
1780require.NoError(t, slApp.Commit())
1781
1782slApp = sl.appender(context.Background())
1783_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
1784require.NoError(t, err)
1785require.NoError(t, slApp.Commit())
1786
1787want := []floatSample{
1788{
1789metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1790t: 1000,
1791f: 1,
1792},
1793{
1794metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1795t: timestamp.FromTime(now.Add(time.Second)),
1796f: math.Float64frombits(value.StaleNaN),
1797},
1798}
1799requireEqual(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
1800}
1801
1802func TestScrapeLoopAppendExemplar(t *testing.T) {
1803tests := []struct {
1804title string
1805scrapeClassicHistograms bool
1806enableNativeHistogramsIngestion bool
1807scrapeText string
1808contentType string
1809discoveryLabels []string
1810floats []floatSample
1811histograms []histogramSample
1812exemplars []exemplar.Exemplar
1813}{
1814{
1815title: "Metric without exemplars",
1816scrapeText: "metric_total{n=\"1\"} 0\n# EOF",
1817contentType: "application/openmetrics-text",
1818discoveryLabels: []string{"n", "2"},
1819floats: []floatSample{{
1820metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
1821f: 0,
1822}},
1823},
1824{
1825title: "Metric with exemplars",
1826scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF",
1827contentType: "application/openmetrics-text",
1828discoveryLabels: []string{"n", "2"},
1829floats: []floatSample{{
1830metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
1831f: 0,
1832}},
1833exemplars: []exemplar.Exemplar{
1834{Labels: labels.FromStrings("a", "abc"), Value: 1},
1835},
1836},
1837{
1838title: "Metric with exemplars and TS",
1839scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF",
1840contentType: "application/openmetrics-text",
1841discoveryLabels: []string{"n", "2"},
1842floats: []floatSample{{
1843metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
1844f: 0,
1845}},
1846exemplars: []exemplar.Exemplar{
1847{Labels: labels.FromStrings("a", "abc"), Value: 1, Ts: 10000000, HasTs: true},
1848},
1849},
1850{
1851title: "Two metrics and exemplars",
1852scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000
1853metric_total{n="2"} 2 # {t="2"} 2.0 20000
1854# EOF`,
1855contentType: "application/openmetrics-text",
1856floats: []floatSample{{
1857metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
1858f: 1,
1859}, {
1860metric: labels.FromStrings("__name__", "metric_total", "n", "2"),
1861f: 2,
1862}},
1863exemplars: []exemplar.Exemplar{
1864{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
1865{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
1866},
1867},
1868{
1869title: "Native histogram with three exemplars",
1870
1871enableNativeHistogramsIngestion: true,
1872scrapeText: `name: "test_histogram"
1873help: "Test histogram with many buckets removed to keep it manageable in size."
1874type: HISTOGRAM
1875metric: <
1876histogram: <
1877sample_count: 175
1878sample_sum: 0.0008280461746287094
1879bucket: <
1880cumulative_count: 2
1881upper_bound: -0.0004899999999999998
1882>
1883bucket: <
1884cumulative_count: 4
1885upper_bound: -0.0003899999999999998
1886exemplar: <
1887label: <
1888name: "dummyID"
1889value: "59727"
1890>
1891value: -0.00039
1892timestamp: <
1893seconds: 1625851155
1894nanos: 146848499
1895>
1896>
1897>
1898bucket: <
1899cumulative_count: 16
1900upper_bound: -0.0002899999999999998
1901exemplar: <
1902label: <
1903name: "dummyID"
1904value: "5617"
1905>
1906value: -0.00029
1907>
1908>
1909bucket: <
1910cumulative_count: 32
1911upper_bound: -0.0001899999999999998
1912exemplar: <
1913label: <
1914name: "dummyID"
1915value: "58215"
1916>
1917value: -0.00019
1918timestamp: <
1919seconds: 1625851055
1920nanos: 146848599
1921>
1922>
1923>
1924schema: 3
1925zero_threshold: 2.938735877055719e-39
1926zero_count: 2
1927negative_span: <
1928offset: -162
1929length: 1
1930>
1931negative_span: <
1932offset: 23
1933length: 4
1934>
1935negative_delta: 1
1936negative_delta: 3
1937negative_delta: -2
1938negative_delta: -1
1939negative_delta: 1
1940positive_span: <
1941offset: -161
1942length: 1
1943>
1944positive_span: <
1945offset: 8
1946length: 3
1947>
1948positive_delta: 1
1949positive_delta: 2
1950positive_delta: -1
1951positive_delta: -1
1952>
1953timestamp_ms: 1234568
1954>
1955
1956`,
1957contentType: "application/vnd.google.protobuf",
1958histograms: []histogramSample{{
1959t: 1234568,
1960h: &histogram.Histogram{
1961Count: 175,
1962ZeroCount: 2,
1963Sum: 0.0008280461746287094,
1964ZeroThreshold: 2.938735877055719e-39,
1965Schema: 3,
1966PositiveSpans: []histogram.Span{
1967{Offset: -161, Length: 1},
1968{Offset: 8, Length: 3},
1969},
1970NegativeSpans: []histogram.Span{
1971{Offset: -162, Length: 1},
1972{Offset: 23, Length: 4},
1973},
1974PositiveBuckets: []int64{1, 2, -1, -1},
1975NegativeBuckets: []int64{1, 3, -2, -1, 1},
1976},
1977}},
1978exemplars: []exemplar.Exemplar{
1979// Native histogram exemplars are arranged by timestamp, and those with missing timestamps are dropped.
1980{Labels: labels.FromStrings("dummyID", "58215"), Value: -0.00019, Ts: 1625851055146, HasTs: true},
1981{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
1982},
1983},
1984{
1985title: "Native histogram with three exemplars scraped as classic histogram",
1986
1987enableNativeHistogramsIngestion: true,
1988scrapeText: `name: "test_histogram"
1989help: "Test histogram with many buckets removed to keep it manageable in size."
1990type: HISTOGRAM
1991metric: <
1992histogram: <
1993sample_count: 175
1994sample_sum: 0.0008280461746287094
1995bucket: <
1996cumulative_count: 2
1997upper_bound: -0.0004899999999999998
1998>
1999bucket: <
2000cumulative_count: 4
2001upper_bound: -0.0003899999999999998
2002exemplar: <
2003label: <
2004name: "dummyID"
2005value: "59727"
2006>
2007value: -0.00039
2008timestamp: <
2009seconds: 1625851155
2010nanos: 146848499
2011>
2012>
2013>
2014bucket: <
2015cumulative_count: 16
2016upper_bound: -0.0002899999999999998
2017exemplar: <
2018label: <
2019name: "dummyID"
2020value: "5617"
2021>
2022value: -0.00029
2023>
2024>
2025bucket: <
2026cumulative_count: 32
2027upper_bound: -0.0001899999999999998
2028exemplar: <
2029label: <
2030name: "dummyID"
2031value: "58215"
2032>
2033value: -0.00019
2034timestamp: <
2035seconds: 1625851055
2036nanos: 146848599
2037>
2038>
2039>
2040schema: 3
2041zero_threshold: 2.938735877055719e-39
2042zero_count: 2
2043negative_span: <
2044offset: -162
2045length: 1
2046>
2047negative_span: <
2048offset: 23
2049length: 4
2050>
2051negative_delta: 1
2052negative_delta: 3
2053negative_delta: -2
2054negative_delta: -1
2055negative_delta: 1
2056positive_span: <
2057offset: -161
2058length: 1
2059>
2060positive_span: <
2061offset: 8
2062length: 3
2063>
2064positive_delta: 1
2065positive_delta: 2
2066positive_delta: -1
2067positive_delta: -1
2068>
2069timestamp_ms: 1234568
2070>
2071
2072`,
2073scrapeClassicHistograms: true,
2074contentType: "application/vnd.google.protobuf",
2075floats: []floatSample{
2076{metric: labels.FromStrings("__name__", "test_histogram_count"), t: 1234568, f: 175},
2077{metric: labels.FromStrings("__name__", "test_histogram_sum"), t: 1234568, f: 0.0008280461746287094},
2078{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0004899999999999998"), t: 1234568, f: 2},
2079{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0003899999999999998"), t: 1234568, f: 4},
2080{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0002899999999999998"), t: 1234568, f: 16},
2081{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0001899999999999998"), t: 1234568, f: 32},
2082{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"), t: 1234568, f: 175},
2083},
2084histograms: []histogramSample{{
2085t: 1234568,
2086h: &histogram.Histogram{
2087Count: 175,
2088ZeroCount: 2,
2089Sum: 0.0008280461746287094,
2090ZeroThreshold: 2.938735877055719e-39,
2091Schema: 3,
2092PositiveSpans: []histogram.Span{
2093{Offset: -161, Length: 1},
2094{Offset: 8, Length: 3},
2095},
2096NegativeSpans: []histogram.Span{
2097{Offset: -162, Length: 1},
2098{Offset: 23, Length: 4},
2099},
2100PositiveBuckets: []int64{1, 2, -1, -1},
2101NegativeBuckets: []int64{1, 3, -2, -1, 1},
2102},
2103}},
2104exemplars: []exemplar.Exemplar{
2105// Native histogram one is arranged by timestamp.
2106// Exemplars with missing timestamps are dropped for native histograms.
2107{Labels: labels.FromStrings("dummyID", "58215"), Value: -0.00019, Ts: 1625851055146, HasTs: true},
2108{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
2109// Classic histogram one is in order of appearance.
2110// Exemplars with missing timestamps are supported for classic histograms.
2111{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
2112{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, Ts: 1234568, HasTs: false},
2113{Labels: labels.FromStrings("dummyID", "58215"), Value: -0.00019, Ts: 1625851055146, HasTs: true},
2114},
2115},
2116}
2117
2118for _, test := range tests {
2119t.Run(test.title, func(t *testing.T) {
2120app := &collectResultAppender{}
2121
2122discoveryLabels := &Target{
2123labels: labels.FromStrings(test.discoveryLabels...),
2124}
2125
2126sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
2127sl.enableNativeHistogramIngestion = test.enableNativeHistogramsIngestion
2128sl.sampleMutator = func(l labels.Labels) labels.Labels {
2129return mutateSampleLabels(l, discoveryLabels, false, nil)
2130}
2131sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
2132return mutateReportSampleLabels(l, discoveryLabels)
2133}
2134sl.scrapeClassicHistograms = test.scrapeClassicHistograms
2135
2136now := time.Now()
2137
2138for i := range test.floats {
2139if test.floats[i].t != 0 {
2140continue
2141}
2142test.floats[i].t = timestamp.FromTime(now)
2143}
2144
2145// We need to set the timestamp for expected exemplars that does not have a timestamp.
2146for i := range test.exemplars {
2147if test.exemplars[i].Ts == 0 {
2148test.exemplars[i].Ts = timestamp.FromTime(now)
2149}
2150}
2151
2152buf := &bytes.Buffer{}
2153if test.contentType == "application/vnd.google.protobuf" {
2154// In case of protobuf, we have to create the binary representation.
2155pb := &dto.MetricFamily{}
2156// From text to proto message.
2157require.NoError(t, proto.UnmarshalText(test.scrapeText, pb))
2158// From proto message to binary protobuf.
2159protoBuf, err := proto.Marshal(pb)
2160require.NoError(t, err)
2161
2162// Write first length, then binary protobuf.
2163varintBuf := binary.AppendUvarint(nil, uint64(len(protoBuf)))
2164buf.Write(varintBuf)
2165buf.Write(protoBuf)
2166} else {
2167buf.WriteString(test.scrapeText)
2168}
2169
2170_, _, _, err := sl.append(app, buf.Bytes(), test.contentType, now)
2171require.NoError(t, err)
2172require.NoError(t, app.Commit())
2173requireEqual(t, test.floats, app.resultFloats)
2174requireEqual(t, test.histograms, app.resultHistograms)
2175requireEqual(t, test.exemplars, app.resultExemplars)
2176})
2177}
2178}
2179
2180func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
2181scrapeText := []string{`metric_total{n="1"} 1 # {t="1"} 1.0 10000
2182# EOF`, `metric_total{n="1"} 2 # {t="2"} 2.0 20000
2183# EOF`}
2184samples := []floatSample{{
2185metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
2186f: 1,
2187}, {
2188metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
2189f: 2,
2190}}
2191exemplars := []exemplar.Exemplar{
2192{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
2193{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
2194}
2195discoveryLabels := &Target{
2196labels: labels.FromStrings(),
2197}
2198
2199app := &collectResultAppender{}
2200
2201sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
2202sl.sampleMutator = func(l labels.Labels) labels.Labels {
2203return mutateSampleLabels(l, discoveryLabels, false, nil)
2204}
2205sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
2206return mutateReportSampleLabels(l, discoveryLabels)
2207}
2208
2209now := time.Now()
2210
2211for i := range samples {
2212ts := now.Add(time.Second * time.Duration(i))
2213samples[i].t = timestamp.FromTime(ts)
2214}
2215
2216// We need to set the timestamp for expected exemplars that does not have a timestamp.
2217for i := range exemplars {
2218if exemplars[i].Ts == 0 {
2219ts := now.Add(time.Second * time.Duration(i))
2220exemplars[i].Ts = timestamp.FromTime(ts)
2221}
2222}
2223
2224for i, st := range scrapeText {
2225_, _, _, err := sl.append(app, []byte(st), "application/openmetrics-text", timestamp.Time(samples[i].t))
2226require.NoError(t, err)
2227require.NoError(t, app.Commit())
2228}
2229
2230requireEqual(t, samples, app.resultFloats)
2231requireEqual(t, exemplars, app.resultExemplars)
2232}
2233
2234func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
2235var (
2236scraper = &testScraper{}
2237appender = &collectResultAppender{}
2238app = func(ctx context.Context) storage.Appender { return appender }
2239)
2240
2241ctx, cancel := context.WithCancel(context.Background())
2242sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
2243
2244scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
2245cancel()
2246return errors.New("scrape failed")
2247}
2248
2249sl.run(nil)
2250require.Equal(t, 0.0, appender.resultFloats[0].f, "bad 'up' value")
2251}
2252
2253func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
2254var (
2255scraper = &testScraper{}
2256appender = &collectResultAppender{}
2257app = func(ctx context.Context) storage.Appender { return appender }
2258)
2259
2260ctx, cancel := context.WithCancel(context.Background())
2261sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
2262
2263scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
2264cancel()
2265w.Write([]byte("a{l=\"\xff\"} 1\n"))
2266return nil
2267}
2268
2269sl.run(nil)
2270require.Equal(t, 0.0, appender.resultFloats[0].f, "bad 'up' value")
2271}
2272
2273type errorAppender struct {
2274collectResultAppender
2275}
2276
2277func (app *errorAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
2278switch lset.Get(model.MetricNameLabel) {
2279case "out_of_order":
2280return 0, storage.ErrOutOfOrderSample
2281case "amend":
2282return 0, storage.ErrDuplicateSampleForTimestamp
2283case "out_of_bounds":
2284return 0, storage.ErrOutOfBounds
2285default:
2286return app.collectResultAppender.Append(ref, lset, t, v)
2287}
2288}
2289
2290func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
2291app := &errorAppender{}
2292sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
2293
2294now := time.Unix(1, 0)
2295slApp := sl.appender(context.Background())
2296total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
2297require.NoError(t, err)
2298require.NoError(t, slApp.Commit())
2299
2300want := []floatSample{
2301{
2302metric: labels.FromStrings(model.MetricNameLabel, "normal"),
2303t: timestamp.FromTime(now),
2304f: 1,
2305},
2306}
2307requireEqual(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
2308require.Equal(t, 4, total)
2309require.Equal(t, 4, added)
2310require.Equal(t, 1, seriesAdded)
2311}
2312
2313func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
2314app := &collectResultAppender{}
2315sl := newBasicScrapeLoop(t, context.Background(), nil,
2316func(ctx context.Context) storage.Appender {
2317return &timeLimitAppender{
2318Appender: app,
2319maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)),
2320}
2321},
23220,
2323)
2324
2325now := time.Now().Add(20 * time.Minute)
2326slApp := sl.appender(context.Background())
2327total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now)
2328require.NoError(t, err)
2329require.NoError(t, slApp.Commit())
2330require.Equal(t, 1, total)
2331require.Equal(t, 1, added)
2332require.Equal(t, 0, seriesAdded)
2333}
2334
2335func TestTargetScraperScrapeOK(t *testing.T) {
2336const (
2337configTimeout = 1500 * time.Millisecond
2338expectedTimeout = "1.5"
2339)
2340
2341var protobufParsing bool
2342
2343server := httptest.NewServer(
2344http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2345if protobufParsing {
2346accept := r.Header.Get("Accept")
2347require.True(t, strings.HasPrefix(accept, "application/vnd.google.protobuf;"),
2348"Expected Accept header to prefer application/vnd.google.protobuf.")
2349}
2350
2351timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
2352require.Equal(t, expectedTimeout, timeout, "Expected scrape timeout header.")
2353
2354w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
2355w.Write([]byte("metric_a 1\nmetric_b 2\n"))
2356}),
2357)
2358defer server.Close()
2359
2360serverURL, err := url.Parse(server.URL)
2361if err != nil {
2362panic(err)
2363}
2364
2365runTest := func(acceptHeader string) {
2366ts := &targetScraper{
2367Target: &Target{
2368labels: labels.FromStrings(
2369model.SchemeLabel, serverURL.Scheme,
2370model.AddressLabel, serverURL.Host,
2371),
2372},
2373client: http.DefaultClient,
2374timeout: configTimeout,
2375acceptHeader: acceptHeader,
2376}
2377var buf bytes.Buffer
2378
2379resp, err := ts.scrape(context.Background())
2380require.NoError(t, err)
2381contentType, err := ts.readResponse(context.Background(), resp, &buf)
2382require.NoError(t, err)
2383require.Equal(t, "text/plain; version=0.0.4", contentType)
2384require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
2385}
2386
2387runTest(acceptHeader(config.DefaultScrapeProtocols))
2388protobufParsing = true
2389runTest(acceptHeader(config.DefaultProtoFirstScrapeProtocols))
2390}
2391
2392func TestTargetScrapeScrapeCancel(t *testing.T) {
2393block := make(chan struct{})
2394
2395server := httptest.NewServer(
2396http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2397<-block
2398}),
2399)
2400defer server.Close()
2401
2402serverURL, err := url.Parse(server.URL)
2403if err != nil {
2404panic(err)
2405}
2406
2407ts := &targetScraper{
2408Target: &Target{
2409labels: labels.FromStrings(
2410model.SchemeLabel, serverURL.Scheme,
2411model.AddressLabel, serverURL.Host,
2412),
2413},
2414client: http.DefaultClient,
2415acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
2416}
2417ctx, cancel := context.WithCancel(context.Background())
2418
2419errc := make(chan error, 1)
2420
2421go func() {
2422time.Sleep(1 * time.Second)
2423cancel()
2424}()
2425
2426go func() {
2427_, err := ts.scrape(ctx)
2428switch {
2429case err == nil:
2430errc <- errors.New("Expected error but got nil")
2431case !errors.Is(ctx.Err(), context.Canceled):
2432errc <- fmt.Errorf("Expected context cancellation error but got: %w", ctx.Err())
2433default:
2434close(errc)
2435}
2436}()
2437
2438select {
2439case <-time.After(5 * time.Second):
2440require.FailNow(t, "Scrape function did not return unexpectedly.")
2441case err := <-errc:
2442require.NoError(t, err)
2443}
2444// If this is closed in a defer above the function the test server
2445// doesn't terminate and the test doesn't complete.
2446close(block)
2447}
2448
2449func TestTargetScrapeScrapeNotFound(t *testing.T) {
2450server := httptest.NewServer(
2451http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2452w.WriteHeader(http.StatusNotFound)
2453}),
2454)
2455defer server.Close()
2456
2457serverURL, err := url.Parse(server.URL)
2458if err != nil {
2459panic(err)
2460}
2461
2462ts := &targetScraper{
2463Target: &Target{
2464labels: labels.FromStrings(
2465model.SchemeLabel, serverURL.Scheme,
2466model.AddressLabel, serverURL.Host,
2467),
2468},
2469client: http.DefaultClient,
2470acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
2471}
2472
2473resp, err := ts.scrape(context.Background())
2474require.NoError(t, err)
2475_, err = ts.readResponse(context.Background(), resp, io.Discard)
2476require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err)
2477}
2478
2479func TestTargetScraperBodySizeLimit(t *testing.T) {
2480const (
2481bodySizeLimit = 15
2482responseBody = "metric_a 1\nmetric_b 2\n"
2483)
2484var gzipResponse bool
2485server := httptest.NewServer(
2486http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2487w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
2488if gzipResponse {
2489w.Header().Set("Content-Encoding", "gzip")
2490gw := gzip.NewWriter(w)
2491defer gw.Close()
2492gw.Write([]byte(responseBody))
2493return
2494}
2495w.Write([]byte(responseBody))
2496}),
2497)
2498defer server.Close()
2499
2500serverURL, err := url.Parse(server.URL)
2501if err != nil {
2502panic(err)
2503}
2504
2505ts := &targetScraper{
2506Target: &Target{
2507labels: labels.FromStrings(
2508model.SchemeLabel, serverURL.Scheme,
2509model.AddressLabel, serverURL.Host,
2510),
2511},
2512client: http.DefaultClient,
2513bodySizeLimit: bodySizeLimit,
2514acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
2515metrics: newTestScrapeMetrics(t),
2516}
2517var buf bytes.Buffer
2518
2519// Target response uncompressed body, scrape with body size limit.
2520resp, err := ts.scrape(context.Background())
2521require.NoError(t, err)
2522_, err = ts.readResponse(context.Background(), resp, &buf)
2523require.ErrorIs(t, err, errBodySizeLimit)
2524require.Equal(t, bodySizeLimit, buf.Len())
2525// Target response gzip compressed body, scrape with body size limit.
2526gzipResponse = true
2527buf.Reset()
2528resp, err = ts.scrape(context.Background())
2529require.NoError(t, err)
2530_, err = ts.readResponse(context.Background(), resp, &buf)
2531require.ErrorIs(t, err, errBodySizeLimit)
2532require.Equal(t, bodySizeLimit, buf.Len())
2533// Target response uncompressed body, scrape without body size limit.
2534gzipResponse = false
2535buf.Reset()
2536ts.bodySizeLimit = 0
2537resp, err = ts.scrape(context.Background())
2538require.NoError(t, err)
2539_, err = ts.readResponse(context.Background(), resp, &buf)
2540require.NoError(t, err)
2541require.Len(t, responseBody, buf.Len())
2542// Target response gzip compressed body, scrape without body size limit.
2543gzipResponse = true
2544buf.Reset()
2545resp, err = ts.scrape(context.Background())
2546require.NoError(t, err)
2547_, err = ts.readResponse(context.Background(), resp, &buf)
2548require.NoError(t, err)
2549require.Len(t, responseBody, buf.Len())
2550}
2551
2552// testScraper implements the scraper interface and allows setting values
2553// returned by its methods. It also allows setting a custom scrape function.
2554type testScraper struct {
2555offsetDur time.Duration
2556
2557lastStart time.Time
2558lastDuration time.Duration
2559lastError error
2560
2561scrapeErr error
2562scrapeFunc func(context.Context, io.Writer) error
2563}
2564
2565func (ts *testScraper) offset(time.Duration, uint64) time.Duration {
2566return ts.offsetDur
2567}
2568
2569func (ts *testScraper) Report(start time.Time, duration time.Duration, err error) {
2570ts.lastStart = start
2571ts.lastDuration = duration
2572ts.lastError = err
2573}
2574
2575func (ts *testScraper) scrape(ctx context.Context) (*http.Response, error) {
2576return nil, ts.scrapeErr
2577}
2578
2579func (ts *testScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) {
2580if ts.scrapeFunc != nil {
2581return "", ts.scrapeFunc(ctx, w)
2582}
2583return "", ts.scrapeErr
2584}
2585
2586func TestScrapeLoop_RespectTimestamps(t *testing.T) {
2587s := teststorage.New(t)
2588defer s.Close()
2589
2590app := s.Appender(context.Background())
2591capp := &collectResultAppender{next: app}
2592sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0)
2593
2594now := time.Now()
2595slApp := sl.appender(context.Background())
2596_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
2597require.NoError(t, err)
2598require.NoError(t, slApp.Commit())
2599
2600want := []floatSample{
2601{
2602metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
2603t: 0,
2604f: 1,
2605},
2606}
2607require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
2608}
2609
2610func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
2611s := teststorage.New(t)
2612defer s.Close()
2613
2614app := s.Appender(context.Background())
2615
2616capp := &collectResultAppender{next: app}
2617
2618sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0)
2619sl.honorTimestamps = false
2620
2621now := time.Now()
2622slApp := sl.appender(context.Background())
2623_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
2624require.NoError(t, err)
2625require.NoError(t, slApp.Commit())
2626
2627want := []floatSample{
2628{
2629metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
2630t: timestamp.FromTime(now),
2631f: 1,
2632},
2633}
2634require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
2635}
2636
2637func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
2638s := teststorage.New(t)
2639defer s.Close()
2640
2641ctx, cancel := context.WithCancel(context.Background())
2642sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0)
2643defer cancel()
2644
2645// We add a good and a bad metric to check that both are discarded.
2646slApp := sl.appender(ctx)
2647_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
2648require.Error(t, err)
2649require.NoError(t, slApp.Rollback())
2650// We need to cycle staleness cache maps after a manual rollback. Otherwise they will have old entries in them,
2651// which would cause ErrDuplicateSampleForTimestamp errors on the next append.
2652sl.cache.iterDone(true)
2653
2654q, err := s.Querier(time.Time{}.UnixNano(), 0)
2655require.NoError(t, err)
2656series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
2657require.False(t, series.Next(), "series found in tsdb")
2658require.NoError(t, series.Err())
2659
2660// We add a good metric to check that it is recorded.
2661slApp = sl.appender(ctx)
2662_, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
2663require.NoError(t, err)
2664require.NoError(t, slApp.Commit())
2665
2666q, err = s.Querier(time.Time{}.UnixNano(), 0)
2667require.NoError(t, err)
2668series = q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
2669require.True(t, series.Next(), "series not found in tsdb")
2670require.NoError(t, series.Err())
2671require.False(t, series.Next(), "more than one series found in tsdb")
2672}
2673
2674func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
2675s := teststorage.New(t)
2676defer s.Close()
2677
2678app := s.Appender(context.Background())
2679
2680ctx, cancel := context.WithCancel(context.Background())
2681sl := newBasicScrapeLoop(t, context.Background(), &testScraper{}, func(ctx context.Context) storage.Appender { return app }, 0)
2682sl.sampleMutator = func(l labels.Labels) labels.Labels {
2683if l.Has("drop") {
2684return labels.FromStrings("no", "name") // This label set will trigger an error.
2685}
2686return l
2687}
2688defer cancel()
2689
2690slApp := sl.appender(context.Background())
2691_, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
2692require.Error(t, err)
2693require.NoError(t, slApp.Rollback())
2694require.Equal(t, errNameLabelMandatory, err)
2695
2696q, err := s.Querier(time.Time{}.UnixNano(), 0)
2697require.NoError(t, err)
2698series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
2699require.False(t, series.Next(), "series found in tsdb")
2700require.NoError(t, series.Err())
2701}
2702
2703func TestReusableConfig(t *testing.T) {
2704variants := []*config.ScrapeConfig{
2705{
2706JobName: "prometheus",
2707ScrapeTimeout: model.Duration(15 * time.Second),
2708},
2709{
2710JobName: "httpd",
2711ScrapeTimeout: model.Duration(15 * time.Second),
2712},
2713{
2714JobName: "prometheus",
2715ScrapeTimeout: model.Duration(5 * time.Second),
2716},
2717{
2718JobName: "prometheus",
2719MetricsPath: "/metrics",
2720},
2721{
2722JobName: "prometheus",
2723MetricsPath: "/metrics2",
2724},
2725{
2726JobName: "prometheus",
2727ScrapeTimeout: model.Duration(5 * time.Second),
2728MetricsPath: "/metrics2",
2729},
2730{
2731JobName: "prometheus",
2732ScrapeInterval: model.Duration(5 * time.Second),
2733MetricsPath: "/metrics2",
2734},
2735{
2736JobName: "prometheus",
2737ScrapeInterval: model.Duration(5 * time.Second),
2738SampleLimit: 1000,
2739MetricsPath: "/metrics2",
2740},
2741}
2742
2743match := [][]int{
2744{0, 2},
2745{4, 5},
2746{4, 6},
2747{4, 7},
2748{5, 6},
2749{5, 7},
2750{6, 7},
2751}
2752noMatch := [][]int{
2753{1, 2},
2754{0, 4},
2755{3, 4},
2756}
2757
2758for i, m := range match {
2759require.True(t, reusableCache(variants[m[0]], variants[m[1]]), "match test %d", i)
2760require.True(t, reusableCache(variants[m[1]], variants[m[0]]), "match test %d", i)
2761require.True(t, reusableCache(variants[m[1]], variants[m[1]]), "match test %d", i)
2762require.True(t, reusableCache(variants[m[0]], variants[m[0]]), "match test %d", i)
2763}
2764for i, m := range noMatch {
2765require.False(t, reusableCache(variants[m[0]], variants[m[1]]), "not match test %d", i)
2766require.False(t, reusableCache(variants[m[1]], variants[m[0]]), "not match test %d", i)
2767}
2768}
2769
2770func TestReuseScrapeCache(t *testing.T) {
2771var (
2772app = &nopAppendable{}
2773cfg = &config.ScrapeConfig{
2774JobName: "Prometheus",
2775ScrapeTimeout: model.Duration(5 * time.Second),
2776ScrapeInterval: model.Duration(5 * time.Second),
2777MetricsPath: "/metrics",
2778}
2779sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
2780t1 = &Target{
2781discoveredLabels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"),
2782}
2783proxyURL, _ = url.Parse("http://localhost:2128")
2784)
2785defer sp.stop()
2786sp.sync([]*Target{t1})
2787
2788steps := []struct {
2789keep bool
2790newConfig *config.ScrapeConfig
2791}{
2792{
2793keep: true,
2794newConfig: &config.ScrapeConfig{
2795JobName: "Prometheus",
2796ScrapeInterval: model.Duration(5 * time.Second),
2797ScrapeTimeout: model.Duration(5 * time.Second),
2798MetricsPath: "/metrics",
2799},
2800},
2801{
2802keep: false,
2803newConfig: &config.ScrapeConfig{
2804JobName: "Prometheus",
2805ScrapeInterval: model.Duration(5 * time.Second),
2806ScrapeTimeout: model.Duration(15 * time.Second),
2807MetricsPath: "/metrics2",
2808},
2809},
2810{
2811keep: true,
2812newConfig: &config.ScrapeConfig{
2813JobName: "Prometheus",
2814SampleLimit: 400,
2815ScrapeInterval: model.Duration(5 * time.Second),
2816ScrapeTimeout: model.Duration(15 * time.Second),
2817MetricsPath: "/metrics2",
2818},
2819},
2820{
2821keep: false,
2822newConfig: &config.ScrapeConfig{
2823JobName: "Prometheus",
2824HonorTimestamps: true,
2825SampleLimit: 400,
2826ScrapeInterval: model.Duration(5 * time.Second),
2827ScrapeTimeout: model.Duration(15 * time.Second),
2828MetricsPath: "/metrics2",
2829},
2830},
2831{
2832keep: true,
2833newConfig: &config.ScrapeConfig{
2834JobName: "Prometheus",
2835HonorTimestamps: true,
2836SampleLimit: 400,
2837HTTPClientConfig: config_util.HTTPClientConfig{
2838ProxyConfig: config_util.ProxyConfig{ProxyURL: config_util.URL{URL: proxyURL}},
2839},
2840ScrapeInterval: model.Duration(5 * time.Second),
2841ScrapeTimeout: model.Duration(15 * time.Second),
2842MetricsPath: "/metrics2",
2843},
2844},
2845{
2846keep: false,
2847newConfig: &config.ScrapeConfig{
2848JobName: "Prometheus",
2849HonorTimestamps: true,
2850HonorLabels: true,
2851SampleLimit: 400,
2852ScrapeInterval: model.Duration(5 * time.Second),
2853ScrapeTimeout: model.Duration(15 * time.Second),
2854MetricsPath: "/metrics2",
2855},
2856},
2857{
2858keep: false,
2859newConfig: &config.ScrapeConfig{
2860JobName: "Prometheus",
2861ScrapeInterval: model.Duration(5 * time.Second),
2862ScrapeTimeout: model.Duration(15 * time.Second),
2863MetricsPath: "/metrics",
2864LabelLimit: 1,
2865},
2866},
2867{
2868keep: false,
2869newConfig: &config.ScrapeConfig{
2870JobName: "Prometheus",
2871ScrapeInterval: model.Duration(5 * time.Second),
2872ScrapeTimeout: model.Duration(15 * time.Second),
2873MetricsPath: "/metrics",
2874LabelLimit: 15,
2875},
2876},
2877{
2878keep: false,
2879newConfig: &config.ScrapeConfig{
2880JobName: "Prometheus",
2881ScrapeInterval: model.Duration(5 * time.Second),
2882ScrapeTimeout: model.Duration(15 * time.Second),
2883MetricsPath: "/metrics",
2884LabelLimit: 15,
2885LabelNameLengthLimit: 5,
2886},
2887},
2888{
2889keep: false,
2890newConfig: &config.ScrapeConfig{
2891JobName: "Prometheus",
2892ScrapeInterval: model.Duration(5 * time.Second),
2893ScrapeTimeout: model.Duration(15 * time.Second),
2894MetricsPath: "/metrics",
2895LabelLimit: 15,
2896LabelNameLengthLimit: 5,
2897LabelValueLengthLimit: 7,
2898},
2899},
2900}
2901
2902cacheAddr := func(sp *scrapePool) map[uint64]string {
2903r := make(map[uint64]string)
2904for fp, l := range sp.loops {
2905r[fp] = fmt.Sprintf("%p", l.getCache())
2906}
2907return r
2908}
2909
2910for i, s := range steps {
2911initCacheAddr := cacheAddr(sp)
2912sp.reload(s.newConfig)
2913for fp, newCacheAddr := range cacheAddr(sp) {
2914if s.keep {
2915require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are not the same", i)
2916} else {
2917require.NotEqual(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are the same", i)
2918}
2919}
2920initCacheAddr = cacheAddr(sp)
2921sp.reload(s.newConfig)
2922for fp, newCacheAddr := range cacheAddr(sp) {
2923require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: reloading the exact config invalidates the cache", i)
2924}
2925}
2926}
2927
2928func TestScrapeAddFast(t *testing.T) {
2929s := teststorage.New(t)
2930defer s.Close()
2931
2932ctx, cancel := context.WithCancel(context.Background())
2933sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0)
2934defer cancel()
2935
2936slApp := sl.appender(ctx)
2937_, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{})
2938require.NoError(t, err)
2939require.NoError(t, slApp.Commit())
2940
2941// Poison the cache. There is just one entry, and one series in the
2942// storage. Changing the ref will create a 'not found' error.
2943for _, v := range sl.getCache().series {
2944v.ref++
2945}
2946
2947slApp = sl.appender(ctx)
2948_, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second))
2949require.NoError(t, err)
2950require.NoError(t, slApp.Commit())
2951}
2952
2953func TestReuseCacheRace(t *testing.T) {
2954var (
2955app = &nopAppendable{}
2956cfg = &config.ScrapeConfig{
2957JobName: "Prometheus",
2958ScrapeTimeout: model.Duration(5 * time.Second),
2959ScrapeInterval: model.Duration(5 * time.Second),
2960MetricsPath: "/metrics",
2961}
2962buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
2963sp, _ = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t))
2964t1 = &Target{
2965discoveredLabels: labels.FromStrings("labelNew", "nameNew"),
2966}
2967)
2968defer sp.stop()
2969sp.sync([]*Target{t1})
2970
2971start := time.Now()
2972for i := uint(1); i > 0; i++ {
2973if time.Since(start) > 5*time.Second {
2974break
2975}
2976sp.reload(&config.ScrapeConfig{
2977JobName: "Prometheus",
2978ScrapeTimeout: model.Duration(1 * time.Millisecond),
2979ScrapeInterval: model.Duration(1 * time.Millisecond),
2980MetricsPath: "/metrics",
2981SampleLimit: i,
2982})
2983}
2984}
2985
2986func TestCheckAddError(t *testing.T) {
2987var appErrs appendErrors
2988sl := scrapeLoop{l: log.NewNopLogger(), metrics: newTestScrapeMetrics(t)}
2989sl.checkAddError(nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
2990require.Equal(t, 1, appErrs.numOutOfOrder)
2991}
2992
2993func TestScrapeReportSingleAppender(t *testing.T) {
2994s := teststorage.New(t)
2995defer s.Close()
2996
2997var (
2998signal = make(chan struct{}, 1)
2999scraper = &testScraper{}
3000)
3001
3002ctx, cancel := context.WithCancel(context.Background())
3003sl := newBasicScrapeLoop(t, ctx, scraper, s.Appender, 10*time.Millisecond)
3004
3005numScrapes := 0
3006
3007scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
3008numScrapes++
3009if numScrapes%4 == 0 {
3010return fmt.Errorf("scrape failed")
3011}
3012w.Write([]byte("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n"))
3013return nil
3014}
3015
3016go func() {
3017sl.run(nil)
3018signal <- struct{}{}
3019}()
3020
3021start := time.Now()
3022for time.Since(start) < 3*time.Second {
3023q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
3024require.NoError(t, err)
3025series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+"))
3026
3027c := 0
3028for series.Next() {
3029i := series.At().Iterator(nil)
3030for i.Next() != chunkenc.ValNone {
3031c++
3032}
3033}
3034
3035require.Equal(t, 0, c%9, "Appended samples not as expected: %d", c)
3036q.Close()
3037}
3038cancel()
3039
3040select {
3041case <-signal:
3042case <-time.After(5 * time.Second):
3043require.FailNow(t, "Scrape wasn't stopped.")
3044}
3045}
3046
3047func TestScrapeReportLimit(t *testing.T) {
3048s := teststorage.New(t)
3049defer s.Close()
3050
3051cfg := &config.ScrapeConfig{
3052JobName: "test",
3053SampleLimit: 5,
3054Scheme: "http",
3055ScrapeInterval: model.Duration(100 * time.Millisecond),
3056ScrapeTimeout: model.Duration(100 * time.Millisecond),
3057}
3058
3059var (
3060scrapes int
3061scrapedTwice = make(chan bool)
3062)
3063
3064ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3065fmt.Fprint(w, "metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n")
3066scrapes++
3067if scrapes == 2 {
3068close(scrapedTwice)
3069}
3070}))
3071defer ts.Close()
3072
3073sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
3074require.NoError(t, err)
3075defer sp.stop()
3076
3077testURL, err := url.Parse(ts.URL)
3078require.NoError(t, err)
3079sp.Sync([]*targetgroup.Group{
3080{
3081Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(testURL.Host)}},
3082},
3083})
3084
3085select {
3086case <-time.After(5 * time.Second):
3087t.Fatalf("target was not scraped twice")
3088case <-scrapedTwice:
3089// If the target has been scraped twice, report samples from the first
3090// scrape have been inserted in the database.
3091}
3092
3093ctx, cancel := context.WithCancel(context.Background())
3094defer cancel()
3095q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
3096require.NoError(t, err)
3097defer q.Close()
3098series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "up"))
3099
3100var found bool
3101for series.Next() {
3102i := series.At().Iterator(nil)
3103for i.Next() == chunkenc.ValFloat {
3104_, v := i.At()
3105require.Equal(t, 1.0, v)
3106found = true
3107}
3108}
3109
3110require.True(t, found)
3111}
3112
3113func TestScrapeLoopLabelLimit(t *testing.T) {
3114tests := []struct {
3115title string
3116scrapeLabels string
3117discoveryLabels []string
3118labelLimits labelLimits
3119expectErr bool
3120}{
3121{
3122title: "Valid number of labels",
3123scrapeLabels: `metric{l1="1", l2="2"} 0`,
3124discoveryLabels: nil,
3125labelLimits: labelLimits{labelLimit: 5},
3126expectErr: false,
3127}, {
3128title: "Too many labels",
3129scrapeLabels: `metric{l1="1", l2="2", l3="3", l4="4", l5="5", l6="6"} 0`,
3130discoveryLabels: nil,
3131labelLimits: labelLimits{labelLimit: 5},
3132expectErr: true,
3133}, {
3134title: "Too many labels including discovery labels",
3135scrapeLabels: `metric{l1="1", l2="2", l3="3", l4="4"} 0`,
3136discoveryLabels: []string{"l5", "5", "l6", "6"},
3137labelLimits: labelLimits{labelLimit: 5},
3138expectErr: true,
3139}, {
3140title: "Valid labels name length",
3141scrapeLabels: `metric{l1="1", l2="2"} 0`,
3142discoveryLabels: nil,
3143labelLimits: labelLimits{labelNameLengthLimit: 10},
3144expectErr: false,
3145}, {
3146title: "Label name too long",
3147scrapeLabels: `metric{label_name_too_long="0"} 0`,
3148discoveryLabels: nil,
3149labelLimits: labelLimits{labelNameLengthLimit: 10},
3150expectErr: true,
3151}, {
3152title: "Discovery label name too long",
3153scrapeLabels: `metric{l1="1", l2="2"} 0`,
3154discoveryLabels: []string{"label_name_too_long", "0"},
3155labelLimits: labelLimits{labelNameLengthLimit: 10},
3156expectErr: true,
3157}, {
3158title: "Valid labels value length",
3159scrapeLabels: `metric{l1="1", l2="2"} 0`,
3160discoveryLabels: nil,
3161labelLimits: labelLimits{labelValueLengthLimit: 10},
3162expectErr: false,
3163}, {
3164title: "Label value too long",
3165scrapeLabels: `metric{l1="label_value_too_long"} 0`,
3166discoveryLabels: nil,
3167labelLimits: labelLimits{labelValueLengthLimit: 10},
3168expectErr: true,
3169}, {
3170title: "Discovery label value too long",
3171scrapeLabels: `metric{l1="1", l2="2"} 0`,
3172discoveryLabels: []string{"l1", "label_value_too_long"},
3173labelLimits: labelLimits{labelValueLengthLimit: 10},
3174expectErr: true,
3175},
3176}
3177
3178for _, test := range tests {
3179app := &collectResultAppender{}
3180
3181discoveryLabels := &Target{
3182labels: labels.FromStrings(test.discoveryLabels...),
3183}
3184
3185sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
3186sl.sampleMutator = func(l labels.Labels) labels.Labels {
3187return mutateSampleLabels(l, discoveryLabels, false, nil)
3188}
3189sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
3190return mutateReportSampleLabels(l, discoveryLabels)
3191}
3192sl.labelLimits = &test.labelLimits
3193
3194slApp := sl.appender(context.Background())
3195_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", time.Now())
3196
3197t.Logf("Test:%s", test.title)
3198if test.expectErr {
3199require.Error(t, err)
3200} else {
3201require.NoError(t, err)
3202require.NoError(t, slApp.Commit())
3203}
3204}
3205}
3206
3207func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
3208interval, _ := model.ParseDuration("2s")
3209timeout, _ := model.ParseDuration("500ms")
3210config := &config.ScrapeConfig{
3211ScrapeInterval: interval,
3212ScrapeTimeout: timeout,
3213RelabelConfigs: []*relabel.Config{
3214{
3215SourceLabels: model.LabelNames{model.ScrapeIntervalLabel},
3216Regex: relabel.MustNewRegexp("2s"),
3217Replacement: "3s",
3218TargetLabel: model.ScrapeIntervalLabel,
3219Action: relabel.Replace,
3220},
3221{
3222SourceLabels: model.LabelNames{model.ScrapeTimeoutLabel},
3223Regex: relabel.MustNewRegexp("500ms"),
3224Replacement: "750ms",
3225TargetLabel: model.ScrapeTimeoutLabel,
3226Action: relabel.Replace,
3227},
3228},
3229}
3230sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
3231tgts := []*targetgroup.Group{
3232{
3233Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},
3234},
3235}
3236
3237sp.Sync(tgts)
3238defer sp.stop()
3239
3240require.Equal(t, "3s", sp.ActiveTargets()[0].labels.Get(model.ScrapeIntervalLabel))
3241require.Equal(t, "750ms", sp.ActiveTargets()[0].labels.Get(model.ScrapeTimeoutLabel))
3242}
3243
3244// Testing whether we can remove trailing .0 from histogram 'le' and summary 'quantile' labels.
3245func TestLeQuantileReLabel(t *testing.T) {
3246simpleStorage := teststorage.New(t)
3247defer simpleStorage.Close()
3248
3249config := &config.ScrapeConfig{
3250JobName: "test",
3251MetricRelabelConfigs: []*relabel.Config{
3252{
3253SourceLabels: model.LabelNames{"le", "__name__"},
3254Regex: relabel.MustNewRegexp("(\\d+)\\.0+;.*_bucket"),
3255Replacement: relabel.DefaultRelabelConfig.Replacement,
3256Separator: relabel.DefaultRelabelConfig.Separator,
3257TargetLabel: "le",
3258Action: relabel.Replace,
3259},
3260{
3261SourceLabels: model.LabelNames{"quantile"},
3262Regex: relabel.MustNewRegexp("(\\d+)\\.0+"),
3263Replacement: relabel.DefaultRelabelConfig.Replacement,
3264Separator: relabel.DefaultRelabelConfig.Separator,
3265TargetLabel: "quantile",
3266Action: relabel.Replace,
3267},
3268},
3269SampleLimit: 100,
3270Scheme: "http",
3271ScrapeInterval: model.Duration(100 * time.Millisecond),
3272ScrapeTimeout: model.Duration(100 * time.Millisecond),
3273}
3274
3275metricsText := `
3276# HELP test_histogram This is a histogram with default buckets
3277# TYPE test_histogram histogram
3278test_histogram_bucket{address="0.0.0.0",port="5001",le="0.005"} 0
3279test_histogram_bucket{address="0.0.0.0",port="5001",le="0.01"} 0
3280test_histogram_bucket{address="0.0.0.0",port="5001",le="0.025"} 0
3281test_histogram_bucket{address="0.0.0.0",port="5001",le="0.05"} 0
3282test_histogram_bucket{address="0.0.0.0",port="5001",le="0.1"} 0
3283test_histogram_bucket{address="0.0.0.0",port="5001",le="0.25"} 0
3284test_histogram_bucket{address="0.0.0.0",port="5001",le="0.5"} 0
3285test_histogram_bucket{address="0.0.0.0",port="5001",le="1.0"} 0
3286test_histogram_bucket{address="0.0.0.0",port="5001",le="2.5"} 0
3287test_histogram_bucket{address="0.0.0.0",port="5001",le="5.0"} 0
3288test_histogram_bucket{address="0.0.0.0",port="5001",le="10.0"} 0
3289test_histogram_bucket{address="0.0.0.0",port="5001",le="+Inf"} 0
3290test_histogram_sum{address="0.0.0.0",port="5001"} 0
3291test_histogram_count{address="0.0.0.0",port="5001"} 0
3292# HELP test_summary Number of inflight requests sampled at a regular interval. Quantile buckets keep track of inflight requests over the last 60s.
3293# TYPE test_summary summary
3294test_summary{quantile="0.5"} 0
3295test_summary{quantile="0.9"} 0
3296test_summary{quantile="0.95"} 0
3297test_summary{quantile="0.99"} 0
3298test_summary{quantile="1.0"} 1
3299test_summary_sum 1
3300test_summary_count 199
3301`
3302
3303// The expected "le" values do not have the trailing ".0".
3304expectedLeValues := []string{"0.005", "0.01", "0.025", "0.05", "0.1", "0.25", "0.5", "1", "2.5", "5", "10", "+Inf"}
3305
3306// The expected "quantile" values do not have the trailing ".0".
3307expectedQuantileValues := []string{"0.5", "0.9", "0.95", "0.99", "1"}
3308
3309scrapeCount := 0
3310scraped := make(chan bool)
3311
3312ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3313fmt.Fprint(w, metricsText)
3314scrapeCount++
3315if scrapeCount > 2 {
3316close(scraped)
3317}
3318}))
3319defer ts.Close()
3320
3321sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
3322require.NoError(t, err)
3323defer sp.stop()
3324
3325testURL, err := url.Parse(ts.URL)
3326require.NoError(t, err)
3327sp.Sync([]*targetgroup.Group{
3328{
3329Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(testURL.Host)}},
3330},
3331})
3332require.Len(t, sp.ActiveTargets(), 1)
3333
3334select {
3335case <-time.After(5 * time.Second):
3336t.Fatalf("target was not scraped")
3337case <-scraped:
3338}
3339
3340ctx, cancel := context.WithCancel(context.Background())
3341defer cancel()
3342q, err := simpleStorage.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
3343require.NoError(t, err)
3344defer q.Close()
3345
3346checkValues := func(labelName string, expectedValues []string, series storage.SeriesSet) {
3347foundLeValues := map[string]bool{}
3348
3349for series.Next() {
3350s := series.At()
3351v := s.Labels().Get(labelName)
3352require.NotContains(t, foundLeValues, v, "duplicate label value found")
3353foundLeValues[v] = true
3354}
3355
3356require.Equal(t, len(expectedValues), len(foundLeValues), "number of label values not as expected")
3357for _, v := range expectedValues {
3358require.Contains(t, foundLeValues, v, "label value not found")
3359}
3360}
3361
3362series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test_histogram_bucket"))
3363checkValues("le", expectedLeValues, series)
3364
3365series = q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test_summary"))
3366checkValues("quantile", expectedQuantileValues, series)
3367}
3368
3369func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *testing.T) {
3370appender := &collectResultAppender{}
3371var (
3372signal = make(chan struct{}, 1)
3373scraper = &testScraper{}
3374app = func(ctx context.Context) storage.Appender { return appender }
3375)
3376
3377ctx, cancel := context.WithCancel(context.Background())
3378sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
3379sl.trackTimestampsStaleness = true
3380// Succeed once, several failures, then stop.
3381numScrapes := 0
3382
3383scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
3384numScrapes++
3385
3386switch numScrapes {
3387case 1:
3388w.Write([]byte(fmt.Sprintf("metric_a 42 %d\n", time.Now().UnixNano()/int64(time.Millisecond))))
3389return nil
3390case 5:
3391cancel()
3392}
3393return errors.New("scrape failed")
3394}
3395
3396go func() {
3397sl.run(nil)
3398signal <- struct{}{}
3399}()
3400
3401select {
3402case <-signal:
3403case <-time.After(5 * time.Second):
3404t.Fatalf("Scrape wasn't stopped.")
3405}
3406
3407// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
3408// each scrape successful or not.
3409require.Len(t, appender.resultFloats, 27, "Appended samples not as expected:\n%s", appender)
3410require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
3411require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
3412"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
3413}
3414
3415func TestScrapeLoopCompression(t *testing.T) {
3416simpleStorage := teststorage.New(t)
3417defer simpleStorage.Close()
3418
3419metricsText := makeTestMetrics(10)
3420
3421for _, tc := range []struct {
3422enableCompression bool
3423acceptEncoding string
3424}{
3425{
3426enableCompression: true,
3427acceptEncoding: "gzip",
3428},
3429{
3430enableCompression: false,
3431acceptEncoding: "identity",
3432},
3433} {
3434t.Run(fmt.Sprintf("compression=%v,acceptEncoding=%s", tc.enableCompression, tc.acceptEncoding), func(t *testing.T) {
3435scraped := make(chan bool)
3436
3437ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3438require.Equal(t, tc.acceptEncoding, r.Header.Get("Accept-Encoding"), "invalid value of the Accept-Encoding header")
3439fmt.Fprint(w, metricsText)
3440close(scraped)
3441}))
3442defer ts.Close()
3443
3444config := &config.ScrapeConfig{
3445JobName: "test",
3446SampleLimit: 100,
3447Scheme: "http",
3448ScrapeInterval: model.Duration(100 * time.Millisecond),
3449ScrapeTimeout: model.Duration(100 * time.Millisecond),
3450EnableCompression: tc.enableCompression,
3451}
3452
3453sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
3454require.NoError(t, err)
3455defer sp.stop()
3456
3457testURL, err := url.Parse(ts.URL)
3458require.NoError(t, err)
3459sp.Sync([]*targetgroup.Group{
3460{
3461Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(testURL.Host)}},
3462},
3463})
3464require.Len(t, sp.ActiveTargets(), 1)
3465
3466select {
3467case <-time.After(5 * time.Second):
3468t.Fatalf("target was not scraped")
3469case <-scraped:
3470}
3471})
3472}
3473}
3474
3475func TestPickSchema(t *testing.T) {
3476tcs := []struct {
3477factor float64
3478schema int32
3479}{
3480{
3481factor: 65536,
3482schema: -4,
3483},
3484{
3485factor: 256,
3486schema: -3,
3487},
3488{
3489factor: 16,
3490schema: -2,
3491},
3492{
3493factor: 4,
3494schema: -1,
3495},
3496{
3497factor: 2,
3498schema: 0,
3499},
3500{
3501factor: 1.4,
3502schema: 1,
3503},
3504{
3505factor: 1.1,
3506schema: 2,
3507},
3508{
3509factor: 1.09,
3510schema: 3,
3511},
3512{
3513factor: 1.04,
3514schema: 4,
3515},
3516{
3517factor: 1.02,
3518schema: 5,
3519},
3520{
3521factor: 1.01,
3522schema: 6,
3523},
3524{
3525factor: 1.005,
3526schema: 7,
3527},
3528{
3529factor: 1.002,
3530schema: 8,
3531},
3532// The default value of native_histogram_min_bucket_factor
3533{
3534factor: 0,
3535schema: 8,
3536},
3537}
3538
3539for _, tc := range tcs {
3540schema := pickSchema(tc.factor)
3541require.Equal(t, tc.schema, schema)
3542}
3543}
3544
3545func BenchmarkTargetScraperGzip(b *testing.B) {
3546scenarios := []struct {
3547metricsCount int
3548body []byte
3549}{
3550{metricsCount: 1},
3551{metricsCount: 100},
3552{metricsCount: 1000},
3553{metricsCount: 10000},
3554{metricsCount: 100000},
3555}
3556
3557for i := 0; i < len(scenarios); i++ {
3558var buf bytes.Buffer
3559var name string
3560gw := gzip.NewWriter(&buf)
3561for j := 0; j < scenarios[i].metricsCount; j++ {
3562name = fmt.Sprintf("go_memstats_alloc_bytes_total_%d", j)
3563fmt.Fprintf(gw, "# HELP %s Total number of bytes allocated, even if freed.\n", name)
3564fmt.Fprintf(gw, "# TYPE %s counter\n", name)
3565fmt.Fprintf(gw, "%s %d\n", name, i*j)
3566}
3567gw.Close()
3568scenarios[i].body = buf.Bytes()
3569}
3570
3571handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3572w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
3573w.Header().Set("Content-Encoding", "gzip")
3574for _, scenario := range scenarios {
3575if strconv.Itoa(scenario.metricsCount) == r.URL.Query()["count"][0] {
3576w.Write(scenario.body)
3577return
3578}
3579}
3580w.WriteHeader(http.StatusBadRequest)
3581})
3582
3583server := httptest.NewServer(handler)
3584defer server.Close()
3585
3586serverURL, err := url.Parse(server.URL)
3587if err != nil {
3588panic(err)
3589}
3590
3591client, err := config_util.NewClientFromConfig(config_util.DefaultHTTPClientConfig, "test_job")
3592if err != nil {
3593panic(err)
3594}
3595
3596for _, scenario := range scenarios {
3597b.Run(fmt.Sprintf("metrics=%d", scenario.metricsCount), func(b *testing.B) {
3598ts := &targetScraper{
3599Target: &Target{
3600labels: labels.FromStrings(
3601model.SchemeLabel, serverURL.Scheme,
3602model.AddressLabel, serverURL.Host,
3603),
3604params: url.Values{"count": []string{strconv.Itoa(scenario.metricsCount)}},
3605},
3606client: client,
3607timeout: time.Second,
3608}
3609b.ResetTimer()
3610for i := 0; i < b.N; i++ {
3611_, err = ts.scrape(context.Background())
3612require.NoError(b, err)
3613}
3614})
3615}
3616}
3617
3618// When a scrape contains multiple instances for the same time series we should increment
3619// prometheus_target_scrapes_sample_duplicate_timestamp_total metric.
3620func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
3621ctx, sl := simpleTestScrapeLoop(t)
3622
3623slApp := sl.appender(ctx)
3624total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "", time.Time{})
3625require.NoError(t, err)
3626require.NoError(t, slApp.Commit())
3627require.Equal(t, 3, total)
3628require.Equal(t, 3, added)
3629require.Equal(t, 1, seriesAdded)
3630
3631slApp = sl.appender(ctx)
3632total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "", time.Time{})
3633require.NoError(t, err)
3634require.NoError(t, slApp.Commit())
3635require.Equal(t, 3, total)
3636require.Equal(t, 3, added)
3637require.Equal(t, 0, seriesAdded)
3638
3639metric := dto.Metric{}
3640err = sl.metrics.targetScrapeSampleDuplicate.Write(&metric)
3641require.NoError(t, err)
3642value := metric.GetCounter().GetValue()
3643require.Equal(t, 4.0, value)
3644}
3645
3646// This tests running a full scrape loop and checking that the scrape option
3647// `native_histogram_min_bucket_factor` is used correctly.
3648func TestNativeHistogramMaxSchemaSet(t *testing.T) {
3649testcases := map[string]struct {
3650minBucketFactor string
3651expectedSchema int32
3652}{
3653"min factor not specified": {
3654minBucketFactor: "",
3655expectedSchema: 3, // Factor 1.09.
3656},
3657"min factor 1": {
3658minBucketFactor: "native_histogram_min_bucket_factor: 1",
3659expectedSchema: 3, // Factor 1.09.
3660},
3661"min factor 2": {
3662minBucketFactor: "native_histogram_min_bucket_factor: 2",
3663expectedSchema: 0, // Factor 2.00.
3664},
3665}
3666for name, tc := range testcases {
3667t.Run(name, func(t *testing.T) {
3668testNativeHistogramMaxSchemaSet(t, tc.minBucketFactor, tc.expectedSchema)
3669})
3670}
3671}
3672
3673func testNativeHistogramMaxSchemaSet(t *testing.T, minBucketFactor string, expectedSchema int32) {
3674// Create a ProtoBuf message to serve as a Prometheus metric.
3675nativeHistogram := prometheus.NewHistogram(
3676prometheus.HistogramOpts{
3677Namespace: "testing",
3678Name: "example_native_histogram",
3679Help: "This is used for testing",
3680NativeHistogramBucketFactor: 1.1,
3681NativeHistogramMaxBucketNumber: 100,
3682},
3683)
3684registry := prometheus.NewRegistry()
3685registry.Register(nativeHistogram)
3686nativeHistogram.Observe(1.0)
3687nativeHistogram.Observe(1.0)
3688nativeHistogram.Observe(1.0)
3689nativeHistogram.Observe(10.0) // in different bucket since > 1*1.1.
3690nativeHistogram.Observe(10.0)
3691
3692gathered, err := registry.Gather()
3693require.NoError(t, err)
3694require.NotEmpty(t, gathered)
3695
3696histogramMetricFamily := gathered[0]
3697buffer := protoMarshalDelimited(t, histogramMetricFamily)
3698
3699// Create a HTTP server to serve /metrics via ProtoBuf
3700metricsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3701w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
3702w.Write(buffer)
3703}))
3704defer metricsServer.Close()
3705
3706// Create a scrape loop with the HTTP server as the target.
3707configStr := fmt.Sprintf(`
3708global:
3709scrape_interval: 1s
3710scrape_timeout: 1s
3711scrape_configs:
3712- job_name: test
3713%s
3714static_configs:
3715- targets: [%s]
3716`, minBucketFactor, strings.ReplaceAll(metricsServer.URL, "http://", ""))
3717
3718s := teststorage.New(t)
3719defer s.Close()
3720s.DB.EnableNativeHistograms()
3721reg := prometheus.NewRegistry()
3722
3723mng, err := NewManager(&Options{EnableNativeHistogramsIngestion: true}, nil, s, reg)
3724require.NoError(t, err)
3725cfg, err := config.Load(configStr, false, log.NewNopLogger())
3726require.NoError(t, err)
3727mng.ApplyConfig(cfg)
3728tsets := make(chan map[string][]*targetgroup.Group)
3729go func() {
3730err = mng.Run(tsets)
3731require.NoError(t, err)
3732}()
3733defer mng.Stop()
3734
3735// Get the static targets and apply them to the scrape manager.
3736require.Len(t, cfg.ScrapeConfigs, 1)
3737scrapeCfg := cfg.ScrapeConfigs[0]
3738require.Len(t, scrapeCfg.ServiceDiscoveryConfigs, 1)
3739staticDiscovery, ok := scrapeCfg.ServiceDiscoveryConfigs[0].(discovery.StaticConfig)
3740require.True(t, ok)
3741require.Len(t, staticDiscovery, 1)
3742tsets <- map[string][]*targetgroup.Group{"test": staticDiscovery}
3743
3744// Wait for the scrape loop to scrape the target.
3745require.Eventually(t, func() bool {
3746q, err := s.Querier(0, math.MaxInt64)
3747require.NoError(t, err)
3748seriesS := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "testing_example_native_histogram"))
3749countSeries := 0
3750for seriesS.Next() {
3751countSeries++
3752}
3753return countSeries > 0
3754}, 15*time.Second, 100*time.Millisecond)
3755
3756// Check that native histogram schema is as expected.
3757q, err := s.Querier(0, math.MaxInt64)
3758require.NoError(t, err)
3759seriesS := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "testing_example_native_histogram"))
3760histogramSamples := []*histogram.Histogram{}
3761for seriesS.Next() {
3762series := seriesS.At()
3763it := series.Iterator(nil)
3764for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
3765if vt != chunkenc.ValHistogram {
3766// don't care about other samples
3767continue
3768}
3769_, h := it.AtHistogram(nil)
3770histogramSamples = append(histogramSamples, h)
3771}
3772}
3773require.NoError(t, seriesS.Err())
3774require.NotEmpty(t, histogramSamples)
3775for _, h := range histogramSamples {
3776require.Equal(t, expectedSchema, h.Schema)
3777}
3778}
3779