pangolin_exporter
568 строк · 20.2 Кб
1// Copyright 2022 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package collector
15
16import (
17"context"
18"database/sql"
19"strconv"
20
21"github.com/blang/semver/v4"
22"github.com/go-kit/log"
23"github.com/go-kit/log/level"
24"github.com/prometheus/client_golang/prometheus"
25)
26
27const statDatabaseSubsystem = "pangolin_database"
28
29func init() {
30registerCollector(statDatabaseSubsystem, defaultEnabled, NewPGdatabaseCollector)
31}
32
33type PGdatabaseCollector struct {
34log log.Logger
35}
36
37func NewPGdatabaseCollector(config collectorConfig) (Collector, error) {
38return &PGdatabaseCollector{log: config.logger}, nil
39}
40
41var (
42// По просьбе НТ, для совместимости оставил имена pg_stat_database_xact_rollback, pg_stat_database_xact_commit
43pg_commit = prometheus.NewDesc(
44prometheus.BuildFQName(
45"pg",
46"stat_database",
47"xact_commit",
48),
49"Total number of transactions had been committed.",
50[]string{"datname"},
51prometheus.Labels{},
52)
53pg_rollback = prometheus.NewDesc(
54prometheus.BuildFQName(
55"pg",
56"stat_database",
57"xact_rollback",
58),
59"Total number of transactions had been rolled back.",
60[]string{"datname"},
61prometheus.Labels{},
62)
63
64commits = prometheus.NewDesc(
65prometheus.BuildFQName(
66"",
67statDatabaseSubsystem,
68"xact_commits_total",
69),
70"Total number of transactions had been committed.",
71[]string{"database"},
72prometheus.Labels{},
73)
74rollbacks = prometheus.NewDesc(
75prometheus.BuildFQName(
76"",
77statDatabaseSubsystem,
78"xact_rollbacks_total",
79),
80"Total number of transactions had been rolled back.",
81[]string{"database"},
82prometheus.Labels{},
83)
84blocks = prometheus.NewDesc(
85prometheus.BuildFQName(
86"",
87statDatabaseSubsystem,
88"blocks_total",
89),
90"Total number of disk blocks had been accessed by each type of access.",
91[]string{"database", "access"},
92prometheus.Labels{},
93)
94tuplesReturned = prometheus.NewDesc(
95prometheus.BuildFQName(
96"",
97statDatabaseSubsystem,
98"tuples_returned_total",
99),
100"Total number of rows returned per each database.",
101[]string{"database"},
102prometheus.Labels{},
103)
104tuplesFetched = prometheus.NewDesc(
105prometheus.BuildFQName(
106"",
107statDatabaseSubsystem,
108"tuples_fetched_total",
109),
110"Total number of rows fetched per each database.",
111[]string{"database"},
112prometheus.Labels{},
113)
114tuplesInserted = prometheus.NewDesc(
115prometheus.BuildFQName(
116"",
117statDatabaseSubsystem,
118"tuples_inserted_total",
119),
120"Total number of rows inserted per each database.",
121[]string{"database"},
122prometheus.Labels{},
123)
124tuplesUpdated = prometheus.NewDesc(
125prometheus.BuildFQName(
126"",
127statDatabaseSubsystem,
128"tuples_updated_total",
129),
130"Total number of rows updated per each database.",
131[]string{"database"},
132prometheus.Labels{},
133)
134tuplesDeleted = prometheus.NewDesc(
135prometheus.BuildFQName(
136"",
137statDatabaseSubsystem,
138"tuples_deleted_total",
139),
140"Total number of rows deleted per each database.",
141[]string{"database"},
142prometheus.Labels{},
143)
144tempbytes_ = prometheus.NewDesc(
145prometheus.BuildFQName(
146"",
147statDatabaseSubsystem,
148"temp_bytes_total",
149),
150"Total number of temporary files created by queries.",
151[]string{"database"},
152prometheus.Labels{},
153)
154tempfiles_ = prometheus.NewDesc(
155prometheus.BuildFQName(
156"",
157statDatabaseSubsystem,
158"temp_files_total",
159),
160"Total number of temporary files created by queries.",
161[]string{"database"},
162prometheus.Labels{},
163)
164conflicts_ = prometheus.NewDesc(
165prometheus.BuildFQName(
166"",
167statDatabaseSubsystem,
168"conflicts_total",
169),
170"Total number of recovery conflicts occurred.",
171[]string{"database"},
172prometheus.Labels{},
173)
174deadlocks_ = prometheus.NewDesc(
175prometheus.BuildFQName(
176"",
177statDatabaseSubsystem,
178"deadlocks_total",
179),
180"Total number of deadlocks occurred.",
181[]string{"database"},
182prometheus.Labels{},
183)
184csumfails_ = prometheus.NewDesc(
185prometheus.BuildFQName(
186"",
187statDatabaseSubsystem,
188"checksum_failures_total",
189),
190"Total number of checksum failures occurred.",
191[]string{"database"},
192prometheus.Labels{},
193)
194csumlastfailunixts_ = prometheus.NewDesc(
195prometheus.BuildFQName(
196"",
197statDatabaseSubsystem,
198"last_checksum_failure_seconds",
199),
200"Time of the last checksum failure occurred, in unixtime.",
201[]string{"database"},
202prometheus.Labels{},
203)
204blockstime = prometheus.NewDesc(
205prometheus.BuildFQName(
206"",
207statDatabaseSubsystem,
208"blk_time_seconds_total",
209),
210"Total time spent accessing data blocks by backends in this database in each access type, in seconds.",
211[]string{"database", "type"},
212prometheus.Labels{},
213)
214sessionalltime = prometheus.NewDesc(
215prometheus.BuildFQName(
216"",
217statDatabaseSubsystem,
218"session_time_seconds_all_total",
219),
220"Total time spent by database sessions in this database in all states, in seconds.",
221[]string{"database"},
222prometheus.Labels{},
223)
224sessiontime_ = prometheus.NewDesc(
225prometheus.BuildFQName(
226"",
227statDatabaseSubsystem,
228"session_time_seconds_total",
229),
230"Total time spent by database sessions in this database in each state, in seconds.",
231[]string{"database", "state"},
232prometheus.Labels{},
233)
234sessionsall = prometheus.NewDesc(
235prometheus.BuildFQName(
236"",
237statDatabaseSubsystem,
238"sessions_all_total",
239),
240"Total number of sessions established to this database.",
241[]string{"database"},
242prometheus.Labels{},
243)
244sessions_ = prometheus.NewDesc(
245prometheus.BuildFQName(
246"",
247statDatabaseSubsystem,
248"sessions_total",
249),
250"Total number of sessions established to this database and closed by each reason.",
251[]string{"database", "reason"},
252prometheus.Labels{},
253)
254sizes = prometheus.NewDesc(
255prometheus.BuildFQName(
256"",
257statDatabaseSubsystem,
258"size_bytes",
259),
260"Total size of the database, in bytes.",
261[]string{"database"},
262prometheus.Labels{},
263)
264statsage_ = prometheus.NewDesc(
265prometheus.BuildFQName(
266"",
267statDatabaseSubsystem,
268"stats_age_seconds_total",
269),
270"The age of the databases activity statistics, in seconds.",
271[]string{"database"},
272prometheus.Labels{},
273)
274xidlimit_ = prometheus.NewDesc(
275prometheus.BuildFQName(
276"",
277statDatabaseSubsystem,
278"left_before_wraparound",
279),
280"The number of transactions left before force shutdown due to XID wraparound.",
281[]string{"xid_from"},
282prometheus.Labels{},
283)
284
285// databasesQuery11 = "SELECT " +
286// "coalesce(datname, 'global') AS database, " +
287// "xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, " +
288// "conflicts, temp_files, temp_bytes, deadlocks, blk_read_time, blk_write_time, pg_database_size(datname) as size_bytes, " +
289// "coalesce(extract('epoch' from age(now(), stats_reset)), 0) as stats_age_seconds " +
290// "FROM pg_stat_database WHERE datname IN (SELECT datname FROM pg_database WHERE datallowconn AND NOT datistemplate) " +
291// "OR datname IS NULL"
292
293databasesQuery13 = "SELECT " +
294"coalesce(datname, 'global') AS database, " +
295"xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, " +
296"conflicts, temp_files, temp_bytes, deadlocks, checksum_failures, coalesce(extract(epoch from checksum_last_failure), 0) AS last_checksum_failure_unixtime, " +
297"blk_read_time, blk_write_time, pg_database_size(datname) as size_bytes, " +
298"coalesce(extract('epoch' from age(now(), stats_reset)), 0) as stats_age_seconds " +
299"FROM pg_stat_database WHERE datname IN (SELECT datname FROM pg_database WHERE datallowconn AND NOT datistemplate) " +
300"OR datname IS NULL"
301
302databasesQueryLatest = "SELECT " +
303"coalesce(datname, 'global') AS database, " +
304"xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, " +
305"conflicts, temp_files, temp_bytes, deadlocks, checksum_failures, coalesce(extract(epoch from checksum_last_failure), 0) AS last_checksum_failure_unixtime, " +
306"blk_read_time, blk_write_time, " +
307"session_time, active_time, idle_in_transaction_time, sessions, sessions_abandoned, sessions_fatal, sessions_killed, " +
308"pg_database_size(datname) as size_bytes, " +
309"coalesce(extract('epoch' from age(now(), stats_reset)), 0) as stats_age_seconds " +
310"FROM pg_stat_database WHERE datname IN (SELECT datname FROM pg_database WHERE datallowconn AND NOT datistemplate) " +
311"OR datname IS NULL"
312
313xidLimitQuery = `SELECT 'database' AS src, 2147483647 - greatest(max(age(datfrozenxid)), max(age(coalesce(nullif(datminmxid, 1), datfrozenxid)))) AS to_limit FROM pg_database
314UNION SELECT 'prepared_xacts' AS src, 2147483647 - coalesce(max(age(transaction)), 0) AS to_limit FROM pg_prepared_xacts
315UNION SELECT 'replication_slots' AS src, 2147483647 - greatest(coalesce(min(age(xmin)), 0), coalesce(min(age(catalog_xmin)), 0)) AS to_limit
316FROM pg_replication_slots;`
317)
318
319func (c *PGdatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
320db := instance.getDB()
321after13 := instance.version.Compare(semver.MustParse("13.0.0"))
322after14 := instance.version.Compare(semver.MustParse("14.0.0"))
323if after13 < 0 {
324level.Warn(c.log).Log("msg", "pangolin_database collector is not available on PostgreSQL < 13.0.0, skipping")
325return nil
326}
327type xid_str struct {
328src sql.NullString
329to_limit sql.NullString
330}
331type xidLimitStats struct {
332database float64 // based on pg_database.datfrozenxid and datminmxid
333prepared float64 // based on pg_prepared_xacts.transaction
334replSlot float64 // based on pg_replication_slots.xmin and catalog_xmin
335}
336rows_xid, err_xid := db.QueryContext(ctx, xidLimitQuery)
337if err_xid != nil {
338return err_xid
339}
340defer rows_xid.Close()
341
342var xid xid_str
343var xids = make([]xid_str, 0)
344var xidLimit xidLimitStats
345for rows_xid.Next() {
346err_xid := rows_xid.Scan(
347&xid.src, &xid.to_limit,
348)
349if err_xid != nil {
350return err_xid
351}
352xids = append(xids, xid)
353}
354
355for _, row := range xids {
356// Get data value and convert it to float64 used by Prometheus.
357value, err := strconv.ParseFloat(row.to_limit.String, 64)
358if err != nil {
359level.Warn(c.log).Log("invalid input, parse '%s' failed: %s; skip", row.to_limit.String, err)
360continue
361}
362switch row.src.String {
363case "database":
364xidLimit.database = value
365case "prepared_xacts":
366xidLimit.prepared = value
367case "replication_slots":
368xidLimit.replSlot = value
369}
370}
371
372ch <- prometheus.MustNewConstMetric(xidlimit_, prometheus.CounterValue, float64(xidLimit.database), "pg_database")
373ch <- prometheus.MustNewConstMetric(xidlimit_, prometheus.CounterValue, float64(xidLimit.prepared), "pg_prepared_xacts")
374ch <- prometheus.MustNewConstMetric(xidlimit_, prometheus.CounterValue, float64(xidLimit.replSlot), "pg_replication_slots")
375
376query := databasesQuery13
377if after14 >= 0 {
378query = databasesQueryLatest
379}
380
381rows, err := db.QueryContext(ctx, query)
382if err != nil {
383return err
384}
385defer rows.Close()
386
387var database sql.NullString
388var xactcommit, xactrollback, blksread, blkshit, tupreturned, tupfetched, tupinserted sql.NullFloat64
389var tupupdated, tupdeleted, conflicts, tempfiles, tempbytes, deadlocks, csumfails, csumlastfailunixts sql.NullFloat64
390var blkreadtime, blkwritetime sql.NullFloat64
391var sessiontime, activetime, idletxtime, sessions, sessabandoned, sessfatal, sesskilled sql.NullFloat64
392var sizebytes, statsage sql.NullFloat64
393
394for rows.Next() {
395if after13 >= 0 && after14 < 0 {
396err := rows.Scan(
397&database,
398&xactcommit, &xactrollback, &blksread, &blkshit, &tupreturned, &tupfetched, &tupinserted,
399&tupupdated, &tupdeleted, &conflicts, &tempfiles, &tempbytes, &deadlocks, &csumfails, &csumlastfailunixts,
400&blkreadtime, &blkwritetime,
401&sizebytes, &statsage,
402)
403if err != nil {
404return err
405}
406}
407if after14 >= 0 {
408err := rows.Scan(
409&database,
410&xactcommit, &xactrollback, &blksread, &blkshit, &tupreturned, &tupfetched, &tupinserted,
411&tupupdated, &tupdeleted, &conflicts, &tempfiles, &tempbytes, &deadlocks, &csumfails, &csumlastfailunixts,
412&blkreadtime, &blkwritetime,
413&sessiontime, &activetime, &idletxtime, &sessions, &sessabandoned, &sessfatal, &sesskilled,
414&sizebytes, &statsage,
415)
416if err != nil {
417return err
418}
419if !sessiontime.Valid {
420level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no sessiontime")
421continue
422}
423if !activetime.Valid {
424level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no activetime")
425continue
426}
427if !idletxtime.Valid {
428level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no idletxtime")
429continue
430}
431if !sessions.Valid {
432level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no sessions")
433continue
434}
435if !sessabandoned.Valid {
436level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no sessabandoned")
437continue
438}
439if !sessfatal.Valid {
440level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no sessfatal")
441continue
442}
443if !sesskilled.Valid {
444level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no sesskilled")
445continue
446}
447ch <- prometheus.MustNewConstMetric(sessionalltime, prometheus.CounterValue, float64(sessiontime.Float64), database.String)
448ch <- prometheus.MustNewConstMetric(sessiontime_, prometheus.CounterValue, float64(activetime.Float64), database.String, "active")
449ch <- prometheus.MustNewConstMetric(sessiontime_, prometheus.CounterValue, float64(idletxtime.Float64), database.String, "idle_in_transaction")
450ch <- prometheus.MustNewConstMetric(sessiontime_, prometheus.CounterValue, float64(sessiontime.Float64)-(float64(activetime.Float64)+float64(idletxtime.Float64)), database.String, "idle")
451ch <- prometheus.MustNewConstMetric(sessionsall, prometheus.CounterValue, float64(sessions.Float64), database.String)
452ch <- prometheus.MustNewConstMetric(sessions_, prometheus.CounterValue, float64(sessabandoned.Float64), database.String, "abandoned")
453ch <- prometheus.MustNewConstMetric(sessions_, prometheus.CounterValue, float64(sessfatal.Float64), database.String, "fatal")
454ch <- prometheus.MustNewConstMetric(sessions_, prometheus.CounterValue, float64(sesskilled.Float64), database.String, "killed")
455ch <- prometheus.MustNewConstMetric(sessions_, prometheus.CounterValue, float64(sessions.Float64)-(float64(sessabandoned.Float64)+float64(sessfatal.Float64)+float64(sesskilled.Float64)), database.String, "normal")
456}
457if !database.Valid {
458level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no database")
459continue
460}
461if !xactcommit.Valid {
462level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xactcommit")
463continue
464}
465if !xactrollback.Valid {
466level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xactrollback")
467continue
468}
469if !blksread.Valid {
470level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blksread")
471continue
472}
473if !blkshit.Valid {
474level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blkshit")
475continue
476}
477if !tupreturned.Valid {
478level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tupreturned")
479continue
480}
481if !tupfetched.Valid {
482level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tupfetched")
483continue
484}
485if !tupinserted.Valid {
486level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tupinserted")
487continue
488}
489if !tupupdated.Valid {
490level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tupupdated")
491continue
492}
493if !tupdeleted.Valid {
494level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tupdeleted")
495continue
496}
497if !conflicts.Valid {
498level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no conflicts")
499continue
500}
501if !tempfiles.Valid {
502level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tempfiles")
503continue
504}
505if !tempbytes.Valid {
506level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tempbytes")
507continue
508}
509if !deadlocks.Valid {
510level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no deadlocks")
511continue
512}
513if !csumfails.Valid {
514level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no csumfails")
515continue
516}
517if !csumlastfailunixts.Valid {
518level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no csumlastfailunixts")
519continue
520}
521if !blkreadtime.Valid {
522level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blkreadtime")
523continue
524}
525if !blkwritetime.Valid {
526level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blkwritetime")
527continue
528}
529if !sizebytes.Valid {
530level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no sizebytes")
531continue
532}
533if !statsage.Valid {
534level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no statsage")
535continue
536}
537// По просьбе НТ, для совместимости оставил имена pg_stat_database_xact_rollback, pg_stat_database_xact_commit
538ch <- prometheus.MustNewConstMetric(pg_commit, prometheus.CounterValue, float64(xactcommit.Float64), database.String)
539ch <- prometheus.MustNewConstMetric(pg_rollback, prometheus.CounterValue, float64(xactrollback.Float64), database.String)
540
541ch <- prometheus.MustNewConstMetric(commits, prometheus.CounterValue, float64(xactcommit.Float64), database.String)
542ch <- prometheus.MustNewConstMetric(rollbacks, prometheus.CounterValue, float64(xactrollback.Float64), database.String)
543ch <- prometheus.MustNewConstMetric(blocks, prometheus.CounterValue, float64(blksread.Float64), database.String, "read")
544ch <- prometheus.MustNewConstMetric(blocks, prometheus.CounterValue, float64(blkshit.Float64), database.String, "hit")
545
546ch <- prometheus.MustNewConstMetric(tuplesReturned, prometheus.CounterValue, float64(tupreturned.Float64), database.String)
547ch <- prometheus.MustNewConstMetric(tuplesFetched, prometheus.CounterValue, float64(tupfetched.Float64), database.String)
548ch <- prometheus.MustNewConstMetric(tuplesInserted, prometheus.CounterValue, float64(tupinserted.Float64), database.String)
549ch <- prometheus.MustNewConstMetric(tuplesUpdated, prometheus.CounterValue, float64(tupupdated.Float64), database.String)
550ch <- prometheus.MustNewConstMetric(tuplesDeleted, prometheus.CounterValue, float64(tupdeleted.Float64), database.String)
551
552ch <- prometheus.MustNewConstMetric(tempbytes_, prometheus.CounterValue, float64(tempbytes.Float64), database.String)
553ch <- prometheus.MustNewConstMetric(tempfiles_, prometheus.CounterValue, float64(tempfiles.Float64), database.String)
554ch <- prometheus.MustNewConstMetric(conflicts_, prometheus.CounterValue, float64(conflicts.Float64), database.String)
555ch <- prometheus.MustNewConstMetric(deadlocks_, prometheus.CounterValue, float64(deadlocks.Float64), database.String)
556
557ch <- prometheus.MustNewConstMetric(blockstime, prometheus.CounterValue, float64(blkreadtime.Float64), database.String, "read")
558ch <- prometheus.MustNewConstMetric(blockstime, prometheus.CounterValue, float64(blkwritetime.Float64), database.String, "write")
559ch <- prometheus.MustNewConstMetric(sizes, prometheus.CounterValue, float64(sizebytes.Float64), database.String)
560ch <- prometheus.MustNewConstMetric(statsage_, prometheus.CounterValue, float64(statsage.Float64), database.String)
561
562ch <- prometheus.MustNewConstMetric(csumfails_, prometheus.CounterValue, float64(csumfails.Float64), database.String)
563ch <- prometheus.MustNewConstMetric(csumlastfailunixts_, prometheus.CounterValue, float64(csumlastfailunixts.Float64), database.String)
564
565}
566
567return nil
568}
569