prometheus

Форк
0
430 строк · 11.4 Кб
1
// Copyright 2015 The Prometheus Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package file
15

16
import (
17
	"context"
18
	"encoding/json"
19
	"errors"
20
	"fmt"
21
	"io"
22
	"os"
23
	"path/filepath"
24
	"strings"
25
	"sync"
26
	"time"
27

28
	"github.com/fsnotify/fsnotify"
29
	"github.com/go-kit/log"
30
	"github.com/go-kit/log/level"
31
	"github.com/grafana/regexp"
32
	"github.com/prometheus/client_golang/prometheus"
33
	"github.com/prometheus/common/config"
34
	"github.com/prometheus/common/model"
35
	"gopkg.in/yaml.v2"
36

37
	"github.com/prometheus/prometheus/discovery"
38
	"github.com/prometheus/prometheus/discovery/targetgroup"
39
)
40

41
var (
42
	patFileSDName = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml|JSON|YML|YAML)$`)
43

44
	// DefaultSDConfig is the default file SD configuration.
45
	DefaultSDConfig = SDConfig{
46
		RefreshInterval: model.Duration(5 * time.Minute),
47
	}
48
)
49

50
func init() {
51
	discovery.RegisterConfig(&SDConfig{})
52
}
53

54
// SDConfig is the configuration for file based discovery.
55
type SDConfig struct {
56
	Files           []string       `yaml:"files"`
57
	RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
58
}
59

60
// NewDiscovererMetrics implements discovery.Config.
61
func (*SDConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
62
	return newDiscovererMetrics(reg, rmi)
63
}
64

65
// Name returns the name of the Config.
66
func (*SDConfig) Name() string { return "file" }
67

68
// NewDiscoverer returns a Discoverer for the Config.
69
func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
70
	return NewDiscovery(c, opts.Logger, opts.Metrics)
71
}
72

73
// SetDirectory joins any relative file paths with dir.
74
func (c *SDConfig) SetDirectory(dir string) {
75
	for i, file := range c.Files {
76
		c.Files[i] = config.JoinDir(dir, file)
77
	}
78
}
79

80
// UnmarshalYAML implements the yaml.Unmarshaler interface.
81
func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
82
	*c = DefaultSDConfig
83
	type plain SDConfig
84
	err := unmarshal((*plain)(c))
85
	if err != nil {
86
		return err
87
	}
88
	if len(c.Files) == 0 {
89
		return errors.New("file service discovery config must contain at least one path name")
90
	}
91
	for _, name := range c.Files {
92
		if !patFileSDName.MatchString(name) {
93
			return fmt.Errorf("path name %q is not valid for file discovery", name)
94
		}
95
	}
96
	return nil
97
}
98

99
const fileSDFilepathLabel = model.MetaLabelPrefix + "filepath"
100

101
// TimestampCollector is a Custom Collector for Timestamps of the files.
102
// TODO(ptodev): Now that each file SD has its own TimestampCollector
103
// inside discovery/file/metrics.go, we can refactor this collector
104
// (or get rid of it) as each TimestampCollector instance will only use one discoverer.
105
type TimestampCollector struct {
106
	Description *prometheus.Desc
107
	discoverers map[*Discovery]struct{}
108
	lock        sync.RWMutex
109
}
110

111
// Describe method sends the description to the channel.
112
func (t *TimestampCollector) Describe(ch chan<- *prometheus.Desc) {
113
	ch <- t.Description
114
}
115

116
// Collect creates constant metrics for each file with last modified time of the file.
117
func (t *TimestampCollector) Collect(ch chan<- prometheus.Metric) {
118
	// New map to dedup filenames.
119
	uniqueFiles := make(map[string]float64)
120
	t.lock.RLock()
121
	for fileSD := range t.discoverers {
122
		fileSD.lock.RLock()
123
		for filename, timestamp := range fileSD.timestamps {
124
			uniqueFiles[filename] = timestamp
125
		}
126
		fileSD.lock.RUnlock()
127
	}
128
	t.lock.RUnlock()
129
	for filename, timestamp := range uniqueFiles {
130
		ch <- prometheus.MustNewConstMetric(
131
			t.Description,
132
			prometheus.GaugeValue,
133
			timestamp,
134
			filename,
135
		)
136
	}
137
}
138

139
func (t *TimestampCollector) addDiscoverer(disc *Discovery) {
140
	t.lock.Lock()
141
	t.discoverers[disc] = struct{}{}
142
	t.lock.Unlock()
143
}
144

145
func (t *TimestampCollector) removeDiscoverer(disc *Discovery) {
146
	t.lock.Lock()
147
	delete(t.discoverers, disc)
148
	t.lock.Unlock()
149
}
150

151
// NewTimestampCollector creates a TimestampCollector.
152
func NewTimestampCollector() *TimestampCollector {
153
	return &TimestampCollector{
154
		Description: prometheus.NewDesc(
155
			"prometheus_sd_file_mtime_seconds",
156
			"Timestamp (mtime) of files read by FileSD. Timestamp is set at read time.",
157
			[]string{"filename"},
158
			nil,
159
		),
160
		discoverers: make(map[*Discovery]struct{}),
161
	}
162
}
163

164
// Discovery provides service discovery functionality based
165
// on files that contain target groups in JSON or YAML format. Refreshing
166
// happens using file watches and periodic refreshes.
167
type Discovery struct {
168
	paths      []string
169
	watcher    *fsnotify.Watcher
170
	interval   time.Duration
171
	timestamps map[string]float64
172
	lock       sync.RWMutex
173

174
	// lastRefresh stores which files were found during the last refresh
175
	// and how many target groups they contained.
176
	// This is used to detect deleted target groups.
177
	lastRefresh map[string]int
178
	logger      log.Logger
179

180
	metrics *fileMetrics
181
}
182

183
// NewDiscovery returns a new file discovery for the given paths.
184
func NewDiscovery(conf *SDConfig, logger log.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) {
185
	fm, ok := metrics.(*fileMetrics)
186
	if !ok {
187
		return nil, fmt.Errorf("invalid discovery metrics type")
188
	}
189

190
	if logger == nil {
191
		logger = log.NewNopLogger()
192
	}
193

194
	disc := &Discovery{
195
		paths:      conf.Files,
196
		interval:   time.Duration(conf.RefreshInterval),
197
		timestamps: make(map[string]float64),
198
		logger:     logger,
199
		metrics:    fm,
200
	}
201

202
	fm.init(disc)
203

204
	return disc, nil
205
}
206

207
// listFiles returns a list of all files that match the configured patterns.
208
func (d *Discovery) listFiles() []string {
209
	var paths []string
210
	for _, p := range d.paths {
211
		files, err := filepath.Glob(p)
212
		if err != nil {
213
			level.Error(d.logger).Log("msg", "Error expanding glob", "glob", p, "err", err)
214
			continue
215
		}
216
		paths = append(paths, files...)
217
	}
218
	return paths
219
}
220

221
// watchFiles sets watches on all full paths or directories that were configured for
222
// this file discovery.
223
func (d *Discovery) watchFiles() {
224
	if d.watcher == nil {
225
		panic("no watcher configured")
226
	}
227
	for _, p := range d.paths {
228
		if dir, _ := filepath.Split(p); dir != "" {
229
			p = dir
230
		} else {
231
			p = "./"
232
		}
233
		if err := d.watcher.Add(p); err != nil {
234
			level.Error(d.logger).Log("msg", "Error adding file watch", "path", p, "err", err)
235
		}
236
	}
237
}
238

239
// Run implements the Discoverer interface.
240
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
241
	watcher, err := fsnotify.NewWatcher()
242
	if err != nil {
243
		level.Error(d.logger).Log("msg", "Error adding file watcher", "err", err)
244
		d.metrics.fileWatcherErrorsCount.Inc()
245
		return
246
	}
247
	d.watcher = watcher
248
	defer d.stop()
249

250
	d.refresh(ctx, ch)
251

252
	ticker := time.NewTicker(d.interval)
253
	defer ticker.Stop()
254

255
	for {
256
		select {
257
		case <-ctx.Done():
258
			return
259

260
		case event := <-d.watcher.Events:
261
			// fsnotify sometimes sends a bunch of events without name or operation.
262
			// It's unclear what they are and why they are sent - filter them out.
263
			if len(event.Name) == 0 {
264
				break
265
			}
266
			// Everything but a chmod requires rereading.
267
			if event.Op^fsnotify.Chmod == 0 {
268
				break
269
			}
270
			// Changes to a file can spawn various sequences of events with
271
			// different combinations of operations. For all practical purposes
272
			// this is inaccurate.
273
			// The most reliable solution is to reload everything if anything happens.
274
			d.refresh(ctx, ch)
275

276
		case <-ticker.C:
277
			// Setting a new watch after an update might fail. Make sure we don't lose
278
			// those files forever.
279
			d.refresh(ctx, ch)
280

281
		case err := <-d.watcher.Errors:
282
			if err != nil {
283
				level.Error(d.logger).Log("msg", "Error watching file", "err", err)
284
			}
285
		}
286
	}
287
}
288

289
func (d *Discovery) writeTimestamp(filename string, timestamp float64) {
290
	d.lock.Lock()
291
	d.timestamps[filename] = timestamp
292
	d.lock.Unlock()
293
}
294

295
func (d *Discovery) deleteTimestamp(filename string) {
296
	d.lock.Lock()
297
	delete(d.timestamps, filename)
298
	d.lock.Unlock()
299
}
300

301
// stop shuts down the file watcher.
302
func (d *Discovery) stop() {
303
	level.Debug(d.logger).Log("msg", "Stopping file discovery...", "paths", fmt.Sprintf("%v", d.paths))
304

305
	done := make(chan struct{})
306
	defer close(done)
307

308
	d.metrics.fileSDTimeStamp.removeDiscoverer(d)
309

310
	// Closing the watcher will deadlock unless all events and errors are drained.
311
	go func() {
312
		for {
313
			select {
314
			case <-d.watcher.Errors:
315
			case <-d.watcher.Events:
316
				// Drain all events and errors.
317
			case <-done:
318
				return
319
			}
320
		}
321
	}()
322
	if err := d.watcher.Close(); err != nil {
323
		level.Error(d.logger).Log("msg", "Error closing file watcher", "paths", fmt.Sprintf("%v", d.paths), "err", err)
324
	}
325

326
	level.Debug(d.logger).Log("msg", "File discovery stopped")
327
}
328

329
// refresh reads all files matching the discovery's patterns and sends the respective
330
// updated target groups through the channel.
331
func (d *Discovery) refresh(ctx context.Context, ch chan<- []*targetgroup.Group) {
332
	t0 := time.Now()
333
	defer func() {
334
		d.metrics.fileSDScanDuration.Observe(time.Since(t0).Seconds())
335
	}()
336
	ref := map[string]int{}
337
	for _, p := range d.listFiles() {
338
		tgroups, err := d.readFile(p)
339
		if err != nil {
340
			d.metrics.fileSDReadErrorsCount.Inc()
341

342
			level.Error(d.logger).Log("msg", "Error reading file", "path", p, "err", err)
343
			// Prevent deletion down below.
344
			ref[p] = d.lastRefresh[p]
345
			continue
346
		}
347
		select {
348
		case ch <- tgroups:
349
		case <-ctx.Done():
350
			return
351
		}
352

353
		ref[p] = len(tgroups)
354
	}
355
	// Send empty updates for sources that disappeared.
356
	for f, n := range d.lastRefresh {
357
		m, ok := ref[f]
358
		if !ok || n > m {
359
			level.Debug(d.logger).Log("msg", "file_sd refresh found file that should be removed", "file", f)
360
			d.deleteTimestamp(f)
361
			for i := m; i < n; i++ {
362
				select {
363
				case ch <- []*targetgroup.Group{{Source: fileSource(f, i)}}:
364
				case <-ctx.Done():
365
					return
366
				}
367
			}
368
		}
369
	}
370
	d.lastRefresh = ref
371

372
	d.watchFiles()
373
}
374

375
// readFile reads a JSON or YAML list of targets groups from the file, depending on its
376
// file extension. It returns full configuration target groups.
377
func (d *Discovery) readFile(filename string) ([]*targetgroup.Group, error) {
378
	fd, err := os.Open(filename)
379
	if err != nil {
380
		return nil, err
381
	}
382
	defer fd.Close()
383

384
	content, err := io.ReadAll(fd)
385
	if err != nil {
386
		return nil, err
387
	}
388

389
	info, err := fd.Stat()
390
	if err != nil {
391
		return nil, err
392
	}
393

394
	var targetGroups []*targetgroup.Group
395

396
	switch ext := filepath.Ext(filename); strings.ToLower(ext) {
397
	case ".json":
398
		if err := json.Unmarshal(content, &targetGroups); err != nil {
399
			return nil, err
400
		}
401
	case ".yml", ".yaml":
402
		if err := yaml.UnmarshalStrict(content, &targetGroups); err != nil {
403
			return nil, err
404
		}
405
	default:
406
		panic(fmt.Errorf("discovery.File.readFile: unhandled file extension %q", ext))
407
	}
408

409
	for i, tg := range targetGroups {
410
		if tg == nil {
411
			err = errors.New("nil target group item found")
412
			return nil, err
413
		}
414

415
		tg.Source = fileSource(filename, i)
416
		if tg.Labels == nil {
417
			tg.Labels = model.LabelSet{}
418
		}
419
		tg.Labels[fileSDFilepathLabel] = model.LabelValue(filename)
420
	}
421

422
	d.writeTimestamp(filename, float64(info.ModTime().Unix()))
423

424
	return targetGroups, nil
425
}
426

427
// fileSource returns a source ID for the i-th target group in the file.
428
func fileSource(filename string, i int) string {
429
	return fmt.Sprintf("%s:%d", filename, i)
430
}
431

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.