prometheus
457 строк · 12.2 Кб
1// Copyright 2020 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package storage
15
16import (
17"fmt"
18"math"
19"sort"
20
21"github.com/prometheus/prometheus/model/histogram"
22"github.com/prometheus/prometheus/model/labels"
23"github.com/prometheus/prometheus/tsdb/chunkenc"
24"github.com/prometheus/prometheus/tsdb/chunks"
25)
26
27type SeriesEntry struct {
28Lset labels.Labels
29SampleIteratorFn func(chunkenc.Iterator) chunkenc.Iterator
30}
31
32func (s *SeriesEntry) Labels() labels.Labels { return s.Lset }
33func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return s.SampleIteratorFn(it) }
34
35type ChunkSeriesEntry struct {
36Lset labels.Labels
37ChunkIteratorFn func(chunks.Iterator) chunks.Iterator
38}
39
40func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
41func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) }
42
43// NewListSeries returns series entry with iterator that allows to iterate over provided samples.
44func NewListSeries(lset labels.Labels, s []chunks.Sample) *SeriesEntry {
45samplesS := Samples(samples(s))
46return &SeriesEntry{
47Lset: lset,
48SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
49if lsi, ok := it.(*listSeriesIterator); ok {
50lsi.Reset(samplesS)
51return lsi
52}
53return NewListSeriesIterator(samplesS)
54},
55}
56}
57
58// NewListChunkSeriesFromSamples returns a chunk series entry that allows to iterate over provided samples.
59// NOTE: It uses an inefficient chunks encoding implementation, not caring about chunk size.
60// Use only for testing.
61func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]chunks.Sample) *ChunkSeriesEntry {
62chksFromSamples := make([]chunks.Meta, 0, len(samples))
63for _, s := range samples {
64cfs, err := chunks.ChunkFromSamples(s)
65if err != nil {
66return &ChunkSeriesEntry{
67Lset: lset,
68ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator {
69return errChunksIterator{err: err}
70},
71}
72}
73chksFromSamples = append(chksFromSamples, cfs)
74}
75return &ChunkSeriesEntry{
76Lset: lset,
77ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator {
78lcsi, existing := it.(*listChunkSeriesIterator)
79var chks []chunks.Meta
80if existing {
81chks = lcsi.chks[:0]
82} else {
83chks = make([]chunks.Meta, 0, len(samples))
84}
85chks = append(chks, chksFromSamples...)
86if existing {
87lcsi.Reset(chks...)
88return lcsi
89}
90return NewListChunkSeriesIterator(chks...)
91},
92}
93}
94
95type listSeriesIterator struct {
96samples Samples
97idx int
98}
99
100type samples []chunks.Sample
101
102func (s samples) Get(i int) chunks.Sample { return s[i] }
103func (s samples) Len() int { return len(s) }
104
105// Samples interface allows to work on arrays of types that are compatible with chunks.Sample.
106type Samples interface {
107Get(i int) chunks.Sample
108Len() int
109}
110
111// NewListSeriesIterator returns listSeriesIterator that allows to iterate over provided samples.
112func NewListSeriesIterator(samples Samples) chunkenc.Iterator {
113return &listSeriesIterator{samples: samples, idx: -1}
114}
115
116func (it *listSeriesIterator) Reset(samples Samples) {
117it.samples = samples
118it.idx = -1
119}
120
121func (it *listSeriesIterator) At() (int64, float64) {
122s := it.samples.Get(it.idx)
123return s.T(), s.F()
124}
125
126func (it *listSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
127s := it.samples.Get(it.idx)
128return s.T(), s.H()
129}
130
131func (it *listSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
132s := it.samples.Get(it.idx)
133return s.T(), s.FH()
134}
135
136func (it *listSeriesIterator) AtT() int64 {
137s := it.samples.Get(it.idx)
138return s.T()
139}
140
141func (it *listSeriesIterator) Next() chunkenc.ValueType {
142it.idx++
143if it.idx >= it.samples.Len() {
144return chunkenc.ValNone
145}
146return it.samples.Get(it.idx).Type()
147}
148
149func (it *listSeriesIterator) Seek(t int64) chunkenc.ValueType {
150if it.idx == -1 {
151it.idx = 0
152}
153if it.idx >= it.samples.Len() {
154return chunkenc.ValNone
155}
156// No-op check.
157if s := it.samples.Get(it.idx); s.T() >= t {
158return s.Type()
159}
160// Do binary search between current position and end.
161it.idx += sort.Search(it.samples.Len()-it.idx, func(i int) bool {
162s := it.samples.Get(i + it.idx)
163return s.T() >= t
164})
165
166if it.idx >= it.samples.Len() {
167return chunkenc.ValNone
168}
169return it.samples.Get(it.idx).Type()
170}
171
172func (it *listSeriesIterator) Err() error { return nil }
173
174type listChunkSeriesIterator struct {
175chks []chunks.Meta
176idx int
177}
178
179// NewListChunkSeriesIterator returns listChunkSeriesIterator that allows to iterate over provided chunks.
180func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator {
181return &listChunkSeriesIterator{chks: chks, idx: -1}
182}
183
184func (it *listChunkSeriesIterator) Reset(chks ...chunks.Meta) {
185it.chks = chks
186it.idx = -1
187}
188
189func (it *listChunkSeriesIterator) At() chunks.Meta {
190return it.chks[it.idx]
191}
192
193func (it *listChunkSeriesIterator) Next() bool {
194it.idx++
195return it.idx < len(it.chks)
196}
197
198func (it *listChunkSeriesIterator) Err() error { return nil }
199
200type chunkSetToSeriesSet struct {
201ChunkSeriesSet
202
203iter chunks.Iterator
204chkIterErr error
205sameSeriesChunks []Series
206}
207
208// NewSeriesSetFromChunkSeriesSet converts ChunkSeriesSet to SeriesSet by decoding chunks one by one.
209func NewSeriesSetFromChunkSeriesSet(chk ChunkSeriesSet) SeriesSet {
210return &chunkSetToSeriesSet{ChunkSeriesSet: chk}
211}
212
213func (c *chunkSetToSeriesSet) Next() bool {
214if c.Err() != nil || !c.ChunkSeriesSet.Next() {
215return false
216}
217
218c.iter = c.ChunkSeriesSet.At().Iterator(c.iter)
219c.sameSeriesChunks = nil
220
221for c.iter.Next() {
222c.sameSeriesChunks = append(
223c.sameSeriesChunks,
224newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), c.iter.At()),
225)
226}
227
228if c.iter.Err() != nil {
229c.chkIterErr = c.iter.Err()
230return false
231}
232return true
233}
234
235func (c *chunkSetToSeriesSet) At() Series {
236// Series composed of same chunks for the same series.
237return ChainedSeriesMerge(c.sameSeriesChunks...)
238}
239
240func (c *chunkSetToSeriesSet) Err() error {
241if c.chkIterErr != nil {
242return c.chkIterErr
243}
244return c.ChunkSeriesSet.Err()
245}
246
247func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series {
248return &SeriesEntry{
249Lset: labels,
250SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
251// TODO(bwplotka): Can we provide any chunkenc buffer?
252return chk.Chunk.Iterator(it)
253},
254}
255}
256
257type seriesSetToChunkSet struct {
258SeriesSet
259}
260
261// NewSeriesSetToChunkSet converts SeriesSet to ChunkSeriesSet by encoding chunks from samples.
262func NewSeriesSetToChunkSet(chk SeriesSet) ChunkSeriesSet {
263return &seriesSetToChunkSet{SeriesSet: chk}
264}
265
266func (c *seriesSetToChunkSet) Next() bool {
267if c.Err() != nil || !c.SeriesSet.Next() {
268return false
269}
270return true
271}
272
273func (c *seriesSetToChunkSet) At() ChunkSeries {
274return NewSeriesToChunkEncoder(c.SeriesSet.At())
275}
276
277func (c *seriesSetToChunkSet) Err() error {
278return c.SeriesSet.Err()
279}
280
281type seriesToChunkEncoder struct {
282Series
283}
284
285const seriesToChunkEncoderSplit = 120
286
287// NewSeriesToChunkEncoder encodes samples to chunks with 120 samples limit.
288func NewSeriesToChunkEncoder(series Series) ChunkSeries {
289return &seriesToChunkEncoder{series}
290}
291
292func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
293var (
294chk, newChk chunkenc.Chunk
295app chunkenc.Appender
296err error
297recoded bool
298)
299mint := int64(math.MaxInt64)
300maxt := int64(math.MinInt64)
301
302var chks []chunks.Meta
303lcsi, existing := it.(*listChunkSeriesIterator)
304if existing {
305chks = lcsi.chks[:0]
306}
307
308i := 0
309seriesIter := s.Series.Iterator(nil)
310lastType := chunkenc.ValNone
311for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
312if typ != lastType || i >= seriesToChunkEncoderSplit {
313// Create a new chunk if the sample type changed or too many samples in the current one.
314chks = appendChunk(chks, mint, maxt, chk)
315chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
316if err != nil {
317return errChunksIterator{err: err}
318}
319app, err = chk.Appender()
320if err != nil {
321return errChunksIterator{err: err}
322}
323mint = int64(math.MaxInt64)
324// maxt is immediately overwritten below which is why setting it here won't make a difference.
325i = 0
326}
327lastType = typ
328
329var (
330t int64
331v float64
332h *histogram.Histogram
333fh *histogram.FloatHistogram
334)
335switch typ {
336case chunkenc.ValFloat:
337t, v = seriesIter.At()
338app.Append(t, v)
339case chunkenc.ValHistogram:
340t, h = seriesIter.AtHistogram(nil)
341newChk, recoded, app, err = app.AppendHistogram(nil, t, h, false)
342if err != nil {
343return errChunksIterator{err: err}
344}
345if newChk != nil {
346if !recoded {
347chks = appendChunk(chks, mint, maxt, chk)
348mint = int64(math.MaxInt64)
349// maxt is immediately overwritten below which is why setting it here won't make a difference.
350i = 0
351}
352chk = newChk
353}
354case chunkenc.ValFloatHistogram:
355t, fh = seriesIter.AtFloatHistogram(nil)
356newChk, recoded, app, err = app.AppendFloatHistogram(nil, t, fh, false)
357if err != nil {
358return errChunksIterator{err: err}
359}
360if newChk != nil {
361if !recoded {
362chks = appendChunk(chks, mint, maxt, chk)
363mint = int64(math.MaxInt64)
364// maxt is immediately overwritten below which is why setting it here won't make a difference.
365i = 0
366}
367chk = newChk
368}
369default:
370return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())}
371}
372
373maxt = t
374if mint == math.MaxInt64 {
375mint = t
376}
377i++
378}
379if err := seriesIter.Err(); err != nil {
380return errChunksIterator{err: err}
381}
382
383chks = appendChunk(chks, mint, maxt, chk)
384
385if existing {
386lcsi.Reset(chks...)
387return lcsi
388}
389return NewListChunkSeriesIterator(chks...)
390}
391
392func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta {
393if chk != nil {
394chks = append(chks, chunks.Meta{
395MinTime: mint,
396MaxTime: maxt,
397Chunk: chk,
398})
399}
400return chks
401}
402
403type errChunksIterator struct {
404err error
405}
406
407func (e errChunksIterator) At() chunks.Meta { return chunks.Meta{} }
408func (e errChunksIterator) Next() bool { return false }
409func (e errChunksIterator) Err() error { return e.err }
410
411// ExpandSamples iterates over all samples in the iterator, buffering all in slice.
412// Optionally it takes samples constructor, useful when you want to compare sample slices with different
413// sample implementations. if nil, sample type from this package will be used.
414func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
415if newSampleFn == nil {
416newSampleFn = func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample {
417switch {
418case h != nil:
419return hSample{t, h}
420case fh != nil:
421return fhSample{t, fh}
422default:
423return fSample{t, f}
424}
425}
426}
427
428var result []chunks.Sample
429for {
430switch iter.Next() {
431case chunkenc.ValNone:
432return result, iter.Err()
433case chunkenc.ValFloat:
434t, f := iter.At()
435// NaNs can't be compared normally, so substitute for another value.
436if math.IsNaN(f) {
437f = -42
438}
439result = append(result, newSampleFn(t, f, nil, nil))
440case chunkenc.ValHistogram:
441t, h := iter.AtHistogram(nil)
442result = append(result, newSampleFn(t, 0, h, nil))
443case chunkenc.ValFloatHistogram:
444t, fh := iter.AtFloatHistogram(nil)
445result = append(result, newSampleFn(t, 0, nil, fh))
446}
447}
448}
449
450// ExpandChunks iterates over all chunks in the iterator, buffering all in slice.
451func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) {
452var result []chunks.Meta
453for iter.Next() {
454result = append(result, iter.At())
455}
456return result, iter.Err()
457}
458