prometheus

Форк
0
332 строки · 9.1 Кб
1
// Copyright 2016 The Prometheus Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package legacymanager
15

16
import (
17
	"context"
18
	"fmt"
19
	"reflect"
20
	"sync"
21
	"time"
22

23
	"github.com/go-kit/log"
24
	"github.com/go-kit/log/level"
25
	"github.com/prometheus/client_golang/prometheus"
26

27
	"github.com/prometheus/prometheus/discovery"
28
	"github.com/prometheus/prometheus/discovery/targetgroup"
29
)
30

31
type poolKey struct {
32
	setName  string
33
	provider string
34
}
35

36
// provider holds a Discoverer instance, its configuration and its subscribers.
37
type provider struct {
38
	name   string
39
	d      discovery.Discoverer
40
	subs   []string
41
	config interface{}
42
}
43

44
// NewManager is the Discovery Manager constructor.
45
func NewManager(ctx context.Context, logger log.Logger, registerer prometheus.Registerer, sdMetrics map[string]discovery.DiscovererMetrics, options ...func(*Manager)) *Manager {
46
	if logger == nil {
47
		logger = log.NewNopLogger()
48
	}
49
	mgr := &Manager{
50
		logger:         logger,
51
		syncCh:         make(chan map[string][]*targetgroup.Group),
52
		targets:        make(map[poolKey]map[string]*targetgroup.Group),
53
		discoverCancel: []context.CancelFunc{},
54
		ctx:            ctx,
55
		updatert:       5 * time.Second,
56
		triggerSend:    make(chan struct{}, 1),
57
		registerer:     registerer,
58
		sdMetrics:      sdMetrics,
59
	}
60
	for _, option := range options {
61
		option(mgr)
62
	}
63

64
	// Register the metrics.
65
	// We have to do this after setting all options, so that the name of the Manager is set.
66
	if metrics, err := discovery.NewManagerMetrics(registerer, mgr.name); err == nil {
67
		mgr.metrics = metrics
68
	} else {
69
		level.Error(logger).Log("msg", "Failed to create discovery manager metrics", "manager", mgr.name, "err", err)
70
		return nil
71
	}
72

73
	return mgr
74
}
75

76
// Name sets the name of the manager.
77
func Name(n string) func(*Manager) {
78
	return func(m *Manager) {
79
		m.mtx.Lock()
80
		defer m.mtx.Unlock()
81
		m.name = n
82
	}
83
}
84

85
// Manager maintains a set of discovery providers and sends each update to a map channel.
86
// Targets are grouped by the target set name.
87
type Manager struct {
88
	logger         log.Logger
89
	name           string
90
	mtx            sync.RWMutex
91
	ctx            context.Context
92
	discoverCancel []context.CancelFunc
93

94
	// Some Discoverers(eg. k8s) send only the updates for a given target group
95
	// so we use map[tg.Source]*targetgroup.Group to know which group to update.
96
	targets map[poolKey]map[string]*targetgroup.Group
97
	// providers keeps track of SD providers.
98
	providers []*provider
99
	// The sync channel sends the updates as a map where the key is the job value from the scrape config.
100
	syncCh chan map[string][]*targetgroup.Group
101

102
	// How long to wait before sending updates to the channel. The variable
103
	// should only be modified in unit tests.
104
	updatert time.Duration
105

106
	// The triggerSend channel signals to the manager that new updates have been received from providers.
107
	triggerSend chan struct{}
108

109
	// A registerer for all service discovery metrics.
110
	registerer prometheus.Registerer
111

112
	metrics   *discovery.Metrics
113
	sdMetrics map[string]discovery.DiscovererMetrics
114
}
115

116
// Run starts the background processing.
117
func (m *Manager) Run() error {
118
	go m.sender()
119
	<-m.ctx.Done()
120
	m.cancelDiscoverers()
121
	return m.ctx.Err()
122
}
123

124
// SyncCh returns a read only channel used by all the clients to receive target updates.
125
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
126
	return m.syncCh
127
}
128

129
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
130
func (m *Manager) ApplyConfig(cfg map[string]discovery.Configs) error {
131
	m.mtx.Lock()
132
	defer m.mtx.Unlock()
133

134
	for pk := range m.targets {
135
		if _, ok := cfg[pk.setName]; !ok {
136
			m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, pk.setName)
137
		}
138
	}
139
	m.cancelDiscoverers()
140
	m.targets = make(map[poolKey]map[string]*targetgroup.Group)
141
	m.providers = nil
142
	m.discoverCancel = nil
143

144
	failedCount := 0
145
	for name, scfg := range cfg {
146
		failedCount += m.registerProviders(scfg, name)
147
		m.metrics.DiscoveredTargets.WithLabelValues(name).Set(0)
148
	}
149
	m.metrics.FailedConfigs.Set(float64(failedCount))
150

151
	for _, prov := range m.providers {
152
		m.startProvider(m.ctx, prov)
153
	}
154

155
	return nil
156
}
157

