prometheus

Форк
0
/
scrape_test.go 
3778 строк · 108.6 Кб
1
// Copyright 2016 The Prometheus Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package scrape
15

16
import (
17
	"bytes"
18
	"compress/gzip"
19
	"context"
20
	"encoding/binary"
21
	"errors"
22
	"fmt"
23
	"io"
24
	"math"
25
	"net/http"
26
	"net/http/httptest"
27
	"net/url"
28
	"strconv"
29
	"strings"
30
	"sync"
31
	"testing"
32
	"time"
33

34
	"github.com/go-kit/log"
35
	"github.com/gogo/protobuf/proto"
36
	"github.com/google/go-cmp/cmp"
37
	"github.com/prometheus/client_golang/prometheus"
38
	dto "github.com/prometheus/client_model/go"
39
	config_util "github.com/prometheus/common/config"
40
	"github.com/prometheus/common/model"
41
	"github.com/stretchr/testify/require"
42

43
	"github.com/prometheus/prometheus/config"
44
	"github.com/prometheus/prometheus/discovery"
45
	"github.com/prometheus/prometheus/discovery/targetgroup"
46
	"github.com/prometheus/prometheus/model/exemplar"
47
	"github.com/prometheus/prometheus/model/histogram"
48
	"github.com/prometheus/prometheus/model/labels"
49
	"github.com/prometheus/prometheus/model/relabel"
50
	"github.com/prometheus/prometheus/model/textparse"
51
	"github.com/prometheus/prometheus/model/timestamp"
52
	"github.com/prometheus/prometheus/model/value"
53
	"github.com/prometheus/prometheus/storage"
54
	"github.com/prometheus/prometheus/tsdb/chunkenc"
55
	"github.com/prometheus/prometheus/util/pool"
56
	"github.com/prometheus/prometheus/util/teststorage"
57
	"github.com/prometheus/prometheus/util/testutil"
58
)
59

60
func TestMain(m *testing.M) {
61
	testutil.TolerantVerifyLeak(m)
62
}
63

64
func newTestScrapeMetrics(t testing.TB) *scrapeMetrics {
65
	reg := prometheus.NewRegistry()
66
	metrics, err := newScrapeMetrics(reg)
67
	require.NoError(t, err)
68
	return metrics
69
}
70

71
func TestNewScrapePool(t *testing.T) {
72
	var (
73
		app   = &nopAppendable{}
74
		cfg   = &config.ScrapeConfig{}
75
		sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
76
	)
77

78
	a, ok := sp.appendable.(*nopAppendable)
79
	require.True(t, ok, "Failure to append.")
80
	require.Equal(t, app, a, "Wrong sample appender.")
81
	require.Equal(t, cfg, sp.config, "Wrong scrape config.")
82
	require.NotNil(t, sp.newLoop, "newLoop function not initialized.")
83
}
84

85
func TestDroppedTargetsList(t *testing.T) {
86
	var (
87
		app = &nopAppendable{}
88
		cfg = &config.ScrapeConfig{
89
			JobName:        "dropMe",
90
			ScrapeInterval: model.Duration(1),
91
			RelabelConfigs: []*relabel.Config{
92
				{
93
					Action:       relabel.Drop,
94
					Regex:        relabel.MustNewRegexp("dropMe"),
95
					SourceLabels: model.LabelNames{"job"},
96
				},
97
			},
98
		}
99
		tgs = []*targetgroup.Group{
100
			{
101
				Targets: []model.LabelSet{
102
					{model.AddressLabel: "127.0.0.1:9090"},
103
					{model.AddressLabel: "127.0.0.1:9091"},
104
				},
105
			},
106
		}
107
		sp, _                  = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
108
		expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
109
		expectedLength         = 2
110
	)
111
	sp.Sync(tgs)
112
	sp.Sync(tgs)
113
	require.Len(t, sp.droppedTargets, expectedLength)
114
	require.Equal(t, expectedLength, sp.droppedTargetsCount)
115
	require.Equal(t, expectedLabelSetString, sp.droppedTargets[0].DiscoveredLabels().String())
116

117
	// Check that count is still correct when we don't retain all dropped targets.
118
	sp.config.KeepDroppedTargets = 1
119
	sp.Sync(tgs)
120
	require.Len(t, sp.droppedTargets, 1)
121
	require.Equal(t, expectedLength, sp.droppedTargetsCount)
122
}
123

124
// TestDiscoveredLabelsUpdate checks that DiscoveredLabels are updated
125
// even when new labels don't affect the target `hash`.
126
func TestDiscoveredLabelsUpdate(t *testing.T) {
127
	sp := &scrapePool{
128
		metrics: newTestScrapeMetrics(t),
129
	}
130

131
	// These are used when syncing so need this to avoid a panic.
132
	sp.config = &config.ScrapeConfig{
133
		ScrapeInterval: model.Duration(1),
134
		ScrapeTimeout:  model.Duration(1),
135
	}
136
	sp.activeTargets = make(map[uint64]*Target)
137
	t1 := &Target{
138
		discoveredLabels: labels.FromStrings("label", "name"),
139
	}
140
	sp.activeTargets[t1.hash()] = t1
141

142
	t2 := &Target{
143
		discoveredLabels: labels.FromStrings("labelNew", "nameNew"),
144
	}
145
	sp.sync([]*Target{t2})
146

147
	require.Equal(t, t2.DiscoveredLabels(), sp.activeTargets[t1.hash()].DiscoveredLabels())
148
}
149

150
type testLoop struct {
151
	startFunc    func(interval, timeout time.Duration, errc chan<- error)
152
	stopFunc     func()
153
	forcedErr    error
154
	forcedErrMtx sync.Mutex
155
	runOnce      bool
156
	interval     time.Duration
157
	timeout      time.Duration
158
}
159

160
func (l *testLoop) run(errc chan<- error) {
161
	if l.runOnce {
162
		panic("loop must be started only once")
163
	}
164
	l.runOnce = true
165
	l.startFunc(l.interval, l.timeout, errc)
166
}
167

168
func (l *testLoop) disableEndOfRunStalenessMarkers() {
169
}
170

171
func (l *testLoop) setForcedError(err error) {
172
	l.forcedErrMtx.Lock()
173
	defer l.forcedErrMtx.Unlock()
174
	l.forcedErr = err
175
}
176

177
func (l *testLoop) getForcedError() error {
178
	l.forcedErrMtx.Lock()
179
	defer l.forcedErrMtx.Unlock()
180
	return l.forcedErr
181
}
182

183
func (l *testLoop) stop() {
184
	l.stopFunc()
185
}
186

187
func (l *testLoop) getCache() *scrapeCache {
188
	return nil
189
}
190

191
func TestScrapePoolStop(t *testing.T) {
192
	sp := &scrapePool{
193
		activeTargets: map[uint64]*Target{},
194
		loops:         map[uint64]loop{},
195
		cancel:        func() {},
196
		client:        http.DefaultClient,
197
		metrics:       newTestScrapeMetrics(t),
198
	}
199
	var mtx sync.Mutex
200
	stopped := map[uint64]bool{}
201
	numTargets := 20
202

203
	// Stopping the scrape pool must call stop() on all scrape loops,
204
	// clean them and the respective targets up. It must wait until each loop's
205
	// stop function returned before returning itself.
206

207
	for i := 0; i < numTargets; i++ {
208
		t := &Target{
209
			labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)),
210
		}
211
		l := &testLoop{}
212
		d := time.Duration((i+1)*20) * time.Millisecond
213
		l.stopFunc = func() {
214
			time.Sleep(d)
215

216
			mtx.Lock()
217
			stopped[t.hash()] = true
218
			mtx.Unlock()
219
		}
220

221
		sp.activeTargets[t.hash()] = t
222
		sp.loops[t.hash()] = l
223
	}
224

225
	done := make(chan struct{})
226
	stopTime := time.Now()
227

228
	go func() {
229
		sp.stop()
230
		close(done)
231
	}()
232

233
	select {
234
	case <-time.After(5 * time.Second):
235
		require.Fail(t, "scrapeLoop.stop() did not return as expected")
236
	case <-done:
237
		// This should have taken at least as long as the last target slept.
238
		require.GreaterOrEqual(t, time.Since(stopTime), time.Duration(numTargets*20)*time.Millisecond, "scrapeLoop.stop() exited before all targets stopped")
239
	}
240

241
	mtx.Lock()
242
	require.Len(t, stopped, numTargets, "Unexpected number of stopped loops")
243
	mtx.Unlock()
244

245
	require.Empty(t, sp.activeTargets, "Targets were not cleared on stopping: %d left", len(sp.activeTargets))
246
	require.Empty(t, sp.loops, "Loops were not cleared on stopping: %d left", len(sp.loops))
247
}
248

249
func TestScrapePoolReload(t *testing.T) {
250
	var mtx sync.Mutex
251
	numTargets := 20
252

253
	stopped := map[uint64]bool{}
254

255
	reloadCfg := &config.ScrapeConfig{
256
		ScrapeInterval: model.Duration(3 * time.Second),
257
		ScrapeTimeout:  model.Duration(2 * time.Second),
258
	}
259
	// On starting to run, new loops created on reload check whether their preceding
260
	// equivalents have been stopped.
261
	newLoop := func(opts scrapeLoopOptions) loop {
262
		l := &testLoop{interval: time.Duration(reloadCfg.ScrapeInterval), timeout: time.Duration(reloadCfg.ScrapeTimeout)}
263
		l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
264
			require.Equal(t, 3*time.Second, interval, "Unexpected scrape interval")
265
			require.Equal(t, 2*time.Second, timeout, "Unexpected scrape timeout")
266

267
			mtx.Lock()
268
			targetScraper := opts.scraper.(*targetScraper)
269
			require.True(t, stopped[targetScraper.hash()], "Scrape loop for %v not stopped yet", targetScraper)
270
			mtx.Unlock()
271
		}
272
		return l
273
	}
274

275
	sp := &scrapePool{
276
		appendable:    &nopAppendable{},
277
		activeTargets: map[uint64]*Target{},
278
		loops:         map[uint64]loop{},
279
		newLoop:       newLoop,
280
		logger:        nil,
281
		client:        http.DefaultClient,
282
		metrics:       newTestScrapeMetrics(t),
283
		symbolTable:   labels.NewSymbolTable(),
284
	}
285

286
	// Reloading a scrape pool with a new scrape configuration must stop all scrape
287
	// loops and start new ones. A new loop must not be started before the preceding
288
	// one terminated.
289

290
	for i := 0; i < numTargets; i++ {
291
		labels := labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i))
292
		t := &Target{
293
			labels:           labels,
294
			discoveredLabels: labels,
295
		}
296
		l := &testLoop{}
297
		d := time.Duration((i+1)*20) * time.Millisecond
298
		l.stopFunc = func() {
299
			time.Sleep(d)
300

301
			mtx.Lock()
302
			stopped[t.hash()] = true
303
			mtx.Unlock()
304
		}
305

306
		sp.activeTargets[t.hash()] = t
307
		sp.loops[t.hash()] = l
308
	}
309
	done := make(chan struct{})
310

311
	beforeTargets := map[uint64]*Target{}
312
	for h, t := range sp.activeTargets {
313
		beforeTargets[h] = t
314
	}
315

316
	reloadTime := time.Now()
317

318
	go func() {
319
		sp.reload(reloadCfg)
320
		close(done)
321
	}()
322

323
	select {
324
	case <-time.After(5 * time.Second):
325
		require.FailNow(t, "scrapeLoop.reload() did not return as expected")
326
	case <-done:
327
		// This should have taken at least as long as the last target slept.
328
		require.GreaterOrEqual(t, time.Since(reloadTime), time.Duration(numTargets*20)*time.Millisecond, "scrapeLoop.stop() exited before all targets stopped")
329
	}
330

331
	mtx.Lock()
332
	require.Len(t, stopped, numTargets, "Unexpected number of stopped loops")
333
	mtx.Unlock()
334

335
	require.Equal(t, sp.activeTargets, beforeTargets, "Reloading affected target states unexpectedly")
336
	require.Len(t, sp.loops, numTargets, "Unexpected number of stopped loops after reload")
337
}
338

339
func TestScrapePoolReloadPreserveRelabeledIntervalTimeout(t *testing.T) {
340
	reloadCfg := &config.ScrapeConfig{
341
		ScrapeInterval: model.Duration(3 * time.Second),
342
		ScrapeTimeout:  model.Duration(2 * time.Second),
343
	}
344
	newLoop := func(opts scrapeLoopOptions) loop {
345
		l := &testLoop{interval: opts.interval, timeout: opts.timeout}
346
		l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
347
			require.Equal(t, 5*time.Second, interval, "Unexpected scrape interval")
348
			require.Equal(t, 3*time.Second, timeout, "Unexpected scrape timeout")
349
		}
350
		return l
351
	}
352
	sp := &scrapePool{
353
		appendable: &nopAppendable{},
354
		activeTargets: map[uint64]*Target{
355
			1: {
356
				labels: labels.FromStrings(model.ScrapeIntervalLabel, "5s", model.ScrapeTimeoutLabel, "3s"),
357
			},
358
		},
359
		loops: map[uint64]loop{
360
			1: noopLoop(),
361
		},
362
		newLoop:     newLoop,
363
		logger:      nil,
364
		client:      http.DefaultClient,
365
		metrics:     newTestScrapeMetrics(t),
366
		symbolTable: labels.NewSymbolTable(),
367
	}
368

369
	err := sp.reload(reloadCfg)
370
	if err != nil {
371
		t.Fatalf("unable to reload configuration: %s", err)
372
	}
373
}
374

375
func TestScrapePoolTargetLimit(t *testing.T) {
376
	var wg sync.WaitGroup
377
	// On starting to run, new loops created on reload check whether their preceding
378
	// equivalents have been stopped.
379
	newLoop := func(opts scrapeLoopOptions) loop {
380
		wg.Add(1)
381
		l := &testLoop{
382
			startFunc: func(interval, timeout time.Duration, errc chan<- error) {
383
				wg.Done()
384
			},
385
			stopFunc: func() {},
386
		}
387
		return l
388
	}
389
	sp := &scrapePool{
390
		appendable:    &nopAppendable{},
391
		activeTargets: map[uint64]*Target{},
392
		loops:         map[uint64]loop{},
393
		newLoop:       newLoop,
394
		logger:        log.NewNopLogger(),
395
		client:        http.DefaultClient,
396
		metrics:       newTestScrapeMetrics(t),
397
		symbolTable:   labels.NewSymbolTable(),
398
	}
399

400
	tgs := []*targetgroup.Group{}
401
	for i := 0; i < 50; i++ {
402
		tgs = append(tgs,
403
			&targetgroup.Group{
404
				Targets: []model.LabelSet{
405
					{model.AddressLabel: model.LabelValue(fmt.Sprintf("127.0.0.1:%d", 9090+i))},
406
				},
407
			},
408
		)
409
	}
410

411
	var limit uint
412
	reloadWithLimit := func(l uint) {
413
		limit = l
414
		require.NoError(t, sp.reload(&config.ScrapeConfig{
415
			ScrapeInterval: model.Duration(3 * time.Second),
416
			ScrapeTimeout:  model.Duration(2 * time.Second),
417
			TargetLimit:    l,
418
		}))
419
	}
420

421
	var targets int
422
	loadTargets := func(n int) {
423
		targets = n
424
		sp.Sync(tgs[:n])
425
	}
426

427
	validateIsRunning := func() {
428
		wg.Wait()
429
		for _, l := range sp.loops {
430
			require.True(t, l.(*testLoop).runOnce, "loop should be running")
431
		}
432
	}
433

434
	validateErrorMessage := func(shouldErr bool) {
435
		for _, l := range sp.loops {
436
			lerr := l.(*testLoop).getForcedError()
437
			if shouldErr {
438
				require.Error(t, lerr, "error was expected for %d targets with a limit of %d", targets, limit)
439
				require.Equal(t, fmt.Sprintf("target_limit exceeded (number of targets: %d, limit: %d)", targets, limit), lerr.Error())
440
			} else {
441
				require.NoError(t, lerr)
442
			}
443
		}
444
	}
445

446
	reloadWithLimit(0)
447
	loadTargets(50)
448
	validateIsRunning()
449

450
	// Simulate an initial config with a limit.
451
	sp.config.TargetLimit = 30
452
	limit = 30
453
	loadTargets(50)
454
	validateIsRunning()
455
	validateErrorMessage(true)
456

457
	reloadWithLimit(50)
458
	validateIsRunning()
459
	validateErrorMessage(false)
460

461
	reloadWithLimit(40)
462
	validateIsRunning()
463
	validateErrorMessage(true)
464

465
	loadTargets(30)
466
	validateIsRunning()
467
	validateErrorMessage(false)
468

469
	loadTargets(40)
470
	validateIsRunning()
471
	validateErrorMessage(false)
472

473
	loadTargets(41)
474
	validateIsRunning()
475
	validateErrorMessage(true)
476

477
	reloadWithLimit(0)
478
	validateIsRunning()
479
	validateErrorMessage(false)
480

481
	reloadWithLimit(51)
482
	validateIsRunning()
483
	validateErrorMessage(false)
484

485
	tgs = append(tgs,
486
		&targetgroup.Group{
487
			Targets: []model.LabelSet{
488
				{model.AddressLabel: model.LabelValue("127.0.0.1:1090")},
489
			},
490
		},
491
		&targetgroup.Group{
492
			Targets: []model.LabelSet{
493
				{model.AddressLabel: model.LabelValue("127.0.0.1:1090")},
494
			},
495
		},
496
	)
497

498
	sp.Sync(tgs)
499
	validateIsRunning()
500
	validateErrorMessage(false)
501
}
502

