cubefs

Форк
0
429 строк · 11.2 Кб
1
// Copyright 2015 The Prometheus Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package expfmt
15

16
import (
17
	"fmt"
18
	"io"
19
	"math"
20
	"mime"
21
	"net/http"
22

23
	dto "github.com/prometheus/client_model/go"
24

25
	"github.com/matttproud/golang_protobuf_extensions/pbutil"
26
	"github.com/prometheus/common/model"
27
)
28

29
// Decoder types decode an input stream into metric families.
30
type Decoder interface {
31
	Decode(*dto.MetricFamily) error
32
}
33

34
// DecodeOptions contains options used by the Decoder and in sample extraction.
35
type DecodeOptions struct {
36
	// Timestamp is added to each value from the stream that has no explicit timestamp set.
37
	Timestamp model.Time
38
}
39

40
// ResponseFormat extracts the correct format from a HTTP response header.
41
// If no matching format can be found FormatUnknown is returned.
42
func ResponseFormat(h http.Header) Format {
43
	ct := h.Get(hdrContentType)
44

45
	mediatype, params, err := mime.ParseMediaType(ct)
46
	if err != nil {
47
		return FmtUnknown
48
	}
49

50
	const textType = "text/plain"
51

52
	switch mediatype {
53
	case ProtoType:
54
		if p, ok := params["proto"]; ok && p != ProtoProtocol {
55
			return FmtUnknown
56
		}
57
		if e, ok := params["encoding"]; ok && e != "delimited" {
58
			return FmtUnknown
59
		}
60
		return FmtProtoDelim
61

62
	case textType:
63
		if v, ok := params["version"]; ok && v != TextVersion {
64
			return FmtUnknown
65
		}
66
		return FmtText
67
	}
68

69
	return FmtUnknown
70
}
71

72
// NewDecoder returns a new decoder based on the given input format.
73
// If the input format does not imply otherwise, a text format decoder is returned.
74
func NewDecoder(r io.Reader, format Format) Decoder {
75
	switch format {
76
	case FmtProtoDelim:
77
		return &protoDecoder{r: r}
78
	}
79
	return &textDecoder{r: r}
80
}
81

82
// protoDecoder implements the Decoder interface for protocol buffers.
83
type protoDecoder struct {
84
	r io.Reader
85
}
86

87
// Decode implements the Decoder interface.
88
func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
89
	_, err := pbutil.ReadDelimited(d.r, v)
90
	if err != nil {
91
		return err
92
	}
93
	if !model.IsValidMetricName(model.LabelValue(v.GetName())) {
94
		return fmt.Errorf("invalid metric name %q", v.GetName())
95
	}
96
	for _, m := range v.GetMetric() {
97
		if m == nil {
98
			continue
99
		}
100
		for _, l := range m.GetLabel() {
101
			if l == nil {
102
				continue
103
			}
104
			if !model.LabelValue(l.GetValue()).IsValid() {
105
				return fmt.Errorf("invalid label value %q", l.GetValue())
106
			}
107
			if !model.LabelName(l.GetName()).IsValid() {
108
				return fmt.Errorf("invalid label name %q", l.GetName())
109
			}
110
		}
111
	}
112
	return nil
113
}
114

115
// textDecoder implements the Decoder interface for the text protocol.
116
type textDecoder struct {
117
	r    io.Reader
118
	p    TextParser
119
	fams []*dto.MetricFamily
120
}
121

122
// Decode implements the Decoder interface.
123
func (d *textDecoder) Decode(v *dto.MetricFamily) error {
124
	// TODO(fabxc): Wrap this as a line reader to make streaming safer.
125
	if len(d.fams) == 0 {
126
		// No cached metric families, read everything and parse metrics.
127
		fams, err := d.p.TextToMetricFamilies(d.r)
128
		if err != nil {
129
			return err
130
		}
131
		if len(fams) == 0 {
132
			return io.EOF
133
		}
134
		d.fams = make([]*dto.MetricFamily, 0, len(fams))
135
		for _, f := range fams {
136
			d.fams = append(d.fams, f)
137
		}
138
	}
139

140
	*v = *d.fams[0]
141
	d.fams = d.fams[1:]
142

143
	return nil
144
}
145

