prometheus
499 строк · 13.4 Кб
1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package consul
15
16import (
17"context"
18"net/http"
19"net/http/httptest"
20"net/url"
21"testing"
22"time"
23
24"github.com/go-kit/log"
25"github.com/prometheus/client_golang/prometheus"
26"github.com/prometheus/common/config"
27"github.com/prometheus/common/model"
28"github.com/stretchr/testify/require"
29"go.uber.org/goleak"
30"gopkg.in/yaml.v2"
31
32"github.com/prometheus/prometheus/discovery"
33"github.com/prometheus/prometheus/discovery/targetgroup"
34)
35
36func TestMain(m *testing.M) {
37goleak.VerifyTestMain(m)
38}
39
40// TODO: Add ability to unregister metrics?
41func NewTestMetrics(t *testing.T, conf discovery.Config, reg prometheus.Registerer) discovery.DiscovererMetrics {
42refreshMetrics := discovery.NewRefreshMetrics(reg)
43require.NoError(t, refreshMetrics.Register())
44
45metrics := conf.NewDiscovererMetrics(prometheus.NewRegistry(), refreshMetrics)
46require.NoError(t, metrics.Register())
47
48return metrics
49}
50
51func TestConfiguredService(t *testing.T) {
52conf := &SDConfig{
53Services: []string{"configuredServiceName"},
54}
55
56metrics := NewTestMetrics(t, conf, prometheus.NewRegistry())
57
58consulDiscovery, err := NewDiscovery(conf, nil, metrics)
59require.NoError(t, err, "when initializing discovery")
60require.True(t, consulDiscovery.shouldWatch("configuredServiceName", []string{""}),
61"Expected service %s to be watched", "configuredServiceName")
62require.False(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}),
63"Expected service %s to not be watched", "nonConfiguredServiceName")
64}
65
66func TestConfiguredServiceWithTag(t *testing.T) {
67conf := &SDConfig{
68Services: []string{"configuredServiceName"},
69ServiceTags: []string{"http"},
70}
71
72metrics := NewTestMetrics(t, conf, prometheus.NewRegistry())
73
74consulDiscovery, err := NewDiscovery(conf, nil, metrics)
75require.NoError(t, err, "when initializing discovery")
76require.False(t, consulDiscovery.shouldWatch("configuredServiceName", []string{""}),
77"Expected service %s to not be watched without tag", "configuredServiceName")
78
79require.True(t, consulDiscovery.shouldWatch("configuredServiceName", []string{"http"}),
80"Expected service %s to be watched with tag %s", "configuredServiceName", "http")
81
82require.False(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}),
83"Expected service %s to not be watched without tag", "nonConfiguredServiceName")
84
85require.False(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{"http"}),
86"Expected service %s to not be watched with tag %s", "nonConfiguredServiceName", "http")
87}
88
89func TestConfiguredServiceWithTags(t *testing.T) {
90type testcase struct {
91// What we've configured to watch.
92conf *SDConfig
93// The service we're checking if we should watch or not.
94serviceName string
95serviceTags []string
96shouldWatch bool
97}
98
99cases := []testcase{
100{
101conf: &SDConfig{
102Services: []string{"configuredServiceName"},
103ServiceTags: []string{"http", "v1"},
104},
105serviceName: "configuredServiceName",
106serviceTags: []string{""},
107shouldWatch: false,
108},
109{
110conf: &SDConfig{
111Services: []string{"configuredServiceName"},
112ServiceTags: []string{"http", "v1"},
113},
114serviceName: "configuredServiceName",
115serviceTags: []string{"http", "v1"},
116shouldWatch: true,
117},
118{
119conf: &SDConfig{
120Services: []string{"configuredServiceName"},
121ServiceTags: []string{"http", "v1"},
122},
123serviceName: "nonConfiguredServiceName",
124serviceTags: []string{""},
125shouldWatch: false,
126},
127{
128conf: &SDConfig{
129Services: []string{"configuredServiceName"},
130ServiceTags: []string{"http", "v1"},
131},
132serviceName: "nonConfiguredServiceName",
133serviceTags: []string{"http, v1"},
134shouldWatch: false,
135},
136{
137conf: &SDConfig{
138Services: []string{"configuredServiceName"},
139ServiceTags: []string{"http", "v1"},
140},
141serviceName: "configuredServiceName",
142serviceTags: []string{"http", "v1", "foo"},
143shouldWatch: true,
144},
145{
146conf: &SDConfig{
147Services: []string{"configuredServiceName"},
148ServiceTags: []string{"http", "v1", "foo"},
149},
150serviceName: "configuredServiceName",
151serviceTags: []string{"http", "v1", "foo"},
152shouldWatch: true,
153},
154{
155conf: &SDConfig{
156Services: []string{"configuredServiceName"},
157ServiceTags: []string{"http", "v1"},
158},
159serviceName: "configuredServiceName",
160serviceTags: []string{"http", "v1", "v1"},
161shouldWatch: true,
162},
163}
164
165for _, tc := range cases {
166metrics := NewTestMetrics(t, tc.conf, prometheus.NewRegistry())
167
168consulDiscovery, err := NewDiscovery(tc.conf, nil, metrics)
169require.NoError(t, err, "when initializing discovery")
170ret := consulDiscovery.shouldWatch(tc.serviceName, tc.serviceTags)
171require.Equal(t, tc.shouldWatch, ret, "Watched service and tags: %s %+v, input was %s %+v",
172tc.conf.Services, tc.conf.ServiceTags, tc.serviceName, tc.serviceTags)
173}
174}
175
176func TestNonConfiguredService(t *testing.T) {
177conf := &SDConfig{}
178
179metrics := NewTestMetrics(t, conf, prometheus.NewRegistry())
180
181consulDiscovery, err := NewDiscovery(conf, nil, metrics)
182require.NoError(t, err, "when initializing discovery")
183require.True(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}), "Expected service %s to be watched", "nonConfiguredServiceName")
184}
185
186const (
187AgentAnswer = `{"Config": {"Datacenter": "test-dc"}}`
188ServiceTestAnswer = `
189[{
190"Node": {
191"ID": "b78c2e48-5ef3-1814-31b8-0d880f50471e",
192"Node": "node1",
193"Address": "1.1.1.1",
194"Datacenter": "test-dc",
195"TaggedAddresses": {
196"lan": "192.168.10.10",
197"wan": "10.0.10.10"
198},
199"Meta": {"rack_name": "2304"},
200"CreateIndex": 1,
201"ModifyIndex": 1
202},
203"Service": {
204"ID": "test",
205"Service": "test",
206"Tags": ["tag1"],
207"Address": "",
208"Meta": {"version":"1.0.0","environment":"staging"},
209"Port": 3341,
210"Weights": {
211"Passing": 1,
212"Warning": 1
213},
214"EnableTagOverride": false,
215"ProxyDestination": "",
216"Proxy": {},
217"Connect": {},
218"CreateIndex": 1,
219"ModifyIndex": 1
220},
221"Checks": [{
222"Node": "node1",
223"CheckID": "serfHealth",
224"Name": "Serf Health Status",
225"Status": "passing"
226}]
227}]`
228
229ServicesTestAnswer = `{"test": ["tag1"], "other": ["tag2"]}`
230)
231
232func newServer(t *testing.T) (*httptest.Server, *SDConfig) {
233// github.com/hashicorp/consul/testutil/ would be nice but it needs a local consul binary.
234stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
235response := ""
236switch r.URL.String() {
237case "/v1/agent/self":
238response = AgentAnswer
239case "/v1/health/service/test?node-meta=rack_name%3A2304&stale=&tag=tag1&wait=120000ms":
240response = ServiceTestAnswer
241case "/v1/health/service/test?wait=120000ms":
242response = ServiceTestAnswer
243case "/v1/health/service/other?wait=120000ms":
244response = `[]`
245case "/v1/catalog/services?node-meta=rack_name%3A2304&stale=&wait=120000ms":
246response = ServicesTestAnswer
247case "/v1/catalog/services?wait=120000ms":
248response = ServicesTestAnswer
249case "/v1/catalog/services?index=1&node-meta=rack_name%3A2304&stale=&wait=120000ms":
250time.Sleep(5 * time.Second)
251response = ServicesTestAnswer
252case "/v1/catalog/services?index=1&wait=120000ms":
253time.Sleep(5 * time.Second)
254response = ServicesTestAnswer
255default:
256t.Errorf("Unhandled consul call: %s", r.URL)
257}
258w.Header().Add("X-Consul-Index", "1")
259w.Write([]byte(response))
260}))
261stuburl, err := url.Parse(stub.URL)
262require.NoError(t, err)
263
264config := &SDConfig{
265Server: stuburl.Host,
266Token: "fake-token",
267RefreshInterval: model.Duration(1 * time.Second),
268}
269return stub, config
270}
271
272func newDiscovery(t *testing.T, config *SDConfig) *Discovery {
273logger := log.NewNopLogger()
274
275metrics := NewTestMetrics(t, config, prometheus.NewRegistry())
276
277d, err := NewDiscovery(config, logger, metrics)
278require.NoError(t, err)
279return d
280}
281
282func checkOneTarget(t *testing.T, tg []*targetgroup.Group) {
283require.Len(t, tg, 1)
284target := tg[0]
285require.Equal(t, "test-dc", string(target.Labels["__meta_consul_dc"]))
286require.Equal(t, target.Source, string(target.Labels["__meta_consul_service"]))
287if target.Source == "test" {
288// test service should have one node.
289require.NotEmpty(t, target.Targets, "Test service should have one node")
290}
291}
292
293// Watch all the services in the catalog.
294func TestAllServices(t *testing.T) {
295stub, config := newServer(t)
296defer stub.Close()
297
298d := newDiscovery(t, config)
299
300ctx, cancel := context.WithCancel(context.Background())
301ch := make(chan []*targetgroup.Group)
302go func() {
303d.Run(ctx, ch)
304close(ch)
305}()
306checkOneTarget(t, <-ch)
307checkOneTarget(t, <-ch)
308cancel()
309<-ch
310}
311
312// targetgroup with no targets is emitted if no services were discovered.
313func TestNoTargets(t *testing.T) {
314stub, config := newServer(t)
315defer stub.Close()
316config.ServiceTags = []string{"missing"}
317
318d := newDiscovery(t, config)
319
320ctx, cancel := context.WithCancel(context.Background())
321ch := make(chan []*targetgroup.Group)
322go func() {
323d.Run(ctx, ch)
324close(ch)
325}()
326
327targets := (<-ch)[0].Targets
328require.Empty(t, targets)
329cancel()
330<-ch
331}
332
333// Watch only the test service.
334func TestOneService(t *testing.T) {
335stub, config := newServer(t)
336defer stub.Close()
337
338config.Services = []string{"test"}
339d := newDiscovery(t, config)
340
341ctx, cancel := context.WithCancel(context.Background())
342ch := make(chan []*targetgroup.Group)
343go d.Run(ctx, ch)
344checkOneTarget(t, <-ch)
345cancel()
346}
347
348// Watch the test service with a specific tag and node-meta.
349func TestAllOptions(t *testing.T) {
350stub, config := newServer(t)
351defer stub.Close()
352
353config.Services = []string{"test"}
354config.NodeMeta = map[string]string{"rack_name": "2304"}
355config.ServiceTags = []string{"tag1"}
356config.AllowStale = true
357config.Token = "fake-token"
358
359d := newDiscovery(t, config)
360
361ctx, cancel := context.WithCancel(context.Background())
362ch := make(chan []*targetgroup.Group)
363go func() {
364d.Run(ctx, ch)
365close(ch)
366}()
367checkOneTarget(t, <-ch)
368cancel()
369<-ch
370}
371
372func TestGetDatacenterShouldReturnError(t *testing.T) {
373for _, tc := range []struct {
374handler func(http.ResponseWriter, *http.Request)
375errMessage string
376}{
377{
378// Define a handler that will return status 500.
379handler: func(w http.ResponseWriter, r *http.Request) {
380w.WriteHeader(http.StatusInternalServerError)
381},
382errMessage: "Unexpected response code: 500 ()",
383},
384{
385// Define a handler that will return incorrect response.
386handler: func(w http.ResponseWriter, r *http.Request) {
387w.Write([]byte(`{"Config": {"Not-Datacenter": "test-dc"}}`))
388},
389errMessage: "invalid value '<nil>' for Config.Datacenter",
390},
391} {
392stub := httptest.NewServer(http.HandlerFunc(tc.handler))
393stuburl, err := url.Parse(stub.URL)
394require.NoError(t, err)
395
396config := &SDConfig{
397Server: stuburl.Host,
398Token: "fake-token",
399RefreshInterval: model.Duration(1 * time.Second),
400}
401defer stub.Close()
402d := newDiscovery(t, config)
403
404// Should be empty if not initialized.
405require.Equal(t, "", d.clientDatacenter)
406
407err = d.getDatacenter()
408
409// An error should be returned.
410require.Equal(t, tc.errMessage, err.Error())
411// Should still be empty.
412require.Equal(t, "", d.clientDatacenter)
413}
414}
415
416func TestUnmarshalConfig(t *testing.T) {
417unmarshal := func(d []byte) func(interface{}) error {
418return func(o interface{}) error {
419return yaml.Unmarshal(d, o)
420}
421}
422
423goodConfig := DefaultSDConfig
424goodConfig.Username = "123"
425goodConfig.Password = "1234"
426goodConfig.HTTPClientConfig = config.HTTPClientConfig{
427BasicAuth: &config.BasicAuth{
428Username: "123",
429Password: "1234",
430},
431FollowRedirects: true,
432EnableHTTP2: true,
433}
434
435cases := []struct {
436name string
437config string
438expected SDConfig
439errMessage string
440}{
441{
442name: "good",
443config: `
444server: localhost:8500
445username: 123
446password: 1234
447`,
448expected: goodConfig,
449},
450{
451name: "username and password and basic auth configured",
452config: `
453server: localhost:8500
454username: 123
455password: 1234
456basic_auth:
457username: 12345
458password: 123456
459`,
460errMessage: "at most one of consul SD configuration username and password and basic auth can be configured",
461},
462{
463name: "token and authorization configured",
464config: `
465server: localhost:8500
466token: 1234567
467authorization:
468credentials: 12345678
469`,
470errMessage: "at most one of consul SD token, authorization, or oauth2 can be configured",
471},
472{
473name: "token and oauth2 configured",
474config: `
475server: localhost:8500
476token: 1234567
477oauth2:
478client_id: 10
479client_secret: 11
480token_url: http://example.com
481`,
482errMessage: "at most one of consul SD token, authorization, or oauth2 can be configured",
483},
484}
485
486for _, test := range cases {
487t.Run(test.name, func(t *testing.T) {
488var config SDConfig
489err := config.UnmarshalYAML(unmarshal([]byte(test.config)))
490if err != nil {
491require.EqualError(t, err, test.errMessage)
492return
493}
494require.Empty(t, test.errMessage, "Expected error.")
495
496require.Equal(t, test.expected, config)
497})
498}
499}
500