prometheus
332 строки · 9.1 Кб
1// Copyright 2016 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package legacymanager
15
16import (
17"context"
18"fmt"
19"reflect"
20"sync"
21"time"
22
23"github.com/go-kit/log"
24"github.com/go-kit/log/level"
25"github.com/prometheus/client_golang/prometheus"
26
27"github.com/prometheus/prometheus/discovery"
28"github.com/prometheus/prometheus/discovery/targetgroup"
29)
30
31type poolKey struct {
32setName string
33provider string
34}
35
36// provider holds a Discoverer instance, its configuration and its subscribers.
37type provider struct {
38name string
39d discovery.Discoverer
40subs []string
41config interface{}
42}
43
44// NewManager is the Discovery Manager constructor.
45func NewManager(ctx context.Context, logger log.Logger, registerer prometheus.Registerer, sdMetrics map[string]discovery.DiscovererMetrics, options ...func(*Manager)) *Manager {
46if logger == nil {
47logger = log.NewNopLogger()
48}
49mgr := &Manager{
50logger: logger,
51syncCh: make(chan map[string][]*targetgroup.Group),
52targets: make(map[poolKey]map[string]*targetgroup.Group),
53discoverCancel: []context.CancelFunc{},
54ctx: ctx,
55updatert: 5 * time.Second,
56triggerSend: make(chan struct{}, 1),
57registerer: registerer,
58sdMetrics: sdMetrics,
59}
60for _, option := range options {
61option(mgr)
62}
63
64// Register the metrics.
65// We have to do this after setting all options, so that the name of the Manager is set.
66if metrics, err := discovery.NewManagerMetrics(registerer, mgr.name); err == nil {
67mgr.metrics = metrics
68} else {
69level.Error(logger).Log("msg", "Failed to create discovery manager metrics", "manager", mgr.name, "err", err)
70return nil
71}
72
73return mgr
74}
75
76// Name sets the name of the manager.
77func Name(n string) func(*Manager) {
78return func(m *Manager) {
79m.mtx.Lock()
80defer m.mtx.Unlock()
81m.name = n
82}
83}
84
85// Manager maintains a set of discovery providers and sends each update to a map channel.
86// Targets are grouped by the target set name.
87type Manager struct {
88logger log.Logger
89name string
90mtx sync.RWMutex
91ctx context.Context
92discoverCancel []context.CancelFunc
93
94// Some Discoverers(eg. k8s) send only the updates for a given target group
95// so we use map[tg.Source]*targetgroup.Group to know which group to update.
96targets map[poolKey]map[string]*targetgroup.Group
97// providers keeps track of SD providers.
98providers []*provider
99// The sync channel sends the updates as a map where the key is the job value from the scrape config.
100syncCh chan map[string][]*targetgroup.Group
101
102// How long to wait before sending updates to the channel. The variable
103// should only be modified in unit tests.
104updatert time.Duration
105
106// The triggerSend channel signals to the manager that new updates have been received from providers.
107triggerSend chan struct{}
108
109// A registerer for all service discovery metrics.
110registerer prometheus.Registerer
111
112metrics *discovery.Metrics
113sdMetrics map[string]discovery.DiscovererMetrics
114}
115
116// Run starts the background processing.
117func (m *Manager) Run() error {
118go m.sender()
119<-m.ctx.Done()
120m.cancelDiscoverers()
121return m.ctx.Err()
122}
123
124// SyncCh returns a read only channel used by all the clients to receive target updates.
125func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
126return m.syncCh
127}
128
129// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
130func (m *Manager) ApplyConfig(cfg map[string]discovery.Configs) error {
131m.mtx.Lock()
132defer m.mtx.Unlock()
133
134for pk := range m.targets {
135if _, ok := cfg[pk.setName]; !ok {
136m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, pk.setName)
137}
138}
139m.cancelDiscoverers()
140m.targets = make(map[poolKey]map[string]*targetgroup.Group)
141m.providers = nil
142m.discoverCancel = nil
143
144failedCount := 0
145for name, scfg := range cfg {
146failedCount += m.registerProviders(scfg, name)
147m.metrics.DiscoveredTargets.WithLabelValues(name).Set(0)
148}
149m.metrics.FailedConfigs.Set(float64(failedCount))
150
151for _, prov := range m.providers {
152m.startProvider(m.ctx, prov)
153}
154
155return nil
156}
157
158// StartCustomProvider is used for sdtool. Only use this if you know what you're doing.
159func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker discovery.Discoverer) {
160p := &provider{
161name: name,
162d: worker,
163subs: []string{name},
164}
165m.providers = append(m.providers, p)
166m.startProvider(ctx, p)
167}
168
169func (m *Manager) startProvider(ctx context.Context, p *provider) {
170level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
171ctx, cancel := context.WithCancel(ctx)
172updates := make(chan []*targetgroup.Group)
173
174m.discoverCancel = append(m.discoverCancel, cancel)
175
176go p.d.Run(ctx, updates)
177go m.updater(ctx, p, updates)
178}
179
180func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
181for {
182select {
183case <-ctx.Done():
184return
185case tgs, ok := <-updates:
186m.metrics.ReceivedUpdates.Inc()
187if !ok {
188level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
189return
190}
191
192for _, s := range p.subs {
193m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
194}
195
196select {
197case m.triggerSend <- struct{}{}:
198default:
199}
200}
201}
202}
203
204func (m *Manager) sender() {
205ticker := time.NewTicker(m.updatert)
206defer ticker.Stop()
207
208for {
209select {
210case <-m.ctx.Done():
211return
212case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
213select {
214case <-m.triggerSend:
215m.metrics.SentUpdates.Inc()
216select {
217case m.syncCh <- m.allGroups():
218default:
219m.metrics.DelayedUpdates.Inc()
220level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
221select {
222case m.triggerSend <- struct{}{}:
223default:
224}
225}
226default:
227}
228}
229}
230}
231
232func (m *Manager) cancelDiscoverers() {
233for _, c := range m.discoverCancel {
234c()
235}
236}
237
238func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
239m.mtx.Lock()
240defer m.mtx.Unlock()
241
242if _, ok := m.targets[poolKey]; !ok {
243m.targets[poolKey] = make(map[string]*targetgroup.Group)
244}
245for _, tg := range tgs {
246if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
247m.targets[poolKey][tg.Source] = tg
248}
249}
250}
251
252func (m *Manager) allGroups() map[string][]*targetgroup.Group {
253m.mtx.RLock()
254defer m.mtx.RUnlock()
255
256tSets := map[string][]*targetgroup.Group{}
257n := map[string]int{}
258for pkey, tsets := range m.targets {
259for _, tg := range tsets {
260// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
261// to signal that it needs to stop all scrape loops for this target set.
262tSets[pkey.setName] = append(tSets[pkey.setName], tg)
263n[pkey.setName] += len(tg.Targets)
264}
265}
266for setName, v := range n {
267m.metrics.DiscoveredTargets.WithLabelValues(setName).Set(float64(v))
268}
269return tSets
270}
271
272// registerProviders returns a number of failed SD config.
273func (m *Manager) registerProviders(cfgs discovery.Configs, setName string) int {
274var (
275failed int
276added bool
277)
278add := func(cfg discovery.Config) {
279for _, p := range m.providers {
280if reflect.DeepEqual(cfg, p.config) {
281p.subs = append(p.subs, setName)
282added = true
283return
284}
285}
286typ := cfg.Name()
287d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{
288Logger: log.With(m.logger, "discovery", typ, "config", setName),
289Metrics: m.sdMetrics[typ],
290})
291if err != nil {
292level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ, "config", setName)
293failed++
294return
295}
296m.providers = append(m.providers, &provider{
297name: fmt.Sprintf("%s/%d", typ, len(m.providers)),
298d: d,
299config: cfg,
300subs: []string{setName},
301})
302added = true
303}
304for _, cfg := range cfgs {
305add(cfg)
306}
307if !added {
308// Add an empty target group to force the refresh of the corresponding
309// scrape pool and to notify the receiver that this target set has no
310// current targets.
311// It can happen because the combined set of SD configurations is empty
312// or because we fail to instantiate all the SD configurations.
313add(discovery.StaticConfig{{}})
314}
315return failed
316}
317
318// StaticProvider holds a list of target groups that never change.
319type StaticProvider struct {
320TargetGroups []*targetgroup.Group
321}
322
323// Run implements the Worker interface.
324func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
325// We still have to consider that the consumer exits right away in which case
326// the context will be canceled.
327select {
328case ch <- sd.TargetGroups:
329case <-ctx.Done():
330}
331close(ch)
332}
333