prometheus

Форк
0
/
manager_test.go 
871 строка · 24.9 Кб
1
// Copyright 2013 The Prometheus Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package scrape
15

16
import (
17
	"context"
18
	"fmt"
19
	"net/http"
20
	"net/http/httptest"
21
	"net/url"
22
	"os"
23
	"strconv"
24
	"sync"
25
	"testing"
26
	"time"
27

28
	"github.com/go-kit/log"
29
	"github.com/gogo/protobuf/proto"
30
	"github.com/prometheus/client_golang/prometheus"
31
	dto "github.com/prometheus/client_model/go"
32
	"github.com/prometheus/common/model"
33
	"github.com/stretchr/testify/require"
34
	"google.golang.org/protobuf/types/known/timestamppb"
35
	"gopkg.in/yaml.v2"
36

37
	"github.com/prometheus/prometheus/config"
38
	"github.com/prometheus/prometheus/discovery"
39
	"github.com/prometheus/prometheus/discovery/targetgroup"
40
	"github.com/prometheus/prometheus/model/labels"
41
	"github.com/prometheus/prometheus/model/relabel"
42
	"github.com/prometheus/prometheus/util/runutil"
43
	"github.com/prometheus/prometheus/util/testutil"
44
)
45

46
func TestPopulateLabels(t *testing.T) {
47
	cases := []struct {
48
		in            labels.Labels
49
		cfg           *config.ScrapeConfig
50
		noDefaultPort bool
51
		res           labels.Labels
52
		resOrig       labels.Labels
53
		err           string
54
	}{
55
		// Regular population of scrape config options.
56
		{
57
			in: labels.FromMap(map[string]string{
58
				model.AddressLabel: "1.2.3.4:1000",
59
				"custom":           "value",
60
			}),
61
			cfg: &config.ScrapeConfig{
62
				Scheme:         "https",
63
				MetricsPath:    "/metrics",
64
				JobName:        "job",
65
				ScrapeInterval: model.Duration(time.Second),
66
				ScrapeTimeout:  model.Duration(time.Second),
67
			},
68
			res: labels.FromMap(map[string]string{
69
				model.AddressLabel:        "1.2.3.4:1000",
70
				model.InstanceLabel:       "1.2.3.4:1000",
71
				model.SchemeLabel:         "https",
72
				model.MetricsPathLabel:    "/metrics",
73
				model.JobLabel:            "job",
74
				model.ScrapeIntervalLabel: "1s",
75
				model.ScrapeTimeoutLabel:  "1s",
76
				"custom":                  "value",
77
			}),
78
			resOrig: labels.FromMap(map[string]string{
79
				model.AddressLabel:        "1.2.3.4:1000",
80
				model.SchemeLabel:         "https",
81
				model.MetricsPathLabel:    "/metrics",
82
				model.JobLabel:            "job",
83
				"custom":                  "value",
84
				model.ScrapeIntervalLabel: "1s",
85
				model.ScrapeTimeoutLabel:  "1s",
86
			}),
87
		},
88
		// Pre-define/overwrite scrape config labels.
89
		// Leave out port and expect it to be defaulted to scheme.
90
		{
91
			in: labels.FromMap(map[string]string{
92
				model.AddressLabel:        "1.2.3.4",
93
				model.SchemeLabel:         "http",
94
				model.MetricsPathLabel:    "/custom",
95
				model.JobLabel:            "custom-job",
96
				model.ScrapeIntervalLabel: "2s",
97
				model.ScrapeTimeoutLabel:  "2s",
98
			}),
99
			cfg: &config.ScrapeConfig{
100
				Scheme:         "https",
101
				MetricsPath:    "/metrics",
102
				JobName:        "job",
103
				ScrapeInterval: model.Duration(time.Second),
104
				ScrapeTimeout:  model.Duration(time.Second),
105
			},
106
			res: labels.FromMap(map[string]string{
107
				model.AddressLabel:        "1.2.3.4:80",
108
				model.InstanceLabel:       "1.2.3.4:80",
109
				model.SchemeLabel:         "http",
110
				model.MetricsPathLabel:    "/custom",
111
				model.JobLabel:            "custom-job",
112
				model.ScrapeIntervalLabel: "2s",
113
				model.ScrapeTimeoutLabel:  "2s",
114
			}),
115
			resOrig: labels.FromMap(map[string]string{
116
				model.AddressLabel:        "1.2.3.4",
117
				model.SchemeLabel:         "http",
118
				model.MetricsPathLabel:    "/custom",
119
				model.JobLabel:            "custom-job",
120
				model.ScrapeIntervalLabel: "2s",
121
				model.ScrapeTimeoutLabel:  "2s",
122
			}),
123
		},
124
		// Provide instance label. HTTPS port default for IPv6.
125
		{
126
			in: labels.FromMap(map[string]string{
127
				model.AddressLabel:  "[::1]",
128
				model.InstanceLabel: "custom-instance",
129
			}),
130
			cfg: &config.ScrapeConfig{
131
				Scheme:         "https",
132
				MetricsPath:    "/metrics",
133
				JobName:        "job",
134
				ScrapeInterval: model.Duration(time.Second),
135
				ScrapeTimeout:  model.Duration(time.Second),
136
			},
137
			res: labels.FromMap(map[string]string{
138
				model.AddressLabel:        "[::1]:443",
139
				model.InstanceLabel:       "custom-instance",
140
				model.SchemeLabel:         "https",
141
				model.MetricsPathLabel:    "/metrics",
142
				model.JobLabel:            "job",
143
				model.ScrapeIntervalLabel: "1s",
144
				model.ScrapeTimeoutLabel:  "1s",
145
			}),
146
			resOrig: labels.FromMap(map[string]string{
147
				model.AddressLabel:        "[::1]",
148
				model.InstanceLabel:       "custom-instance",
149
				model.SchemeLabel:         "https",
150
				model.MetricsPathLabel:    "/metrics",
151
				model.JobLabel:            "job",
152
				model.ScrapeIntervalLabel: "1s",
153
				model.ScrapeTimeoutLabel:  "1s",
154
			}),
155
		},
156
		// Address label missing.
157
		{
158
			in: labels.FromStrings("custom", "value"),
159
			cfg: &config.ScrapeConfig{
160
				Scheme:         "https",
161
				MetricsPath:    "/metrics",
162
				JobName:        "job",
163
				ScrapeInterval: model.Duration(time.Second),
164
				ScrapeTimeout:  model.Duration(time.Second),
165
			},
166
			res:     labels.EmptyLabels(),
167
			resOrig: labels.EmptyLabels(),
168
			err:     "no address",
169
		},
170
		// Address label missing, but added in relabelling.
171
		{
172
			in: labels.FromStrings("custom", "host:1234"),
173
			cfg: &config.ScrapeConfig{
174
				Scheme:         "https",
175
				MetricsPath:    "/metrics",
176
				JobName:        "job",
177
				ScrapeInterval: model.Duration(time.Second),
178
				ScrapeTimeout:  model.Duration(time.Second),
179
				RelabelConfigs: []*relabel.Config{
180
					{
181
						Action:       relabel.Replace,
182
						Regex:        relabel.MustNewRegexp("(.*)"),
183
						SourceLabels: model.LabelNames{"custom"},
184
						Replacement:  "${1}",
185
						TargetLabel:  string(model.AddressLabel),
186
					},
187
				},
188
			},
189
			res: labels.FromMap(map[string]string{
190
				model.AddressLabel:        "host:1234",
191
				model.InstanceLabel:       "host:1234",
192
				model.SchemeLabel:         "https",
193
				model.MetricsPathLabel:    "/metrics",
194
				model.JobLabel:            "job",
195
				model.ScrapeIntervalLabel: "1s",
196
				model.ScrapeTimeoutLabel:  "1s",
197
				"custom":                  "host:1234",
198
			}),
199
			resOrig: labels.FromMap(map[string]string{
200
				model.SchemeLabel:         "https",
201
				model.MetricsPathLabel:    "/metrics",
202
				model.JobLabel:            "job",
203
				model.ScrapeIntervalLabel: "1s",
204
				model.ScrapeTimeoutLabel:  "1s",
205
				"custom":                  "host:1234",
206
			}),
207
		},
208
		// Address label missing, but added in relabelling.
209
		{
210
			in: labels.FromStrings("custom", "host:1234"),
211
			cfg: &config.ScrapeConfig{
212
				Scheme:         "https",
213
				MetricsPath:    "/metrics",
214
				JobName:        "job",
215
				ScrapeInterval: model.Duration(time.Second),
216
				ScrapeTimeout:  model.Duration(time.Second),
217
				RelabelConfigs: []*relabel.Config{
218
					{
219
						Action:       relabel.Replace,
220
						Regex:        relabel.MustNewRegexp("(.*)"),
221
						SourceLabels: model.LabelNames{"custom"},
222
						Replacement:  "${1}",
223
						TargetLabel:  string(model.AddressLabel),
224
					},
225
				},
226
			},
227
			res: labels.FromMap(map[string]string{
228
				model.AddressLabel:        "host:1234",
229
				model.InstanceLabel:       "host:1234",
230
				model.SchemeLabel:         "https",
231
				model.MetricsPathLabel:    "/metrics",
232
				model.JobLabel:            "job",
233
				model.ScrapeIntervalLabel: "1s",
234
				model.ScrapeTimeoutLabel:  "1s",
235
				"custom":                  "host:1234",
236
			}),
237
			resOrig: labels.FromMap(map[string]string{
238
				model.SchemeLabel:         "https",
239
				model.MetricsPathLabel:    "/metrics",
240
				model.JobLabel:            "job",
241
				model.ScrapeIntervalLabel: "1s",
242
				model.ScrapeTimeoutLabel:  "1s",
243
				"custom":                  "host:1234",
244
			}),
245
		},
246
		// Invalid UTF-8 in label.
247
		{
248
			in: labels.FromMap(map[string]string{
249
				model.AddressLabel: "1.2.3.4:1000",
250
				"custom":           "\xbd",
251
			}),
252
			cfg: &config.ScrapeConfig{
253
				Scheme:         "https",
254
				MetricsPath:    "/metrics",
255
				JobName:        "job",
256
				ScrapeInterval: model.Duration(time.Second),
257
				ScrapeTimeout:  model.Duration(time.Second),
258
			},
259
			res:     labels.EmptyLabels(),
260
			resOrig: labels.EmptyLabels(),
261
			err:     "invalid label value for \"custom\": \"\\xbd\"",
262
		},
263
		// Invalid duration in interval label.
264
		{
265
			in: labels.FromMap(map[string]string{
266
				model.AddressLabel:        "1.2.3.4:1000",
267
				model.ScrapeIntervalLabel: "2notseconds",
268
			}),
269
			cfg: &config.ScrapeConfig{
270
				Scheme:         "https",
271
				MetricsPath:    "/metrics",
272
				JobName:        "job",
273
				ScrapeInterval: model.Duration(time.Second),
274
				ScrapeTimeout:  model.Duration(time.Second),
275
			},
276
			res:     labels.EmptyLabels(),
277
			resOrig: labels.EmptyLabels(),
278
			err:     "error parsing scrape interval: unknown unit \"notseconds\" in duration \"2notseconds\"",
279
		},
280
		// Invalid duration in timeout label.
281
		{
282
			in: labels.FromMap(map[string]string{
283
				model.AddressLabel:       "1.2.3.4:1000",
284
				model.ScrapeTimeoutLabel: "2notseconds",
285
			}),
286
			cfg: &config.ScrapeConfig{
287
				Scheme:         "https",
288
				MetricsPath:    "/metrics",
289
				JobName:        "job",
290
				ScrapeInterval: model.Duration(time.Second),
291
				ScrapeTimeout:  model.Duration(time.Second),
292
			},
293
			res:     labels.EmptyLabels(),
294
			resOrig: labels.EmptyLabels(),
295
			err:     "error parsing scrape timeout: unknown unit \"notseconds\" in duration \"2notseconds\"",
296
		},
297
		// 0 interval in timeout label.
298
		{
299
			in: labels.FromMap(map[string]string{
300
				model.AddressLabel:        "1.2.3.4:1000",
301
				model.ScrapeIntervalLabel: "0s",
302
			}),
303
			cfg: &config.ScrapeConfig{
304
				Scheme:         "https",
305
				MetricsPath:    "/metrics",
306
				JobName:        "job",
307
				ScrapeInterval: model.Duration(time.Second),
308
				ScrapeTimeout:  model.Duration(time.Second),
309
			},
310
			res:     labels.EmptyLabels(),
311
			resOrig: labels.EmptyLabels(),
312
			err:     "scrape interval cannot be 0",
313
		},
314
		// 0 duration in timeout label.
315
		{
316
			in: labels.FromMap(map[string]string{
317
				model.AddressLabel:       "1.2.3.4:1000",
318
				model.ScrapeTimeoutLabel: "0s",
319
			}),
320
			cfg: &config.ScrapeConfig{
321
				Scheme:         "https",
322
				MetricsPath:    "/metrics",
323
				JobName:        "job",
324
				ScrapeInterval: model.Duration(time.Second),
325
				ScrapeTimeout:  model.Duration(time.Second),
326
			},
327
			res:     labels.EmptyLabels(),
328
			resOrig: labels.EmptyLabels(),
329
			err:     "scrape timeout cannot be 0",
330
		},
331
		// Timeout less than interval.
332
		{
333
			in: labels.FromMap(map[string]string{
334
				model.AddressLabel:        "1.2.3.4:1000",
335
				model.ScrapeIntervalLabel: "1s",
336
				model.ScrapeTimeoutLabel:  "2s",
337
			}),
338
			cfg: &config.ScrapeConfig{
339
				Scheme:         "https",
340
				MetricsPath:    "/metrics",
341
				JobName:        "job",
342
				ScrapeInterval: model.Duration(time.Second),
343
				ScrapeTimeout:  model.Duration(time.Second),
344
			},
345
			res:     labels.EmptyLabels(),
346
			resOrig: labels.EmptyLabels(),
347
			err:     "scrape timeout cannot be greater than scrape interval (\"2s\" > \"1s\")",
348
		},
349
		// Don't attach default port.
350
		{
351
			in: labels.FromMap(map[string]string{
352
				model.AddressLabel: "1.2.3.4",
353
			}),
354
			cfg: &config.ScrapeConfig{
355
				Scheme:         "https",
356
				MetricsPath:    "/metrics",
357
				JobName:        "job",
358
				ScrapeInterval: model.Duration(time.Second),
359
				ScrapeTimeout:  model.Duration(time.Second),
360
			},
361
			noDefaultPort: true,
362
			res: labels.FromMap(map[string]string{
363
				model.AddressLabel:        "1.2.3.4",
364
				model.InstanceLabel:       "1.2.3.4",
365
				model.SchemeLabel:         "https",
366
				model.MetricsPathLabel:    "/metrics",
367
				model.JobLabel:            "job",
368
				model.ScrapeIntervalLabel: "1s",
369
				model.ScrapeTimeoutLabel:  "1s",
370
			}),
371
			resOrig: labels.FromMap(map[string]string{
372
				model.AddressLabel:        "1.2.3.4",
373
				model.SchemeLabel:         "https",
374
				model.MetricsPathLabel:    "/metrics",
375
				model.JobLabel:            "job",
376
				model.ScrapeIntervalLabel: "1s",
377
				model.ScrapeTimeoutLabel:  "1s",
378
			}),
379
		},
380
		// Remove default port (http).
381
		{
382
			in: labels.FromMap(map[string]string{
383
				model.AddressLabel: "1.2.3.4:80",
384
			}),
385
			cfg: &config.ScrapeConfig{
386
				Scheme:         "http",
387
				MetricsPath:    "/metrics",
388
				JobName:        "job",
389
				ScrapeInterval: model.Duration(time.Second),
390
				ScrapeTimeout:  model.Duration(time.Second),
391
			},
392
			noDefaultPort: true,
393
			res: labels.FromMap(map[string]string{
394
				model.AddressLabel:        "1.2.3.4",
395
				model.InstanceLabel:       "1.2.3.4:80",
396
				model.SchemeLabel:         "http",
397
				model.MetricsPathLabel:    "/metrics",
398
				model.JobLabel:            "job",
399
				model.ScrapeIntervalLabel: "1s",
400
				model.ScrapeTimeoutLabel:  "1s",
401
			}),
402
			resOrig: labels.FromMap(map[string]string{
403
				model.AddressLabel:        "1.2.3.4:80",
404
				model.SchemeLabel:         "http",
405
				model.MetricsPathLabel:    "/metrics",
406
				model.JobLabel:            "job",
407
				model.ScrapeIntervalLabel: "1s",
408
				model.ScrapeTimeoutLabel:  "1s",
409
			}),
410
		},
411
		// Remove default port (https).
412
		{
413
			in: labels.FromMap(map[string]string{
414
				model.AddressLabel: "1.2.3.4:443",
415
			}),
416
			cfg: &config.ScrapeConfig{
417
				Scheme:         "https",
418
				MetricsPath:    "/metrics",
419
				JobName:        "job",
420
				ScrapeInterval: model.Duration(time.Second),
421
				ScrapeTimeout:  model.Duration(time.Second),
422
			},
423
			noDefaultPort: true,
424
			res: labels.FromMap(map[string]string{
425
				model.AddressLabel:        "1.2.3.4",
426
				model.InstanceLabel:       "1.2.3.4:443",
427
				model.SchemeLabel:         "https",
428
				model.MetricsPathLabel:    "/metrics",
429
				model.JobLabel:            "job",
430
				model.ScrapeIntervalLabel: "1s",
431
				model.ScrapeTimeoutLabel:  "1s",
432
			}),
433
			resOrig: labels.FromMap(map[string]string{
434
				model.AddressLabel:        "1.2.3.4:443",
435
				model.SchemeLabel:         "https",
436
				model.MetricsPathLabel:    "/metrics",
437
				model.JobLabel:            "job",
438
				model.ScrapeIntervalLabel: "1s",
439
				model.ScrapeTimeoutLabel:  "1s",
440
			}),
441
		},
442
	}
443
	for _, c := range cases {
444
		in := c.in.Copy()
445

446
		res, orig, err := PopulateLabels(labels.NewBuilder(c.in), c.cfg, c.noDefaultPort)
447
		if c.err != "" {
448
			require.EqualError(t, err, c.err)
449
		} else {
450
			require.NoError(t, err)
451
		}
452
		require.Equal(t, c.in, in)
453
		testutil.RequireEqual(t, c.res, res)
454
		testutil.RequireEqual(t, c.resOrig, orig)
455
	}
456
}
457

