prometheus-net

Форк
0
/
Histogram.cs 
403 строки · 16.4 Кб
1
using System.Numerics;
2
using System.Runtime.CompilerServices;
3

4
#if NET7_0_OR_GREATER
5
using System.Runtime.Intrinsics;
6
using System.Runtime.Intrinsics.X86;
7
#endif
8

9
namespace Prometheus;
10

11
/// <remarks>
12
/// The histogram is thread-safe but not atomic - the sum of values and total count of events
13
/// may not add up perfectly with bucket contents if new observations are made during a collection.
14
/// </remarks>
15
public sealed class Histogram : Collector<Histogram.Child>, IHistogram
16
{
17
    private static readonly double[] DefaultBuckets = [.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10];
18

19
    private readonly double[] _buckets;
20

21
#if NET7_0_OR_GREATER
22
    // For AVX, we need to align on 32 bytes and pin the memory. This is a buffer
23
    // with extra items that we can "skip" when using the data, for alignment purposes.
24
    private readonly double[] _bucketsAlignmentBuffer;
25
    // How many items from the start to skip.
26
    private readonly int _bucketsAlignmentBufferOffset;
27

28
    private const int AvxAlignBytes = 32;
29
#endif
30

31
    // These labels go together with the buckets, so we do not need to allocate them for every child.
32
    private readonly CanonicalLabel[] _leLabels;
33

34
    private static readonly byte[] LeLabelName = "le"u8.ToArray();
35

36
    internal Histogram(string name, string help, StringSequence instanceLabelNames, LabelSequence staticLabels, bool suppressInitialValue, double[]? buckets, ExemplarBehavior exemplarBehavior)
37
        : base(name, help, instanceLabelNames, staticLabels, suppressInitialValue, exemplarBehavior)
38
    {
39
        if (instanceLabelNames.Contains("le"))
40
        {
41
            throw new ArgumentException("'le' is a reserved label name");
42
        }
43

44
        _buckets = buckets ?? DefaultBuckets;
45

46
        if (_buckets.Length == 0)
47
        {
48
            throw new ArgumentException("Histogram must have at least one bucket");
49
        }
50

51
        if (!double.IsPositiveInfinity(_buckets[_buckets.Length - 1]))
52
        {
53
            _buckets = [.. _buckets, double.PositiveInfinity];
54
        }
55

56
        for (int i = 1; i < _buckets.Length; i++)
57
        {
58
            if (_buckets[i] <= _buckets[i - 1])
59
            {
60
                throw new ArgumentException("Bucket values must be increasing");
61
            }
62
        }
63

64
        _leLabels = new CanonicalLabel[_buckets.Length];
65
        for (var i = 0; i < _buckets.Length; i++)
66
        {
67
            _leLabels[i] = TextSerializer.EncodeValueAsCanonicalLabel(LeLabelName, _buckets[i]);
68
        }
69

70
#if NET7_0_OR_GREATER
71
        if (Avx.IsSupported)
72
        {
73
            _bucketsAlignmentBuffer = GC.AllocateUninitializedArray<double>(_buckets.Length + (AvxAlignBytes / sizeof(double)), pinned: true);
74

75
            unsafe
76
            {
77
                var pointer = (nuint)Unsafe.AsPointer(ref _bucketsAlignmentBuffer[0]);
78
                var pointerTooFarByBytes = pointer % AvxAlignBytes;
79
                var bytesUntilNextAlignedPosition = (AvxAlignBytes - pointerTooFarByBytes) % AvxAlignBytes;
80

81
                if (bytesUntilNextAlignedPosition % sizeof(double) != 0)
82
                    throw new Exception("Unreachable code reached - all double[] allocations are expected to be at least 8-aligned.");
83

84
                _bucketsAlignmentBufferOffset = (int)(bytesUntilNextAlignedPosition / sizeof(double));
85
            }
86

87
            Array.Copy(_buckets, 0, _bucketsAlignmentBuffer, _bucketsAlignmentBufferOffset, _buckets.Length);
88
        }
89
        else
90
        {
91
            _bucketsAlignmentBuffer = [];
92
        }
93
#endif
94
    }
95

96
    private protected override Child NewChild(LabelSequence instanceLabels, LabelSequence flattenedLabels, bool publish, ExemplarBehavior exemplarBehavior)
97
    {
98
        return new Child(this, instanceLabels, flattenedLabels, publish, exemplarBehavior);
99
    }
100

101
    public sealed class Child : ChildBase, IHistogram
102
    {
103
        internal Child(Histogram parent, LabelSequence instanceLabels, LabelSequence flattenedLabels, bool publish, ExemplarBehavior exemplarBehavior)
104
            : base(parent, instanceLabels, flattenedLabels, publish, exemplarBehavior)
105
        {
106
            Parent = parent;
107

108
            _bucketCounts = new ThreadSafeLong[Parent._buckets.Length];
109

110
            _exemplars = new ObservedExemplar[Parent._buckets.Length];
111
            for (var i = 0; i < Parent._buckets.Length; i++)
112
            {
113
                _exemplars[i] = ObservedExemplar.Empty;
114
            }
115
        }
116

117
        internal new readonly Histogram Parent;
118

119
        private ThreadSafeDouble _sum = new(0.0D);
120
        private readonly ThreadSafeLong[] _bucketCounts;
121
        private static readonly byte[] SumSuffix = "sum"u8.ToArray();
122
        private static readonly byte[] CountSuffix = "count"u8.ToArray();
123
        private static readonly byte[] BucketSuffix = "bucket"u8.ToArray();
124
        private readonly ObservedExemplar[] _exemplars;
125

126
#if NET
127
    [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
128
#endif
129
        private protected override async ValueTask CollectAndSerializeImplAsync(IMetricsSerializer serializer,
130
            CancellationToken cancel)
131
        {
132
            // We output sum.
133
            // We output count.
134
            // We output each bucket in order of increasing upper bound.
135
            await serializer.WriteMetricPointAsync(
136
                Parent.NameBytes,
137
                FlattenedLabelsBytes,
138
                CanonicalLabel.Empty,
139
                _sum.Value,
140
                ObservedExemplar.Empty,
141
                SumSuffix,
142
                cancel);
143
            await serializer.WriteMetricPointAsync(
144
                Parent.NameBytes,
145
                FlattenedLabelsBytes,
146
                CanonicalLabel.Empty,
147
                Count,
148
                ObservedExemplar.Empty,
149
                CountSuffix,
150
                cancel);
151

152
            var cumulativeCount = 0L;
153

154
            for (var i = 0; i < _bucketCounts.Length; i++)
155
            {
156
                var exemplar = BorrowExemplar(ref _exemplars[i]);
157

158
                cumulativeCount += _bucketCounts[i].Value;
159
                await serializer.WriteMetricPointAsync(
160
                    Parent.NameBytes,
161
                    FlattenedLabelsBytes,
162
                    Parent._leLabels[i],
163
                    cumulativeCount,
164
                    exemplar,
165
                    BucketSuffix,
166
                    cancel);
167

168
                ReturnBorrowedExemplar(ref _exemplars[i], exemplar);
169
            }
170
        }
171

172
        public double Sum => _sum.Value;
173

174
        public long Count
175
        {
176
            get
177
            {
178
                long total = 0;
179

180
                foreach (var count in _bucketCounts)
181
                    total += count.Value;
182

183
                return total;
184
            }
185
        }
186

187
        public void Observe(double val, Exemplar? exemplarLabels) => ObserveInternal(val, 1, exemplarLabels);
188

189
        public void Observe(double val) => Observe(val, 1);
190

191
        public void Observe(double val, long count) => ObserveInternal(val, count, null);
192

193
        private void ObserveInternal(double val, long count, Exemplar? exemplar)
194
        {
195
            if (double.IsNaN(val))
196
            {
197
                return;
198
            }
199

200
            exemplar ??= GetDefaultExemplar(val);
201

202
            var bucketIndex = GetBucketIndex(val);
203

204
            _bucketCounts[bucketIndex].Add(count);
205

206
            if (exemplar?.Length > 0)
207
                RecordExemplar(exemplar, ref _exemplars[bucketIndex], val);
208

209
            _sum.Add(val * count);
210

211
            Publish();
212
        }
213

214
        private int GetBucketIndex(double val)
215
        {
216
#if NET7_0_OR_GREATER
217
            if (Avx.IsSupported)
218
                return GetBucketIndexAvx(val);
219
#endif
220

221
            for (int i = 0; i < Parent._buckets.Length; i++)
222
            {
223
                if (val <= Parent._buckets[i])
224
                    return i;
225
            }
226

227
            throw new Exception("Unreachable code reached.");
228
        }
229

230
#if NET7_0_OR_GREATER
231
        /// <summary>
232
        /// AVX allows us to perform 4 comparisons at the same time when finding the right bucket to increment.
233
        /// The total speedup is not 4x due to various overheads but it's still 10-30% (more for wider histograms).
234
        /// </summary>
235
        private unsafe int GetBucketIndexAvx(double val)
236
        {
237
            // AVX operates on vectors of N buckets, so if the total is not divisible by N we need to check some of them manually.
238
            var remaining = Parent._buckets.Length % Vector256<double>.Count;
239

240
            for (int i = 0; i < Parent._buckets.Length - remaining; i += Vector256<double>.Count)
241
            {
242
                // The buckets are permanently pinned, no need to re-pin them here.
243
                var boundPointer = (double*)Unsafe.AsPointer(ref Parent._bucketsAlignmentBuffer[Parent._bucketsAlignmentBufferOffset + i]);
244
                var boundVector = Avx.LoadAlignedVector256(boundPointer);
245

246
                var valVector = Vector256.Create(val);
247

248
                var mask = Avx.CompareLessThanOrEqual(valVector, boundVector);
249

250
                // Condenses the mask vector into a 32-bit integer where one bit represents one vector element (so 1111000.. means "first 4 items true").
251
                var moveMask = Avx.MoveMask(mask);
252

253
                var indexInBlock = BitOperations.TrailingZeroCount(moveMask);
254

255
                if (indexInBlock == sizeof(int) * 8)
256
                    continue; // All bits are zero, so we did not find a match.
257

258
                return i + indexInBlock;
259
            }
260

261
            for (int i = Parent._buckets.Length - remaining; i < Parent._buckets.Length; i++)
262
            {
263
                if (val <= Parent._buckets[i])
264
                    return i;
265
            }
266

267
            throw new Exception("Unreachable code reached.");
268
        }
269
#endif
270
    }
271

272
    internal override MetricType Type => MetricType.Histogram;
273

274
    public double Sum => Unlabelled.Sum;
275
    public long Count => Unlabelled.Count;
276
    public void Observe(double val) => Unlabelled.Observe(val, 1);
277
    public void Observe(double val, long count) => Unlabelled.Observe(val, count);
278
    public void Observe(double val, Exemplar? exemplar) => Unlabelled.Observe(val, exemplar);
279
    public void Publish() => Unlabelled.Publish();
280
    public void Unpublish() => Unlabelled.Unpublish();
281

282
    // From https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go
283
    /// <summary>  
284
    ///  Creates '<paramref name="count"/>' buckets, where the lowest bucket has an
285
    ///  upper bound of '<paramref name="start"/>' and each following bucket's upper bound is '<paramref name="factor"/>'
286
    ///  times the previous bucket's upper bound.
287
    /// 
288
    ///  The function throws if '<paramref name="count"/>' is 0 or negative, if '<paramref name="start"/>' is 0 or negative,
289
    ///  or if '<paramref name="factor"/>' is less than or equal 1.
290
    /// </summary>
291
    /// <param name="start">The upper bound of the lowest bucket. Must be positive.</param>
292
    /// <param name="factor">The factor to increase the upper bound of subsequent buckets. Must be greater than 1.</param>
293
    /// <param name="count">The number of buckets to create. Must be positive.</param>
294
    public static double[] ExponentialBuckets(double start, double factor, int count)
295
    {
296
        if (count <= 0) throw new ArgumentException($"{nameof(ExponentialBuckets)} needs a positive {nameof(count)}");
297
        if (start <= 0) throw new ArgumentException($"{nameof(ExponentialBuckets)} needs a positive {nameof(start)}");
298
        if (factor <= 1) throw new ArgumentException($"{nameof(ExponentialBuckets)} needs a {nameof(factor)} greater than 1");
299

300
        // The math we do can make it incur some tiny avoidable error due to floating point gremlins.
301
        // We use decimal for the path to preserve as much accuracy as we can, before finally converting to double.
302
        // It will not fix 100% of the cases where we end up with 0.0000000000000000000000000000001 offset but it helps a lot.
303

304
        var next = (decimal)start;
305
        var buckets = new double[count];
306

307
        for (var i = 0; i < buckets.Length; i++)
308
        {
309
            buckets[i] = (double)next;
310
            next *= (decimal)factor;
311
        }
312

313
        return buckets;
314
    }
315

316
    // From https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go
317
    /// <summary>  
318
    ///  Creates '<paramref name="count"/>' buckets, where the lowest bucket has an
319
    ///  upper bound of '<paramref name="start"/>' and each following bucket's upper bound is the upper bound of the
320
    ///  previous bucket, incremented by '<paramref name="width"/>'
321
    /// 
322
    ///  The function throws if '<paramref name="count"/>' is 0 or negative.
323
    /// </summary>
324
    /// <param name="start">The upper bound of the lowest bucket.</param>
325
    /// <param name="width">The width of each bucket (distance between lower and upper bound).</param>
326
    /// <param name="count">The number of buckets to create. Must be positive.</param>
327
    public static double[] LinearBuckets(double start, double width, int count)
328
    {
329
        if (count <= 0) throw new ArgumentException($"{nameof(LinearBuckets)} needs a positive {nameof(count)}");
330

331
        // The math we do can make it incur some tiny avoidable error due to floating point gremlins.
332
        // We use decimal for the path to preserve as much accuracy as we can, before finally converting to double.
333
        // It will not fix 100% of the cases where we end up with 0.0000000000000000000000000000001 offset but it helps a lot.
334

335
        var next = (decimal)start;
336
        var buckets = new double[count];
337

338
        for (var i = 0; i < buckets.Length; i++)
339
        {
340
            buckets[i] = (double)next;
341
            next += (decimal)width;
342
        }
343

344
        return buckets;
345
    }
346

347
    /// <summary>
348
    /// Divides each power of 10 into N divisions.
349
    /// </summary>
350
    /// <param name="startPower">The starting range includes 10 raised to this power.</param>
351
    /// <param name="endPower">The ranges end with 10 raised to this power (this no longer starts a new range).</param>
352
    /// <param name="divisions">How many divisions to divide each range into.</param>
353
    /// <remarks>
354
    /// For example, with startPower=-1, endPower=2, divisions=4 we would get:
355
    /// 10^-1 == 0.1 which defines our starting range, giving buckets: 0.25, 0.5, 0.75, 1.0
356
    /// 10^0 == 1 which is the next range, giving buckets: 2.5, 5, 7.5, 10
357
    /// 10^1 == 10 which is the next range, giving buckets: 25, 50, 75, 100
358
    /// 10^2 == 100 which is the end and the top level of the preceding range.
359
    /// Giving total buckets: 0.25, 0.5, 0.75, 1.0, 2.5, 5, 7.5, 10, 25, 50, 75, 100
360
    /// </remarks>
361
    public static double[] PowersOfTenDividedBuckets(int startPower, int endPower, int divisions)
362
    {
363
        if (startPower >= endPower)
364
            throw new ArgumentException($"{nameof(startPower)} must be less than {nameof(endPower)}.", nameof(startPower));
365

366
        if (divisions <= 0)
367
            throw new ArgumentOutOfRangeException($"{nameof(divisions)} must be a positive integer.", nameof(divisions));
368

369
        var buckets = new List<double>();
370

371
        for (var powerOfTen = startPower; powerOfTen < endPower; powerOfTen++)
372
        {
373
            // This gives us the upper bound (the start of the next range).
374
            var max = (decimal)Math.Pow(10, powerOfTen + 1);
375

376
            // Then we just divide it into N divisions and we are done!
377
            for (var division = 0; division < divisions; division++)
378
            {
379
                var bucket = max / divisions * (division + 1);
380

381
                // The math we do can make it incur some tiny avoidable error due to floating point gremlins.
382
                // We use decimal for the path to preserve as much accuracy as we can, before finally converting to double.
383
                // It will not fix 100% of the cases where we end up with 0.0000000000000000000000000000001 offset but it helps a lot.
384
                var candidate = (double)bucket;
385

386
                // Depending on the number of divisions, it may be that divisions from different powers overlap.
387
                // For example, a division into 20 would include:
388
                // 19th value in the 0th power: 9.5 (10/20*19=9.5)
389
                // 1st value in the 1st power: 5 (100/20*1 = 5)
390
                // To avoid this being a problem, we simply constrain all values to be increasing.
391
                if (buckets.Any() && buckets.Last() >= candidate)
392
                    continue; // Skip this one, it is not greater.
393

394
                buckets.Add(candidate);
395
            }
396
        }
397

398
        return [.. buckets];
399
    }
400

401
    // sum + count + buckets
402
    internal override int TimeseriesCount => ChildCount * (2 + _buckets.Length);
403
}

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.