prometheus

Форк
0
/
file_test.go 
493 строки · 11.1 Кб
1
// Copyright 2016 The Prometheus Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
//     http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package file
15

16
import (
17
	"context"
18
	"encoding/json"
19
	"io"
20
	"os"
21
	"path/filepath"
22
	"sort"
23
	"sync"
24
	"testing"
25
	"time"
26

27
	"github.com/prometheus/client_golang/prometheus"
28
	"github.com/prometheus/common/model"
29
	"github.com/stretchr/testify/require"
30
	"go.uber.org/goleak"
31

32
	"github.com/prometheus/prometheus/discovery"
33
	"github.com/prometheus/prometheus/discovery/targetgroup"
34
)
35

36
func TestMain(m *testing.M) {
37
	goleak.VerifyTestMain(m)
38
}
39

40
const defaultWait = time.Second
41

42
type testRunner struct {
43
	*testing.T
44
	dir           string
45
	ch            chan []*targetgroup.Group
46
	done, stopped chan struct{}
47
	cancelSD      context.CancelFunc
48

49
	mtx        sync.Mutex
50
	tgs        map[string]*targetgroup.Group
51
	receivedAt time.Time
52
}
53

54
func newTestRunner(t *testing.T) *testRunner {
55
	t.Helper()
56

57
	return &testRunner{
58
		T:       t,
59
		dir:     t.TempDir(),
60
		ch:      make(chan []*targetgroup.Group),
61
		done:    make(chan struct{}),
62
		stopped: make(chan struct{}),
63
		tgs:     make(map[string]*targetgroup.Group),
64
	}
65
}
66

67
// copyFile atomically copies a file to the runner's directory.
68
func (t *testRunner) copyFile(src string) string {
69
	t.Helper()
70
	return t.copyFileTo(src, filepath.Base(src))
71
}
72

73
// copyFileTo atomically copies a file with a different name to the runner's directory.
74
func (t *testRunner) copyFileTo(src, name string) string {
75
	t.Helper()
76

77
	newf, err := os.CreateTemp(t.dir, "")
78
	require.NoError(t, err)
79

80
	f, err := os.Open(src)
81
	require.NoError(t, err)
82

83
	_, err = io.Copy(newf, f)
84
	require.NoError(t, err)
85
	require.NoError(t, f.Close())
86
	require.NoError(t, newf.Close())
87

88
	dst := filepath.Join(t.dir, name)
89
	err = os.Rename(newf.Name(), dst)
90
	require.NoError(t, err)
91

92
	return dst
93
}
94

95
// writeString writes atomically a string to a file.
96
func (t *testRunner) writeString(file, data string) {
97
	t.Helper()
98

99
	newf, err := os.CreateTemp(t.dir, "")
100
	require.NoError(t, err)
101

102
	_, err = newf.WriteString(data)
103
	require.NoError(t, err)
104
	require.NoError(t, newf.Close())
105

106
	err = os.Rename(newf.Name(), file)
107
	require.NoError(t, err)
108
}
109

110
// appendString appends a string to a file.
111
func (t *testRunner) appendString(file, data string) {
112
	t.Helper()
113

114
	f, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND, 0)
115
	require.NoError(t, err)
116
	defer f.Close()
117

118
	_, err = f.WriteString(data)
119
	require.NoError(t, err)
120
}
121

122
// run starts the file SD and the loop receiving target groups updates.
123
func (t *testRunner) run(files ...string) {
124
	go func() {
125
		defer close(t.stopped)
126
		for {
127
			select {
128
			case <-t.done:
129
				os.RemoveAll(t.dir)
130
				return
131
			case tgs := <-t.ch:
132
				t.mtx.Lock()
133
				t.receivedAt = time.Now()
134
				for _, tg := range tgs {
135
					t.tgs[tg.Source] = tg
136
				}
137
				t.mtx.Unlock()
138
			}
139
		}
140
	}()
141

142
	for i := range files {
143
		files[i] = filepath.Join(t.dir, files[i])
144
	}
145
	ctx, cancel := context.WithCancel(context.Background())
146
	t.cancelSD = cancel
147
	go func() {
148
		conf := &SDConfig{
149
			Files: files,
150
			// Setting a high refresh interval to make sure that the tests only
151
			// rely on file watches.
152
			RefreshInterval: model.Duration(1 * time.Hour),
153
		}
154

155
		reg := prometheus.NewRegistry()
156
		refreshMetrics := discovery.NewRefreshMetrics(reg)
157
		metrics := conf.NewDiscovererMetrics(reg, refreshMetrics)
158
		require.NoError(t, metrics.Register())
159

160
		d, err := NewDiscovery(
161
			conf,
162
			nil,
163
			metrics,
164
		)
165
		require.NoError(t, err)
166

167
		d.Run(ctx, t.ch)
168

169
		metrics.Unregister()
170
	}()
171
}
172

173
func (t *testRunner) stop() {
174
	t.cancelSD()
175
	close(t.done)
176
	<-t.stopped
177
}
178

179
func (t *testRunner) lastReceive() time.Time {
180
	t.mtx.Lock()
181
	defer t.mtx.Unlock()
182
	return t.receivedAt
183
}
184