503
func TestScrapePoolAppender(t *testing.T) {
504
	cfg := &config.ScrapeConfig{}
505
	app := &nopAppendable{}
506
	sp, _ := newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
507

508
	loop := sp.newLoop(scrapeLoopOptions{
509
		target: &Target{},
510
	})
511
	appl, ok := loop.(*scrapeLoop)
512
	require.True(t, ok, "Expected scrapeLoop but got %T", loop)
513

514
	wrapped := appender(appl.appender(context.Background()), 0, 0, nativeHistogramMaxSchema)
515

516
	tl, ok := wrapped.(*timeLimitAppender)
517
	require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped)
518

519
	_, ok = tl.Appender.(nopAppender)
520
	require.True(t, ok, "Expected base appender but got %T", tl.Appender)
521

522
	sampleLimit := 100
523
	loop = sp.newLoop(scrapeLoopOptions{
524
		target:      &Target{},
525
		sampleLimit: sampleLimit,
526
	})
527
	appl, ok = loop.(*scrapeLoop)
528
	require.True(t, ok, "Expected scrapeLoop but got %T", loop)
529

530
	wrapped = appender(appl.appender(context.Background()), sampleLimit, 0, nativeHistogramMaxSchema)
531

532
	sl, ok := wrapped.(*limitAppender)
533
	require.True(t, ok, "Expected limitAppender but got %T", wrapped)
534

535
	tl, ok = sl.Appender.(*timeLimitAppender)
536
	require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender)
537

538
	_, ok = tl.Appender.(nopAppender)
539
	require.True(t, ok, "Expected base appender but got %T", tl.Appender)
540

541
	wrapped = appender(appl.appender(context.Background()), sampleLimit, 100, nativeHistogramMaxSchema)
542

543
	bl, ok := wrapped.(*bucketLimitAppender)
544
	require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped)
545

546
	sl, ok = bl.Appender.(*limitAppender)
547
	require.True(t, ok, "Expected limitAppender but got %T", bl)
548

549
	tl, ok = sl.Appender.(*timeLimitAppender)
550
	require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender)
551

552
	_, ok = tl.Appender.(nopAppender)
553
	require.True(t, ok, "Expected base appender but got %T", tl.Appender)
554

555
	wrapped = appender(appl.appender(context.Background()), sampleLimit, 100, 0)
556

557
	ml, ok := wrapped.(*maxSchemaAppender)
558
	require.True(t, ok, "Expected maxSchemaAppender but got %T", wrapped)
559

560
	bl, ok = ml.Appender.(*bucketLimitAppender)
561
	require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped)
562

563
	sl, ok = bl.Appender.(*limitAppender)
564
	require.True(t, ok, "Expected limitAppender but got %T", bl)
565

566
	tl, ok = sl.Appender.(*timeLimitAppender)
567
	require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender)
568

569
	_, ok = tl.Appender.(nopAppender)
570
	require.True(t, ok, "Expected base appender but got %T", tl.Appender)
571
}
572

573
func TestScrapePoolRaces(t *testing.T) {
574
	interval, _ := model.ParseDuration("1s")
575
	timeout, _ := model.ParseDuration("500ms")
576
	newConfig := func() *config.ScrapeConfig {
577
		return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
578
	}
579
	sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
580
	tgts := []*targetgroup.Group{
581
		{
582
			Targets: []model.LabelSet{
583
				{model.AddressLabel: "127.0.0.1:9090"},
584
				{model.AddressLabel: "127.0.0.2:9090"},
585
				{model.AddressLabel: "127.0.0.3:9090"},
586
				{model.AddressLabel: "127.0.0.4:9090"},
587
				{model.AddressLabel: "127.0.0.5:9090"},
588
				{model.AddressLabel: "127.0.0.6:9090"},
589
				{model.AddressLabel: "127.0.0.7:9090"},
590
				{model.AddressLabel: "127.0.0.8:9090"},
591
			},
592
		},
593
	}
594

595
	sp.Sync(tgts)
596
	active := sp.ActiveTargets()
597
	dropped := sp.DroppedTargets()
598
	expectedActive, expectedDropped := len(tgts[0].Targets), 0
599

600
	require.Len(t, active, expectedActive, "Invalid number of active targets")
601
	require.Len(t, dropped, expectedDropped, "Invalid number of dropped targets")
602

603
	for i := 0; i < 20; i++ {
604
		time.Sleep(10 * time.Millisecond)
605
		sp.reload(newConfig())
606
	}
607
	sp.stop()
608
}
609

610
func TestScrapePoolScrapeLoopsStarted(t *testing.T) {
611
	var wg sync.WaitGroup
612
	newLoop := func(opts scrapeLoopOptions) loop {
613
		wg.Add(1)
614
		l := &testLoop{
615
			startFunc: func(interval, timeout time.Duration, errc chan<- error) {
616
				wg.Done()
617
			},
618
			stopFunc: func() {},
619
		}
620
		return l
621
	}
622
	sp := &scrapePool{
623
		appendable:    &nopAppendable{},
624
		activeTargets: map[uint64]*Target{},
625
		loops:         map[uint64]loop{},
626
		newLoop:       newLoop,
627
		logger:        nil,
628
		client:        http.DefaultClient,
629
		metrics:       newTestScrapeMetrics(t),
630
		symbolTable:   labels.NewSymbolTable(),
631
	}
632

633
	tgs := []*targetgroup.Group{
634
		{
635
			Targets: []model.LabelSet{
636
				{model.AddressLabel: model.LabelValue("127.0.0.1:9090")},
637
			},
638
		},
639
		{
640
			Targets: []model.LabelSet{
641
				{model.AddressLabel: model.LabelValue("127.0.0.1:9090")},
642
			},
643
		},
644
	}
645

646
	require.NoError(t, sp.reload(&config.ScrapeConfig{
647
		ScrapeInterval: model.Duration(3 * time.Second),
648
		ScrapeTimeout:  model.Duration(2 * time.Second),
649
	}))
650
	sp.Sync(tgs)
651

652
	require.Len(t, sp.loops, 1)
653

654
	wg.Wait()
655
	for _, l := range sp.loops {
656
		require.True(t, l.(*testLoop).runOnce, "loop should be running")
657
	}
658
}
659

660
func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration) *scrapeLoop {
661
	return newScrapeLoop(ctx,
662
		scraper,
663
		nil, nil,
664
		nopMutator,
665
		nopMutator,
666
		app,
667
		nil,
668
		labels.NewSymbolTable(),
669
		0,
670
		true,
671
		false,
672
		true,
673
		0, 0, nativeHistogramMaxSchema,
674
		nil,
675
		interval,
676
		time.Hour,
677
		false,
678
		false,
679
		false,
680
		false,
681
		false,
682
		nil,
683
		false,
684
		newTestScrapeMetrics(t),
685
		false,
686
	)
687
}
688

689
func TestScrapeLoopStopBeforeRun(t *testing.T) {
690
	scraper := &testScraper{}
691
	sl := newBasicScrapeLoop(t, context.Background(), scraper, nil, 1)
692

693
	// The scrape pool synchronizes on stopping scrape loops. However, new scrape
694
	// loops are started asynchronously. Thus it's possible, that a loop is stopped
695
	// again before having started properly.
696
	// Stopping not-yet-started loops must block until the run method was called and exited.
697
	// The run method must exit immediately.
698

699
	stopDone := make(chan struct{})
700
	go func() {
701
		sl.stop()
702
		close(stopDone)
703
	}()
704

705
	select {
706
	case <-stopDone:
707
		require.FailNow(t, "Stopping terminated before run exited successfully.")
708
	case <-time.After(500 * time.Millisecond):
709
	}
710

711
	// Running the scrape loop must exit before calling the scraper even once.
712
	scraper.scrapeFunc = func(context.Context, io.Writer) error {
713
		require.FailNow(t, "Scraper was called for terminated scrape loop.")
714
		return nil
715
	}
716

717
	runDone := make(chan struct{})
718
	go func() {
719
		sl.run(nil)
720
		close(runDone)
721
	}()
722

723
	select {
724
	case <-runDone:
725
	case <-time.After(1 * time.Second):
726
		require.FailNow(t, "Running terminated scrape loop did not exit.")
727
	}
728

729
	select {
730
	case <-stopDone:
731
	case <-time.After(1 * time.Second):
732
		require.FailNow(t, "Stopping did not terminate after running exited.")
733
	}
734
}
735

736
func nopMutator(l labels.Labels) labels.Labels { return l }
737

738
func TestScrapeLoopStop(t *testing.T) {
739
	var (
740
		signal   = make(chan struct{}, 1)
741
		appender = &collectResultAppender{}
742
		scraper  = &testScraper{}
743
		app      = func(ctx context.Context) storage.Appender { return appender }
744
	)
745

746
	sl := newBasicScrapeLoop(t, context.Background(), scraper, app, 10*time.Millisecond)
747

748
	// Terminate loop after 2 scrapes.
749
	numScrapes := 0
750

751
	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
752
		numScrapes++
753
		if numScrapes == 2 {
754
			go sl.stop()
755
			<-sl.ctx.Done()
756
		}
757
		w.Write([]byte("metric_a 42\n"))
758
		return ctx.Err()
759
	}
760

761
	go func() {
762
		sl.run(nil)
763
		signal <- struct{}{}
764
	}()
765

766
	select {
767
	case <-signal:
768
	case <-time.After(5 * time.Second):
769
		require.FailNow(t, "Scrape wasn't stopped.")
770
	}
771

772
	// We expected 1 actual sample for each scrape plus 5 for report samples.
773
	// At least 2 scrapes were made, plus the final stale markers.
774
	require.GreaterOrEqual(t, len(appender.resultFloats), 6*3, "Expected at least 3 scrapes with 6 samples each.")
775
	require.Zero(t, len(appender.resultFloats)%6, "There is a scrape with missing samples.")
776
	// All samples in a scrape must have the same timestamp.
777
	var ts int64
778
	for i, s := range appender.resultFloats {
779
		switch {
780
		case i%6 == 0:
781
			ts = s.t
782
		case s.t != ts:
783
			t.Fatalf("Unexpected multiple timestamps within single scrape")
784
		}
785
	}
786
	// All samples from the last scrape must be stale markers.
787
	for _, s := range appender.resultFloats[len(appender.resultFloats)-5:] {
788
		require.True(t, value.IsStaleNaN(s.f), "Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.f))
789
	}
790
}
791

792
func TestScrapeLoopRun(t *testing.T) {
793
	var (
794
		signal = make(chan struct{}, 1)
795
		errc   = make(chan error)
796

797
		scraper       = &testScraper{}
798
		app           = func(ctx context.Context) storage.Appender { return &nopAppender{} }
799
		scrapeMetrics = newTestScrapeMetrics(t)
800
	)
801

802
	ctx, cancel := context.WithCancel(context.Background())
803
	sl := newScrapeLoop(ctx,
804
		scraper,
805
		nil, nil,
806
		nopMutator,
807
		nopMutator,
808
		app,
809
		nil,
810
		nil,
811
		0,
812
		true,
813
		false,
814
		true,
815
		0, 0, nativeHistogramMaxSchema,
816
		nil,
817
		time.Second,
818
		time.Hour,
819
		false,
820
		false,
821
		false,
822
		false,
823
		false,
824
		nil,
825
		false,
826
		scrapeMetrics,
827
		false,
828
	)
829

830
	// The loop must terminate during the initial offset if the context
831
	// is canceled.
832
	scraper.offsetDur = time.Hour
833

834
	go func() {
835
		sl.run(errc)
836
		signal <- struct{}{}
837
	}()
838

839
	// Wait to make sure we are actually waiting on the offset.
840
	time.Sleep(1 * time.Second)
841

842
	cancel()
843
	select {
844
	case <-signal:
845
	case <-time.After(5 * time.Second):
846
		require.FailNow(t, "Cancellation during initial offset failed.")
847
	case err := <-errc:
848
		require.FailNow(t, "Unexpected error: %s", err)
849
	}
850

851
	// The provided timeout must cause cancellation of the context passed down to the
852
	// scraper. The scraper has to respect the context.
853
	scraper.offsetDur = 0
854

855
	block := make(chan struct{})
856
	scraper.scrapeFunc = func(ctx context.Context, _ io.Writer) error {
857
		select {
858
		case <-block:
859
		case <-ctx.Done():
860
			return ctx.Err()
861
		}
862
		return nil
863
	}
864

865
	ctx, cancel = context.WithCancel(context.Background())
866
	sl = newBasicScrapeLoop(t, ctx, scraper, app, time.Second)
867
	sl.timeout = 100 * time.Millisecond
868

869
	go func() {
870
		sl.run(errc)
871
		signal <- struct{}{}
872
	}()
873

874
	select {
875
	case err := <-errc:
876
		require.ErrorIs(t, err, context.DeadlineExceeded)
877
	case <-time.After(3 * time.Second):
878
		require.FailNow(t, "Expected timeout error but got none.")
879
	}
880

881
	// We already caught the timeout error and are certainly in the loop.
882
	// Let the scrapes returns immediately to cause no further timeout errors
883
	// and check whether canceling the parent context terminates the loop.
884
	close(block)
885
	cancel()
886

887
	select {
888
	case <-signal:
889
		// Loop terminated as expected.
890
	case err := <-errc:
891
		require.FailNow(t, "Unexpected error: %s", err)
892
	case <-time.After(3 * time.Second):
893
		require.FailNow(t, "Loop did not terminate on context cancellation")
894
	}
895
}
896

897
func TestScrapeLoopForcedErr(t *testing.T) {
898
	var (
899
		signal = make(chan struct{}, 1)
900
		errc   = make(chan error)
901

902
		scraper = &testScraper{}
903
		app     = func(ctx context.Context) storage.Appender { return &nopAppender{} }
904
	)
905

906
	ctx, cancel := context.WithCancel(context.Background())
907
	sl := newBasicScrapeLoop(t, ctx, scraper, app, time.Second)
908

909
	forcedErr := fmt.Errorf("forced err")
910
	sl.setForcedError(forcedErr)
911

912
	scraper.scrapeFunc = func(context.Context, io.Writer) error {
913
		require.FailNow(t, "Should not be scraped.")
914
		return nil
915
	}
916

917
	go func() {
918
		sl.run(errc)
919
		signal <- struct{}{}
920
	}()
921

922
	select {
923
	case err := <-errc:
924
		require.ErrorIs(t, err, forcedErr)
925
	case <-time.After(3 * time.Second):
926
		require.FailNow(t, "Expected forced error but got none.")
927
	}
928
	cancel()
929

930
	select {
931
	case <-signal:
932
	case <-time.After(5 * time.Second):
933
		require.FailNow(t, "Scrape not stopped.")
934
	}
935
}
936

937
func TestScrapeLoopMetadata(t *testing.T) {
938
	var (
939
		signal        = make(chan struct{})
940
		scraper       = &testScraper{}
941
		scrapeMetrics = newTestScrapeMetrics(t)
942
		cache         = newScrapeCache(scrapeMetrics)
943
	)
944
	defer close(signal)
945

946
	ctx, cancel := context.WithCancel(context.Background())
947
	sl := newScrapeLoop(ctx,
948
		scraper,
949
		nil, nil,
950
		nopMutator,
951
		nopMutator,
952
		func(ctx context.Context) storage.Appender { return nopAppender{} },
953
		cache,
954
		labels.NewSymbolTable(),
955
		0,
956
		true,
957
		false,
958
		true,
959
		0, 0, nativeHistogramMaxSchema,
960
		nil,
961
		0,
962
		0,
963
		false,
964
		false,
965
		false,
966
		false,
967
		false,
968
		nil,
969
		false,
970
		scrapeMetrics,
971
		false,
972
	)
973
	defer cancel()
974

975
	slApp := sl.appender(ctx)
976
	total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter
977
# HELP test_metric some help text
978
# UNIT test_metric metric
979
test_metric 1
980
# TYPE test_metric_no_help gauge
981
# HELP test_metric_no_type other help text
982
# EOF`), "application/openmetrics-text", time.Now())
983
	require.NoError(t, err)
984
	require.NoError(t, slApp.Commit())
985
	require.Equal(t, 1, total)
986

987
	md, ok := cache.GetMetadata("test_metric")
988
	require.True(t, ok, "expected metadata to be present")
989
	require.Equal(t, model.MetricTypeCounter, md.Type, "unexpected metric type")
990
	require.Equal(t, "some help text", md.Help)
991
	require.Equal(t, "metric", md.Unit)
992

993
	md, ok = cache.GetMetadata("test_metric_no_help")
994
	require.True(t, ok, "expected metadata to be present")
995
	require.Equal(t, model.MetricTypeGauge, md.Type, "unexpected metric type")
996
	require.Equal(t, "", md.Help)
997
	require.Equal(t, "", md.Unit)
998

999
	md, ok = cache.GetMetadata("test_metric_no_type")
1000
	require.True(t, ok, "expected metadata to be present")
1001
	require.Equal(t, model.MetricTypeUnknown, md.Type, "unexpected metric type")
1002
	require.Equal(t, "other help text", md.Help)
1003
	require.Equal(t, "", md.Unit)
1004
}
1005

