pangolin_exporter

Форк
0
231 строка · 6.7 Кб
1
// Copyright 2022 The Prometheus Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package collector
15

16
import (
17
	"context"
18
	"errors"
19
	"fmt"
20
	"sync"
21
	"time"
22

23
	"github.com/alecthomas/kingpin/v2"
24
	"github.com/go-kit/log"
25
	"github.com/go-kit/log/level"
26
	"github.com/prometheus/client_golang/prometheus"
27
)
28

29
var (
30
	factories              = make(map[string]func(collectorConfig) (Collector, error))
31
	initiatedCollectorsMtx = sync.Mutex{}
32
	initiatedCollectors    = make(map[string]Collector)
33
	collectorState         = make(map[string]*bool)
34
	forcedCollectors       = map[string]bool{} // collectors which have been explicitly enabled or disabled
35
)
36

37
const (
38
	// Namespace for all metrics.
39
	namespace          = "pg"
40
	namespace_pangolin = "pangolin"
41

42
	defaultEnabled  = true
43
	defaultDisabled = false
44
)
45

46
var (
47
	scrapeDurationDesc = prometheus.NewDesc(
48
		prometheus.BuildFQName(namespace, "scrape", "collector_duration_seconds"),
49
		"pangolin_exporter: Duration of a collector scrape.",
50
		[]string{"collector"},
51
		nil,
52
	)
53
	scrapeSuccessDesc = prometheus.NewDesc(
54
		prometheus.BuildFQName(namespace, "scrape", "collector_success"),
55
		"postgres_exporter: Whether a collector succeeded.",
56
		[]string{"collector"},
57
		nil,
58
	)
59
)
60

61
type Collector interface {
62
	Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error
63
}
64

65
type collectorConfig struct {
66
	logger           log.Logger
67
	excludeDatabases []string
68
}
69

70
func registerCollector(name string, isDefaultEnabled bool, createFunc func(collectorConfig) (Collector, error)) {
71
	var helpDefaultState string
72
	if isDefaultEnabled {
73
		helpDefaultState = "enabled"
74
	} else {
75
		helpDefaultState = "disabled"
76
	}
77

78
	// Create flag for this collector
79
	flagName := fmt.Sprintf("collector.%s", name)
80
	flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState)
81
	defaultValue := fmt.Sprintf("%v", isDefaultEnabled)
82

83
	flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(name)).Bool()
84
	collectorState[name] = flag
85

86
	// Register the create function for this collector
87
	factories[name] = createFunc
88
}
89

90
// PostgresCollector implements the prometheus.Collector interface.
91
type PostgresCollector struct {
92
	Collectors map[string]Collector
93
	logger     log.Logger
94

95
	instance *instance
96
}
97

98
type Option func(*PostgresCollector) error
99

100
// NewPostgresCollector creates a new PostgresCollector.
101
func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn string, filters []string, options ...Option) (*PostgresCollector, error) {
102
	p := &PostgresCollector{
103
		logger: logger,
104
	}
105
	// Apply options to customize the collector
106
	for _, o := range options {
107
		err := o(p)
108
		if err != nil {
109
			return nil, err
110
		}
111
	}
112

113
	f := make(map[string]bool)
114
	for _, filter := range filters {
115
		enabled, exist := collectorState[filter]
116
		if !exist {
117
			return nil, fmt.Errorf("missing collector: %s", filter)
118
		}
119
		if !*enabled {
120
			return nil, fmt.Errorf("disabled collector: %s", filter)
121
		}
122
		f[filter] = true
123
	}
124
	collectors := make(map[string]Collector)
125
	initiatedCollectorsMtx.Lock()
126
	defer initiatedCollectorsMtx.Unlock()
127
	for key, enabled := range collectorState {
128
		if !*enabled || (len(f) > 0 && !f[key]) {
129
			continue
130
		}
131
		if collector, ok := initiatedCollectors[key]; ok {
132
			collectors[key] = collector
133
		} else {
134
			collector, err := factories[key](collectorConfig{
135
				logger:           log.With(logger, "collector", key),
136
				excludeDatabases: excludeDatabases,
137
			})
138
			if err != nil {
139
				return nil, err
140
			}
141
			collectors[key] = collector
142
			initiatedCollectors[key] = collector
143
		}
144
	}
145

146
	p.Collectors = collectors
147

148
	if dsn == "" {
149
		return nil, errors.New("empty dsn")
150
	}
151

152
	instance, err := newInstance(dsn)
153
	if err != nil {
154
		return nil, err
155
	}
156
	p.instance = instance
157

158
	return p, nil
159
}
160

161
// Describe implements the prometheus.Collector interface.
162
func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) {
163
	ch <- scrapeDurationDesc
164
	ch <- scrapeSuccessDesc
165
}
166

167
// Collect implements the prometheus.Collector interface.
168
func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
169
	ctx := context.TODO()
170

171
	// copy the instance so that concurrent scrapes have independent instances
172
	inst := p.instance.copy()
173

174
	// Set up the database connection for the collector.
175
	err := inst.setup()
176
	if err != nil {
177
		level.Error(p.logger).Log("msg", "Error opening connection to database", "err", err)
178
		return
179
	}
180
	defer inst.Close()
181

182
	wg := sync.WaitGroup{}
183
	wg.Add(len(p.Collectors))
184
	for name, c := range p.Collectors {
185
		go func(name string, c Collector) {
186
			execute(ctx, name, c, inst, ch, p.logger)
187
			wg.Done()
188
		}(name, c)
189
	}
190
	wg.Wait()
191
}
192

193
func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) {
194
	begin := time.Now()
195
	err := c.Update(ctx, instance, ch)
196
	duration := time.Since(begin)
197
	var success float64
198

199
	if err != nil {
200
		if IsNoDataError(err) {
201
			level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err)
202
		} else {
203
			level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
204
		}
205
		success = 0
206
	} else {
207
		level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds())
208
		success = 1
209
	}
210
	ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name)
211
	ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name)
212
}
213

214
// collectorFlagAction generates a new action function for the given collector
215
// to track whether it has been explicitly enabled or disabled from the command line.
216
// A new action function is needed for each collector flag because the ParseContext
217
// does not contain information about which flag called the action.
218
// See: https://github.com/alecthomas/kingpin/issues/294
219
func collectorFlagAction(collector string) func(ctx *kingpin.ParseContext) error {
220
	return func(ctx *kingpin.ParseContext) error {
221
		forcedCollectors[collector] = true
222
		return nil
223
	}
224
}
225

226
// ErrNoData indicates the collector found no data to collect, but had no other error.
227
var ErrNoData = errors.New("collector returned no data")
228

229
func IsNoDataError(err error) bool {
230
	return err == ErrNoData
231
}
232

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.