185
func (t *testRunner) targets() []*targetgroup.Group {
186
	t.mtx.Lock()
187
	defer t.mtx.Unlock()
188
	var (
189
		keys []string
190
		tgs  []*targetgroup.Group
191
	)
192

193
	for k := range t.tgs {
194
		keys = append(keys, k)
195
	}
196
	sort.Strings(keys)
197
	for _, k := range keys {
198
		tgs = append(tgs, t.tgs[k])
199
	}
200
	return tgs
201
}
202

203
func (t *testRunner) requireUpdate(ref time.Time, expected []*targetgroup.Group) {
204
	t.Helper()
205

206
	timeout := time.After(defaultWait)
207
	for {
208
		select {
209
		case <-timeout:
210
			t.Fatalf("Expected update but got none")
211
		case <-time.After(defaultWait / 10):
212
			if ref.Equal(t.lastReceive()) {
213
				// No update received.
214
				break
215
			}
216

217
			// We can receive partial updates so only check the result when the
218
			// expected number of groups is reached.
219
			tgs := t.targets()
220
			if len(tgs) != len(expected) {
221
				t.Logf("skipping update: expected %d targets, got %d", len(expected), len(tgs))
222
				break
223
			}
224
			t.requireTargetGroups(expected, tgs)
225
			if ref.After(time.Time{}) {
226
				t.Logf("update received after %v", t.lastReceive().Sub(ref))
227
			}
228
			return
229
		}
230
	}
231
}
232

233
func (t *testRunner) requireTargetGroups(expected, got []*targetgroup.Group) {
234
	t.Helper()
235
	b1, err := json.Marshal(expected)
236
	if err != nil {
237
		panic(err)
238
	}
239
	b2, err := json.Marshal(got)
240
	if err != nil {
241
		panic(err)
242
	}
243

244
	require.Equal(t, string(b1), string(b2))
245
}
246

247
// validTg() maps to fixtures/valid.{json,yml}.
248
func validTg(file string) []*targetgroup.Group {
249
	return []*targetgroup.Group{
250
		{
251
			Targets: []model.LabelSet{
252
				{
253
					model.AddressLabel: model.LabelValue("localhost:9090"),
254
				},
255
				{
256
					model.AddressLabel: model.LabelValue("example.org:443"),
257
				},
258
			},
259
			Labels: model.LabelSet{
260
				model.LabelName("foo"): model.LabelValue("bar"),
261
				fileSDFilepathLabel:    model.LabelValue(file),
262
			},
263
			Source: fileSource(file, 0),
264
		},
265
		{
266
			Targets: []model.LabelSet{
267
				{
268
					model.AddressLabel: model.LabelValue("my.domain"),
269
				},
270
			},
271
			Labels: model.LabelSet{
272
				fileSDFilepathLabel: model.LabelValue(file),
273
			},
274
			Source: fileSource(file, 1),
275
		},
276
	}
277
}
278

279
// valid2Tg() maps to fixtures/valid2.{json,yml}.
280
func valid2Tg(file string) []*targetgroup.Group {
281
	return []*targetgroup.Group{
282
		{
283
			Targets: []model.LabelSet{
284
				{
285
					model.AddressLabel: model.LabelValue("my.domain"),
286
				},
287
			},
288
			Labels: model.LabelSet{
289
				fileSDFilepathLabel: model.LabelValue(file),
290
			},
291
			Source: fileSource(file, 0),
292
		},
293
		{
294
			Targets: []model.LabelSet{
295
				{
296
					model.AddressLabel: model.LabelValue("localhost:9090"),
297
				},
298
			},
299
			Labels: model.LabelSet{
300
				model.LabelName("foo"):  model.LabelValue("bar"),
301
				model.LabelName("fred"): model.LabelValue("baz"),
302
				fileSDFilepathLabel:     model.LabelValue(file),
303
			},
304
			Source: fileSource(file, 1),
305
		},
306
		{
307
			Targets: []model.LabelSet{
308
				{
309
					model.AddressLabel: model.LabelValue("example.org:443"),
310
				},
311
			},
312
			Labels: model.LabelSet{
313
				model.LabelName("scheme"): model.LabelValue("https"),
314
				fileSDFilepathLabel:       model.LabelValue(file),
315
			},
316
			Source: fileSource(file, 2),
317
		},
318
	}
319
}
320

321
func TestInitialUpdate(t *testing.T) {
322
	for _, tc := range []string{
323
		"fixtures/valid.yml",
324
		"fixtures/valid.json",
325
	} {
326
		tc := tc
327
		t.Run(tc, func(t *testing.T) {
328
			t.Parallel()
329

330
			runner := newTestRunner(t)
331
			sdFile := runner.copyFile(tc)
332

333
			runner.run("*" + filepath.Ext(tc))
334
			defer runner.stop()
335

336
			// Verify that we receive the initial target groups.
337
			runner.requireUpdate(time.Time{}, validTg(sdFile))
338
		})
339
	}
340
}
341