458
func loadConfiguration(t testing.TB, c string) *config.Config {
459
	t.Helper()
460

461
	cfg := &config.Config{}
462
	err := yaml.UnmarshalStrict([]byte(c), cfg)
463
	require.NoError(t, err, "Unable to load YAML config.")
464

465
	return cfg
466
}
467

468
func noopLoop() loop {
469
	return &testLoop{
470
		startFunc: func(interval, timeout time.Duration, errc chan<- error) {},
471
		stopFunc:  func() {},
472
	}
473
}
474

475
func TestManagerApplyConfig(t *testing.T) {
476
	// Valid initial configuration.
477
	cfgText1 := `
478
scrape_configs:
479
 - job_name: job1
480
   static_configs:
481
   - targets: ["foo:9090"]
482
`
483
	// Invalid configuration.
484
	cfgText2 := `
485
scrape_configs:
486
 - job_name: job1
487
   scheme: https
488
   static_configs:
489
   - targets: ["foo:9090"]
490
   tls_config:
491
     ca_file: /not/existing/ca/file
492
`
493
	// Valid configuration.
494
	cfgText3 := `
495
scrape_configs:
496
 - job_name: job1
497
   scheme: https
498
   static_configs:
499
   - targets: ["foo:9090"]
500
`
501
	var (
502
		cfg1 = loadConfiguration(t, cfgText1)
503
		cfg2 = loadConfiguration(t, cfgText2)
504
		cfg3 = loadConfiguration(t, cfgText3)
505

506
		ch = make(chan struct{}, 1)
507

508
		testRegistry = prometheus.NewRegistry()
509
	)
510

511
	opts := Options{}
512
	scrapeManager, err := NewManager(&opts, nil, nil, testRegistry)
513
	require.NoError(t, err)
514
	newLoop := func(scrapeLoopOptions) loop {
515
		ch <- struct{}{}
516
		return noopLoop()
517
	}
518
	sp := &scrapePool{
519
		appendable: &nopAppendable{},
520
		activeTargets: map[uint64]*Target{
521
			1: {},
522
		},
523
		loops: map[uint64]loop{
524
			1: noopLoop(),
525
		},
526
		newLoop:     newLoop,
527
		logger:      nil,
528
		config:      cfg1.ScrapeConfigs[0],
529
		client:      http.DefaultClient,
530
		metrics:     scrapeManager.metrics,
531
		symbolTable: labels.NewSymbolTable(),
532
	}
533
	scrapeManager.scrapePools = map[string]*scrapePool{
534
		"job1": sp,
535
	}
536

537
	// Apply the initial configuration.
538
	err = scrapeManager.ApplyConfig(cfg1)
539
	require.NoError(t, err, "Unable to apply configuration.")
540
	select {
541
	case <-ch:
542
		require.FailNow(t, "Reload happened.")
543
	default:
544
	}
545

546
	// Apply a configuration for which the reload fails.
547
	err = scrapeManager.ApplyConfig(cfg2)
548
	require.Error(t, err, "Expecting error but got none.")
549
	select {
550
	case <-ch:
551
		require.FailNow(t, "Reload happened.")
552
	default:
553
	}
554

555
	// Apply a configuration for which the reload succeeds.
556
	err = scrapeManager.ApplyConfig(cfg3)
557
	require.NoError(t, err, "Unable to apply configuration.")
558
	select {
559
	case <-ch:
560
	default:
561
		require.FailNow(t, "Reload didn't happen.")
562
	}
563

564
	// Re-applying the same configuration shouldn't trigger a reload.
565
	err = scrapeManager.ApplyConfig(cfg3)
566
	require.NoError(t, err, "Unable to apply configuration.")
567
	select {
568
	case <-ch:
569
		require.FailNow(t, "Reload happened.")
570
	default:
571
	}
572
}
573