1006
func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
1007
	// Need a full storage for correct Add/AddFast semantics.
1008
	s := teststorage.New(t)
1009
	t.Cleanup(func() { s.Close() })
1010

1011
	ctx, cancel := context.WithCancel(context.Background())
1012
	sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0)
1013
	t.Cleanup(func() { cancel() })
1014

1015
	return ctx, sl
1016
}
1017

1018
func TestScrapeLoopSeriesAdded(t *testing.T) {
1019
	ctx, sl := simpleTestScrapeLoop(t)
1020

1021
	slApp := sl.appender(ctx)
1022
	total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
1023
	require.NoError(t, err)
1024
	require.NoError(t, slApp.Commit())
1025
	require.Equal(t, 1, total)
1026
	require.Equal(t, 1, added)
1027
	require.Equal(t, 1, seriesAdded)
1028

1029
	slApp = sl.appender(ctx)
1030
	total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
1031
	require.NoError(t, slApp.Commit())
1032
	require.NoError(t, err)
1033
	require.Equal(t, 1, total)
1034
	require.Equal(t, 1, added)
1035
	require.Equal(t, 0, seriesAdded)
1036
}
1037

1038
func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
1039
	s := teststorage.New(t)
1040
	defer s.Close()
1041
	ctx, cancel := context.WithCancel(context.Background())
1042
	defer cancel()
1043

1044
	target := &Target{
1045
		labels: labels.FromStrings("pod_label_invalid_012", "test"),
1046
	}
1047
	relabelConfig := []*relabel.Config{{
1048
		Action:      relabel.LabelMap,
1049
		Regex:       relabel.MustNewRegexp("pod_label_invalid_(.+)"),
1050
		Separator:   ";",
1051
		Replacement: "$1",
1052
	}}
1053
	sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0)
1054
	sl.sampleMutator = func(l labels.Labels) labels.Labels {
1055
		return mutateSampleLabels(l, target, true, relabelConfig)
1056
	}
1057

1058
	slApp := sl.appender(ctx)
1059
	total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
1060
	require.ErrorContains(t, err, "invalid metric name or label names")
1061
	require.NoError(t, slApp.Rollback())
1062
	require.Equal(t, 1, total)
1063
	require.Equal(t, 0, added)
1064
	require.Equal(t, 0, seriesAdded)
1065
}
1066

1067
func makeTestMetrics(n int) []byte {
1068
	// Construct a metrics string to parse
1069
	sb := bytes.Buffer{}
1070
	for i := 0; i < n; i++ {
1071
		fmt.Fprintf(&sb, "# TYPE metric_a gauge\n")
1072
		fmt.Fprintf(&sb, "# HELP metric_a help text\n")
1073
		fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100)
1074
	}
1075
	fmt.Fprintf(&sb, "# EOF\n")
1076
	return sb.Bytes()
1077
}
1078

1079
func BenchmarkScrapeLoopAppend(b *testing.B) {
1080
	ctx, sl := simpleTestScrapeLoop(b)
1081

1082
	slApp := sl.appender(ctx)
1083
	metrics := makeTestMetrics(100)
1084
	ts := time.Time{}
1085

1086
	b.ResetTimer()
1087

1088
	for i := 0; i < b.N; i++ {
1089
		ts = ts.Add(time.Second)
1090
		_, _, _, _ = sl.append(slApp, metrics, "", ts)
1091
	}
1092
}
1093

1094
func BenchmarkScrapeLoopAppendOM(b *testing.B) {
1095
	ctx, sl := simpleTestScrapeLoop(b)
1096

1097
	slApp := sl.appender(ctx)
1098
	metrics := makeTestMetrics(100)
1099
	ts := time.Time{}
1100

1101
	b.ResetTimer()
1102

1103
	for i := 0; i < b.N; i++ {
1104
		ts = ts.Add(time.Second)
1105
		_, _, _, _ = sl.append(slApp, metrics, "application/openmetrics-text", ts)
1106
	}
1107
}
1108

1109
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
1110
	appender := &collectResultAppender{}
1111
	var (
1112
		signal  = make(chan struct{}, 1)
1113
		scraper = &testScraper{}
1114
		app     = func(ctx context.Context) storage.Appender { return appender }
1115
	)
1116

1117
	ctx, cancel := context.WithCancel(context.Background())
1118
	sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
1119
	// Succeed once, several failures, then stop.
1120
	numScrapes := 0
1121

1122
	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1123
		numScrapes++
1124

1125
		switch numScrapes {
1126
		case 1:
1127
			w.Write([]byte("metric_a 42\n"))
1128
			return nil
1129
		case 5:
1130
			cancel()
1131
		}
1132
		return errors.New("scrape failed")
1133
	}
1134

1135
	go func() {
1136
		sl.run(nil)
1137
		signal <- struct{}{}
1138
	}()
1139

1140
	select {
1141
	case <-signal:
1142
	case <-time.After(5 * time.Second):
1143
		require.FailNow(t, "Scrape wasn't stopped.")
1144
	}
1145

1146
	// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
1147
	// each scrape successful or not.
1148
	require.Len(t, appender.resultFloats, 27, "Appended samples not as expected:\n%s", appender)
1149
	require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
1150
	require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
1151
		"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
1152
}
1153

1154
func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
1155
	appender := &collectResultAppender{}
1156
	var (
1157
		signal     = make(chan struct{}, 1)
1158
		scraper    = &testScraper{}
1159
		app        = func(ctx context.Context) storage.Appender { return appender }
1160
		numScrapes = 0
1161
	)
1162

1163
	ctx, cancel := context.WithCancel(context.Background())
1164
	sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
1165

1166
	// Succeed once, several failures, then stop.
1167
	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1168
		numScrapes++
1169
		switch numScrapes {
1170
		case 1:
1171
			w.Write([]byte("metric_a 42\n"))
1172
			return nil
1173
		case 2:
1174
			w.Write([]byte("7&-\n"))
1175
			return nil
1176
		case 3:
1177
			cancel()
1178
		}
1179
		return errors.New("scrape failed")
1180
	}
1181

1182
	go func() {
1183
		sl.run(nil)
1184
		signal <- struct{}{}
1185
	}()
1186

1187
	select {
1188
	case <-signal:
1189
	case <-time.After(5 * time.Second):
1190
		require.FailNow(t, "Scrape wasn't stopped.")
1191
	}
1192

1193
	// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
1194
	// each scrape successful or not.
1195
	require.Len(t, appender.resultFloats, 17, "Appended samples not as expected:\n%s", appender)
1196
	require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
1197
	require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
1198
		"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
1199
}
1200

1201
func TestScrapeLoopCache(t *testing.T) {
1202
	s := teststorage.New(t)
1203
	defer s.Close()
1204

1205
	appender := &collectResultAppender{}
1206
	var (
1207
		signal  = make(chan struct{}, 1)
1208
		scraper = &testScraper{}
1209
		app     = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender }
1210
	)
1211

1212
	ctx, cancel := context.WithCancel(context.Background())
1213
	// Decreasing the scrape interval could make the test fail, as multiple scrapes might be initiated at identical millisecond timestamps.
1214
	// See https://github.com/prometheus/prometheus/issues/12727.
1215
	sl := newBasicScrapeLoop(t, ctx, scraper, app, 100*time.Millisecond)
1216

1217
	numScrapes := 0
1218

1219
	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1220
		switch numScrapes {
1221
		case 1, 2:
1222
			_, ok := sl.cache.series["metric_a"]
1223
			require.True(t, ok, "metric_a missing from cache after scrape %d", numScrapes)
1224
			_, ok = sl.cache.series["metric_b"]
1225
			require.True(t, ok, "metric_b missing from cache after scrape %d", numScrapes)
1226
		case 3:
1227
			_, ok := sl.cache.series["metric_a"]
1228
			require.True(t, ok, "metric_a missing from cache after scrape %d", numScrapes)
1229
			_, ok = sl.cache.series["metric_b"]
1230
			require.False(t, ok, "metric_b present in cache after scrape %d", numScrapes)
1231
		}
1232

1233
		numScrapes++
1234
		switch numScrapes {
1235
		case 1:
1236
			w.Write([]byte("metric_a 42\nmetric_b 43\n"))
1237
			return nil
1238
		case 3:
1239
			w.Write([]byte("metric_a 44\n"))
1240
			return nil
1241
		case 4:
1242
			cancel()
1243
		}
1244
		return fmt.Errorf("scrape failed")
1245
	}
1246

1247
	go func() {
1248
		sl.run(nil)
1249
		signal <- struct{}{}
1250
	}()
1251

1252
	select {
1253
	case <-signal:
1254
	case <-time.After(5 * time.Second):
1255
		require.FailNow(t, "Scrape wasn't stopped.")
1256
	}
1257

1258
	// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
1259
	// each scrape successful or not.
1260
	require.Len(t, appender.resultFloats, 26, "Appended samples not as expected:\n%s", appender)
1261
}
1262

1263
func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
1264
	s := teststorage.New(t)
1265
	defer s.Close()
1266

1267
	sapp := s.Appender(context.Background())
1268

1269
	appender := &collectResultAppender{next: sapp}
1270
	var (
1271
		signal  = make(chan struct{}, 1)
1272
		scraper = &testScraper{}
1273
		app     = func(ctx context.Context) storage.Appender { return appender }
1274
	)
1275

1276
	ctx, cancel := context.WithCancel(context.Background())
1277
	sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
1278

1279
	numScrapes := 0
1280

1281
	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1282
		numScrapes++
1283
		if numScrapes < 5 {
1284
			s := ""
1285
			for i := 0; i < 500; i++ {
1286
				s = fmt.Sprintf("%smetric_%d_%d 42\n", s, i, numScrapes)
1287
			}
1288
			w.Write([]byte(s + "&"))
1289
		} else {
1290
			cancel()
1291
		}
1292
		return nil
1293
	}
1294

1295
	go func() {
1296
		sl.run(nil)
1297
		signal <- struct{}{}
1298
	}()
1299

1300
	select {
1301
	case <-signal:
1302
	case <-time.After(5 * time.Second):
1303
		require.FailNow(t, "Scrape wasn't stopped.")
1304
	}
1305

1306
	require.LessOrEqual(t, len(sl.cache.series), 2000, "More than 2000 series cached.")
1307
}
1308

1309
func TestScrapeLoopAppend(t *testing.T) {
1310
	tests := []struct {
1311
		title           string
1312
		honorLabels     bool
1313
		scrapeLabels    string
1314
		discoveryLabels []string
1315
		expLset         labels.Labels
1316
		expValue        float64
1317
	}{
1318
		{
1319
			// When "honor_labels" is not set
1320
			// label name collision is handler by adding a prefix.
1321
			title:           "Label name collision",
1322
			honorLabels:     false,
1323
			scrapeLabels:    `metric{n="1"} 0`,
1324
			discoveryLabels: []string{"n", "2"},
1325
			expLset:         labels.FromStrings("__name__", "metric", "exported_n", "1", "n", "2"),
1326
			expValue:        0,
1327
		}, {
1328
			// When "honor_labels" is not set
1329
			// exported label from discovery don't get overwritten
1330
			title:           "Label name collision",
1331
			honorLabels:     false,
1332
			scrapeLabels:    `metric 0`,
1333
			discoveryLabels: []string{"n", "2", "exported_n", "2"},
1334
			expLset:         labels.FromStrings("__name__", "metric", "n", "2", "exported_n", "2"),
1335
			expValue:        0,
1336
		}, {
1337
			// Labels with no value need to be removed as these should not be ingested.
1338
			title:           "Delete Empty labels",
1339
			honorLabels:     false,
1340
			scrapeLabels:    `metric{n=""} 0`,
1341
			discoveryLabels: nil,
1342
			expLset:         labels.FromStrings("__name__", "metric"),
1343
			expValue:        0,
1344
		}, {
1345
			// Honor Labels should ignore labels with the same name.
1346
			title:           "Honor Labels",
1347
			honorLabels:     true,
1348
			scrapeLabels:    `metric{n1="1", n2="2"} 0`,
1349
			discoveryLabels: []string{"n1", "0"},
1350
			expLset:         labels.FromStrings("__name__", "metric", "n1", "1", "n2", "2"),
1351
			expValue:        0,
1352
		}, {
1353
			title:           "Stale - NaN",
1354
			honorLabels:     false,
1355
			scrapeLabels:    `metric NaN`,
1356
			discoveryLabels: nil,
1357
			expLset:         labels.FromStrings("__name__", "metric"),
1358
			expValue:        math.Float64frombits(value.NormalNaN),
1359
		},
1360
	}
1361

1362
	for _, test := range tests {
1363
		app := &collectResultAppender{}
1364

1365
		discoveryLabels := &Target{
1366
			labels: labels.FromStrings(test.discoveryLabels...),
1367
		}
1368

1369
		sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1370
		sl.sampleMutator = func(l labels.Labels) labels.Labels {
1371
			return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil)
1372
		}
1373
		sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
1374
			return mutateReportSampleLabels(l, discoveryLabels)
1375
		}
1376

1377
		now := time.Now()
1378

1379
		slApp := sl.appender(context.Background())
1380
		_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now)
1381
		require.NoError(t, err)
1382
		require.NoError(t, slApp.Commit())
1383

1384
		expected := []floatSample{
1385
			{
1386
				metric: test.expLset,
1387
				t:      timestamp.FromTime(now),
1388
				f:      test.expValue,
1389
			},
1390
		}
1391

1392
		t.Logf("Test:%s", test.title)
1393
		requireEqual(t, expected, app.resultFloats)
1394
	}
1395
}
1396

1397
func requireEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) {
1398
	testutil.RequireEqualWithOptions(t, expected, actual,
1399
		[]cmp.Option{cmp.Comparer(equalFloatSamples), cmp.AllowUnexported(histogramSample{})},
1400
		msgAndArgs...)
1401
}
1402

1403
func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
1404
	testcases := map[string]struct {
1405
		targetLabels  []string
1406
		exposedLabels string
1407
		expected      []string
1408
	}{
1409
		"One target label collides with existing label": {
1410
			targetLabels:  []string{"foo", "2"},
1411
			exposedLabels: `metric{foo="1"} 0`,
1412
			expected:      []string{"__name__", "metric", "exported_foo", "1", "foo", "2"},
1413
		},
1414

1415
		"One target label collides with existing label, plus target label already with prefix 'exported'": {
1416
			targetLabels:  []string{"foo", "2", "exported_foo", "3"},
1417
			exposedLabels: `metric{foo="1"} 0`,
1418
			expected:      []string{"__name__", "metric", "exported_exported_foo", "1", "exported_foo", "3", "foo", "2"},
1419
		},
1420
		"One target label collides with existing label, plus existing label already with prefix 'exported": {
1421
			targetLabels:  []string{"foo", "3"},
1422
			exposedLabels: `metric{foo="1", exported_foo="2"} 0`,
1423
			expected:      []string{"__name__", "metric", "exported_exported_foo", "1", "exported_foo", "2", "foo", "3"},
1424
		},
1425
		"One target label collides with existing label, both already with prefix 'exported'": {
1426
			targetLabels:  []string{"exported_foo", "2"},
1427
			exposedLabels: `metric{exported_foo="1"} 0`,
1428
			expected:      []string{"__name__", "metric", "exported_exported_foo", "1", "exported_foo", "2"},
1429
		},
1430
		"Two target labels collide with existing labels, both with and without prefix 'exported'": {
1431
			targetLabels:  []string{"foo", "3", "exported_foo", "4"},
1432
			exposedLabels: `metric{foo="1", exported_foo="2"} 0`,
1433
			expected: []string{
1434
				"__name__", "metric", "exported_exported_foo", "1", "exported_exported_exported_foo",
1435
				"2", "exported_foo", "4", "foo", "3",
1436
			},
1437
		},
1438
		"Extreme example": {
1439
			targetLabels:  []string{"foo", "0", "exported_exported_foo", "1", "exported_exported_exported_foo", "2"},
1440
			exposedLabels: `metric{foo="3", exported_foo="4", exported_exported_exported_foo="5"} 0`,
1441
			expected: []string{
1442
				"__name__", "metric",
1443
				"exported_exported_exported_exported_exported_foo", "5",
1444
				"exported_exported_exported_exported_foo", "3",
1445
				"exported_exported_exported_foo", "2",
1446
				"exported_exported_foo", "1",
1447
				"exported_foo", "4",
1448
				"foo", "0",
1449
			},
1450
		},
1451
	}