342
func TestInvalidFile(t *testing.T) {
343
	for _, tc := range []string{
344
		"fixtures/invalid_nil.yml",
345
		"fixtures/invalid_nil.json",
346
	} {
347
		tc := tc
348
		t.Run(tc, func(t *testing.T) {
349
			t.Parallel()
350

351
			now := time.Now()
352
			runner := newTestRunner(t)
353
			runner.copyFile(tc)
354

355
			runner.run("*" + filepath.Ext(tc))
356
			defer runner.stop()
357

358
			// Verify that we've received nothing.
359
			time.Sleep(defaultWait)
360
			require.False(t, runner.lastReceive().After(now), "unexpected targets received: %v", runner.targets())
361
		})
362
	}
363
}
364

365
func TestNoopFileUpdate(t *testing.T) {
366
	t.Parallel()
367

368
	runner := newTestRunner(t)
369
	sdFile := runner.copyFile("fixtures/valid.yml")
370

371
	runner.run("*.yml")
372
	defer runner.stop()
373

374
	// Verify that we receive the initial target groups.
375
	runner.requireUpdate(time.Time{}, validTg(sdFile))
376

377
	// Verify that we receive an update with the same target groups.
378
	ref := runner.lastReceive()
379
	runner.copyFileTo("fixtures/valid3.yml", "valid.yml")
380
	runner.requireUpdate(ref, validTg(sdFile))
381
}
382

383
func TestFileUpdate(t *testing.T) {
384
	t.Parallel()
385

386
	runner := newTestRunner(t)
387
	sdFile := runner.copyFile("fixtures/valid.yml")
388

389
	runner.run("*.yml")
390
	defer runner.stop()
391

392
	// Verify that we receive the initial target groups.
393
	runner.requireUpdate(time.Time{}, validTg(sdFile))
394

395
	// Verify that we receive an update with the new target groups.
396
	ref := runner.lastReceive()
397
	runner.copyFileTo("fixtures/valid2.yml", "valid.yml")
398
	runner.requireUpdate(ref, valid2Tg(sdFile))
399
}
400

401
func TestInvalidFileUpdate(t *testing.T) {
402
	t.Parallel()
403

404
	runner := newTestRunner(t)
405
	sdFile := runner.copyFile("fixtures/valid.yml")
406

407
	runner.run("*.yml")
408
	defer runner.stop()
409

410
	// Verify that we receive the initial target groups.
411
	runner.requireUpdate(time.Time{}, validTg(sdFile))
412

413
	ref := runner.lastReceive()
414
	runner.writeString(sdFile, "]gibberish\n][")
415

416
	// Verify that we receive nothing or the same targets as before.
417
	time.Sleep(defaultWait)
418
	if runner.lastReceive().After(ref) {
419
		runner.requireTargetGroups(validTg(sdFile), runner.targets())
420
	}
421
}
422

423
func TestUpdateFileWithPartialWrites(t *testing.T) {
424
	t.Parallel()
425

426
	runner := newTestRunner(t)
427
	sdFile := runner.copyFile("fixtures/valid.yml")
428

429
	runner.run("*.yml")
430
	defer runner.stop()
431

432
	// Verify that we receive the initial target groups.
433
	runner.requireUpdate(time.Time{}, validTg(sdFile))
434

435
	// Do a partial write operation.
436
	ref := runner.lastReceive()
437
	runner.writeString(sdFile, "- targets")
438
	time.Sleep(defaultWait)
439
	// Verify that we receive nothing or the same target groups as before.
440
	if runner.lastReceive().After(ref) {
441
		runner.requireTargetGroups(validTg(sdFile), runner.targets())
442
	}
443

444
	// Verify that we receive the update target groups once the file is a valid YAML payload.
445
	ref = runner.lastReceive()
446
	runner.appendString(sdFile, `: ["localhost:9091"]`)
447
	runner.requireUpdate(ref,
448
		[]*targetgroup.Group{
449
			{
450
				Targets: []model.LabelSet{
451
					{
452
						model.AddressLabel: model.LabelValue("localhost:9091"),
453
					},
454
				},
455
				Labels: model.LabelSet{
456
					fileSDFilepathLabel: model.LabelValue(sdFile),
457
				},
458
				Source: fileSource(sdFile, 0),
459
			},
460
			{
461
				Source: fileSource(sdFile, 1),
462
			},
463
		},
464
	)
465
}
466

467
func TestRemoveFile(t *testing.T) {
468
	t.Parallel()
469

470
	runner := newTestRunner(t)
471
	sdFile := runner.copyFile("fixtures/valid.yml")
472

473
	runner.run("*.yml")
474
	defer runner.stop()
475

476
	// Verify that we receive the initial target groups.
477
	runner.requireUpdate(time.Time{}, validTg(sdFile))
478

479
	// Verify that we receive the update about the target groups being removed.
480
	ref := runner.lastReceive()
481
	require.NoError(t, os.Remove(sdFile))
482
	runner.requireUpdate(
483
		ref,
484
		[]*targetgroup.Group{
485
			{
486
				Source: fileSource(sdFile, 0),
487
			},
488
			{
489
				Source: fileSource(sdFile, 1),
490
			},
491
		},
492
	)
493
}
494

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.