574
func TestManagerTargetsUpdates(t *testing.T) {
575
	opts := Options{}
576
	testRegistry := prometheus.NewRegistry()
577
	m, err := NewManager(&opts, nil, nil, testRegistry)
578
	require.NoError(t, err)
579

580
	ts := make(chan map[string][]*targetgroup.Group)
581
	go m.Run(ts)
582
	defer m.Stop()
583

584
	tgSent := make(map[string][]*targetgroup.Group)
585
	for x := 0; x < 10; x++ {
586
		tgSent[strconv.Itoa(x)] = []*targetgroup.Group{
587
			{
588
				Source: strconv.Itoa(x),
589
			},
590
		}
591

592
		select {
593
		case ts <- tgSent:
594
		case <-time.After(10 * time.Millisecond):
595
			require.Fail(t, "Scrape manager's channel remained blocked after the set threshold.")
596
		}
597
	}
598

599
	m.mtxScrape.Lock()
600
	tsetActual := m.targetSets
601
	m.mtxScrape.Unlock()
602

603
	// Make sure all updates have been received.
604
	require.Equal(t, tgSent, tsetActual)
605

606
	select {
607
	case <-m.triggerReload:
608
	default:
609
		require.Fail(t, "No scrape loops reload was triggered after targets update.")
610
	}
611
}
612