146
// SampleDecoder wraps a Decoder to extract samples from the metric families
147
// decoded by the wrapped Decoder.
148
type SampleDecoder struct {
149
	Dec  Decoder
150
	Opts *DecodeOptions
151

152
	f dto.MetricFamily
153
}
154

155
// Decode calls the Decode method of the wrapped Decoder and then extracts the
156
// samples from the decoded MetricFamily into the provided model.Vector.
157
func (sd *SampleDecoder) Decode(s *model.Vector) error {
158
	err := sd.Dec.Decode(&sd.f)
159
	if err != nil {
160
		return err
161
	}
162
	*s, err = extractSamples(&sd.f, sd.Opts)
163
	return err
164
}
165

166
// ExtractSamples builds a slice of samples from the provided metric
167
// families. If an error occurs during sample extraction, it continues to
168
// extract from the remaining metric families. The returned error is the last
169
// error that has occurred.
170
func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) (model.Vector, error) {
171
	var (
172
		all     model.Vector
173
		lastErr error
174
	)
175
	for _, f := range fams {
176
		some, err := extractSamples(f, o)
177
		if err != nil {
178
			lastErr = err
179
			continue
180
		}
181
		all = append(all, some...)
182
	}
183
	return all, lastErr
184
}
185

186
func extractSamples(f *dto.MetricFamily, o *DecodeOptions) (model.Vector, error) {
187
	switch f.GetType() {
188
	case dto.MetricType_COUNTER:
189
		return extractCounter(o, f), nil
190
	case dto.MetricType_GAUGE:
191
		return extractGauge(o, f), nil
192
	case dto.MetricType_SUMMARY:
193
		return extractSummary(o, f), nil
194
	case dto.MetricType_UNTYPED:
195
		return extractUntyped(o, f), nil
196
	case dto.MetricType_HISTOGRAM:
197
		return extractHistogram(o, f), nil
198
	}
199
	return nil, fmt.Errorf("expfmt.extractSamples: unknown metric family type %v", f.GetType())
200
}
201

202
func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
203
	samples := make(model.Vector, 0, len(f.Metric))
204

205
	for _, m := range f.Metric {
206
		if m.Counter == nil {
207
			continue
208
		}
209

210
		lset := make(model.LabelSet, len(m.Label)+1)
211
		for _, p := range m.Label {
212
			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
213
		}
214
		lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
215

216
		smpl := &model.Sample{
217
			Metric: model.Metric(lset),
218
			Value:  model.SampleValue(m.Counter.GetValue()),
219
		}
220

221
		if m.TimestampMs != nil {
222
			smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
223
		} else {
224
			smpl.Timestamp = o.Timestamp
225
		}
226

227
		samples = append(samples, smpl)
228
	}
229

230
	return samples
231
}
232

233
func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
234
	samples := make(model.Vector, 0, len(f.Metric))
235

236
	for _, m := range f.Metric {
237
		if m.Gauge == nil {
238
			continue
239
		}
240

241
		lset := make(model.LabelSet, len(m.Label)+1)
242
		for _, p := range m.Label {
243
			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
244
		}
245
		lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
246

247
		smpl := &model.Sample{
248
			Metric: model.Metric(lset),
249
			Value:  model.SampleValue(m.Gauge.GetValue()),
250
		}
251

252
		if m.TimestampMs != nil {
253
			smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
254
		} else {
255
			smpl.Timestamp = o.Timestamp
256
		}
257

258
		samples = append(samples, smpl)
259
	}
260

261
	return samples
262
}
263

264
func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
265
	samples := make(model.Vector, 0, len(f.Metric))
266

267
	for _, m := range f.Metric {
268
		if m.Untyped == nil {
269
			continue
270
		}
271

272
		lset := make(model.LabelSet, len(m.Label)+1)
273
		for _, p := range m.Label {
274
			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
275
		}
276
		lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
277

278
		smpl := &model.Sample{
279
			Metric: model.Metric(lset),
280
			Value:  model.SampleValue(m.Untyped.GetValue()),
281
		}
282

283
		if m.TimestampMs != nil {
284
			smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
285
		} else {
286
			smpl.Timestamp = o.Timestamp
287
		}
288

289
		samples = append(samples, smpl)
290
	}
