prometheus

Форк
0
/
manager.go 
325 строк · 8.9 Кб
1
// Copyright 2013 The Prometheus Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package scrape
15

16
import (
17
	"errors"
18
	"fmt"
19
	"hash/fnv"
20
	"reflect"
21
	"sync"
22
	"time"
23

24
	"github.com/go-kit/log"
25
	"github.com/go-kit/log/level"
26
	"github.com/prometheus/client_golang/prometheus"
27
	config_util "github.com/prometheus/common/config"
28
	"github.com/prometheus/common/model"
29

30
	"github.com/prometheus/prometheus/config"
31
	"github.com/prometheus/prometheus/discovery/targetgroup"
32
	"github.com/prometheus/prometheus/model/labels"
33
	"github.com/prometheus/prometheus/storage"
34
	"github.com/prometheus/prometheus/util/osutil"
35
	"github.com/prometheus/prometheus/util/pool"
36
)
37

38
// NewManager is the Manager constructor.
39
func NewManager(o *Options, logger log.Logger, app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
40
	if o == nil {
41
		o = &Options{}
42
	}
43
	if logger == nil {
44
		logger = log.NewNopLogger()
45
	}
46

47
	sm, err := newScrapeMetrics(registerer)
48
	if err != nil {
49
		return nil, fmt.Errorf("failed to create scrape manager due to error: %w", err)
50
	}
51

52
	m := &Manager{
53
		append:        app,
54
		opts:          o,
55
		logger:        logger,
56
		scrapeConfigs: make(map[string]*config.ScrapeConfig),
57
		scrapePools:   make(map[string]*scrapePool),
58
		graceShut:     make(chan struct{}),
59
		triggerReload: make(chan struct{}, 1),
60
		metrics:       sm,
61
		buffers:       pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }),
62
	}
63

64
	m.metrics.setTargetMetadataCacheGatherer(m)
65

66
	return m, nil
67
}
68

69
// Options are the configuration parameters to the scrape manager.
70
type Options struct {
71
	ExtraMetrics  bool
72
	NoDefaultPort bool
73
	// Option used by downstream scraper users like OpenTelemetry Collector
74
	// to help lookup metric metadata. Should be false for Prometheus.
75
	PassMetadataInContext bool
76
	// Option to enable the experimental in-memory metadata storage and append
77
	// metadata to the WAL.
78
	EnableMetadataStorage bool
79
	// Option to increase the interval used by scrape manager to throttle target groups updates.
80
	DiscoveryReloadInterval model.Duration
81
	// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
82
	// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
83
	EnableCreatedTimestampZeroIngestion bool
84
	// Option to enable the ingestion of native histograms.
85
	EnableNativeHistogramsIngestion bool
86

87
	// Optional HTTP client options to use when scraping.
88
	HTTPClientOptions []config_util.HTTPClientOption
89

90
	// private option for testability.
91
	skipOffsetting bool
92
}
93

94
// Manager maintains a set of scrape pools and manages start/stop cycles
95
// when receiving new target groups from the discovery manager.
96
type Manager struct {
97
	opts      *Options
98
	logger    log.Logger
99
	append    storage.Appendable
100
	graceShut chan struct{}
101

102
	offsetSeed    uint64     // Global offsetSeed seed is used to spread scrape workload across HA setup.
103
	mtxScrape     sync.Mutex // Guards the fields below.
104
	scrapeConfigs map[string]*config.ScrapeConfig
105
	scrapePools   map[string]*scrapePool
106
	targetSets    map[string][]*targetgroup.Group
107
	buffers       *pool.Pool
108

109
	triggerReload chan struct{}
110

111
	metrics *scrapeMetrics
112
}
113

114
// Run receives and saves target set updates and triggers the scraping loops reloading.
115
// Reloading happens in the background so that it doesn't block receiving targets updates.
116
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
117
	go m.reloader()
118
	for {
119
		select {
120
		case ts := <-tsets:
121
			m.updateTsets(ts)
122

123
			select {
124
			case m.triggerReload <- struct{}{}:
125
			default:
126
			}
127

128
		case <-m.graceShut:
129
			return nil
130
		}
131
	}
132
}
133

134
// UnregisterMetrics unregisters manager metrics.
135
func (m *Manager) UnregisterMetrics() {
136
	m.metrics.Unregister()
137
}
138

139
func (m *Manager) reloader() {
140
	reloadIntervalDuration := m.opts.DiscoveryReloadInterval
141
	if reloadIntervalDuration < model.Duration(5*time.Second) {
142
		reloadIntervalDuration = model.Duration(5 * time.Second)
143
	}
144

145
	ticker := time.NewTicker(time.Duration(reloadIntervalDuration))
146

147
	defer ticker.Stop()
148

149
	for {
150
		select {
151
		case <-m.graceShut:
152
			return
153
		case <-ticker.C:
154
			select {
155
			case <-m.triggerReload:
156
				m.reload()
157
			case <-m.graceShut:
158
				return
159
			}
160
		}
161
	}
162
}
163