1452

1453
	for name, tc := range testcases {
1454
		t.Run(name, func(t *testing.T) {
1455
			app := &collectResultAppender{}
1456
			sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1457
			sl.sampleMutator = func(l labels.Labels) labels.Labels {
1458
				return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil)
1459
			}
1460
			slApp := sl.appender(context.Background())
1461
			_, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
1462
			require.NoError(t, err)
1463

1464
			require.NoError(t, slApp.Commit())
1465

1466
			requireEqual(t, []floatSample{
1467
				{
1468
					metric: labels.FromStrings(tc.expected...),
1469
					t:      timestamp.FromTime(time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)),
1470
					f:      0,
1471
				},
1472
			}, app.resultFloats)
1473
		})
1474
	}
1475
}
1476

1477
func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
1478
	// collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next.
1479
	app := &collectResultAppender{}
1480
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1481

1482
	fakeRef := storage.SeriesRef(1)
1483
	expValue := float64(1)
1484
	metric := []byte(`metric{n="1"} 1`)
1485
	p, warning := textparse.New(metric, "", false, labels.NewSymbolTable())
1486
	require.NoError(t, warning)
1487

1488
	var lset labels.Labels
1489
	p.Next()
1490
	p.Metric(&lset)
1491
	hash := lset.Hash()
1492

1493
	// Create a fake entry in the cache
1494
	sl.cache.addRef(metric, fakeRef, lset, hash)
1495
	now := time.Now()
1496

1497
	slApp := sl.appender(context.Background())
1498
	_, _, _, err := sl.append(slApp, metric, "", now)
1499
	require.NoError(t, err)
1500
	require.NoError(t, slApp.Commit())
1501

1502
	expected := []floatSample{
1503
		{
1504
			metric: lset,
1505
			t:      timestamp.FromTime(now),
1506
			f:      expValue,
1507
		},
1508
	}
1509

1510
	require.Equal(t, expected, app.resultFloats)
1511
}
1512

1513
func TestScrapeLoopAppendSampleLimit(t *testing.T) {
1514
	resApp := &collectResultAppender{}
1515
	app := &limitAppender{Appender: resApp, limit: 1}
1516

1517
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1518
	sl.sampleMutator = func(l labels.Labels) labels.Labels {
1519
		if l.Has("deleteme") {
1520
			return labels.EmptyLabels()
1521
		}
1522
		return l
1523
	}
1524
	sl.sampleLimit = app.limit
1525

1526
	// Get the value of the Counter before performing the append.
1527
	beforeMetric := dto.Metric{}
1528
	err := sl.metrics.targetScrapeSampleLimit.Write(&beforeMetric)
1529
	require.NoError(t, err)
1530

1531
	beforeMetricValue := beforeMetric.GetCounter().GetValue()
1532

1533
	now := time.Now()
1534
	slApp := sl.appender(context.Background())
1535
	total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
1536
	require.ErrorIs(t, err, errSampleLimit)
1537
	require.NoError(t, slApp.Rollback())
1538
	require.Equal(t, 3, total)
1539
	require.Equal(t, 3, added)
1540
	require.Equal(t, 1, seriesAdded)
1541

1542
	// Check that the Counter has been incremented a single time for the scrape,
1543
	// not multiple times for each sample.
1544
	metric := dto.Metric{}
1545
	err = sl.metrics.targetScrapeSampleLimit.Write(&metric)
1546
	require.NoError(t, err)
1547

1548
	value := metric.GetCounter().GetValue()
1549
	change := value - beforeMetricValue
1550
	require.Equal(t, 1.0, change, "Unexpected change of sample limit metric: %f", change)
1551

1552
	// And verify that we got the samples that fit under the limit.
1553
	want := []floatSample{
1554
		{
1555
			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1556
			t:      timestamp.FromTime(now),
1557
			f:      1,
1558
		},
1559
	}
1560
	requireEqual(t, want, resApp.rolledbackFloats, "Appended samples not as expected:\n%s", appender)
1561

1562
	now = time.Now()
1563
	slApp = sl.appender(context.Background())
1564
	total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now)
1565
	require.ErrorIs(t, err, errSampleLimit)
1566
	require.NoError(t, slApp.Rollback())
1567
	require.Equal(t, 9, total)
1568
	require.Equal(t, 6, added)
1569
	require.Equal(t, 0, seriesAdded)
1570
}
1571

1572
func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
1573
	resApp := &collectResultAppender{}
1574
	app := &bucketLimitAppender{Appender: resApp, limit: 2}
1575

1576
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1577
	sl.enableNativeHistogramIngestion = true
1578
	sl.sampleMutator = func(l labels.Labels) labels.Labels {
1579
		if l.Has("deleteme") {
1580
			return labels.EmptyLabels()
1581
		}
1582
		return l
1583
	}
1584
	sl.sampleLimit = app.limit
1585

1586
	metric := dto.Metric{}
1587
	err := sl.metrics.targetScrapeNativeHistogramBucketLimit.Write(&metric)
1588
	require.NoError(t, err)
1589
	beforeMetricValue := metric.GetCounter().GetValue()
1590

1591
	nativeHistogram := prometheus.NewHistogramVec(
1592
		prometheus.HistogramOpts{
1593
			Namespace:                      "testing",
1594
			Name:                           "example_native_histogram",
1595
			Help:                           "This is used for testing",
1596
			ConstLabels:                    map[string]string{"some": "value"},
1597
			NativeHistogramBucketFactor:    1.1, // 10% increase from bucket to bucket
1598
			NativeHistogramMaxBucketNumber: 100, // intentionally higher than the limit we'll use in the scraper
1599
		},
1600
		[]string{"size"},
1601
	)
1602
	registry := prometheus.NewRegistry()
1603
	registry.Register(nativeHistogram)
1604
	nativeHistogram.WithLabelValues("S").Observe(1.0)
1605
	nativeHistogram.WithLabelValues("M").Observe(1.0)
1606
	nativeHistogram.WithLabelValues("L").Observe(1.0)
1607
	nativeHistogram.WithLabelValues("M").Observe(10.0)
1608
	nativeHistogram.WithLabelValues("L").Observe(10.0) // in different bucket since > 1*1.1
1609

1610
	gathered, err := registry.Gather()
1611
	require.NoError(t, err)
1612
	require.NotEmpty(t, gathered)
1613

1614
	histogramMetricFamily := gathered[0]
1615
	msg, err := MetricFamilyToProtobuf(histogramMetricFamily)
1616
	require.NoError(t, err)
1617

1618
	now := time.Now()
1619
	total, added, seriesAdded, err := sl.append(app, msg, "application/vnd.google.protobuf", now)
1620
	require.NoError(t, err)
1621
	require.Equal(t, 3, total)
1622
	require.Equal(t, 3, added)
1623
	require.Equal(t, 3, seriesAdded)
1624

1625
	err = sl.metrics.targetScrapeNativeHistogramBucketLimit.Write(&metric)
1626
	require.NoError(t, err)
1627
	metricValue := metric.GetCounter().GetValue()
1628
	require.Equal(t, beforeMetricValue, metricValue)
1629
	beforeMetricValue = metricValue
1630

1631
	nativeHistogram.WithLabelValues("L").Observe(100.0) // in different bucket since > 10*1.1
1632

1633
	gathered, err = registry.Gather()
1634
	require.NoError(t, err)
1635
	require.NotEmpty(t, gathered)
1636

1637
	histogramMetricFamily = gathered[0]
1638
	msg, err = MetricFamilyToProtobuf(histogramMetricFamily)
1639
	require.NoError(t, err)
1640

1641
	now = time.Now()
1642
	total, added, seriesAdded, err = sl.append(app, msg, "application/vnd.google.protobuf", now)
1643
	require.NoError(t, err)
1644
	require.Equal(t, 3, total)
1645
	require.Equal(t, 3, added)
1646
	require.Equal(t, 3, seriesAdded)
1647

1648
	err = sl.metrics.targetScrapeNativeHistogramBucketLimit.Write(&metric)
1649
	require.NoError(t, err)
1650
	metricValue = metric.GetCounter().GetValue()
1651
	require.Equal(t, beforeMetricValue, metricValue)
1652
	beforeMetricValue = metricValue
1653

1654
	nativeHistogram.WithLabelValues("L").Observe(100000.0) // in different bucket since > 10*1.1
1655

1656
	gathered, err = registry.Gather()
1657
	require.NoError(t, err)
1658
	require.NotEmpty(t, gathered)
1659

1660
	histogramMetricFamily = gathered[0]
1661
	msg, err = MetricFamilyToProtobuf(histogramMetricFamily)
1662
	require.NoError(t, err)
1663

1664
	now = time.Now()
1665
	total, added, seriesAdded, err = sl.append(app, msg, "application/vnd.google.protobuf", now)
1666
	if !errors.Is(err, errBucketLimit) {
1667
		t.Fatalf("Did not see expected histogram bucket limit error: %s", err)
1668
	}
1669
	require.NoError(t, app.Rollback())
1670
	require.Equal(t, 3, total)
1671
	require.Equal(t, 3, added)
1672
	require.Equal(t, 0, seriesAdded)
1673

1674
	err = sl.metrics.targetScrapeNativeHistogramBucketLimit.Write(&metric)
1675
	require.NoError(t, err)
1676
	metricValue = metric.GetCounter().GetValue()
1677
	require.Equal(t, beforeMetricValue+1, metricValue)
1678
}
1679

1680
func TestScrapeLoop_ChangingMetricString(t *testing.T) {
1681
	// This is a regression test for the scrape loop cache not properly maintaining
1682
	// IDs when the string representation of a metric changes across a scrape. Thus
1683
	// we use a real storage appender here.
1684
	s := teststorage.New(t)
1685
	defer s.Close()
1686

1687
	capp := &collectResultAppender{}
1688
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0)
1689

1690
	now := time.Now()
1691
	slApp := sl.appender(context.Background())
1692
	_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now)
1693
	require.NoError(t, err)
1694
	require.NoError(t, slApp.Commit())
1695

1696
	slApp = sl.appender(context.Background())
1697
	_, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
1698
	require.NoError(t, err)
1699
	require.NoError(t, slApp.Commit())
1700

1701
	want := []floatSample{
1702
		{
1703
			metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
1704
			t:      timestamp.FromTime(now),
1705
			f:      1,
1706
		},
1707
		{
1708
			metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
1709
			t:      timestamp.FromTime(now.Add(time.Minute)),
1710
			f:      2,
1711
		},
1712
	}
1713
	require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
1714
}
1715

1716
func TestScrapeLoopAppendStaleness(t *testing.T) {
1717
	app := &collectResultAppender{}
1718

1719
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1720

1721
	now := time.Now()
1722
	slApp := sl.appender(context.Background())
1723
	_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now)
1724
	require.NoError(t, err)
1725
	require.NoError(t, slApp.Commit())
1726

1727
	slApp = sl.appender(context.Background())
1728
	_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
1729
	require.NoError(t, err)
1730
	require.NoError(t, slApp.Commit())
1731

1732
	want := []floatSample{
1733
		{
1734
			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1735
			t:      timestamp.FromTime(now),
1736
			f:      1,
1737
		},
1738
		{
1739
			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1740
			t:      timestamp.FromTime(now.Add(time.Second)),
1741
			f:      math.Float64frombits(value.StaleNaN),
1742
		},
1743
	}
1744
	requireEqual(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
1745
}
1746

1747
func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
1748
	app := &collectResultAppender{}
1749
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1750
	now := time.Now()
1751
	slApp := sl.appender(context.Background())
1752
	_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
1753
	require.NoError(t, err)
1754
	require.NoError(t, slApp.Commit())
1755

1756
	slApp = sl.appender(context.Background())
1757
	_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
1758
	require.NoError(t, err)
1759
	require.NoError(t, slApp.Commit())
1760

1761
	want := []floatSample{
1762
		{
1763
			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1764
			t:      1000,
1765
			f:      1,
1766
		},
1767
	}
1768
	require.Equal(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
1769
}
1770

