prometheus
325 строк · 8.9 Кб
1// Copyright 2013 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package scrape
15
16import (
17"errors"
18"fmt"
19"hash/fnv"
20"reflect"
21"sync"
22"time"
23
24"github.com/go-kit/log"
25"github.com/go-kit/log/level"
26"github.com/prometheus/client_golang/prometheus"
27config_util "github.com/prometheus/common/config"
28"github.com/prometheus/common/model"
29
30"github.com/prometheus/prometheus/config"
31"github.com/prometheus/prometheus/discovery/targetgroup"
32"github.com/prometheus/prometheus/model/labels"
33"github.com/prometheus/prometheus/storage"
34"github.com/prometheus/prometheus/util/osutil"
35"github.com/prometheus/prometheus/util/pool"
36)
37
38// NewManager is the Manager constructor.
39func NewManager(o *Options, logger log.Logger, app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
40if o == nil {
41o = &Options{}
42}
43if logger == nil {
44logger = log.NewNopLogger()
45}
46
47sm, err := newScrapeMetrics(registerer)
48if err != nil {
49return nil, fmt.Errorf("failed to create scrape manager due to error: %w", err)
50}
51
52m := &Manager{
53append: app,
54opts: o,
55logger: logger,
56scrapeConfigs: make(map[string]*config.ScrapeConfig),
57scrapePools: make(map[string]*scrapePool),
58graceShut: make(chan struct{}),
59triggerReload: make(chan struct{}, 1),
60metrics: sm,
61buffers: pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }),
62}
63
64m.metrics.setTargetMetadataCacheGatherer(m)
65
66return m, nil
67}
68
69// Options are the configuration parameters to the scrape manager.
70type Options struct {
71ExtraMetrics bool
72NoDefaultPort bool
73// Option used by downstream scraper users like OpenTelemetry Collector
74// to help lookup metric metadata. Should be false for Prometheus.
75PassMetadataInContext bool
76// Option to enable the experimental in-memory metadata storage and append
77// metadata to the WAL.
78EnableMetadataStorage bool
79// Option to increase the interval used by scrape manager to throttle target groups updates.
80DiscoveryReloadInterval model.Duration
81// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
82// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
83EnableCreatedTimestampZeroIngestion bool
84// Option to enable the ingestion of native histograms.
85EnableNativeHistogramsIngestion bool
86
87// Optional HTTP client options to use when scraping.
88HTTPClientOptions []config_util.HTTPClientOption
89
90// private option for testability.
91skipOffsetting bool
92}
93
94// Manager maintains a set of scrape pools and manages start/stop cycles
95// when receiving new target groups from the discovery manager.
96type Manager struct {
97opts *Options
98logger log.Logger
99append storage.Appendable
100graceShut chan struct{}
101
102offsetSeed uint64 // Global offsetSeed seed is used to spread scrape workload across HA setup.
103mtxScrape sync.Mutex // Guards the fields below.
104scrapeConfigs map[string]*config.ScrapeConfig
105scrapePools map[string]*scrapePool
106targetSets map[string][]*targetgroup.Group
107buffers *pool.Pool
108
109triggerReload chan struct{}
110
111metrics *scrapeMetrics
112}
113
114// Run receives and saves target set updates and triggers the scraping loops reloading.
115// Reloading happens in the background so that it doesn't block receiving targets updates.
116func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
117go m.reloader()
118for {
119select {
120case ts := <-tsets:
121m.updateTsets(ts)
122
123select {
124case m.triggerReload <- struct{}{}:
125default:
126}
127
128case <-m.graceShut:
129return nil
130}
131}
132}
133
134// UnregisterMetrics unregisters manager metrics.
135func (m *Manager) UnregisterMetrics() {
136m.metrics.Unregister()
137}
138
139func (m *Manager) reloader() {
140reloadIntervalDuration := m.opts.DiscoveryReloadInterval
141if reloadIntervalDuration < model.Duration(5*time.Second) {
142reloadIntervalDuration = model.Duration(5 * time.Second)
143}
144
145ticker := time.NewTicker(time.Duration(reloadIntervalDuration))
146
147defer ticker.Stop()
148
149for {
150select {
151case <-m.graceShut:
152return
153case <-ticker.C:
154select {
155case <-m.triggerReload:
156m.reload()
157case <-m.graceShut:
158return
159}
160}
161}
162}
163
164func (m *Manager) reload() {
165m.mtxScrape.Lock()
166var wg sync.WaitGroup
167for setName, groups := range m.targetSets {
168if _, ok := m.scrapePools[setName]; !ok {
169scrapeConfig, ok := m.scrapeConfigs[setName]
170if !ok {
171level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
172continue
173}
174m.metrics.targetScrapePools.Inc()
175sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, log.With(m.logger, "scrape_pool", setName), m.buffers, m.opts, m.metrics)
176if err != nil {
177m.metrics.targetScrapePoolsFailed.Inc()
178level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
179continue
180}
181m.scrapePools[setName] = sp
182}
183
184wg.Add(1)
185// Run the sync in parallel as these take a while and at high load can't catch up.
186go func(sp *scrapePool, groups []*targetgroup.Group) {
187sp.Sync(groups)
188wg.Done()
189}(m.scrapePools[setName], groups)
190}
191m.mtxScrape.Unlock()
192wg.Wait()
193}
194
195// setOffsetSeed calculates a global offsetSeed per server relying on extra label set.
196func (m *Manager) setOffsetSeed(labels labels.Labels) error {
197h := fnv.New64a()
198hostname, err := osutil.GetFQDN()
199if err != nil {
200return err
201}
202if _, err := fmt.Fprintf(h, "%s%s", hostname, labels.String()); err != nil {
203return err
204}
205m.offsetSeed = h.Sum64()
206return nil
207}
208
209// Stop cancels all running scrape pools and blocks until all have exited.
210func (m *Manager) Stop() {
211m.mtxScrape.Lock()
212defer m.mtxScrape.Unlock()
213
214for _, sp := range m.scrapePools {
215sp.stop()
216}
217close(m.graceShut)
218}
219
220func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
221m.mtxScrape.Lock()
222m.targetSets = tsets
223m.mtxScrape.Unlock()
224}
225
226// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
227func (m *Manager) ApplyConfig(cfg *config.Config) error {
228m.mtxScrape.Lock()
229defer m.mtxScrape.Unlock()
230
231scfgs, err := cfg.GetScrapeConfigs()
232if err != nil {
233return err
234}
235
236c := make(map[string]*config.ScrapeConfig)
237for _, scfg := range scfgs {
238c[scfg.JobName] = scfg
239}
240m.scrapeConfigs = c
241
242if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
243return err
244}
245
246// Cleanup and reload pool if the configuration has changed.
247var failed bool
248for name, sp := range m.scrapePools {
249switch cfg, ok := m.scrapeConfigs[name]; {
250case !ok:
251sp.stop()
252delete(m.scrapePools, name)
253case !reflect.DeepEqual(sp.config, cfg):
254err := sp.reload(cfg)
255if err != nil {
256level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
257failed = true
258}
259}
260}
261
262if failed {
263return errors.New("failed to apply the new configuration")
264}
265return nil
266}
267
268// TargetsAll returns active and dropped targets grouped by job_name.
269func (m *Manager) TargetsAll() map[string][]*Target {
270m.mtxScrape.Lock()
271defer m.mtxScrape.Unlock()
272
273targets := make(map[string][]*Target, len(m.scrapePools))
274for tset, sp := range m.scrapePools {
275targets[tset] = append(sp.ActiveTargets(), sp.DroppedTargets()...)
276}
277return targets
278}
279
280// ScrapePools returns the list of all scrape pool names.
281func (m *Manager) ScrapePools() []string {
282m.mtxScrape.Lock()
283defer m.mtxScrape.Unlock()
284
285names := make([]string, 0, len(m.scrapePools))
286for name := range m.scrapePools {
287names = append(names, name)
288}
289return names
290}
291
292// TargetsActive returns the active targets currently being scraped.
293func (m *Manager) TargetsActive() map[string][]*Target {
294m.mtxScrape.Lock()
295defer m.mtxScrape.Unlock()
296
297targets := make(map[string][]*Target, len(m.scrapePools))
298for tset, sp := range m.scrapePools {
299targets[tset] = sp.ActiveTargets()
300}
301return targets
302}
303
304// TargetsDropped returns the dropped targets during relabelling, subject to KeepDroppedTargets limit.
305func (m *Manager) TargetsDropped() map[string][]*Target {
306m.mtxScrape.Lock()
307defer m.mtxScrape.Unlock()
308
309targets := make(map[string][]*Target, len(m.scrapePools))
310for tset, sp := range m.scrapePools {
311targets[tset] = sp.DroppedTargets()
312}
313return targets
314}
315
316func (m *Manager) TargetsDroppedCounts() map[string]int {
317m.mtxScrape.Lock()
318defer m.mtxScrape.Unlock()
319
320counts := make(map[string]int, len(m.scrapePools))
321for tset, sp := range m.scrapePools {
322counts[tset] = sp.droppedTargetsCount
323}
324return counts
325}
326