613
func TestSetOffsetSeed(t *testing.T) {
614
	getConfig := func(prometheus string) *config.Config {
615
		cfgText := `
616
global:
617
 external_labels:
618
   prometheus: '` + prometheus + `'
619
`
620

621
		cfg := &config.Config{}
622
		err := yaml.UnmarshalStrict([]byte(cfgText), cfg)
623
		require.NoError(t, err, "Unable to load YAML config cfgYaml.")
624

625
		return cfg
626
	}
627

628
	opts := Options{}
629
	testRegistry := prometheus.NewRegistry()
630
	scrapeManager, err := NewManager(&opts, nil, nil, testRegistry)
631
	require.NoError(t, err)
632

633
	// Load the first config.
634
	cfg1 := getConfig("ha1")
635
	err = scrapeManager.setOffsetSeed(cfg1.GlobalConfig.ExternalLabels)
636
	require.NoError(t, err)
637
	offsetSeed1 := scrapeManager.offsetSeed
638

639
	require.NotZero(t, offsetSeed1, "Offset seed has to be a hash of uint64.")
640

641
	// Load the first config.
642
	cfg2 := getConfig("ha2")
643
	require.NoError(t, scrapeManager.setOffsetSeed(cfg2.GlobalConfig.ExternalLabels))
644
	offsetSeed2 := scrapeManager.offsetSeed
645

646
	require.NotEqual(t, offsetSeed1, offsetSeed2, "Offset seed should not be the same on different set of external labels.")
647
}
648