164
func (m *Manager) reload() {
165
	m.mtxScrape.Lock()
166
	var wg sync.WaitGroup
167
	for setName, groups := range m.targetSets {
168
		if _, ok := m.scrapePools[setName]; !ok {
169
			scrapeConfig, ok := m.scrapeConfigs[setName]
170
			if !ok {
171
				level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
172
				continue
173
			}
174
			m.metrics.targetScrapePools.Inc()
175
			sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, log.With(m.logger, "scrape_pool", setName), m.buffers, m.opts, m.metrics)
176
			if err != nil {
177
				m.metrics.targetScrapePoolsFailed.Inc()
178
				level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
179
				continue
180
			}
181
			m.scrapePools[setName] = sp
182
		}
183

184
		wg.Add(1)
185
		// Run the sync in parallel as these take a while and at high load can't catch up.
186
		go func(sp *scrapePool, groups []*targetgroup.Group) {
187
			sp.Sync(groups)
188
			wg.Done()
189
		}(m.scrapePools[setName], groups)
190
	}
191
	m.mtxScrape.Unlock()
192
	wg.Wait()
193
}
194

195
// setOffsetSeed calculates a global offsetSeed per server relying on extra label set.
196
func (m *Manager) setOffsetSeed(labels labels.Labels) error {
197
	h := fnv.New64a()
198
	hostname, err := osutil.GetFQDN()
199
	if err != nil {
200
		return err
201
	}
202
	if _, err := fmt.Fprintf(h, "%s%s", hostname, labels.String()); err != nil {
203
		return err
204
	}
205
	m.offsetSeed = h.Sum64()
206
	return nil
207
}
208

209
// Stop cancels all running scrape pools and blocks until all have exited.
210
func (m *Manager) Stop() {
211
	m.mtxScrape.Lock()
212
	defer m.mtxScrape.Unlock()
213

214
	for _, sp := range m.scrapePools {
215
		sp.stop()
216
	}
217
	close(m.graceShut)
218
}
219

220
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
221
	m.mtxScrape.Lock()
222
	m.targetSets = tsets
223
	m.mtxScrape.Unlock()
224
}
225

226
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
227
func (m *Manager) ApplyConfig(cfg *config.Config) error {
228
	m.mtxScrape.Lock()
229
	defer m.mtxScrape.Unlock()
230

231
	scfgs, err := cfg.GetScrapeConfigs()
232
	if err != nil {
233
		return err
234
	}
235

236
	c := make(map[string]*config.ScrapeConfig)
237
	for _, scfg := range scfgs {
238
		c[scfg.JobName] = scfg
239
	}
240
	m.scrapeConfigs = c
241

242
	if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
243
		return err
244
	}
245

246
	// Cleanup and reload pool if the configuration has changed.
247
	var failed bool
248
	for name, sp := range m.scrapePools {
249
		switch cfg, ok := m.scrapeConfigs[name]; {
250
		case !ok:
251
			sp.stop()
252
			delete(m.scrapePools, name)
253
		case !reflect.DeepEqual(sp.config, cfg):
254
			err := sp.reload(cfg)
255
			if err != nil {
256
				level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
257
				failed = true
258
			}
259
		}
260
	}
261

262
	if failed {
263
		return errors.New("failed to apply the new configuration")
264
	}
265
	return nil
266
}
267

268
// TargetsAll returns active and dropped targets grouped by job_name.
269
func (m *Manager) TargetsAll() map[string][]*Target {
270
	m.mtxScrape.Lock()
271
	defer m.mtxScrape.Unlock()
272

273
	targets := make(map[string][]*Target, len(m.scrapePools))
274
	for tset, sp := range m.scrapePools {
275
		targets[tset] = append(sp.ActiveTargets(), sp.DroppedTargets()...)
276
	}
277
	return targets
278
}
279

280
// ScrapePools returns the list of all scrape pool names.
281
func (m *Manager) ScrapePools() []string {
282
	m.mtxScrape.Lock()
283
	defer m.mtxScrape.Unlock()
284

285
	names := make([]string, 0, len(m.scrapePools))
286
	for name := range m.scrapePools {
287
		names = append(names, name)
288
	}
289
	return names
290
}
291

292
// TargetsActive returns the active targets currently being scraped.
293
func (m *Manager) TargetsActive() map[string][]*Target {
294
	m.mtxScrape.Lock()
295
	defer m.mtxScrape.Unlock()
296

297
	targets := make(map[string][]*Target, len(m.scrapePools))
298
	for tset, sp := range m.scrapePools {
299
		targets[tset] = sp.ActiveTargets()
300
	}
301
	return targets
302
}
303

304
// TargetsDropped returns the dropped targets during relabelling, subject to KeepDroppedTargets limit.
305
func (m *Manager) TargetsDropped() map[string][]*Target {
306
	m.mtxScrape.Lock()
307
	defer m.mtxScrape.Unlock()
308

309
	targets := make(map[string][]*Target, len(m.scrapePools))
310
	for tset, sp := range m.scrapePools {
311
		targets[tset] = sp.DroppedTargets()
312
	}
313
	return targets
314
}
315

316
func (m *Manager) TargetsDroppedCounts() map[string]int {
317
	m.mtxScrape.Lock()
318
	defer m.mtxScrape.Unlock()
319

320
	counts := make(map[string]int, len(m.scrapePools))
321
	for tset, sp := range m.scrapePools {
322
		counts[tset] = sp.droppedTargetsCount
323
	}
324
	return counts
325
}
326

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.