1771
func TestScrapeLoopAppendStalenessIfTrackTimestampStaleness(t *testing.T) {
1772
	app := &collectResultAppender{}
1773
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
1774
	sl.trackTimestampsStaleness = true
1775

1776
	now := time.Now()
1777
	slApp := sl.appender(context.Background())
1778
	_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
1779
	require.NoError(t, err)
1780
	require.NoError(t, slApp.Commit())
1781

1782
	slApp = sl.appender(context.Background())
1783
	_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
1784
	require.NoError(t, err)
1785
	require.NoError(t, slApp.Commit())
1786

1787
	want := []floatSample{
1788
		{
1789
			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1790
			t:      1000,
1791
			f:      1,
1792
		},
1793
		{
1794
			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1795
			t:      timestamp.FromTime(now.Add(time.Second)),
1796
			f:      math.Float64frombits(value.StaleNaN),
1797
		},
1798
	}
1799
	requireEqual(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
1800
}
1801

1802
func TestScrapeLoopAppendExemplar(t *testing.T) {
1803
	tests := []struct {
1804
		title                           string
1805
		scrapeClassicHistograms         bool
1806
		enableNativeHistogramsIngestion bool
1807
		scrapeText                      string
1808
		contentType                     string
1809
		discoveryLabels                 []string
1810
		floats                          []floatSample
1811
		histograms                      []histogramSample
1812
		exemplars                       []exemplar.Exemplar
1813
	}{
1814
		{
1815
			title:           "Metric without exemplars",
1816
			scrapeText:      "metric_total{n=\"1\"} 0\n# EOF",
1817
			contentType:     "application/openmetrics-text",
1818
			discoveryLabels: []string{"n", "2"},
1819
			floats: []floatSample{{
1820
				metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
1821
				f:      0,
1822
			}},
1823
		},
1824
		{
1825
			title:           "Metric with exemplars",
1826
			scrapeText:      "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF",
1827
			contentType:     "application/openmetrics-text",
1828
			discoveryLabels: []string{"n", "2"},
1829
			floats: []floatSample{{
1830
				metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
1831
				f:      0,
1832
			}},
1833
			exemplars: []exemplar.Exemplar{
1834
				{Labels: labels.FromStrings("a", "abc"), Value: 1},
1835
			},
1836
		},
1837
		{
1838
			title:           "Metric with exemplars and TS",
1839
			scrapeText:      "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF",
1840
			contentType:     "application/openmetrics-text",
1841
			discoveryLabels: []string{"n", "2"},
1842
			floats: []floatSample{{
1843
				metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
1844
				f:      0,
1845
			}},
1846
			exemplars: []exemplar.Exemplar{
1847
				{Labels: labels.FromStrings("a", "abc"), Value: 1, Ts: 10000000, HasTs: true},
1848
			},
1849
		},
1850
		{
1851
			title: "Two metrics and exemplars",
1852
			scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000
1853
metric_total{n="2"} 2 # {t="2"} 2.0 20000
1854
# EOF`,
1855
			contentType: "application/openmetrics-text",
1856
			floats: []floatSample{{
1857
				metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
1858
				f:      1,
1859
			}, {
1860
				metric: labels.FromStrings("__name__", "metric_total", "n", "2"),
1861
				f:      2,
1862
			}},
1863
			exemplars: []exemplar.Exemplar{
1864
				{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
1865
				{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
1866
			},
1867
		},
1868
		{
1869
			title: "Native histogram with three exemplars",
1870

1871
			enableNativeHistogramsIngestion: true,
1872
			scrapeText: `name: "test_histogram"
1873
help: "Test histogram with many buckets removed to keep it manageable in size."
1874
type: HISTOGRAM
1875
metric: <
1876
  histogram: <
1877
    sample_count: 175
1878
    sample_sum: 0.0008280461746287094
1879
    bucket: <
1880
      cumulative_count: 2
1881
      upper_bound: -0.0004899999999999998
1882
    >
1883
    bucket: <
1884
      cumulative_count: 4
1885
      upper_bound: -0.0003899999999999998
1886
      exemplar: <
1887
        label: <
1888
          name: "dummyID"
1889
          value: "59727"
1890
        >
1891
        value: -0.00039
1892
        timestamp: <
1893
          seconds: 1625851155
1894
          nanos: 146848499
1895
        >
1896
      >
1897
    >
1898
    bucket: <
1899
      cumulative_count: 16
1900
      upper_bound: -0.0002899999999999998
1901
      exemplar: <
1902
        label: <
1903
          name: "dummyID"
1904
          value: "5617"
1905
        >
1906
        value: -0.00029
1907
      >
1908
    >
1909
    bucket: <
1910
      cumulative_count: 32
1911
      upper_bound: -0.0001899999999999998
1912
      exemplar: <
1913
        label: <
1914
          name: "dummyID"
1915
          value: "58215"
1916
        >
1917
        value: -0.00019
1918
        timestamp: <
1919
          seconds: 1625851055
1920
          nanos: 146848599
1921
        >
1922
      >
1923
    >
1924
    schema: 3
1925
    zero_threshold: 2.938735877055719e-39
1926
    zero_count: 2
1927
    negative_span: <
1928
      offset: -162
1929
      length: 1
1930
    >
1931
    negative_span: <
1932
      offset: 23
1933
      length: 4
1934
    >
1935
    negative_delta: 1
1936
    negative_delta: 3
1937
    negative_delta: -2
1938
    negative_delta: -1
1939
    negative_delta: 1
1940
    positive_span: <
1941
      offset: -161
1942
      length: 1
1943
    >
1944
    positive_span: <
1945
      offset: 8
1946
      length: 3
1947
    >
1948
    positive_delta: 1
1949
    positive_delta: 2
1950
    positive_delta: -1
1951
    positive_delta: -1
1952
  >
1953
  timestamp_ms: 1234568
1954
>
1955

1956
`,
1957
			contentType: "application/vnd.google.protobuf",
1958
			histograms: []histogramSample{{
1959
				t: 1234568,
1960
				h: &histogram.Histogram{
1961
					Count:         175,
1962
					ZeroCount:     2,
1963
					Sum:           0.0008280461746287094,
1964
					ZeroThreshold: 2.938735877055719e-39,
1965
					Schema:        3,
1966
					PositiveSpans: []histogram.Span{
1967
						{Offset: -161, Length: 1},
1968
						{Offset: 8, Length: 3},
1969
					},
1970
					NegativeSpans: []histogram.Span{
1971
						{Offset: -162, Length: 1},
1972
						{Offset: 23, Length: 4},
1973
					},
1974
					PositiveBuckets: []int64{1, 2, -1, -1},
1975
					NegativeBuckets: []int64{1, 3, -2, -1, 1},
1976
				},
1977
			}},
1978
			exemplars: []exemplar.Exemplar{
1979
				// Native histogram exemplars are arranged by timestamp, and those with missing timestamps are dropped.
1980
				{Labels: labels.FromStrings("dummyID", "58215"), Value: -0.00019, Ts: 1625851055146, HasTs: true},
1981
				{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
1982
			},
1983
		},
1984
		{
1985
			title: "Native histogram with three exemplars scraped as classic histogram",
1986

1987
			enableNativeHistogramsIngestion: true,
1988
			scrapeText: `name: "test_histogram"
1989
help: "Test histogram with many buckets removed to keep it manageable in size."
1990
type: HISTOGRAM
1991
metric: <
1992
  histogram: <
1993
    sample_count: 175
1994
    sample_sum: 0.0008280461746287094
1995
    bucket: <
1996
      cumulative_count: 2
1997
      upper_bound: -0.0004899999999999998
1998
    >
1999
    bucket: <
2000
      cumulative_count: 4
2001
      upper_bound: -0.0003899999999999998
2002
      exemplar: <
2003
        label: <
2004
          name: "dummyID"
2005
          value: "59727"
2006
        >
2007
        value: -0.00039
2008
        timestamp: <
2009
          seconds: 1625851155
2010
          nanos: 146848499
2011
        >
2012
      >
2013
    >
2014
    bucket: <
2015
      cumulative_count: 16
2016
      upper_bound: -0.0002899999999999998
2017
      exemplar: <
2018
        label: <
2019
          name: "dummyID"
2020
          value: "5617"
2021
        >
2022
        value: -0.00029
2023
      >
2024
    >
2025
    bucket: <
2026
      cumulative_count: 32
2027
      upper_bound: -0.0001899999999999998
2028
      exemplar: <
2029
        label: <
2030
          name: "dummyID"
2031
          value: "58215"
2032
        >
2033
        value: -0.00019
2034
        timestamp: <
2035
          seconds: 1625851055
2036
          nanos: 146848599
2037
        >
2038
      >
2039
    >
2040
    schema: 3
2041
    zero_threshold: 2.938735877055719e-39
2042
    zero_count: 2
2043
    negative_span: <
2044
      offset: -162
2045
      length: 1
2046
    >
2047
    negative_span: <
2048
      offset: 23
2049
      length: 4
2050
    >
2051
    negative_delta: 1
2052
    negative_delta: 3
2053
    negative_delta: -2
2054
    negative_delta: -1
2055
    negative_delta: 1
2056
    positive_span: <
2057
      offset: -161
2058
      length: 1
2059
    >
2060
    positive_span: <
2061
      offset: 8
2062
      length: 3
2063
    >
2064
    positive_delta: 1
2065
    positive_delta: 2
2066
    positive_delta: -1
2067
    positive_delta: -1
2068
  >
2069
  timestamp_ms: 1234568
2070
>
2071

2072
`,
2073
			scrapeClassicHistograms: true,
2074
			contentType:             "application/vnd.google.protobuf",
2075
			floats: []floatSample{
2076
				{metric: labels.FromStrings("__name__", "test_histogram_count"), t: 1234568, f: 175},
2077
				{metric: labels.FromStrings("__name__", "test_histogram_sum"), t: 1234568, f: 0.0008280461746287094},
2078
				{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0004899999999999998"), t: 1234568, f: 2},
2079
				{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0003899999999999998"), t: 1234568, f: 4},
2080
				{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0002899999999999998"), t: 1234568, f: 16},
2081
				{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0001899999999999998"), t: 1234568, f: 32},
2082
				{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"), t: 1234568, f: 175},
2083
			},
2084
			histograms: []histogramSample{{
2085
				t: 1234568,
2086
				h: &histogram.Histogram{
2087
					Count:         175,
2088
					ZeroCount:     2,
2089
					Sum:           0.0008280461746287094,
2090
					ZeroThreshold: 2.938735877055719e-39,
2091
					Schema:        3,
2092
					PositiveSpans: []histogram.Span{
2093
						{Offset: -161, Length: 1},
2094
						{Offset: 8, Length: 3},
2095
					},
2096
					NegativeSpans: []histogram.Span{
2097
						{Offset: -162, Length: 1},
2098
						{Offset: 23, Length: 4},
2099
					},
2100
					PositiveBuckets: []int64{1, 2, -1, -1},
2101
					NegativeBuckets: []int64{1, 3, -2, -1, 1},
2102
				},
2103
			}},
2104
			exemplars: []exemplar.Exemplar{
2105
				// Native histogram one is arranged by timestamp.
2106
				// Exemplars with missing timestamps are dropped for native histograms.
2107
				{Labels: labels.FromStrings("dummyID", "58215"), Value: -0.00019, Ts: 1625851055146, HasTs: true},
2108
				{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
2109
				// Classic histogram one is in order of appearance.
2110
				// Exemplars with missing timestamps are supported for classic histograms.
2111
				{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
2112
				{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, Ts: 1234568, HasTs: false},
2113
				{Labels: labels.FromStrings("dummyID", "58215"), Value: -0.00019, Ts: 1625851055146, HasTs: true},
2114
			},
2115
		},
2116
	}
2117

2118
	for _, test := range tests {
2119
		t.Run(test.title, func(t *testing.T) {
2120
			app := &collectResultAppender{}
2121

2122
			discoveryLabels := &Target{
2123
				labels: labels.FromStrings(test.discoveryLabels...),
2124
			}
2125

2126
			sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
2127
			sl.enableNativeHistogramIngestion = test.enableNativeHistogramsIngestion
2128
			sl.sampleMutator = func(l labels.Labels) labels.Labels {
2129
				return mutateSampleLabels(l, discoveryLabels, false, nil)
2130
			}
2131
			sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
2132
				return mutateReportSampleLabels(l, discoveryLabels)
2133
			}
2134
			sl.scrapeClassicHistograms = test.scrapeClassicHistograms
2135

2136
			now := time.Now()
2137

2138
			for i := range test.floats {
2139
				if test.floats[i].t != 0 {
2140
					continue
2141
				}
2142
				test.floats[i].t = timestamp.FromTime(now)
2143
			}
2144

2145
			// We need to set the timestamp for expected exemplars that does not have a timestamp.
2146
			for i := range test.exemplars {
2147
				if test.exemplars[i].Ts == 0 {
2148
					test.exemplars[i].Ts = timestamp.FromTime(now)
2149
				}
2150
			}
2151

2152
			buf := &bytes.Buffer{}
2153
			if test.contentType == "application/vnd.google.protobuf" {
2154
				// In case of protobuf, we have to create the binary representation.
2155
				pb := &dto.MetricFamily{}
2156
				// From text to proto message.
2157
				require.NoError(t, proto.UnmarshalText(test.scrapeText, pb))
2158
				// From proto message to binary protobuf.
2159
				protoBuf, err := proto.Marshal(pb)
2160
				require.NoError(t, err)
2161

2162
				// Write first length, then binary protobuf.
2163
				varintBuf := binary.AppendUvarint(nil, uint64(len(protoBuf)))
2164
				buf.Write(varintBuf)
2165
				buf.Write(protoBuf)
2166
			} else {
2167
				buf.WriteString(test.scrapeText)
2168
			}
2169

2170
			_, _, _, err := sl.append(app, buf.Bytes(), test.contentType, now)
2171
			require.NoError(t, err)
2172
			require.NoError(t, app.Commit())
2173
			requireEqual(t, test.floats, app.resultFloats)
2174
			requireEqual(t, test.histograms, app.resultHistograms)
2175
			requireEqual(t, test.exemplars, app.resultExemplars)
2176
		})
2177
	}
2178
}
2179

2180
func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
2181
	scrapeText := []string{`metric_total{n="1"} 1 # {t="1"} 1.0 10000
2182
# EOF`, `metric_total{n="1"} 2 # {t="2"} 2.0 20000
2183
# EOF`}
2184
	samples := []floatSample{{
2185
		metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
2186
		f:      1,
2187
	}, {
2188
		metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
2189
		f:      2,
2190
	}}
2191
	exemplars := []exemplar.Exemplar{
2192
		{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
2193
		{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
2194
	}
2195
	discoveryLabels := &Target{
2196
		labels: labels.FromStrings(),
2197
	}
2198

2199
	app := &collectResultAppender{}
2200

2201
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
2202
	sl.sampleMutator = func(l labels.Labels) labels.Labels {
2203
		return mutateSampleLabels(l, discoveryLabels, false, nil)
2204
	}
2205
	sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
2206
		return mutateReportSampleLabels(l, discoveryLabels)
2207
	}
2208

2209
	now := time.Now()
2210

2211
	for i := range samples {
2212
		ts := now.Add(time.Second * time.Duration(i))
2213
		samples[i].t = timestamp.FromTime(ts)
2214
	}
2215

2216
	// We need to set the timestamp for expected exemplars that does not have a timestamp.
2217
	for i := range exemplars {
2218
		if exemplars[i].Ts == 0 {
2219
			ts := now.Add(time.Second * time.Duration(i))
2220
			exemplars[i].Ts = timestamp.FromTime(ts)
2221
		}
2222
	}
2223

2224
	for i, st := range scrapeText {
2225
		_, _, _, err := sl.append(app, []byte(st), "application/openmetrics-text", timestamp.Time(samples[i].t))
2226
		require.NoError(t, err)
2227
		require.NoError(t, app.Commit())
2228
	}
2229

2230
	requireEqual(t, samples, app.resultFloats)
2231
	requireEqual(t, exemplars, app.resultExemplars)
2232
}
2233

2234
func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
2235
	var (
2236
		scraper  = &testScraper{}
2237
		appender = &collectResultAppender{}
2238
		app      = func(ctx context.Context) storage.Appender { return appender }
2239
	)
2240

2241
	ctx, cancel := context.WithCancel(context.Background())
2242
	sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
2243

2244
	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
2245
		cancel()
2246
		return errors.New("scrape failed")
2247
	}
2248

2249
	sl.run(nil)
2250
	require.Equal(t, 0.0, appender.resultFloats[0].f, "bad 'up' value")
2251
}
2252

2253
func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
2254
	var (
2255
		scraper  = &testScraper{}
2256
		appender = &collectResultAppender{}
2257
		app      = func(ctx context.Context) storage.Appender { return appender }
2258
	)
2259

2260
	ctx, cancel := context.WithCancel(context.Background())
2261
	sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
2262

2263
	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
2264
		cancel()
2265
		w.Write([]byte("a{l=\"\xff\"} 1\n"))
2266
		return nil
2267
	}
2268

2269
	sl.run(nil)
2270
	require.Equal(t, 0.0, appender.resultFloats[0].f, "bad 'up' value")
2271
}
2272

2273
type errorAppender struct {
2274
	collectResultAppender
2275
}
2276

2277
func (app *errorAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
2278
	switch lset.Get(model.MetricNameLabel) {
2279
	case "out_of_order":
2280
		return 0, storage.ErrOutOfOrderSample
2281
	case "amend":
2282
		return 0, storage.ErrDuplicateSampleForTimestamp
2283
	case "out_of_bounds":
2284
		return 0, storage.ErrOutOfBounds
2285
	default:
2286
		return app.collectResultAppender.Append(ref, lset, t, v)
2287
	}
2288
}
2289

2290
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
2291
	app := &errorAppender{}
2292
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
2293

2294
	now := time.Unix(1, 0)
2295
	slApp := sl.appender(context.Background())
2296
	total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
2297
	require.NoError(t, err)
2298
	require.NoError(t, slApp.Commit())
2299

2300
	want := []floatSample{
2301
		{
2302
			metric: labels.FromStrings(model.MetricNameLabel, "normal"),
2303
			t:      timestamp.FromTime(now),
2304
			f:      1,
2305
		},
2306
	}
2307
	requireEqual(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
2308
	require.Equal(t, 4, total)
2309
	require.Equal(t, 4, added)
2310
	require.Equal(t, 1, seriesAdded)
2311
}
2312

2313
func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
2314
	app := &collectResultAppender{}
2315
	sl := newBasicScrapeLoop(t, context.Background(), nil,
2316
		func(ctx context.Context) storage.Appender {
2317
			return &timeLimitAppender{
2318
				Appender: app,
2319
				maxTime:  timestamp.FromTime(time.Now().Add(10 * time.Minute)),
2320
			}
2321
		},
2322
		0,
2323
	)
2324

2325
	now := time.Now().Add(20 * time.Minute)
2326
	slApp := sl.appender(context.Background())
2327
	total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now)
2328
	require.NoError(t, err)
2329
	require.NoError(t, slApp.Commit())
2330
	require.Equal(t, 1, total)
2331
	require.Equal(t, 1, added)
2332
	require.Equal(t, 0, seriesAdded)
2333
}
2334

2335
func TestTargetScraperScrapeOK(t *testing.T) {
2336
	const (
2337
		configTimeout   = 1500 * time.Millisecond
2338
		expectedTimeout = "1.5"
2339
	)
2340

2341
	var protobufParsing bool
2342

2343
	server := httptest.NewServer(
2344
		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2345
			if protobufParsing {
2346
				accept := r.Header.Get("Accept")
2347
				require.True(t, strings.HasPrefix(accept, "application/vnd.google.protobuf;"),
2348
					"Expected Accept header to prefer application/vnd.google.protobuf.")
2349
			}
2350

2351
			timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
2352
			require.Equal(t, expectedTimeout, timeout, "Expected scrape timeout header.")
2353

2354
			w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
2355
			w.Write([]byte("metric_a 1\nmetric_b 2\n"))
2356
		}),
2357
	)
2358
	defer server.Close()
2359

2360
	serverURL, err := url.Parse(server.URL)
2361
	if err != nil {
2362
		panic(err)
2363
	}
2364

2365
	runTest := func(acceptHeader string) {
2366
		ts := &targetScraper{
2367
			Target: &Target{
2368
				labels: labels.FromStrings(
2369
					model.SchemeLabel, serverURL.Scheme,
2370
					model.AddressLabel, serverURL.Host,
2371
				),
2372
			},
2373
			client:       http.DefaultClient,
2374
			timeout:      configTimeout,
2375
			acceptHeader: acceptHeader,
2376
		}
2377
		var buf bytes.Buffer
2378

2379
		resp, err := ts.scrape(context.Background())
2380
		require.NoError(t, err)
2381
		contentType, err := ts.readResponse(context.Background(), resp, &buf)
2382
		require.NoError(t, err)
2383
		require.Equal(t, "text/plain; version=0.0.4", contentType)
2384
		require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
2385
	}
2386

2387
	runTest(acceptHeader(config.DefaultScrapeProtocols))
2388
	protobufParsing = true
2389
	runTest(acceptHeader(config.DefaultProtoFirstScrapeProtocols))
2390
}
2391

2392
func TestTargetScrapeScrapeCancel(t *testing.T) {
2393
	block := make(chan struct{})
2394

2395
	server := httptest.NewServer(
2396
		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2397
			<-block
2398
		}),
2399
	)
2400
	defer server.Close()
2401

2402
	serverURL, err := url.Parse(server.URL)
2403
	if err != nil {
2404
		panic(err)
2405
	}
2406

2407
	ts := &targetScraper{
2408
		Target: &Target{
2409
			labels: labels.FromStrings(
2410
				model.SchemeLabel, serverURL.Scheme,
2411
				model.AddressLabel, serverURL.Host,
2412
			),
2413
		},
2414
		client:       http.DefaultClient,
2415
		acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
2416
	}
2417
	ctx, cancel := context.WithCancel(context.Background())
2418

2419
	errc := make(chan error, 1)
2420

2421
	go func() {
2422
		time.Sleep(1 * time.Second)
2423
		cancel()
2424
	}()
2425

2426
	go func() {
2427
		_, err := ts.scrape(ctx)
2428
		switch {
2429
		case err == nil:
2430
			errc <- errors.New("Expected error but got nil")
2431
		case !errors.Is(ctx.Err(), context.Canceled):
2432
			errc <- fmt.Errorf("Expected context cancellation error but got: %w", ctx.Err())
2433
		default:
2434
			close(errc)
2435
		}
2436
	}()
2437

2438
	select {
2439
	case <-time.After(5 * time.Second):
2440
		require.FailNow(t, "Scrape function did not return unexpectedly.")
2441
	case err := <-errc:
2442
		require.NoError(t, err)
2443
	}
2444
	// If this is closed in a defer above the function the test server
2445
	// doesn't terminate and the test doesn't complete.
2446
	close(block)
2447
}
2448

