pangolin_exporter
231 строка · 6.7 Кб
1// Copyright 2022 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package collector
15
16import (
17"context"
18"errors"
19"fmt"
20"sync"
21"time"
22
23"github.com/alecthomas/kingpin/v2"
24"github.com/go-kit/log"
25"github.com/go-kit/log/level"
26"github.com/prometheus/client_golang/prometheus"
27)
28
29var (
30factories = make(map[string]func(collectorConfig) (Collector, error))
31initiatedCollectorsMtx = sync.Mutex{}
32initiatedCollectors = make(map[string]Collector)
33collectorState = make(map[string]*bool)
34forcedCollectors = map[string]bool{} // collectors which have been explicitly enabled or disabled
35)
36
37const (
38// Namespace for all metrics.
39namespace = "pg"
40namespace_pangolin = "pangolin"
41
42defaultEnabled = true
43defaultDisabled = false
44)
45
46var (
47scrapeDurationDesc = prometheus.NewDesc(
48prometheus.BuildFQName(namespace, "scrape", "collector_duration_seconds"),
49"pangolin_exporter: Duration of a collector scrape.",
50[]string{"collector"},
51nil,
52)
53scrapeSuccessDesc = prometheus.NewDesc(
54prometheus.BuildFQName(namespace, "scrape", "collector_success"),
55"postgres_exporter: Whether a collector succeeded.",
56[]string{"collector"},
57nil,
58)
59)
60
61type Collector interface {
62Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error
63}
64
65type collectorConfig struct {
66logger log.Logger
67excludeDatabases []string
68}
69
70func registerCollector(name string, isDefaultEnabled bool, createFunc func(collectorConfig) (Collector, error)) {
71var helpDefaultState string
72if isDefaultEnabled {
73helpDefaultState = "enabled"
74} else {
75helpDefaultState = "disabled"
76}
77
78// Create flag for this collector
79flagName := fmt.Sprintf("collector.%s", name)
80flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState)
81defaultValue := fmt.Sprintf("%v", isDefaultEnabled)
82
83flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(name)).Bool()
84collectorState[name] = flag
85
86// Register the create function for this collector
87factories[name] = createFunc
88}
89
90// PostgresCollector implements the prometheus.Collector interface.
91type PostgresCollector struct {
92Collectors map[string]Collector
93logger log.Logger
94
95instance *instance
96}
97
98type Option func(*PostgresCollector) error
99
100// NewPostgresCollector creates a new PostgresCollector.
101func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn string, filters []string, options ...Option) (*PostgresCollector, error) {
102p := &PostgresCollector{
103logger: logger,
104}
105// Apply options to customize the collector
106for _, o := range options {
107err := o(p)
108if err != nil {
109return nil, err
110}
111}
112
113f := make(map[string]bool)
114for _, filter := range filters {
115enabled, exist := collectorState[filter]
116if !exist {
117return nil, fmt.Errorf("missing collector: %s", filter)
118}
119if !*enabled {
120return nil, fmt.Errorf("disabled collector: %s", filter)
121}
122f[filter] = true
123}
124collectors := make(map[string]Collector)
125initiatedCollectorsMtx.Lock()
126defer initiatedCollectorsMtx.Unlock()
127for key, enabled := range collectorState {
128if !*enabled || (len(f) > 0 && !f[key]) {
129continue
130}
131if collector, ok := initiatedCollectors[key]; ok {
132collectors[key] = collector
133} else {
134collector, err := factories[key](collectorConfig{
135logger: log.With(logger, "collector", key),
136excludeDatabases: excludeDatabases,
137})
138if err != nil {
139return nil, err
140}
141collectors[key] = collector
142initiatedCollectors[key] = collector
143}
144}
145
146p.Collectors = collectors
147
148if dsn == "" {
149return nil, errors.New("empty dsn")
150}
151
152instance, err := newInstance(dsn)
153if err != nil {
154return nil, err
155}
156p.instance = instance
157
158return p, nil
159}
160
161// Describe implements the prometheus.Collector interface.
162func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) {
163ch <- scrapeDurationDesc
164ch <- scrapeSuccessDesc
165}
166
167// Collect implements the prometheus.Collector interface.
168func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
169ctx := context.TODO()
170
171// copy the instance so that concurrent scrapes have independent instances
172inst := p.instance.copy()
173
174// Set up the database connection for the collector.
175err := inst.setup()
176if err != nil {
177level.Error(p.logger).Log("msg", "Error opening connection to database", "err", err)
178return
179}
180defer inst.Close()
181
182wg := sync.WaitGroup{}
183wg.Add(len(p.Collectors))
184for name, c := range p.Collectors {
185go func(name string, c Collector) {
186execute(ctx, name, c, inst, ch, p.logger)
187wg.Done()
188}(name, c)
189}
190wg.Wait()
191}
192
193func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) {
194begin := time.Now()
195err := c.Update(ctx, instance, ch)
196duration := time.Since(begin)
197var success float64
198
199if err != nil {
200if IsNoDataError(err) {
201level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err)
202} else {
203level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
204}
205success = 0
206} else {
207level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds())
208success = 1
209}
210ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name)
211ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name)
212}
213
214// collectorFlagAction generates a new action function for the given collector
215// to track whether it has been explicitly enabled or disabled from the command line.
216// A new action function is needed for each collector flag because the ParseContext
217// does not contain information about which flag called the action.
218// See: https://github.com/alecthomas/kingpin/issues/294
219func collectorFlagAction(collector string) func(ctx *kingpin.ParseContext) error {
220return func(ctx *kingpin.ParseContext) error {
221forcedCollectors[collector] = true
222return nil
223}
224}
225
226// ErrNoData indicates the collector found no data to collect, but had no other error.
227var ErrNoData = errors.New("collector returned no data")
228
229func IsNoDataError(err error) bool {
230return err == ErrNoData
231}
232