cubefs
429 строк · 11.2 Кб
1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package expfmt
15
16import (
17"fmt"
18"io"
19"math"
20"mime"
21"net/http"
22
23dto "github.com/prometheus/client_model/go"
24
25"github.com/matttproud/golang_protobuf_extensions/pbutil"
26"github.com/prometheus/common/model"
27)
28
29// Decoder types decode an input stream into metric families.
30type Decoder interface {
31Decode(*dto.MetricFamily) error
32}
33
34// DecodeOptions contains options used by the Decoder and in sample extraction.
35type DecodeOptions struct {
36// Timestamp is added to each value from the stream that has no explicit timestamp set.
37Timestamp model.Time
38}
39
40// ResponseFormat extracts the correct format from a HTTP response header.
41// If no matching format can be found FormatUnknown is returned.
42func ResponseFormat(h http.Header) Format {
43ct := h.Get(hdrContentType)
44
45mediatype, params, err := mime.ParseMediaType(ct)
46if err != nil {
47return FmtUnknown
48}
49
50const textType = "text/plain"
51
52switch mediatype {
53case ProtoType:
54if p, ok := params["proto"]; ok && p != ProtoProtocol {
55return FmtUnknown
56}
57if e, ok := params["encoding"]; ok && e != "delimited" {
58return FmtUnknown
59}
60return FmtProtoDelim
61
62case textType:
63if v, ok := params["version"]; ok && v != TextVersion {
64return FmtUnknown
65}
66return FmtText
67}
68
69return FmtUnknown
70}
71
72// NewDecoder returns a new decoder based on the given input format.
73// If the input format does not imply otherwise, a text format decoder is returned.
74func NewDecoder(r io.Reader, format Format) Decoder {
75switch format {
76case FmtProtoDelim:
77return &protoDecoder{r: r}
78}
79return &textDecoder{r: r}
80}
81
82// protoDecoder implements the Decoder interface for protocol buffers.
83type protoDecoder struct {
84r io.Reader
85}
86
87// Decode implements the Decoder interface.
88func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
89_, err := pbutil.ReadDelimited(d.r, v)
90if err != nil {
91return err
92}
93if !model.IsValidMetricName(model.LabelValue(v.GetName())) {
94return fmt.Errorf("invalid metric name %q", v.GetName())
95}
96for _, m := range v.GetMetric() {
97if m == nil {
98continue
99}
100for _, l := range m.GetLabel() {
101if l == nil {
102continue
103}
104if !model.LabelValue(l.GetValue()).IsValid() {
105return fmt.Errorf("invalid label value %q", l.GetValue())
106}
107if !model.LabelName(l.GetName()).IsValid() {
108return fmt.Errorf("invalid label name %q", l.GetName())
109}
110}
111}
112return nil
113}
114
115// textDecoder implements the Decoder interface for the text protocol.
116type textDecoder struct {
117r io.Reader
118p TextParser
119fams []*dto.MetricFamily
120}
121
122// Decode implements the Decoder interface.
123func (d *textDecoder) Decode(v *dto.MetricFamily) error {
124// TODO(fabxc): Wrap this as a line reader to make streaming safer.
125if len(d.fams) == 0 {
126// No cached metric families, read everything and parse metrics.
127fams, err := d.p.TextToMetricFamilies(d.r)
128if err != nil {
129return err
130}
131if len(fams) == 0 {
132return io.EOF
133}
134d.fams = make([]*dto.MetricFamily, 0, len(fams))
135for _, f := range fams {
136d.fams = append(d.fams, f)
137}
138}
139
140*v = *d.fams[0]
141d.fams = d.fams[1:]
142
143return nil
144}
145
146// SampleDecoder wraps a Decoder to extract samples from the metric families
147// decoded by the wrapped Decoder.
148type SampleDecoder struct {
149Dec Decoder
150Opts *DecodeOptions
151
152f dto.MetricFamily
153}
154
155// Decode calls the Decode method of the wrapped Decoder and then extracts the
156// samples from the decoded MetricFamily into the provided model.Vector.
157func (sd *SampleDecoder) Decode(s *model.Vector) error {
158err := sd.Dec.Decode(&sd.f)
159if err != nil {
160return err
161}
162*s, err = extractSamples(&sd.f, sd.Opts)
163return err
164}
165
166// ExtractSamples builds a slice of samples from the provided metric
167// families. If an error occurs during sample extraction, it continues to
168// extract from the remaining metric families. The returned error is the last
169// error that has occurred.
170func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) (model.Vector, error) {
171var (
172all model.Vector
173lastErr error
174)
175for _, f := range fams {
176some, err := extractSamples(f, o)
177if err != nil {
178lastErr = err
179continue
180}
181all = append(all, some...)
182}
183return all, lastErr
184}
185
186func extractSamples(f *dto.MetricFamily, o *DecodeOptions) (model.Vector, error) {
187switch f.GetType() {
188case dto.MetricType_COUNTER:
189return extractCounter(o, f), nil
190case dto.MetricType_GAUGE:
191return extractGauge(o, f), nil
192case dto.MetricType_SUMMARY:
193return extractSummary(o, f), nil
194case dto.MetricType_UNTYPED:
195return extractUntyped(o, f), nil
196case dto.MetricType_HISTOGRAM:
197return extractHistogram(o, f), nil
198}
199return nil, fmt.Errorf("expfmt.extractSamples: unknown metric family type %v", f.GetType())
200}
201
202func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
203samples := make(model.Vector, 0, len(f.Metric))
204
205for _, m := range f.Metric {
206if m.Counter == nil {
207continue
208}
209
210lset := make(model.LabelSet, len(m.Label)+1)
211for _, p := range m.Label {
212lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
213}
214lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
215
216smpl := &model.Sample{
217Metric: model.Metric(lset),
218Value: model.SampleValue(m.Counter.GetValue()),
219}
220
221if m.TimestampMs != nil {
222smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
223} else {
224smpl.Timestamp = o.Timestamp
225}
226
227samples = append(samples, smpl)
228}
229
230return samples
231}
232
233func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
234samples := make(model.Vector, 0, len(f.Metric))
235
236for _, m := range f.Metric {
237if m.Gauge == nil {
238continue
239}
240
241lset := make(model.LabelSet, len(m.Label)+1)
242for _, p := range m.Label {
243lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
244}
245lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
246
247smpl := &model.Sample{
248Metric: model.Metric(lset),
249Value: model.SampleValue(m.Gauge.GetValue()),
250}
251
252if m.TimestampMs != nil {
253smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
254} else {
255smpl.Timestamp = o.Timestamp
256}
257
258samples = append(samples, smpl)
259}
260
261return samples
262}
263
264func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
265samples := make(model.Vector, 0, len(f.Metric))
266
267for _, m := range f.Metric {
268if m.Untyped == nil {
269continue
270}
271
272lset := make(model.LabelSet, len(m.Label)+1)
273for _, p := range m.Label {
274lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
275}
276lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
277
278smpl := &model.Sample{
279Metric: model.Metric(lset),
280Value: model.SampleValue(m.Untyped.GetValue()),
281}
282
283if m.TimestampMs != nil {
284smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
285} else {
286smpl.Timestamp = o.Timestamp
287}
288
289samples = append(samples, smpl)
290}
291
292return samples
293}
294
295func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
296samples := make(model.Vector, 0, len(f.Metric))
297
298for _, m := range f.Metric {
299if m.Summary == nil {
300continue
301}
302
303timestamp := o.Timestamp
304if m.TimestampMs != nil {
305timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
306}
307
308for _, q := range m.Summary.Quantile {
309lset := make(model.LabelSet, len(m.Label)+2)
310for _, p := range m.Label {
311lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
312}
313// BUG(matt): Update other names to "quantile".
314lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
315lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
316
317samples = append(samples, &model.Sample{
318Metric: model.Metric(lset),
319Value: model.SampleValue(q.GetValue()),
320Timestamp: timestamp,
321})
322}
323
324lset := make(model.LabelSet, len(m.Label)+1)
325for _, p := range m.Label {
326lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
327}
328lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
329
330samples = append(samples, &model.Sample{
331Metric: model.Metric(lset),
332Value: model.SampleValue(m.Summary.GetSampleSum()),
333Timestamp: timestamp,
334})
335
336lset = make(model.LabelSet, len(m.Label)+1)
337for _, p := range m.Label {
338lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
339}
340lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
341
342samples = append(samples, &model.Sample{
343Metric: model.Metric(lset),
344Value: model.SampleValue(m.Summary.GetSampleCount()),
345Timestamp: timestamp,
346})
347}
348
349return samples
350}
351
352func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
353samples := make(model.Vector, 0, len(f.Metric))
354
355for _, m := range f.Metric {
356if m.Histogram == nil {
357continue
358}
359
360timestamp := o.Timestamp
361if m.TimestampMs != nil {
362timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
363}
364
365infSeen := false
366
367for _, q := range m.Histogram.Bucket {
368lset := make(model.LabelSet, len(m.Label)+2)
369for _, p := range m.Label {
370lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
371}
372lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
373lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
374
375if math.IsInf(q.GetUpperBound(), +1) {
376infSeen = true
377}
378
379samples = append(samples, &model.Sample{
380Metric: model.Metric(lset),
381Value: model.SampleValue(q.GetCumulativeCount()),
382Timestamp: timestamp,
383})
384}
385
386lset := make(model.LabelSet, len(m.Label)+1)
387for _, p := range m.Label {
388lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
389}
390lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
391
392samples = append(samples, &model.Sample{
393Metric: model.Metric(lset),
394Value: model.SampleValue(m.Histogram.GetSampleSum()),
395Timestamp: timestamp,
396})
397
398lset = make(model.LabelSet, len(m.Label)+1)
399for _, p := range m.Label {
400lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
401}
402lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
403
404count := &model.Sample{
405Metric: model.Metric(lset),
406Value: model.SampleValue(m.Histogram.GetSampleCount()),
407Timestamp: timestamp,
408}
409samples = append(samples, count)
410
411if !infSeen {
412// Append an infinity bucket sample.
413lset := make(model.LabelSet, len(m.Label)+2)
414for _, p := range m.Label {
415lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
416}
417lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
418lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
419
420samples = append(samples, &model.Sample{
421Metric: model.Metric(lset),
422Value: count.Value,
423Timestamp: timestamp,
424})
425}
426}
427
428return samples
429}
430