291

292
	return samples
293
}
294

295
func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
296
	samples := make(model.Vector, 0, len(f.Metric))
297

298
	for _, m := range f.Metric {
299
		if m.Summary == nil {
300
			continue
301
		}
302

303
		timestamp := o.Timestamp
304
		if m.TimestampMs != nil {
305
			timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
306
		}
307

308
		for _, q := range m.Summary.Quantile {
309
			lset := make(model.LabelSet, len(m.Label)+2)
310
			for _, p := range m.Label {
311
				lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
312
			}
313
			// BUG(matt): Update other names to "quantile".
314
			lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
315
			lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
316

317
			samples = append(samples, &model.Sample{
318
				Metric:    model.Metric(lset),
319
				Value:     model.SampleValue(q.GetValue()),
320
				Timestamp: timestamp,
321
			})
322
		}
323

324
		lset := make(model.LabelSet, len(m.Label)+1)
325
		for _, p := range m.Label {
326
			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
327
		}
328
		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
329

330
		samples = append(samples, &model.Sample{
331
			Metric:    model.Metric(lset),
332
			Value:     model.SampleValue(m.Summary.GetSampleSum()),
333
			Timestamp: timestamp,
334
		})
335

336
		lset = make(model.LabelSet, len(m.Label)+1)
337
		for _, p := range m.Label {
338
			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
339
		}
340
		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
341

342
		samples = append(samples, &model.Sample{
343
			Metric:    model.Metric(lset),
344
			Value:     model.SampleValue(m.Summary.GetSampleCount()),
345
			Timestamp: timestamp,
346
		})
347
	}
348

349
	return samples
350
}
351

352
func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
353
	samples := make(model.Vector, 0, len(f.Metric))
354

355
	for _, m := range f.Metric {
356
		if m.Histogram == nil {
357
			continue
358
		}
359

360
		timestamp := o.Timestamp
361
		if m.TimestampMs != nil {
362
			timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
363
		}
364

365
		infSeen := false
366

367
		for _, q := range m.Histogram.Bucket {
368
			lset := make(model.LabelSet, len(m.Label)+2)
369
			for _, p := range m.Label {
370
				lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
371
			}
372
			lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
373
			lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
374

375
			if math.IsInf(q.GetUpperBound(), +1) {
376
				infSeen = true
377
			}
378

379
			samples = append(samples, &model.Sample{
380
				Metric:    model.Metric(lset),
381
				Value:     model.SampleValue(q.GetCumulativeCount()),
382
				Timestamp: timestamp,
383
			})
384
		}
385

386
		lset := make(model.LabelSet, len(m.Label)+1)
387
		for _, p := range m.Label {
388
			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
389
		}
390
		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
391

392
		samples = append(samples, &model.Sample{
393
			Metric:    model.Metric(lset),
394
			Value:     model.SampleValue(m.Histogram.GetSampleSum()),
395
			Timestamp: timestamp,
396
		})
397

398
		lset = make(model.LabelSet, len(m.Label)+1)
399
		for _, p := range m.Label {
400
			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
401
		}
402
		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
403

404
		count := &model.Sample{
405
			Metric:    model.Metric(lset),
406
			Value:     model.SampleValue(m.Histogram.GetSampleCount()),
407
			Timestamp: timestamp,
408
		}
409
		samples = append(samples, count)
410

411
		if !infSeen {
412
			// Append an infinity bucket sample.
413
			lset := make(model.LabelSet, len(m.Label)+2)
414
			for _, p := range m.Label {
415
				lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
416
			}
417
			lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
418
			lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
419

420
			samples = append(samples, &model.Sample{
421
				Metric:    model.Metric(lset),
422
				Value:     count.Value,
423
				Timestamp: timestamp,
424
			})
425
		}
426
	}
427

428
	return samples
429
}
430

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.