prometheus
493 строки · 11.1 Кб
1// Copyright 2016 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package file
15
16import (
17"context"
18"encoding/json"
19"io"
20"os"
21"path/filepath"
22"sort"
23"sync"
24"testing"
25"time"
26
27"github.com/prometheus/client_golang/prometheus"
28"github.com/prometheus/common/model"
29"github.com/stretchr/testify/require"
30"go.uber.org/goleak"
31
32"github.com/prometheus/prometheus/discovery"
33"github.com/prometheus/prometheus/discovery/targetgroup"
34)
35
36func TestMain(m *testing.M) {
37goleak.VerifyTestMain(m)
38}
39
40const defaultWait = time.Second
41
42type testRunner struct {
43*testing.T
44dir string
45ch chan []*targetgroup.Group
46done, stopped chan struct{}
47cancelSD context.CancelFunc
48
49mtx sync.Mutex
50tgs map[string]*targetgroup.Group
51receivedAt time.Time
52}
53
54func newTestRunner(t *testing.T) *testRunner {
55t.Helper()
56
57return &testRunner{
58T: t,
59dir: t.TempDir(),
60ch: make(chan []*targetgroup.Group),
61done: make(chan struct{}),
62stopped: make(chan struct{}),
63tgs: make(map[string]*targetgroup.Group),
64}
65}
66
67// copyFile atomically copies a file to the runner's directory.
68func (t *testRunner) copyFile(src string) string {
69t.Helper()
70return t.copyFileTo(src, filepath.Base(src))
71}
72
73// copyFileTo atomically copies a file with a different name to the runner's directory.
74func (t *testRunner) copyFileTo(src, name string) string {
75t.Helper()
76
77newf, err := os.CreateTemp(t.dir, "")
78require.NoError(t, err)
79
80f, err := os.Open(src)
81require.NoError(t, err)
82
83_, err = io.Copy(newf, f)
84require.NoError(t, err)
85require.NoError(t, f.Close())
86require.NoError(t, newf.Close())
87
88dst := filepath.Join(t.dir, name)
89err = os.Rename(newf.Name(), dst)
90require.NoError(t, err)
91
92return dst
93}
94
95// writeString writes atomically a string to a file.
96func (t *testRunner) writeString(file, data string) {
97t.Helper()
98
99newf, err := os.CreateTemp(t.dir, "")
100require.NoError(t, err)
101
102_, err = newf.WriteString(data)
103require.NoError(t, err)
104require.NoError(t, newf.Close())
105
106err = os.Rename(newf.Name(), file)
107require.NoError(t, err)
108}
109
110// appendString appends a string to a file.
111func (t *testRunner) appendString(file, data string) {
112t.Helper()
113
114f, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND, 0)
115require.NoError(t, err)
116defer f.Close()
117
118_, err = f.WriteString(data)
119require.NoError(t, err)
120}
121
122// run starts the file SD and the loop receiving target groups updates.
123func (t *testRunner) run(files ...string) {
124go func() {
125defer close(t.stopped)
126for {
127select {
128case <-t.done:
129os.RemoveAll(t.dir)
130return
131case tgs := <-t.ch:
132t.mtx.Lock()
133t.receivedAt = time.Now()
134for _, tg := range tgs {
135t.tgs[tg.Source] = tg
136}
137t.mtx.Unlock()
138}
139}
140}()
141
142for i := range files {
143files[i] = filepath.Join(t.dir, files[i])
144}
145ctx, cancel := context.WithCancel(context.Background())
146t.cancelSD = cancel
147go func() {
148conf := &SDConfig{
149Files: files,
150// Setting a high refresh interval to make sure that the tests only
151// rely on file watches.
152RefreshInterval: model.Duration(1 * time.Hour),
153}
154
155reg := prometheus.NewRegistry()
156refreshMetrics := discovery.NewRefreshMetrics(reg)
157metrics := conf.NewDiscovererMetrics(reg, refreshMetrics)
158require.NoError(t, metrics.Register())
159
160d, err := NewDiscovery(
161conf,
162nil,
163metrics,
164)
165require.NoError(t, err)
166
167d.Run(ctx, t.ch)
168
169metrics.Unregister()
170}()
171}
172
173func (t *testRunner) stop() {
174t.cancelSD()
175close(t.done)
176<-t.stopped
177}
178
179func (t *testRunner) lastReceive() time.Time {
180t.mtx.Lock()
181defer t.mtx.Unlock()
182return t.receivedAt
183}
184
185func (t *testRunner) targets() []*targetgroup.Group {
186t.mtx.Lock()
187defer t.mtx.Unlock()
188var (
189keys []string
190tgs []*targetgroup.Group
191)
192
193for k := range t.tgs {
194keys = append(keys, k)
195}
196sort.Strings(keys)
197for _, k := range keys {
198tgs = append(tgs, t.tgs[k])
199}
200return tgs
201}
202
203func (t *testRunner) requireUpdate(ref time.Time, expected []*targetgroup.Group) {
204t.Helper()
205
206timeout := time.After(defaultWait)
207for {
208select {
209case <-timeout:
210t.Fatalf("Expected update but got none")
211case <-time.After(defaultWait / 10):
212if ref.Equal(t.lastReceive()) {
213// No update received.
214break
215}
216
217// We can receive partial updates so only check the result when the
218// expected number of groups is reached.
219tgs := t.targets()
220if len(tgs) != len(expected) {
221t.Logf("skipping update: expected %d targets, got %d", len(expected), len(tgs))
222break
223}
224t.requireTargetGroups(expected, tgs)
225if ref.After(time.Time{}) {
226t.Logf("update received after %v", t.lastReceive().Sub(ref))
227}
228return
229}
230}
231}
232
233func (t *testRunner) requireTargetGroups(expected, got []*targetgroup.Group) {
234t.Helper()
235b1, err := json.Marshal(expected)
236if err != nil {
237panic(err)
238}
239b2, err := json.Marshal(got)
240if err != nil {
241panic(err)
242}
243
244require.Equal(t, string(b1), string(b2))
245}
246
247// validTg() maps to fixtures/valid.{json,yml}.
248func validTg(file string) []*targetgroup.Group {
249return []*targetgroup.Group{
250{
251Targets: []model.LabelSet{
252{
253model.AddressLabel: model.LabelValue("localhost:9090"),
254},
255{
256model.AddressLabel: model.LabelValue("example.org:443"),
257},
258},
259Labels: model.LabelSet{
260model.LabelName("foo"): model.LabelValue("bar"),
261fileSDFilepathLabel: model.LabelValue(file),
262},
263Source: fileSource(file, 0),
264},
265{
266Targets: []model.LabelSet{
267{
268model.AddressLabel: model.LabelValue("my.domain"),
269},
270},
271Labels: model.LabelSet{
272fileSDFilepathLabel: model.LabelValue(file),
273},
274Source: fileSource(file, 1),
275},
276}
277}
278
279// valid2Tg() maps to fixtures/valid2.{json,yml}.
280func valid2Tg(file string) []*targetgroup.Group {
281return []*targetgroup.Group{
282{
283Targets: []model.LabelSet{
284{
285model.AddressLabel: model.LabelValue("my.domain"),
286},
287},
288Labels: model.LabelSet{
289fileSDFilepathLabel: model.LabelValue(file),
290},
291Source: fileSource(file, 0),
292},
293{
294Targets: []model.LabelSet{
295{
296model.AddressLabel: model.LabelValue("localhost:9090"),
297},
298},
299Labels: model.LabelSet{
300model.LabelName("foo"): model.LabelValue("bar"),
301model.LabelName("fred"): model.LabelValue("baz"),
302fileSDFilepathLabel: model.LabelValue(file),
303},
304Source: fileSource(file, 1),
305},
306{
307Targets: []model.LabelSet{
308{
309model.AddressLabel: model.LabelValue("example.org:443"),
310},
311},
312Labels: model.LabelSet{
313model.LabelName("scheme"): model.LabelValue("https"),
314fileSDFilepathLabel: model.LabelValue(file),
315},
316Source: fileSource(file, 2),
317},
318}
319}
320
321func TestInitialUpdate(t *testing.T) {
322for _, tc := range []string{
323"fixtures/valid.yml",
324"fixtures/valid.json",
325} {
326tc := tc
327t.Run(tc, func(t *testing.T) {
328t.Parallel()
329
330runner := newTestRunner(t)
331sdFile := runner.copyFile(tc)
332
333runner.run("*" + filepath.Ext(tc))
334defer runner.stop()
335
336// Verify that we receive the initial target groups.
337runner.requireUpdate(time.Time{}, validTg(sdFile))
338})
339}
340}
341
342func TestInvalidFile(t *testing.T) {
343for _, tc := range []string{
344"fixtures/invalid_nil.yml",
345"fixtures/invalid_nil.json",
346} {
347tc := tc
348t.Run(tc, func(t *testing.T) {
349t.Parallel()
350
351now := time.Now()
352runner := newTestRunner(t)
353runner.copyFile(tc)
354
355runner.run("*" + filepath.Ext(tc))
356defer runner.stop()
357
358// Verify that we've received nothing.
359time.Sleep(defaultWait)
360require.False(t, runner.lastReceive().After(now), "unexpected targets received: %v", runner.targets())
361})
362}
363}
364
365func TestNoopFileUpdate(t *testing.T) {
366t.Parallel()
367
368runner := newTestRunner(t)
369sdFile := runner.copyFile("fixtures/valid.yml")
370
371runner.run("*.yml")
372defer runner.stop()
373
374// Verify that we receive the initial target groups.
375runner.requireUpdate(time.Time{}, validTg(sdFile))
376
377// Verify that we receive an update with the same target groups.
378ref := runner.lastReceive()
379runner.copyFileTo("fixtures/valid3.yml", "valid.yml")
380runner.requireUpdate(ref, validTg(sdFile))
381}
382
383func TestFileUpdate(t *testing.T) {
384t.Parallel()
385
386runner := newTestRunner(t)
387sdFile := runner.copyFile("fixtures/valid.yml")
388
389runner.run("*.yml")
390defer runner.stop()
391
392// Verify that we receive the initial target groups.
393runner.requireUpdate(time.Time{}, validTg(sdFile))
394
395// Verify that we receive an update with the new target groups.
396ref := runner.lastReceive()
397runner.copyFileTo("fixtures/valid2.yml", "valid.yml")
398runner.requireUpdate(ref, valid2Tg(sdFile))
399}
400
401func TestInvalidFileUpdate(t *testing.T) {
402t.Parallel()
403
404runner := newTestRunner(t)
405sdFile := runner.copyFile("fixtures/valid.yml")
406
407runner.run("*.yml")
408defer runner.stop()
409
410// Verify that we receive the initial target groups.
411runner.requireUpdate(time.Time{}, validTg(sdFile))
412
413ref := runner.lastReceive()
414runner.writeString(sdFile, "]gibberish\n][")
415
416// Verify that we receive nothing or the same targets as before.
417time.Sleep(defaultWait)
418if runner.lastReceive().After(ref) {
419runner.requireTargetGroups(validTg(sdFile), runner.targets())
420}
421}
422
423func TestUpdateFileWithPartialWrites(t *testing.T) {
424t.Parallel()
425
426runner := newTestRunner(t)
427sdFile := runner.copyFile("fixtures/valid.yml")
428
429runner.run("*.yml")
430defer runner.stop()
431
432// Verify that we receive the initial target groups.
433runner.requireUpdate(time.Time{}, validTg(sdFile))
434
435// Do a partial write operation.
436ref := runner.lastReceive()
437runner.writeString(sdFile, "- targets")
438time.Sleep(defaultWait)
439// Verify that we receive nothing or the same target groups as before.
440if runner.lastReceive().After(ref) {
441runner.requireTargetGroups(validTg(sdFile), runner.targets())
442}
443
444// Verify that we receive the update target groups once the file is a valid YAML payload.
445ref = runner.lastReceive()
446runner.appendString(sdFile, `: ["localhost:9091"]`)
447runner.requireUpdate(ref,
448[]*targetgroup.Group{
449{
450Targets: []model.LabelSet{
451{
452model.AddressLabel: model.LabelValue("localhost:9091"),
453},
454},
455Labels: model.LabelSet{
456fileSDFilepathLabel: model.LabelValue(sdFile),
457},
458Source: fileSource(sdFile, 0),
459},
460{
461Source: fileSource(sdFile, 1),
462},
463},
464)
465}
466
467func TestRemoveFile(t *testing.T) {
468t.Parallel()
469
470runner := newTestRunner(t)
471sdFile := runner.copyFile("fixtures/valid.yml")
472
473runner.run("*.yml")
474defer runner.stop()
475
476// Verify that we receive the initial target groups.
477runner.requireUpdate(time.Time{}, validTg(sdFile))
478
479// Verify that we receive the update about the target groups being removed.
480ref := runner.lastReceive()
481require.NoError(t, os.Remove(sdFile))
482runner.requireUpdate(
483ref,
484[]*targetgroup.Group{
485{
486Source: fileSource(sdFile, 0),
487},
488{
489Source: fileSource(sdFile, 1),
490},
491},
492)
493}
494