prometheus
430 строк · 11.4 Кб
1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package file
15
16import (
17"context"
18"encoding/json"
19"errors"
20"fmt"
21"io"
22"os"
23"path/filepath"
24"strings"
25"sync"
26"time"
27
28"github.com/fsnotify/fsnotify"
29"github.com/go-kit/log"
30"github.com/go-kit/log/level"
31"github.com/grafana/regexp"
32"github.com/prometheus/client_golang/prometheus"
33"github.com/prometheus/common/config"
34"github.com/prometheus/common/model"
35"gopkg.in/yaml.v2"
36
37"github.com/prometheus/prometheus/discovery"
38"github.com/prometheus/prometheus/discovery/targetgroup"
39)
40
41var (
42patFileSDName = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml|JSON|YML|YAML)$`)
43
44// DefaultSDConfig is the default file SD configuration.
45DefaultSDConfig = SDConfig{
46RefreshInterval: model.Duration(5 * time.Minute),
47}
48)
49
50func init() {
51discovery.RegisterConfig(&SDConfig{})
52}
53
54// SDConfig is the configuration for file based discovery.
55type SDConfig struct {
56Files []string `yaml:"files"`
57RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
58}
59
60// NewDiscovererMetrics implements discovery.Config.
61func (*SDConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
62return newDiscovererMetrics(reg, rmi)
63}
64
65// Name returns the name of the Config.
66func (*SDConfig) Name() string { return "file" }
67
68// NewDiscoverer returns a Discoverer for the Config.
69func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
70return NewDiscovery(c, opts.Logger, opts.Metrics)
71}
72
73// SetDirectory joins any relative file paths with dir.
74func (c *SDConfig) SetDirectory(dir string) {
75for i, file := range c.Files {
76c.Files[i] = config.JoinDir(dir, file)
77}
78}
79
80// UnmarshalYAML implements the yaml.Unmarshaler interface.
81func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
82*c = DefaultSDConfig
83type plain SDConfig
84err := unmarshal((*plain)(c))
85if err != nil {
86return err
87}
88if len(c.Files) == 0 {
89return errors.New("file service discovery config must contain at least one path name")
90}
91for _, name := range c.Files {
92if !patFileSDName.MatchString(name) {
93return fmt.Errorf("path name %q is not valid for file discovery", name)
94}
95}
96return nil
97}
98
99const fileSDFilepathLabel = model.MetaLabelPrefix + "filepath"
100
101// TimestampCollector is a Custom Collector for Timestamps of the files.
102// TODO(ptodev): Now that each file SD has its own TimestampCollector
103// inside discovery/file/metrics.go, we can refactor this collector
104// (or get rid of it) as each TimestampCollector instance will only use one discoverer.
105type TimestampCollector struct {
106Description *prometheus.Desc
107discoverers map[*Discovery]struct{}
108lock sync.RWMutex
109}
110
111// Describe method sends the description to the channel.
112func (t *TimestampCollector) Describe(ch chan<- *prometheus.Desc) {
113ch <- t.Description
114}
115
116// Collect creates constant metrics for each file with last modified time of the file.
117func (t *TimestampCollector) Collect(ch chan<- prometheus.Metric) {
118// New map to dedup filenames.
119uniqueFiles := make(map[string]float64)
120t.lock.RLock()
121for fileSD := range t.discoverers {
122fileSD.lock.RLock()
123for filename, timestamp := range fileSD.timestamps {
124uniqueFiles[filename] = timestamp
125}
126fileSD.lock.RUnlock()
127}
128t.lock.RUnlock()
129for filename, timestamp := range uniqueFiles {
130ch <- prometheus.MustNewConstMetric(
131t.Description,
132prometheus.GaugeValue,
133timestamp,
134filename,
135)
136}
137}
138
139func (t *TimestampCollector) addDiscoverer(disc *Discovery) {
140t.lock.Lock()
141t.discoverers[disc] = struct{}{}
142t.lock.Unlock()
143}
144
145func (t *TimestampCollector) removeDiscoverer(disc *Discovery) {
146t.lock.Lock()
147delete(t.discoverers, disc)
148t.lock.Unlock()
149}
150
151// NewTimestampCollector creates a TimestampCollector.
152func NewTimestampCollector() *TimestampCollector {
153return &TimestampCollector{
154Description: prometheus.NewDesc(
155"prometheus_sd_file_mtime_seconds",
156"Timestamp (mtime) of files read by FileSD. Timestamp is set at read time.",
157[]string{"filename"},
158nil,
159),
160discoverers: make(map[*Discovery]struct{}),
161}
162}
163
164// Discovery provides service discovery functionality based
165// on files that contain target groups in JSON or YAML format. Refreshing
166// happens using file watches and periodic refreshes.
167type Discovery struct {
168paths []string
169watcher *fsnotify.Watcher
170interval time.Duration
171timestamps map[string]float64
172lock sync.RWMutex
173
174// lastRefresh stores which files were found during the last refresh
175// and how many target groups they contained.
176// This is used to detect deleted target groups.
177lastRefresh map[string]int
178logger log.Logger
179
180metrics *fileMetrics
181}
182
183// NewDiscovery returns a new file discovery for the given paths.
184func NewDiscovery(conf *SDConfig, logger log.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) {
185fm, ok := metrics.(*fileMetrics)
186if !ok {
187return nil, fmt.Errorf("invalid discovery metrics type")
188}
189
190if logger == nil {
191logger = log.NewNopLogger()
192}
193
194disc := &Discovery{
195paths: conf.Files,
196interval: time.Duration(conf.RefreshInterval),
197timestamps: make(map[string]float64),
198logger: logger,
199metrics: fm,
200}
201
202fm.init(disc)
203
204return disc, nil
205}
206
207// listFiles returns a list of all files that match the configured patterns.
208func (d *Discovery) listFiles() []string {
209var paths []string
210for _, p := range d.paths {
211files, err := filepath.Glob(p)
212if err != nil {
213level.Error(d.logger).Log("msg", "Error expanding glob", "glob", p, "err", err)
214continue
215}
216paths = append(paths, files...)
217}
218return paths
219}
220
221// watchFiles sets watches on all full paths or directories that were configured for
222// this file discovery.
223func (d *Discovery) watchFiles() {
224if d.watcher == nil {
225panic("no watcher configured")
226}
227for _, p := range d.paths {
228if dir, _ := filepath.Split(p); dir != "" {
229p = dir
230} else {
231p = "./"
232}
233if err := d.watcher.Add(p); err != nil {
234level.Error(d.logger).Log("msg", "Error adding file watch", "path", p, "err", err)
235}
236}
237}
238
239// Run implements the Discoverer interface.
240func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
241watcher, err := fsnotify.NewWatcher()
242if err != nil {
243level.Error(d.logger).Log("msg", "Error adding file watcher", "err", err)
244d.metrics.fileWatcherErrorsCount.Inc()
245return
246}
247d.watcher = watcher
248defer d.stop()
249
250d.refresh(ctx, ch)
251
252ticker := time.NewTicker(d.interval)
253defer ticker.Stop()
254
255for {
256select {
257case <-ctx.Done():
258return
259
260case event := <-d.watcher.Events:
261// fsnotify sometimes sends a bunch of events without name or operation.
262// It's unclear what they are and why they are sent - filter them out.
263if len(event.Name) == 0 {
264break
265}
266// Everything but a chmod requires rereading.
267if event.Op^fsnotify.Chmod == 0 {
268break
269}
270// Changes to a file can spawn various sequences of events with
271// different combinations of operations. For all practical purposes
272// this is inaccurate.
273// The most reliable solution is to reload everything if anything happens.
274d.refresh(ctx, ch)
275
276case <-ticker.C:
277// Setting a new watch after an update might fail. Make sure we don't lose
278// those files forever.
279d.refresh(ctx, ch)
280
281case err := <-d.watcher.Errors:
282if err != nil {
283level.Error(d.logger).Log("msg", "Error watching file", "err", err)
284}
285}
286}
287}
288
289func (d *Discovery) writeTimestamp(filename string, timestamp float64) {
290d.lock.Lock()
291d.timestamps[filename] = timestamp
292d.lock.Unlock()
293}
294
295func (d *Discovery) deleteTimestamp(filename string) {
296d.lock.Lock()
297delete(d.timestamps, filename)
298d.lock.Unlock()
299}
300
301// stop shuts down the file watcher.
302func (d *Discovery) stop() {
303level.Debug(d.logger).Log("msg", "Stopping file discovery...", "paths", fmt.Sprintf("%v", d.paths))
304
305done := make(chan struct{})
306defer close(done)
307
308d.metrics.fileSDTimeStamp.removeDiscoverer(d)
309
310// Closing the watcher will deadlock unless all events and errors are drained.
311go func() {
312for {
313select {
314case <-d.watcher.Errors:
315case <-d.watcher.Events:
316// Drain all events and errors.
317case <-done:
318return
319}
320}
321}()
322if err := d.watcher.Close(); err != nil {
323level.Error(d.logger).Log("msg", "Error closing file watcher", "paths", fmt.Sprintf("%v", d.paths), "err", err)
324}
325
326level.Debug(d.logger).Log("msg", "File discovery stopped")
327}
328
329// refresh reads all files matching the discovery's patterns and sends the respective
330// updated target groups through the channel.
331func (d *Discovery) refresh(ctx context.Context, ch chan<- []*targetgroup.Group) {
332t0 := time.Now()
333defer func() {
334d.metrics.fileSDScanDuration.Observe(time.Since(t0).Seconds())
335}()
336ref := map[string]int{}
337for _, p := range d.listFiles() {
338tgroups, err := d.readFile(p)
339if err != nil {
340d.metrics.fileSDReadErrorsCount.Inc()
341
342level.Error(d.logger).Log("msg", "Error reading file", "path", p, "err", err)
343// Prevent deletion down below.
344ref[p] = d.lastRefresh[p]
345continue
346}
347select {
348case ch <- tgroups:
349case <-ctx.Done():
350return
351}
352
353ref[p] = len(tgroups)
354}
355// Send empty updates for sources that disappeared.
356for f, n := range d.lastRefresh {
357m, ok := ref[f]
358if !ok || n > m {
359level.Debug(d.logger).Log("msg", "file_sd refresh found file that should be removed", "file", f)
360d.deleteTimestamp(f)
361for i := m; i < n; i++ {
362select {
363case ch <- []*targetgroup.Group{{Source: fileSource(f, i)}}:
364case <-ctx.Done():
365return
366}
367}
368}
369}
370d.lastRefresh = ref
371
372d.watchFiles()
373}
374
375// readFile reads a JSON or YAML list of targets groups from the file, depending on its
376// file extension. It returns full configuration target groups.
377func (d *Discovery) readFile(filename string) ([]*targetgroup.Group, error) {
378fd, err := os.Open(filename)
379if err != nil {
380return nil, err
381}
382defer fd.Close()
383
384content, err := io.ReadAll(fd)
385if err != nil {
386return nil, err
387}
388
389info, err := fd.Stat()
390if err != nil {
391return nil, err
392}
393
394var targetGroups []*targetgroup.Group
395
396switch ext := filepath.Ext(filename); strings.ToLower(ext) {
397case ".json":
398if err := json.Unmarshal(content, &targetGroups); err != nil {
399return nil, err
400}
401case ".yml", ".yaml":
402if err := yaml.UnmarshalStrict(content, &targetGroups); err != nil {
403return nil, err
404}
405default:
406panic(fmt.Errorf("discovery.File.readFile: unhandled file extension %q", ext))
407}
408
409for i, tg := range targetGroups {
410if tg == nil {
411err = errors.New("nil target group item found")
412return nil, err
413}
414
415tg.Source = fileSource(filename, i)
416if tg.Labels == nil {
417tg.Labels = model.LabelSet{}
418}
419tg.Labels[fileSDFilepathLabel] = model.LabelValue(filename)
420}
421
422d.writeTimestamp(filename, float64(info.ModTime().Unix()))
423
424return targetGroups, nil
425}
426
427// fileSource returns a source ID for the i-th target group in the file.
428func fileSource(filename string, i int) string {
429return fmt.Sprintf("%s:%d", filename, i)
430}
431