649
func TestManagerScrapePools(t *testing.T) {
650
	cfgText1 := `
651
scrape_configs:
652
- job_name: job1
653
  static_configs:
654
  - targets: ["foo:9090"]
655
- job_name: job2
656
  static_configs:
657
  - targets: ["foo:9091", "foo:9092"]
658
`
659
	cfgText2 := `
660
scrape_configs:
661
- job_name: job1
662
  static_configs:
663
  - targets: ["foo:9090", "foo:9094"]
664
- job_name: job3
665
  static_configs:
666
  - targets: ["foo:9093"]
667
`
668
	var (
669
		cfg1         = loadConfiguration(t, cfgText1)
670
		cfg2         = loadConfiguration(t, cfgText2)
671
		testRegistry = prometheus.NewRegistry()
672
	)
673

674
	reload := func(scrapeManager *Manager, cfg *config.Config) {
675
		newLoop := func(scrapeLoopOptions) loop {
676
			return noopLoop()
677
		}
678
		scrapeManager.scrapePools = map[string]*scrapePool{}
679
		for _, sc := range cfg.ScrapeConfigs {
680
			_, cancel := context.WithCancel(context.Background())
681
			defer cancel()
682
			sp := &scrapePool{
683
				appendable:    &nopAppendable{},
684
				activeTargets: map[uint64]*Target{},
685
				loops: map[uint64]loop{
686
					1: noopLoop(),
687
				},
688
				newLoop: newLoop,
689
				logger:  nil,
690
				config:  sc,
691
				client:  http.DefaultClient,
692
				cancel:  cancel,
693
			}
694
			for _, c := range sc.ServiceDiscoveryConfigs {
695
				staticConfig := c.(discovery.StaticConfig)
696
				for _, group := range staticConfig {
697
					for i := range group.Targets {
698
						sp.activeTargets[uint64(i)] = &Target{}
699
					}
700
				}
701
			}
702
			scrapeManager.scrapePools[sc.JobName] = sp
703
		}
704
	}
705

706
	opts := Options{}
707
	scrapeManager, err := NewManager(&opts, nil, nil, testRegistry)
708
	require.NoError(t, err)
709

710
	reload(scrapeManager, cfg1)
711
	require.ElementsMatch(t, []string{"job1", "job2"}, scrapeManager.ScrapePools())
712

713
	reload(scrapeManager, cfg2)
714
	require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
715
}
716