2449
func TestTargetScrapeScrapeNotFound(t *testing.T) {
2450
	server := httptest.NewServer(
2451
		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2452
			w.WriteHeader(http.StatusNotFound)
2453
		}),
2454
	)
2455
	defer server.Close()
2456

2457
	serverURL, err := url.Parse(server.URL)
2458
	if err != nil {
2459
		panic(err)
2460
	}
2461

2462
	ts := &targetScraper{
2463
		Target: &Target{
2464
			labels: labels.FromStrings(
2465
				model.SchemeLabel, serverURL.Scheme,
2466
				model.AddressLabel, serverURL.Host,
2467
			),
2468
		},
2469
		client:       http.DefaultClient,
2470
		acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
2471
	}
2472

2473
	resp, err := ts.scrape(context.Background())
2474
	require.NoError(t, err)
2475
	_, err = ts.readResponse(context.Background(), resp, io.Discard)
2476
	require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err)
2477
}
2478

2479
func TestTargetScraperBodySizeLimit(t *testing.T) {
2480
	const (
2481
		bodySizeLimit = 15
2482
		responseBody  = "metric_a 1\nmetric_b 2\n"
2483
	)
2484
	var gzipResponse bool
2485
	server := httptest.NewServer(
2486
		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2487
			w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
2488
			if gzipResponse {
2489
				w.Header().Set("Content-Encoding", "gzip")
2490
				gw := gzip.NewWriter(w)
2491
				defer gw.Close()
2492
				gw.Write([]byte(responseBody))
2493
				return
2494
			}
2495
			w.Write([]byte(responseBody))
2496
		}),
2497
	)
2498
	defer server.Close()
2499

2500
	serverURL, err := url.Parse(server.URL)
2501
	if err != nil {
2502
		panic(err)
2503
	}
2504

2505
	ts := &targetScraper{
2506
		Target: &Target{
2507
			labels: labels.FromStrings(
2508
				model.SchemeLabel, serverURL.Scheme,
2509
				model.AddressLabel, serverURL.Host,
2510
			),
2511
		},
2512
		client:        http.DefaultClient,
2513
		bodySizeLimit: bodySizeLimit,
2514
		acceptHeader:  acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
2515
		metrics:       newTestScrapeMetrics(t),
2516
	}
2517
	var buf bytes.Buffer
2518

2519
	// Target response uncompressed body, scrape with body size limit.
2520
	resp, err := ts.scrape(context.Background())
2521
	require.NoError(t, err)
2522
	_, err = ts.readResponse(context.Background(), resp, &buf)
2523
	require.ErrorIs(t, err, errBodySizeLimit)
2524
	require.Equal(t, bodySizeLimit, buf.Len())
2525
	// Target response gzip compressed body, scrape with body size limit.
2526
	gzipResponse = true
2527
	buf.Reset()
2528
	resp, err = ts.scrape(context.Background())
2529
	require.NoError(t, err)
2530
	_, err = ts.readResponse(context.Background(), resp, &buf)
2531
	require.ErrorIs(t, err, errBodySizeLimit)
2532
	require.Equal(t, bodySizeLimit, buf.Len())
2533
	// Target response uncompressed body, scrape without body size limit.
2534
	gzipResponse = false
2535
	buf.Reset()
2536
	ts.bodySizeLimit = 0
2537
	resp, err = ts.scrape(context.Background())
2538
	require.NoError(t, err)
2539
	_, err = ts.readResponse(context.Background(), resp, &buf)
2540
	require.NoError(t, err)
2541
	require.Len(t, responseBody, buf.Len())
2542
	// Target response gzip compressed body, scrape without body size limit.
2543
	gzipResponse = true
2544
	buf.Reset()
2545
	resp, err = ts.scrape(context.Background())
2546
	require.NoError(t, err)
2547
	_, err = ts.readResponse(context.Background(), resp, &buf)
2548
	require.NoError(t, err)
2549
	require.Len(t, responseBody, buf.Len())
2550
}
2551

2552
// testScraper implements the scraper interface and allows setting values
2553
// returned by its methods. It also allows setting a custom scrape function.
2554
type testScraper struct {
2555
	offsetDur time.Duration
2556

2557
	lastStart    time.Time
2558
	lastDuration time.Duration
2559
	lastError    error
2560

2561
	scrapeErr  error
2562
	scrapeFunc func(context.Context, io.Writer) error
2563
}
2564

2565
func (ts *testScraper) offset(time.Duration, uint64) time.Duration {
2566
	return ts.offsetDur
2567
}
2568

2569
func (ts *testScraper) Report(start time.Time, duration time.Duration, err error) {
2570
	ts.lastStart = start
2571
	ts.lastDuration = duration
2572
	ts.lastError = err
2573
}
2574

2575
func (ts *testScraper) scrape(ctx context.Context) (*http.Response, error) {
2576
	return nil, ts.scrapeErr
2577
}
2578

2579
func (ts *testScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) {
2580
	if ts.scrapeFunc != nil {
2581
		return "", ts.scrapeFunc(ctx, w)
2582
	}
2583
	return "", ts.scrapeErr
2584
}
2585

2586
func TestScrapeLoop_RespectTimestamps(t *testing.T) {
2587
	s := teststorage.New(t)
2588
	defer s.Close()
2589

2590
	app := s.Appender(context.Background())
2591
	capp := &collectResultAppender{next: app}
2592
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0)
2593

2594
	now := time.Now()
2595
	slApp := sl.appender(context.Background())
2596
	_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
2597
	require.NoError(t, err)
2598
	require.NoError(t, slApp.Commit())
2599

2600
	want := []floatSample{
2601
		{
2602
			metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
2603
			t:      0,
2604
			f:      1,
2605
		},
2606
	}
2607
	require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
2608
}
2609

2610
func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
2611
	s := teststorage.New(t)
2612
	defer s.Close()
2613

2614
	app := s.Appender(context.Background())
2615

2616
	capp := &collectResultAppender{next: app}
2617

2618
	sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0)
2619
	sl.honorTimestamps = false
2620

2621
	now := time.Now()
2622
	slApp := sl.appender(context.Background())
2623
	_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
2624
	require.NoError(t, err)
2625
	require.NoError(t, slApp.Commit())
2626

2627
	want := []floatSample{
2628
		{
2629
			metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
2630
			t:      timestamp.FromTime(now),
2631
			f:      1,
2632
		},
2633
	}
2634
	require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
2635
}
2636

2637
func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
2638
	s := teststorage.New(t)
2639
	defer s.Close()
2640

2641
	ctx, cancel := context.WithCancel(context.Background())
2642
	sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0)
2643
	defer cancel()
2644

2645
	// We add a good and a bad metric to check that both are discarded.
2646
	slApp := sl.appender(ctx)
2647
	_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
2648
	require.Error(t, err)
2649
	require.NoError(t, slApp.Rollback())
2650
	// We need to cycle staleness cache maps after a manual rollback. Otherwise they will have old entries in them,
2651
	// which would cause ErrDuplicateSampleForTimestamp errors on the next append.
2652
	sl.cache.iterDone(true)
2653

2654
	q, err := s.Querier(time.Time{}.UnixNano(), 0)
2655
	require.NoError(t, err)
2656
	series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
2657
	require.False(t, series.Next(), "series found in tsdb")
2658
	require.NoError(t, series.Err())
2659

2660
	// We add a good metric to check that it is recorded.
2661
	slApp = sl.appender(ctx)
2662
	_, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
2663
	require.NoError(t, err)
2664
	require.NoError(t, slApp.Commit())
2665

2666
	q, err = s.Querier(time.Time{}.UnixNano(), 0)
2667
	require.NoError(t, err)
2668
	series = q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
2669
	require.True(t, series.Next(), "series not found in tsdb")
2670
	require.NoError(t, series.Err())
2671
	require.False(t, series.Next(), "more than one series found in tsdb")
2672
}
2673

2674
func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
2675
	s := teststorage.New(t)
2676
	defer s.Close()
2677

2678
	app := s.Appender(context.Background())
2679

2680
	ctx, cancel := context.WithCancel(context.Background())
2681
	sl := newBasicScrapeLoop(t, context.Background(), &testScraper{}, func(ctx context.Context) storage.Appender { return app }, 0)
2682
	sl.sampleMutator = func(l labels.Labels) labels.Labels {
2683
		if l.Has("drop") {
2684
			return labels.FromStrings("no", "name") // This label set will trigger an error.
2685
		}
2686
		return l
2687
	}
2688
	defer cancel()
2689

2690
	slApp := sl.appender(context.Background())
2691
	_, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
2692
	require.Error(t, err)
2693
	require.NoError(t, slApp.Rollback())
2694
	require.Equal(t, errNameLabelMandatory, err)
2695

2696
	q, err := s.Querier(time.Time{}.UnixNano(), 0)
2697
	require.NoError(t, err)
2698
	series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
2699
	require.False(t, series.Next(), "series found in tsdb")
2700
	require.NoError(t, series.Err())
2701
}
2702

2703
func TestReusableConfig(t *testing.T) {
2704
	variants := []*config.ScrapeConfig{
2705
		{
2706
			JobName:       "prometheus",
2707
			ScrapeTimeout: model.Duration(15 * time.Second),
2708
		},
2709
		{
2710
			JobName:       "httpd",
2711
			ScrapeTimeout: model.Duration(15 * time.Second),
2712
		},
2713
		{
2714
			JobName:       "prometheus",
2715
			ScrapeTimeout: model.Duration(5 * time.Second),
2716
		},
2717
		{
2718
			JobName:     "prometheus",
2719
			MetricsPath: "/metrics",
2720
		},
2721
		{
2722
			JobName:     "prometheus",
2723
			MetricsPath: "/metrics2",
2724
		},
2725
		{
2726
			JobName:       "prometheus",
2727
			ScrapeTimeout: model.Duration(5 * time.Second),
2728
			MetricsPath:   "/metrics2",
2729
		},
2730
		{
2731
			JobName:        "prometheus",
2732
			ScrapeInterval: model.Duration(5 * time.Second),
2733
			MetricsPath:    "/metrics2",
2734
		},
2735
		{
2736
			JobName:        "prometheus",
2737
			ScrapeInterval: model.Duration(5 * time.Second),
2738
			SampleLimit:    1000,
2739
			MetricsPath:    "/metrics2",
2740
		},
2741
	}
2742

2743
	match := [][]int{
2744
		{0, 2},
2745
		{4, 5},
2746
		{4, 6},
2747
		{4, 7},
2748
		{5, 6},
2749
		{5, 7},
2750
		{6, 7},
2751
	}
2752
	noMatch := [][]int{
2753
		{1, 2},
2754
		{0, 4},
2755
		{3, 4},
2756
	}
2757

2758
	for i, m := range match {
2759
		require.True(t, reusableCache(variants[m[0]], variants[m[1]]), "match test %d", i)
2760
		require.True(t, reusableCache(variants[m[1]], variants[m[0]]), "match test %d", i)
2761
		require.True(t, reusableCache(variants[m[1]], variants[m[1]]), "match test %d", i)
2762
		require.True(t, reusableCache(variants[m[0]], variants[m[0]]), "match test %d", i)
2763
	}
2764
	for i, m := range noMatch {
2765
		require.False(t, reusableCache(variants[m[0]], variants[m[1]]), "not match test %d", i)
2766
		require.False(t, reusableCache(variants[m[1]], variants[m[0]]), "not match test %d", i)
2767
	}
2768
}
2769

2770
func TestReuseScrapeCache(t *testing.T) {
2771
	var (
2772
		app = &nopAppendable{}
2773
		cfg = &config.ScrapeConfig{
2774
			JobName:        "Prometheus",
2775
			ScrapeTimeout:  model.Duration(5 * time.Second),
2776
			ScrapeInterval: model.Duration(5 * time.Second),
2777
			MetricsPath:    "/metrics",
2778
		}
2779
		sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
2780
		t1    = &Target{
2781
			discoveredLabels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"),
2782
		}
2783
		proxyURL, _ = url.Parse("http://localhost:2128")
2784
	)
2785
	defer sp.stop()
2786
	sp.sync([]*Target{t1})
2787

2788
	steps := []struct {
2789
		keep      bool
2790
		newConfig *config.ScrapeConfig
2791
	}{
2792
		{
2793
			keep: true,
2794
			newConfig: &config.ScrapeConfig{
2795
				JobName:        "Prometheus",
2796
				ScrapeInterval: model.Duration(5 * time.Second),
2797
				ScrapeTimeout:  model.Duration(5 * time.Second),
2798
				MetricsPath:    "/metrics",
2799
			},
2800
		},
2801
		{
2802
			keep: false,
2803
			newConfig: &config.ScrapeConfig{
2804
				JobName:        "Prometheus",
2805
				ScrapeInterval: model.Duration(5 * time.Second),
2806
				ScrapeTimeout:  model.Duration(15 * time.Second),
2807
				MetricsPath:    "/metrics2",
2808
			},
2809
		},
2810
		{
2811
			keep: true,
2812
			newConfig: &config.ScrapeConfig{
2813
				JobName:        "Prometheus",
2814
				SampleLimit:    400,
2815
				ScrapeInterval: model.Duration(5 * time.Second),
2816
				ScrapeTimeout:  model.Duration(15 * time.Second),
2817
				MetricsPath:    "/metrics2",
2818
			},
2819
		},
2820
		{
2821
			keep: false,
2822
			newConfig: &config.ScrapeConfig{
2823
				JobName:         "Prometheus",
2824
				HonorTimestamps: true,
2825
				SampleLimit:     400,
2826
				ScrapeInterval:  model.Duration(5 * time.Second),
2827
				ScrapeTimeout:   model.Duration(15 * time.Second),
2828
				MetricsPath:     "/metrics2",
2829
			},
2830
		},
2831
		{
2832
			keep: true,
2833
			newConfig: &config.ScrapeConfig{
2834
				JobName:         "Prometheus",
2835
				HonorTimestamps: true,
2836
				SampleLimit:     400,
2837
				HTTPClientConfig: config_util.HTTPClientConfig{
2838
					ProxyConfig: config_util.ProxyConfig{ProxyURL: config_util.URL{URL: proxyURL}},
2839
				},
2840
				ScrapeInterval: model.Duration(5 * time.Second),
2841
				ScrapeTimeout:  model.Duration(15 * time.Second),
2842
				MetricsPath:    "/metrics2",
2843
			},
2844
		},
2845
		{
2846
			keep: false,
2847
			newConfig: &config.ScrapeConfig{
2848
				JobName:         "Prometheus",
2849
				HonorTimestamps: true,
2850
				HonorLabels:     true,
2851
				SampleLimit:     400,
2852
				ScrapeInterval:  model.Duration(5 * time.Second),
2853
				ScrapeTimeout:   model.Duration(15 * time.Second),
2854
				MetricsPath:     "/metrics2",
2855
			},
2856
		},
2857
		{
2858
			keep: false,
2859
			newConfig: &config.ScrapeConfig{
2860
				JobName:        "Prometheus",
2861
				ScrapeInterval: model.Duration(5 * time.Second),
2862
				ScrapeTimeout:  model.Duration(15 * time.Second),
2863
				MetricsPath:    "/metrics",
2864
				LabelLimit:     1,
2865
			},
2866
		},
2867
		{
2868
			keep: false,
2869
			newConfig: &config.ScrapeConfig{
2870
				JobName:        "Prometheus",
2871
				ScrapeInterval: model.Duration(5 * time.Second),
2872
				ScrapeTimeout:  model.Duration(15 * time.Second),
2873
				MetricsPath:    "/metrics",
2874
				LabelLimit:     15,
2875
			},
2876
		},
2877
		{
2878
			keep: false,
2879
			newConfig: &config.ScrapeConfig{
2880
				JobName:              "Prometheus",
2881
				ScrapeInterval:       model.Duration(5 * time.Second),
2882
				ScrapeTimeout:        model.Duration(15 * time.Second),
2883
				MetricsPath:          "/metrics",
2884
				LabelLimit:           15,
2885
				LabelNameLengthLimit: 5,
2886
			},
2887
		},
2888
		{
2889
			keep: false,
2890
			newConfig: &config.ScrapeConfig{
2891
				JobName:               "Prometheus",
2892
				ScrapeInterval:        model.Duration(5 * time.Second),
2893
				ScrapeTimeout:         model.Duration(15 * time.Second),
2894
				MetricsPath:           "/metrics",
2895
				LabelLimit:            15,
2896
				LabelNameLengthLimit:  5,
2897
				LabelValueLengthLimit: 7,
2898
			},
2899
		},
2900
	}
2901

2902
	cacheAddr := func(sp *scrapePool) map[uint64]string {
2903
		r := make(map[uint64]string)
2904
		for fp, l := range sp.loops {
2905
			r[fp] = fmt.Sprintf("%p", l.getCache())
2906
		}
2907
		return r
2908
	}
2909

2910
	for i, s := range steps {
2911
		initCacheAddr := cacheAddr(sp)
2912
		sp.reload(s.newConfig)
2913
		for fp, newCacheAddr := range cacheAddr(sp) {
2914
			if s.keep {
2915
				require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are not the same", i)
2916
			} else {
2917
				require.NotEqual(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are the same", i)
2918
			}
2919
		}
2920
		initCacheAddr = cacheAddr(sp)
2921
		sp.reload(s.newConfig)
2922
		for fp, newCacheAddr := range cacheAddr(sp) {
2923
			require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: reloading the exact config invalidates the cache", i)
2924
		}
2925
	}
2926
}
2927

2928
func TestScrapeAddFast(t *testing.T) {
2929
	s := teststorage.New(t)
2930
	defer s.Close()
2931

2932
	ctx, cancel := context.WithCancel(context.Background())
2933
	sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0)
2934
	defer cancel()
2935

2936
	slApp := sl.appender(ctx)
2937
	_, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{})