158
// StartCustomProvider is used for sdtool. Only use this if you know what you're doing.
159
func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker discovery.Discoverer) {
160
	p := &provider{
161
		name: name,
162
		d:    worker,
163
		subs: []string{name},
164
	}
165
	m.providers = append(m.providers, p)
166
	m.startProvider(ctx, p)
167
}
168

169
func (m *Manager) startProvider(ctx context.Context, p *provider) {
170
	level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
171
	ctx, cancel := context.WithCancel(ctx)
172
	updates := make(chan []*targetgroup.Group)
173

174
	m.discoverCancel = append(m.discoverCancel, cancel)
175

176
	go p.d.Run(ctx, updates)
177
	go m.updater(ctx, p, updates)
178
}
179

180
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
181
	for {
182
		select {
183
		case <-ctx.Done():
184
			return
185
		case tgs, ok := <-updates:
186
			m.metrics.ReceivedUpdates.Inc()
187
			if !ok {
188
				level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
189
				return
190
			}
191

192
			for _, s := range p.subs {
193
				m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
194
			}
195

196
			select {
197
			case m.triggerSend <- struct{}{}:
198
			default:
199
			}
200
		}
201
	}
202
}
203

204
func (m *Manager) sender() {
205
	ticker := time.NewTicker(m.updatert)
206
	defer ticker.Stop()
207

208
	for {
209
		select {
210
		case <-m.ctx.Done():
211
			return
212
		case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
213
			select {
214
			case <-m.triggerSend:
215
				m.metrics.SentUpdates.Inc()
216
				select {
217
				case m.syncCh <- m.allGroups():
218
				default:
219
					m.metrics.DelayedUpdates.Inc()
220
					level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
221
					select {
222
					case m.triggerSend <- struct{}{}:
223
					default:
224
					}
225
				}
226
			default:
227
			}
228
		}
229
	}
230
}
231

232
func (m *Manager) cancelDiscoverers() {
233
	for _, c := range m.discoverCancel {
234
		c()
235
	}
236
}
237

238
func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
239
	m.mtx.Lock()
240
	defer m.mtx.Unlock()
241

242
	if _, ok := m.targets[poolKey]; !ok {
243
		m.targets[poolKey] = make(map[string]*targetgroup.Group)
244
	}
245
	for _, tg := range tgs {
246
		if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
247
			m.targets[poolKey][tg.Source] = tg
248
		}
249
	}
250
}
251

252
func (m *Manager) allGroups() map[string][]*targetgroup.Group {
253
	m.mtx.RLock()
254
	defer m.mtx.RUnlock()
255

256
	tSets := map[string][]*targetgroup.Group{}
257
	n := map[string]int{}
258
	for pkey, tsets := range m.targets {
259
		for _, tg := range tsets {
260
			// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
261
			// to signal that it needs to stop all scrape loops for this target set.
262
			tSets[pkey.setName] = append(tSets[pkey.setName], tg)
263
			n[pkey.setName] += len(tg.Targets)
264
		}
265
	}
266
	for setName, v := range n {
267
		m.metrics.DiscoveredTargets.WithLabelValues(setName).Set(float64(v))
268
	}
269
	return tSets
270
}
271

272
// registerProviders returns a number of failed SD config.
273
func (m *Manager) registerProviders(cfgs discovery.Configs, setName string) int {
274
	var (
275
		failed int
276
		added  bool
277
	)
278
	add := func(cfg discovery.Config) {
279
		for _, p := range m.providers {
280
			if reflect.DeepEqual(cfg, p.config) {
281
				p.subs = append(p.subs, setName)
282
				added = true
283
				return
284
			}
285
		}
286
		typ := cfg.Name()
287
		d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{
288
			Logger:  log.With(m.logger, "discovery", typ, "config", setName),
289
			Metrics: m.sdMetrics[typ],
290
		})
291
		if err != nil {
292
			level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ, "config", setName)
293
			failed++
294
			return
295
		}
296
		m.providers = append(m.providers, &provider{
297
			name:   fmt.Sprintf("%s/%d", typ, len(m.providers)),
298
			d:      d,
299
			config: cfg,
300
			subs:   []string{setName},
301
		})
302
		added = true
303
	}
304
	for _, cfg := range cfgs {
305
		add(cfg)
306
	}
307
	if !added {
308
		// Add an empty target group to force the refresh of the corresponding
309
		// scrape pool and to notify the receiver that this target set has no
310
		// current targets.
311
		// It can happen because the combined set of SD configurations is empty
312
		// or because we fail to instantiate all the SD configurations.
313
		add(discovery.StaticConfig{{}})
314
	}
315
	return failed
316
}
317

318
// StaticProvider holds a list of target groups that never change.
319
type StaticProvider struct {
320
	TargetGroups []*targetgroup.Group
321
}
322

323
// Run implements the Worker interface.
324
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
325
	// We still have to consider that the consumer exits right away in which case
326
	// the context will be canceled.
327
	select {
328
	case ch <- sd.TargetGroups:
329
	case <-ctx.Done():
330
	}
331
	close(ch)
332
}
333

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.