717
// TestManagerCTZeroIngestion tests scrape manager for CT cases.
718
func TestManagerCTZeroIngestion(t *testing.T) {
719
	const mName = "expected_counter"
720

721
	for _, tc := range []struct {
722
		name                  string
723
		counterSample         *dto.Counter
724
		enableCTZeroIngestion bool
725

726
		expectedValues []float64
727
	}{
728
		{
729
			name: "disabled with CT on counter",
730
			counterSample: &dto.Counter{
731
				Value: proto.Float64(1.0),
732
				// Timestamp does not matter as long as it exists in this test.
733
				CreatedTimestamp: timestamppb.Now(),
734
			},
735
			expectedValues: []float64{1.0},
736
		},
737
		{
738
			name: "enabled with CT on counter",
739
			counterSample: &dto.Counter{
740
				Value: proto.Float64(1.0),
741
				// Timestamp does not matter as long as it exists in this test.
742
				CreatedTimestamp: timestamppb.Now(),
743
			},
744
			enableCTZeroIngestion: true,
745
			expectedValues:        []float64{0.0, 1.0},
746
		},
747
		{
748
			name: "enabled without CT on counter",
749
			counterSample: &dto.Counter{
750
				Value: proto.Float64(1.0),
751
			},
752
			enableCTZeroIngestion: true,
753
			expectedValues:        []float64{1.0},
754
		},
755
	} {
756
		t.Run(tc.name, func(t *testing.T) {
757
			app := &collectResultAppender{}
758
			scrapeManager, err := NewManager(
759
				&Options{
760
					EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
761
					skipOffsetting:                      true,
762
				},
763
				log.NewLogfmtLogger(os.Stderr),
764
				&collectResultAppendable{app},
765
				prometheus.NewRegistry(),
766
			)
767
			require.NoError(t, err)
768

769
			require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
770
				GlobalConfig: config.GlobalConfig{
771
					// Disable regular scrapes.
772
					ScrapeInterval: model.Duration(9999 * time.Minute),
773
					ScrapeTimeout:  model.Duration(5 * time.Second),
774
					// Ensure the proto is chosen. We need proto as it's the only protocol
775
					// with the CT parsing support.
776
					ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
777
				},
778
				ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
779
			}))
780

781
			once := sync.Once{}
782
			// Start fake HTTP target to that allow one scrape only.
783
			server := httptest.NewServer(
784
				http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
785
					fail := true
786
					once.Do(func() {
787
						fail = false
788
						w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
789

790
						ctrType := dto.MetricType_COUNTER
791
						w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
792
							Name:   proto.String(mName),
793
							Type:   &ctrType,
794
							Metric: []*dto.Metric{{Counter: tc.counterSample}},
795
						}))
796
					})