2938
	require.NoError(t, err)
2939
	require.NoError(t, slApp.Commit())
2940

2941
	// Poison the cache. There is just one entry, and one series in the
2942
	// storage. Changing the ref will create a 'not found' error.
2943
	for _, v := range sl.getCache().series {
2944
		v.ref++
2945
	}
2946

2947
	slApp = sl.appender(ctx)
2948
	_, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second))
2949
	require.NoError(t, err)
2950
	require.NoError(t, slApp.Commit())
2951
}
2952

2953
func TestReuseCacheRace(t *testing.T) {
2954
	var (
2955
		app = &nopAppendable{}
2956
		cfg = &config.ScrapeConfig{
2957
			JobName:        "Prometheus",
2958
			ScrapeTimeout:  model.Duration(5 * time.Second),
2959
			ScrapeInterval: model.Duration(5 * time.Second),
2960
			MetricsPath:    "/metrics",
2961
		}
2962
		buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
2963
		sp, _   = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t))
2964
		t1      = &Target{
2965
			discoveredLabels: labels.FromStrings("labelNew", "nameNew"),
2966
		}
2967
	)
2968
	defer sp.stop()
2969
	sp.sync([]*Target{t1})
2970

2971
	start := time.Now()
2972
	for i := uint(1); i > 0; i++ {
2973
		if time.Since(start) > 5*time.Second {
2974
			break
2975
		}
2976
		sp.reload(&config.ScrapeConfig{
2977
			JobName:        "Prometheus",
2978
			ScrapeTimeout:  model.Duration(1 * time.Millisecond),
2979
			ScrapeInterval: model.Duration(1 * time.Millisecond),
2980
			MetricsPath:    "/metrics",
2981
			SampleLimit:    i,
2982
		})
2983
	}
2984
}
2985

2986
func TestCheckAddError(t *testing.T) {
2987
	var appErrs appendErrors
2988
	sl := scrapeLoop{l: log.NewNopLogger(), metrics: newTestScrapeMetrics(t)}
2989
	sl.checkAddError(nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
2990
	require.Equal(t, 1, appErrs.numOutOfOrder)
2991
}
2992

2993
func TestScrapeReportSingleAppender(t *testing.T) {
2994
	s := teststorage.New(t)
2995
	defer s.Close()
2996

2997
	var (
2998
		signal  = make(chan struct{}, 1)
2999
		scraper = &testScraper{}
3000
	)
3001

3002
	ctx, cancel := context.WithCancel(context.Background())
3003
	sl := newBasicScrapeLoop(t, ctx, scraper, s.Appender, 10*time.Millisecond)
3004

3005
	numScrapes := 0
3006

3007
	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
3008
		numScrapes++
3009
		if numScrapes%4 == 0 {
3010
			return fmt.Errorf("scrape failed")
3011
		}
3012
		w.Write([]byte("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n"))
3013
		return nil
3014
	}
3015

3016
	go func() {
3017
		sl.run(nil)
3018
		signal <- struct{}{}
3019
	}()
3020

3021
	start := time.Now()
3022
	for time.Since(start) < 3*time.Second {
3023
		q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
3024
		require.NoError(t, err)
3025
		series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+"))
3026

3027
		c := 0
3028
		for series.Next() {
3029
			i := series.At().Iterator(nil)
3030
			for i.Next() != chunkenc.ValNone {
3031
				c++
3032
			}
3033
		}
3034

3035
		require.Equal(t, 0, c%9, "Appended samples not as expected: %d", c)
3036
		q.Close()
3037
	}
3038
	cancel()
3039

3040
	select {
3041
	case <-signal:
3042
	case <-time.After(5 * time.Second):
3043
		require.FailNow(t, "Scrape wasn't stopped.")
3044
	}
3045
}
3046

3047
func TestScrapeReportLimit(t *testing.T) {
3048
	s := teststorage.New(t)
3049
	defer s.Close()
3050

3051
	cfg := &config.ScrapeConfig{
3052
		JobName:        "test",
3053
		SampleLimit:    5,
3054
		Scheme:         "http",
3055
		ScrapeInterval: model.Duration(100 * time.Millisecond),
3056
		ScrapeTimeout:  model.Duration(100 * time.Millisecond),
3057
	}
3058

3059
	var (
3060
		scrapes      int
3061
		scrapedTwice = make(chan bool)
3062
	)
3063

3064
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3065
		fmt.Fprint(w, "metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n")
3066
		scrapes++
3067
		if scrapes == 2 {
3068
			close(scrapedTwice)
3069
		}
3070
	}))
3071
	defer ts.Close()
3072

3073
	sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
3074
	require.NoError(t, err)
3075
	defer sp.stop()
3076

3077
	testURL, err := url.Parse(ts.URL)
3078
	require.NoError(t, err)
3079
	sp.Sync([]*targetgroup.Group{
3080
		{
3081
			Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(testURL.Host)}},
3082
		},
3083
	})
3084

3085
	select {
3086
	case <-time.After(5 * time.Second):
3087
		t.Fatalf("target was not scraped twice")
3088
	case <-scrapedTwice:
3089
		// If the target has been scraped twice, report samples from the first
3090
		// scrape have been inserted in the database.
3091
	}
3092

3093
	ctx, cancel := context.WithCancel(context.Background())
3094
	defer cancel()
3095
	q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
3096
	require.NoError(t, err)
3097
	defer q.Close()
3098
	series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "up"))
3099

3100
	var found bool
3101
	for series.Next() {
3102
		i := series.At().Iterator(nil)
3103
		for i.Next() == chunkenc.ValFloat {
3104
			_, v := i.At()
3105
			require.Equal(t, 1.0, v)
3106
			found = true
3107
		}
3108
	}
3109

3110
	require.True(t, found)
3111
}
3112

3113
func TestScrapeLoopLabelLimit(t *testing.T) {
3114
	tests := []struct {
3115
		title           string
3116
		scrapeLabels    string
3117
		discoveryLabels []string
3118
		labelLimits     labelLimits
3119
		expectErr       bool
3120
	}{
3121
		{
3122
			title:           "Valid number of labels",
3123
			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
3124
			discoveryLabels: nil,
3125
			labelLimits:     labelLimits{labelLimit: 5},
3126
			expectErr:       false,
3127
		}, {
3128
			title:           "Too many labels",
3129
			scrapeLabels:    `metric{l1="1", l2="2", l3="3", l4="4", l5="5", l6="6"} 0`,
3130
			discoveryLabels: nil,
3131
			labelLimits:     labelLimits{labelLimit: 5},
3132
			expectErr:       true,
3133
		}, {
3134
			title:           "Too many labels including discovery labels",
3135
			scrapeLabels:    `metric{l1="1", l2="2", l3="3", l4="4"} 0`,
3136
			discoveryLabels: []string{"l5", "5", "l6", "6"},
3137
			labelLimits:     labelLimits{labelLimit: 5},
3138
			expectErr:       true,
3139
		}, {
3140
			title:           "Valid labels name length",
3141
			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
3142
			discoveryLabels: nil,
3143
			labelLimits:     labelLimits{labelNameLengthLimit: 10},
3144
			expectErr:       false,
3145
		}, {
3146
			title:           "Label name too long",
3147
			scrapeLabels:    `metric{label_name_too_long="0"} 0`,
3148
			discoveryLabels: nil,
3149
			labelLimits:     labelLimits{labelNameLengthLimit: 10},
3150
			expectErr:       true,
3151
		}, {
3152
			title:           "Discovery label name too long",
3153
			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
3154
			discoveryLabels: []string{"label_name_too_long", "0"},
3155
			labelLimits:     labelLimits{labelNameLengthLimit: 10},
3156
			expectErr:       true,
3157
		}, {
3158
			title:           "Valid labels value length",
3159
			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
3160
			discoveryLabels: nil,
3161
			labelLimits:     labelLimits{labelValueLengthLimit: 10},
3162
			expectErr:       false,
3163
		}, {
3164
			title:           "Label value too long",
3165
			scrapeLabels:    `metric{l1="label_value_too_long"} 0`,
3166
			discoveryLabels: nil,
3167
			labelLimits:     labelLimits{labelValueLengthLimit: 10},
3168
			expectErr:       true,
3169
		}, {
3170
			title:           "Discovery label value too long",
3171
			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
3172
			discoveryLabels: []string{"l1", "label_value_too_long"},
3173
			labelLimits:     labelLimits{labelValueLengthLimit: 10},
3174
			expectErr:       true,
3175
		},
3176
	}
3177

3178
	for _, test := range tests {
3179
		app := &collectResultAppender{}
3180

3181
		discoveryLabels := &Target{
3182
			labels: labels.FromStrings(test.discoveryLabels...),
3183
		}
3184

3185
		sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
3186
		sl.sampleMutator = func(l labels.Labels) labels.Labels {
3187
			return mutateSampleLabels(l, discoveryLabels, false, nil)
3188
		}
3189
		sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
3190
			return mutateReportSampleLabels(l, discoveryLabels)
3191
		}
3192
		sl.labelLimits = &test.labelLimits
3193

3194
		slApp := sl.appender(context.Background())
3195
		_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", time.Now())
3196

3197
		t.Logf("Test:%s", test.title)
3198
		if test.expectErr {
3199
			require.Error(t, err)
3200
		} else {
3201
			require.NoError(t, err)
3202
			require.NoError(t, slApp.Commit())
3203
		}
3204
	}
3205
}
3206

3207
func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
3208
	interval, _ := model.ParseDuration("2s")
3209
	timeout, _ := model.ParseDuration("500ms")
3210
	config := &config.ScrapeConfig{
3211
		ScrapeInterval: interval,
3212
		ScrapeTimeout:  timeout,
3213
		RelabelConfigs: []*relabel.Config{
3214
			{
3215
				SourceLabels: model.LabelNames{model.ScrapeIntervalLabel},
3216
				Regex:        relabel.MustNewRegexp("2s"),
3217
				Replacement:  "3s",
3218
				TargetLabel:  model.ScrapeIntervalLabel,
3219
				Action:       relabel.Replace,
3220
			},
3221
			{
3222
				SourceLabels: model.LabelNames{model.ScrapeTimeoutLabel},
3223
				Regex:        relabel.MustNewRegexp("500ms"),
3224
				Replacement:  "750ms",
3225
				TargetLabel:  model.ScrapeTimeoutLabel,
3226
				Action:       relabel.Replace,
3227
			},
3228
		},
3229
	}
3230
	sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
3231
	tgts := []*targetgroup.Group{
3232
		{
3233
			Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},
3234
		},
3235
	}
3236

3237
	sp.Sync(tgts)
3238
	defer sp.stop()
3239

3240
	require.Equal(t, "3s", sp.ActiveTargets()[0].labels.Get(model.ScrapeIntervalLabel))
3241
	require.Equal(t, "750ms", sp.ActiveTargets()[0].labels.Get(model.ScrapeTimeoutLabel))
3242
}
3243

3244
// Testing whether we can remove trailing .0 from histogram 'le' and summary 'quantile' labels.
3245
func TestLeQuantileReLabel(t *testing.T) {
3246
	simpleStorage := teststorage.New(t)
3247
	defer simpleStorage.Close()
3248

3249
	config := &config.ScrapeConfig{
3250
		JobName: "test",
3251
		MetricRelabelConfigs: []*relabel.Config{
3252
			{
3253
				SourceLabels: model.LabelNames{"le", "__name__"},
3254
				Regex:        relabel.MustNewRegexp("(\\d+)\\.0+;.*_bucket"),
3255
				Replacement:  relabel.DefaultRelabelConfig.Replacement,
3256
				Separator:    relabel.DefaultRelabelConfig.Separator,
3257
				TargetLabel:  "le",
3258
				Action:       relabel.Replace,
3259
			},
3260
			{
3261
				SourceLabels: model.LabelNames{"quantile"},
3262
				Regex:        relabel.MustNewRegexp("(\\d+)\\.0+"),
3263
				Replacement:  relabel.DefaultRelabelConfig.Replacement,
3264
				Separator:    relabel.DefaultRelabelConfig.Separator,
3265
				TargetLabel:  "quantile",
3266
				Action:       relabel.Replace,
3267
			},
3268
		},
3269
		SampleLimit:    100,
3270
		Scheme:         "http",
3271
		ScrapeInterval: model.Duration(100 * time.Millisecond),
3272
		ScrapeTimeout:  model.Duration(100 * time.Millisecond),
3273
	}
3274

3275
	metricsText := `
3276
# HELP test_histogram This is a histogram with default buckets
3277
# TYPE test_histogram histogram
3278
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.005"} 0
3279
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.01"} 0
3280
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.025"} 0
3281
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.05"} 0
3282
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.1"} 0
3283
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.25"} 0
3284
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.5"} 0
3285
test_histogram_bucket{address="0.0.0.0",port="5001",le="1.0"} 0
3286
test_histogram_bucket{address="0.0.0.0",port="5001",le="2.5"} 0
3287
test_histogram_bucket{address="0.0.0.0",port="5001",le="5.0"} 0
3288
test_histogram_bucket{address="0.0.0.0",port="5001",le="10.0"} 0
3289
test_histogram_bucket{address="0.0.0.0",port="5001",le="+Inf"} 0
3290
test_histogram_sum{address="0.0.0.0",port="5001"} 0
3291
test_histogram_count{address="0.0.0.0",port="5001"} 0
3292
# HELP test_summary Number of inflight requests sampled at a regular interval. Quantile buckets keep track of inflight requests over the last 60s.
3293
# TYPE test_summary summary
3294
test_summary{quantile="0.5"} 0
3295
test_summary{quantile="0.9"} 0
3296
test_summary{quantile="0.95"} 0
3297
test_summary{quantile="0.99"} 0
3298
test_summary{quantile="1.0"} 1
3299
test_summary_sum 1
3300
test_summary_count 199
3301
`
3302

3303
	// The expected "le" values do not have the trailing ".0".
3304
	expectedLeValues := []string{"0.005", "0.01", "0.025", "0.05", "0.1", "0.25", "0.5", "1", "2.5", "5", "10", "+Inf"}
3305

3306
	// The expected "quantile" values do not have the trailing ".0".
3307
	expectedQuantileValues := []string{"0.5", "0.9", "0.95", "0.99", "1"}
3308

3309
	scrapeCount := 0
3310
	scraped := make(chan bool)
3311

3312
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3313
		fmt.Fprint(w, metricsText)
3314
		scrapeCount++
3315
		if scrapeCount > 2 {
3316
			close(scraped)
3317
		}
3318
	}))