797

798
					if fail {
799
						w.WriteHeader(http.StatusInternalServerError)
800
					}
801
				}),
802
			)
803
			defer server.Close()
804

805
			serverURL, err := url.Parse(server.URL)
806
			require.NoError(t, err)
807

808
			// Add fake target directly into tsets + reload. Normally users would use
809
			// Manager.Run and wait for minimum 5s refresh interval.
810
			scrapeManager.updateTsets(map[string][]*targetgroup.Group{
811
				"test": {{
812
					Targets: []model.LabelSet{{
813
						model.SchemeLabel:  model.LabelValue(serverURL.Scheme),
814
						model.AddressLabel: model.LabelValue(serverURL.Host),
815
					}},
816
				}},
817
			})
818
			scrapeManager.reload()
819

820
			// Wait for one scrape.
821
			ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
822
			defer cancel()
823
			require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
824
				if countFloatSamples(app, mName) != len(tc.expectedValues) {
825
					return fmt.Errorf("expected %v samples", tc.expectedValues)
826
				}
827
				return nil
828
			}), "after 1 minute")
829
			scrapeManager.Stop()
830

831
			require.Equal(t, tc.expectedValues, getResultFloats(app, mName))
832
		})
833
	}
834
}
835

836
func countFloatSamples(a *collectResultAppender, expectedMetricName string) (count int) {
837
	a.mtx.Lock()
838
	defer a.mtx.Unlock()
839

840
	for _, f := range a.resultFloats {
841
		if f.metric.Get(model.MetricNameLabel) == expectedMetricName {
842
			count++
843
		}
844
	}
845
	return count
846
}
847

848
func getResultFloats(app *collectResultAppender, expectedMetricName string) (result []float64) {
849
	app.mtx.Lock()
850
	defer app.mtx.Unlock()
851

852
	for _, f := range app.resultFloats {
853
		if f.metric.Get(model.MetricNameLabel) == expectedMetricName {
854
			result = append(result, f.f)
855
		}
856
	}
857
	return result
858
}
859

860
func TestUnregisterMetrics(t *testing.T) {
861
	reg := prometheus.NewRegistry()
862
	// Check that all metrics can be unregistered, allowing a second manager to be created.
863
	for i := 0; i < 2; i++ {
864
		opts := Options{}
865
		manager, err := NewManager(&opts, nil, nil, reg)
866
		require.NotNil(t, manager)
867
		require.NoError(t, err)
868
		// Unregister all metrics.
869
		manager.UnregisterMetrics()
870
	}
871
}
872

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.