3319
	defer ts.Close()
3320

3321
	sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
3322
	require.NoError(t, err)
3323
	defer sp.stop()
3324

3325
	testURL, err := url.Parse(ts.URL)
3326
	require.NoError(t, err)
3327
	sp.Sync([]*targetgroup.Group{
3328
		{
3329
			Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(testURL.Host)}},
3330
		},
3331
	})
3332
	require.Len(t, sp.ActiveTargets(), 1)
3333

3334
	select {
3335
	case <-time.After(5 * time.Second):
3336
		t.Fatalf("target was not scraped")
3337
	case <-scraped:
3338
	}
3339

3340
	ctx, cancel := context.WithCancel(context.Background())
3341
	defer cancel()
3342
	q, err := simpleStorage.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
3343
	require.NoError(t, err)
3344
	defer q.Close()
3345

3346
	checkValues := func(labelName string, expectedValues []string, series storage.SeriesSet) {
3347
		foundLeValues := map[string]bool{}
3348

3349
		for series.Next() {
3350
			s := series.At()
3351
			v := s.Labels().Get(labelName)
3352
			require.NotContains(t, foundLeValues, v, "duplicate label value found")
3353
			foundLeValues[v] = true
3354
		}
3355

3356
		require.Equal(t, len(expectedValues), len(foundLeValues), "number of label values not as expected")
3357
		for _, v := range expectedValues {
3358
			require.Contains(t, foundLeValues, v, "label value not found")
3359
		}
3360
	}
3361

3362
	series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test_histogram_bucket"))
3363
	checkValues("le", expectedLeValues, series)
3364

3365
	series = q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test_summary"))
3366
	checkValues("quantile", expectedQuantileValues, series)
3367
}
3368

3369
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *testing.T) {
3370
	appender := &collectResultAppender{}
3371
	var (
3372
		signal  = make(chan struct{}, 1)
3373
		scraper = &testScraper{}
3374
		app     = func(ctx context.Context) storage.Appender { return appender }
3375
	)
3376

3377
	ctx, cancel := context.WithCancel(context.Background())
3378
	sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
3379
	sl.trackTimestampsStaleness = true
3380
	// Succeed once, several failures, then stop.
3381
	numScrapes := 0
3382

3383
	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
3384
		numScrapes++
3385

3386
		switch numScrapes {
3387
		case 1:
3388
			w.Write([]byte(fmt.Sprintf("metric_a 42 %d\n", time.Now().UnixNano()/int64(time.Millisecond))))
3389
			return nil
3390
		case 5:
3391
			cancel()
3392
		}
3393
		return errors.New("scrape failed")
3394
	}
3395

3396
	go func() {
3397
		sl.run(nil)
3398
		signal <- struct{}{}
3399
	}()
3400

3401
	select {
3402
	case <-signal:
3403
	case <-time.After(5 * time.Second):
3404
		t.Fatalf("Scrape wasn't stopped.")
3405
	}
3406

3407
	// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
3408
	// each scrape successful or not.
3409
	require.Len(t, appender.resultFloats, 27, "Appended samples not as expected:\n%s", appender)
3410
	require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
3411
	require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
3412
		"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
3413
}
3414

3415
func TestScrapeLoopCompression(t *testing.T) {
3416
	simpleStorage := teststorage.New(t)
3417
	defer simpleStorage.Close()
3418

3419
	metricsText := makeTestMetrics(10)
3420

3421
	for _, tc := range []struct {
3422
		enableCompression bool
3423
		acceptEncoding    string
3424
	}{
3425
		{
3426
			enableCompression: true,
3427
			acceptEncoding:    "gzip",
3428
		},
3429
		{
3430
			enableCompression: false,
3431
			acceptEncoding:    "identity",
3432
		},
3433
	} {
3434
		t.Run(fmt.Sprintf("compression=%v,acceptEncoding=%s", tc.enableCompression, tc.acceptEncoding), func(t *testing.T) {
3435
			scraped := make(chan bool)
3436

3437
			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3438
				require.Equal(t, tc.acceptEncoding, r.Header.Get("Accept-Encoding"), "invalid value of the Accept-Encoding header")
3439
				fmt.Fprint(w, metricsText)
3440
				close(scraped)
3441
			}))
3442
			defer ts.Close()
3443

3444
			config := &config.ScrapeConfig{
3445
				JobName:           "test",
3446
				SampleLimit:       100,
3447
				Scheme:            "http",
3448
				ScrapeInterval:    model.Duration(100 * time.Millisecond),
3449
				ScrapeTimeout:     model.Duration(100 * time.Millisecond),
3450
				EnableCompression: tc.enableCompression,
3451
			}
3452

3453
			sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
3454
			require.NoError(t, err)
3455
			defer sp.stop()
3456

3457
			testURL, err := url.Parse(ts.URL)
3458
			require.NoError(t, err)
3459
			sp.Sync([]*targetgroup.Group{
3460
				{
3461
					Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(testURL.Host)}},
3462
				},
3463
			})
3464
			require.Len(t, sp.ActiveTargets(), 1)
3465

3466
			select {
3467
			case <-time.After(5 * time.Second):
3468
				t.Fatalf("target was not scraped")
3469
			case <-scraped:
3470
			}
3471
		})
3472
	}
3473
}
3474

3475
func TestPickSchema(t *testing.T) {
3476
	tcs := []struct {
3477
		factor float64
3478
		schema int32
3479
	}{
3480
		{
3481
			factor: 65536,
3482
			schema: -4,
3483
		},
3484
		{
3485
			factor: 256,
3486
			schema: -3,
3487
		},
3488
		{
3489
			factor: 16,
3490
			schema: -2,
3491
		},
3492
		{
3493
			factor: 4,
3494
			schema: -1,
3495
		},
3496
		{
3497
			factor: 2,
3498
			schema: 0,
3499
		},
3500
		{
3501
			factor: 1.4,
3502
			schema: 1,
3503
		},
3504
		{
3505
			factor: 1.1,
3506
			schema: 2,
3507
		},
3508
		{
3509
			factor: 1.09,
3510
			schema: 3,
3511
		},
3512
		{
3513
			factor: 1.04,
3514
			schema: 4,
3515
		},
3516
		{
3517
			factor: 1.02,
3518
			schema: 5,
3519
		},
3520
		{
3521
			factor: 1.01,
3522
			schema: 6,
3523
		},
3524
		{
3525
			factor: 1.005,
3526
			schema: 7,
3527
		},
3528
		{
3529
			factor: 1.002,
3530
			schema: 8,
3531
		},
3532
		// The default value of native_histogram_min_bucket_factor
3533
		{
3534
			factor: 0,
3535
			schema: 8,
3536
		},
3537
	}
3538

3539
	for _, tc := range tcs {
3540
		schema := pickSchema(tc.factor)
3541
		require.Equal(t, tc.schema, schema)
3542
	}
3543
}
3544

3545
func BenchmarkTargetScraperGzip(b *testing.B) {
3546
	scenarios := []struct {
3547
		metricsCount int
3548
		body         []byte
3549
	}{
3550
		{metricsCount: 1},
3551
		{metricsCount: 100},
3552
		{metricsCount: 1000},
3553
		{metricsCount: 10000},
3554
		{metricsCount: 100000},
3555
	}
3556

3557
	for i := 0; i < len(scenarios); i++ {
3558
		var buf bytes.Buffer
3559
		var name string
3560
		gw := gzip.NewWriter(&buf)
3561
		for j := 0; j < scenarios[i].metricsCount; j++ {
3562
			name = fmt.Sprintf("go_memstats_alloc_bytes_total_%d", j)
3563
			fmt.Fprintf(gw, "# HELP %s Total number of bytes allocated, even if freed.\n", name)
3564
			fmt.Fprintf(gw, "# TYPE %s counter\n", name)
3565
			fmt.Fprintf(gw, "%s %d\n", name, i*j)
3566
		}
3567
		gw.Close()
3568
		scenarios[i].body = buf.Bytes()
3569
	}
3570

3571
	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3572
		w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
3573
		w.Header().Set("Content-Encoding", "gzip")
3574
		for _, scenario := range scenarios {
3575
			if strconv.Itoa(scenario.metricsCount) == r.URL.Query()["count"][0] {
3576
				w.Write(scenario.body)
3577
				return
3578
			}
3579
		}
3580
		w.WriteHeader(http.StatusBadRequest)
3581
	})
3582

3583
	server := httptest.NewServer(handler)
3584
	defer server.Close()
3585

3586
	serverURL, err := url.Parse(server.URL)
3587
	if err != nil {
3588
		panic(err)
3589
	}
3590

3591
	client, err := config_util.NewClientFromConfig(config_util.DefaultHTTPClientConfig, "test_job")
3592
	if err != nil {
3593
		panic(err)
3594
	}
3595

3596
	for _, scenario := range scenarios {
3597
		b.Run(fmt.Sprintf("metrics=%d", scenario.metricsCount), func(b *testing.B) {
3598
			ts := &targetScraper{
3599
				Target: &Target{
3600
					labels: labels.FromStrings(
3601
						model.SchemeLabel, serverURL.Scheme,
3602
						model.AddressLabel, serverURL.Host,
3603
					),
3604
					params: url.Values{"count": []string{strconv.Itoa(scenario.metricsCount)}},
3605
				},
3606
				client:  client,
3607
				timeout: time.Second,
3608
			}
3609
			b.ResetTimer()
3610
			for i := 0; i < b.N; i++ {
3611
				_, err = ts.scrape(context.Background())
3612
				require.NoError(b, err)
3613
			}
3614
		})
3615
	}
3616
}
3617

3618
// When a scrape contains multiple instances for the same time series we should increment
3619
// prometheus_target_scrapes_sample_duplicate_timestamp_total metric.
3620
func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
3621
	ctx, sl := simpleTestScrapeLoop(t)
3622

3623
	slApp := sl.appender(ctx)
3624
	total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "", time.Time{})
3625
	require.NoError(t, err)
3626
	require.NoError(t, slApp.Commit())
3627
	require.Equal(t, 3, total)
3628
	require.Equal(t, 3, added)
3629
	require.Equal(t, 1, seriesAdded)
3630

3631
	slApp = sl.appender(ctx)
3632
	total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "", time.Time{})
3633
	require.NoError(t, err)
3634
	require.NoError(t, slApp.Commit())
3635
	require.Equal(t, 3, total)
3636
	require.Equal(t, 3, added)
3637
	require.Equal(t, 0, seriesAdded)
3638

3639
	metric := dto.Metric{}
3640
	err = sl.metrics.targetScrapeSampleDuplicate.Write(&metric)
3641
	require.NoError(t, err)
3642
	value := metric.GetCounter().GetValue()
3643
	require.Equal(t, 4.0, value)
3644
}
3645

3646
// This tests running a full scrape loop and checking that the scrape option
3647
// `native_histogram_min_bucket_factor` is used correctly.
3648
func TestNativeHistogramMaxSchemaSet(t *testing.T) {
3649
	testcases := map[string]struct {
3650
		minBucketFactor string
3651
		expectedSchema  int32
3652
	}{
3653
		"min factor not specified": {
3654
			minBucketFactor: "",
3655
			expectedSchema:  3, // Factor 1.09.
3656
		},
3657
		"min factor 1": {
3658
			minBucketFactor: "native_histogram_min_bucket_factor: 1",
3659
			expectedSchema:  3, // Factor 1.09.
3660
		},
3661
		"min factor 2": {
3662
			minBucketFactor: "native_histogram_min_bucket_factor: 2",
3663
			expectedSchema:  0, // Factor 2.00.
3664
		},
3665
	}
3666
	for name, tc := range testcases {
3667
		t.Run(name, func(t *testing.T) {
3668
			testNativeHistogramMaxSchemaSet(t, tc.minBucketFactor, tc.expectedSchema)
3669
		})
3670
	}
3671
}
3672

3673
func testNativeHistogramMaxSchemaSet(t *testing.T, minBucketFactor string, expectedSchema int32) {
3674
	// Create a ProtoBuf message to serve as a Prometheus metric.
3675
	nativeHistogram := prometheus.NewHistogram(
3676
		prometheus.HistogramOpts{
3677
			Namespace:                      "testing",
3678
			Name:                           "example_native_histogram",
3679
			Help:                           "This is used for testing",
3680
			NativeHistogramBucketFactor:    1.1,
3681
			NativeHistogramMaxBucketNumber: 100,
3682
		},
3683
	)
3684
	registry := prometheus.NewRegistry()
3685
	registry.Register(nativeHistogram)
3686
	nativeHistogram.Observe(1.0)
3687
	nativeHistogram.Observe(1.0)
3688
	nativeHistogram.Observe(1.0)
3689
	nativeHistogram.Observe(10.0) // in different bucket since > 1*1.1.
3690
	nativeHistogram.Observe(10.0)
3691

3692
	gathered, err := registry.Gather()
3693
	require.NoError(t, err)
3694
	require.NotEmpty(t, gathered)
3695

3696
	histogramMetricFamily := gathered[0]
3697
	buffer := protoMarshalDelimited(t, histogramMetricFamily)
3698

3699
	// Create a HTTP server to serve /metrics via ProtoBuf
3700
	metricsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3701
		w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
3702
		w.Write(buffer)
3703
	}))
3704
	defer metricsServer.Close()
3705

3706
	// Create a scrape loop with the HTTP server as the target.
3707
	configStr := fmt.Sprintf(`
3708
global:
3709
  scrape_interval: 1s
3710
  scrape_timeout: 1s
3711
scrape_configs:
3712
  - job_name: test
3713
    %s
3714
    static_configs:
3715
      - targets: [%s]
3716
`, minBucketFactor, strings.ReplaceAll(metricsServer.URL, "http://", ""))
3717

3718
	s := teststorage.New(t)
3719
	defer s.Close()
3720
	s.DB.EnableNativeHistograms()
3721
	reg := prometheus.NewRegistry()
3722

3723
	mng, err := NewManager(&Options{EnableNativeHistogramsIngestion: true}, nil, s, reg)
3724
	require.NoError(t, err)
3725
	cfg, err := config.Load(configStr, false, log.NewNopLogger())
3726
	require.NoError(t, err)
3727
	mng.ApplyConfig(cfg)
3728
	tsets := make(chan map[string][]*targetgroup.Group)
3729
	go func() {
3730
		err = mng.Run(tsets)
3731
		require.NoError(t, err)
3732
	}()
3733
	defer mng.Stop()
3734

3735
	// Get the static targets and apply them to the scrape manager.
3736
	require.Len(t, cfg.ScrapeConfigs, 1)
3737
	scrapeCfg := cfg.ScrapeConfigs[0]
3738
	require.Len(t, scrapeCfg.ServiceDiscoveryConfigs, 1)
3739
	staticDiscovery, ok := scrapeCfg.ServiceDiscoveryConfigs[0].(discovery.StaticConfig)
3740
	require.True(t, ok)
3741
	require.Len(t, staticDiscovery, 1)
3742
	tsets <- map[string][]*targetgroup.Group{"test": staticDiscovery}
3743

3744
	// Wait for the scrape loop to scrape the target.
3745
	require.Eventually(t, func() bool {
3746
		q, err := s.Querier(0, math.MaxInt64)
3747
		require.NoError(t, err)
3748
		seriesS := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "testing_example_native_histogram"))
3749
		countSeries := 0
3750
		for seriesS.Next() {
3751
			countSeries++
3752
		}
3753
		return countSeries > 0
3754
	}, 15*time.Second, 100*time.Millisecond)
3755

3756
	// Check that native histogram schema is as expected.
3757
	q, err := s.Querier(0, math.MaxInt64)
3758
	require.NoError(t, err)
3759
	seriesS := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "testing_example_native_histogram"))
3760
	histogramSamples := []*histogram.Histogram{}
3761
	for seriesS.Next() {
3762
		series := seriesS.At()
3763
		it := series.Iterator(nil)
3764
		for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
3765
			if vt != chunkenc.ValHistogram {
3766
				// don't care about other samples
3767
				continue
3768
			}
3769
			_, h := it.AtHistogram(nil)
3770
			histogramSamples = append(histogramSamples, h)
3771
		}
3772
	}
3773
	require.NoError(t, seriesS.Err())
3774
	require.NotEmpty(t, histogramSamples)
3775
	for _, h := range histogramSamples {
3776
		require.Equal(t, expectedSchema, h.Schema)
3777
	}
3778
}
3779

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.