prometheus-net
403 строки · 16.4 Кб
1using System.Numerics;
2using System.Runtime.CompilerServices;
3
4#if NET7_0_OR_GREATER
5using System.Runtime.Intrinsics;
6using System.Runtime.Intrinsics.X86;
7#endif
8
9namespace Prometheus;
10
11/// <remarks>
12/// The histogram is thread-safe but not atomic - the sum of values and total count of events
13/// may not add up perfectly with bucket contents if new observations are made during a collection.
14/// </remarks>
15public sealed class Histogram : Collector<Histogram.Child>, IHistogram
16{
17private static readonly double[] DefaultBuckets = [.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10];
18
19private readonly double[] _buckets;
20
21#if NET7_0_OR_GREATER
22// For AVX, we need to align on 32 bytes and pin the memory. This is a buffer
23// with extra items that we can "skip" when using the data, for alignment purposes.
24private readonly double[] _bucketsAlignmentBuffer;
25// How many items from the start to skip.
26private readonly int _bucketsAlignmentBufferOffset;
27
28private const int AvxAlignBytes = 32;
29#endif
30
31// These labels go together with the buckets, so we do not need to allocate them for every child.
32private readonly CanonicalLabel[] _leLabels;
33
34private static readonly byte[] LeLabelName = "le"u8.ToArray();
35
36internal Histogram(string name, string help, StringSequence instanceLabelNames, LabelSequence staticLabels, bool suppressInitialValue, double[]? buckets, ExemplarBehavior exemplarBehavior)
37: base(name, help, instanceLabelNames, staticLabels, suppressInitialValue, exemplarBehavior)
38{
39if (instanceLabelNames.Contains("le"))
40{
41throw new ArgumentException("'le' is a reserved label name");
42}
43
44_buckets = buckets ?? DefaultBuckets;
45
46if (_buckets.Length == 0)
47{
48throw new ArgumentException("Histogram must have at least one bucket");
49}
50
51if (!double.IsPositiveInfinity(_buckets[_buckets.Length - 1]))
52{
53_buckets = [.. _buckets, double.PositiveInfinity];
54}
55
56for (int i = 1; i < _buckets.Length; i++)
57{
58if (_buckets[i] <= _buckets[i - 1])
59{
60throw new ArgumentException("Bucket values must be increasing");
61}
62}
63
64_leLabels = new CanonicalLabel[_buckets.Length];
65for (var i = 0; i < _buckets.Length; i++)
66{
67_leLabels[i] = TextSerializer.EncodeValueAsCanonicalLabel(LeLabelName, _buckets[i]);
68}
69
70#if NET7_0_OR_GREATER
71if (Avx.IsSupported)
72{
73_bucketsAlignmentBuffer = GC.AllocateUninitializedArray<double>(_buckets.Length + (AvxAlignBytes / sizeof(double)), pinned: true);
74
75unsafe
76{
77var pointer = (nuint)Unsafe.AsPointer(ref _bucketsAlignmentBuffer[0]);
78var pointerTooFarByBytes = pointer % AvxAlignBytes;
79var bytesUntilNextAlignedPosition = (AvxAlignBytes - pointerTooFarByBytes) % AvxAlignBytes;
80
81if (bytesUntilNextAlignedPosition % sizeof(double) != 0)
82throw new Exception("Unreachable code reached - all double[] allocations are expected to be at least 8-aligned.");
83
84_bucketsAlignmentBufferOffset = (int)(bytesUntilNextAlignedPosition / sizeof(double));
85}
86
87Array.Copy(_buckets, 0, _bucketsAlignmentBuffer, _bucketsAlignmentBufferOffset, _buckets.Length);
88}
89else
90{
91_bucketsAlignmentBuffer = [];
92}
93#endif
94}
95
96private protected override Child NewChild(LabelSequence instanceLabels, LabelSequence flattenedLabels, bool publish, ExemplarBehavior exemplarBehavior)
97{
98return new Child(this, instanceLabels, flattenedLabels, publish, exemplarBehavior);
99}
100
101public sealed class Child : ChildBase, IHistogram
102{
103internal Child(Histogram parent, LabelSequence instanceLabels, LabelSequence flattenedLabels, bool publish, ExemplarBehavior exemplarBehavior)
104: base(parent, instanceLabels, flattenedLabels, publish, exemplarBehavior)
105{
106Parent = parent;
107
108_bucketCounts = new ThreadSafeLong[Parent._buckets.Length];
109
110_exemplars = new ObservedExemplar[Parent._buckets.Length];
111for (var i = 0; i < Parent._buckets.Length; i++)
112{
113_exemplars[i] = ObservedExemplar.Empty;
114}
115}
116
117internal new readonly Histogram Parent;
118
119private ThreadSafeDouble _sum = new(0.0D);
120private readonly ThreadSafeLong[] _bucketCounts;
121private static readonly byte[] SumSuffix = "sum"u8.ToArray();
122private static readonly byte[] CountSuffix = "count"u8.ToArray();
123private static readonly byte[] BucketSuffix = "bucket"u8.ToArray();
124private readonly ObservedExemplar[] _exemplars;
125
126#if NET
127[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
128#endif
129private protected override async ValueTask CollectAndSerializeImplAsync(IMetricsSerializer serializer,
130CancellationToken cancel)
131{
132// We output sum.
133// We output count.
134// We output each bucket in order of increasing upper bound.
135await serializer.WriteMetricPointAsync(
136Parent.NameBytes,
137FlattenedLabelsBytes,
138CanonicalLabel.Empty,
139_sum.Value,
140ObservedExemplar.Empty,
141SumSuffix,
142cancel);
143await serializer.WriteMetricPointAsync(
144Parent.NameBytes,
145FlattenedLabelsBytes,
146CanonicalLabel.Empty,
147Count,
148ObservedExemplar.Empty,
149CountSuffix,
150cancel);
151
152var cumulativeCount = 0L;
153
154for (var i = 0; i < _bucketCounts.Length; i++)
155{
156var exemplar = BorrowExemplar(ref _exemplars[i]);
157
158cumulativeCount += _bucketCounts[i].Value;
159await serializer.WriteMetricPointAsync(
160Parent.NameBytes,
161FlattenedLabelsBytes,
162Parent._leLabels[i],
163cumulativeCount,
164exemplar,
165BucketSuffix,
166cancel);
167
168ReturnBorrowedExemplar(ref _exemplars[i], exemplar);
169}
170}
171
172public double Sum => _sum.Value;
173
174public long Count
175{
176get
177{
178long total = 0;
179
180foreach (var count in _bucketCounts)
181total += count.Value;
182
183return total;
184}
185}
186
187public void Observe(double val, Exemplar? exemplarLabels) => ObserveInternal(val, 1, exemplarLabels);
188
189public void Observe(double val) => Observe(val, 1);
190
191public void Observe(double val, long count) => ObserveInternal(val, count, null);
192
193private void ObserveInternal(double val, long count, Exemplar? exemplar)
194{
195if (double.IsNaN(val))
196{
197return;
198}
199
200exemplar ??= GetDefaultExemplar(val);
201
202var bucketIndex = GetBucketIndex(val);
203
204_bucketCounts[bucketIndex].Add(count);
205
206if (exemplar?.Length > 0)
207RecordExemplar(exemplar, ref _exemplars[bucketIndex], val);
208
209_sum.Add(val * count);
210
211Publish();
212}
213
214private int GetBucketIndex(double val)
215{
216#if NET7_0_OR_GREATER
217if (Avx.IsSupported)
218return GetBucketIndexAvx(val);
219#endif
220
221for (int i = 0; i < Parent._buckets.Length; i++)
222{
223if (val <= Parent._buckets[i])
224return i;
225}
226
227throw new Exception("Unreachable code reached.");
228}
229
230#if NET7_0_OR_GREATER
231/// <summary>
232/// AVX allows us to perform 4 comparisons at the same time when finding the right bucket to increment.
233/// The total speedup is not 4x due to various overheads but it's still 10-30% (more for wider histograms).
234/// </summary>
235private unsafe int GetBucketIndexAvx(double val)
236{
237// AVX operates on vectors of N buckets, so if the total is not divisible by N we need to check some of them manually.
238var remaining = Parent._buckets.Length % Vector256<double>.Count;
239
240for (int i = 0; i < Parent._buckets.Length - remaining; i += Vector256<double>.Count)
241{
242// The buckets are permanently pinned, no need to re-pin them here.
243var boundPointer = (double*)Unsafe.AsPointer(ref Parent._bucketsAlignmentBuffer[Parent._bucketsAlignmentBufferOffset + i]);
244var boundVector = Avx.LoadAlignedVector256(boundPointer);
245
246var valVector = Vector256.Create(val);
247
248var mask = Avx.CompareLessThanOrEqual(valVector, boundVector);
249
250// Condenses the mask vector into a 32-bit integer where one bit represents one vector element (so 1111000.. means "first 4 items true").
251var moveMask = Avx.MoveMask(mask);
252
253var indexInBlock = BitOperations.TrailingZeroCount(moveMask);
254
255if (indexInBlock == sizeof(int) * 8)
256continue; // All bits are zero, so we did not find a match.
257
258return i + indexInBlock;
259}
260
261for (int i = Parent._buckets.Length - remaining; i < Parent._buckets.Length; i++)
262{
263if (val <= Parent._buckets[i])
264return i;
265}
266
267throw new Exception("Unreachable code reached.");
268}
269#endif
270}
271
272internal override MetricType Type => MetricType.Histogram;
273
274public double Sum => Unlabelled.Sum;
275public long Count => Unlabelled.Count;
276public void Observe(double val) => Unlabelled.Observe(val, 1);
277public void Observe(double val, long count) => Unlabelled.Observe(val, count);
278public void Observe(double val, Exemplar? exemplar) => Unlabelled.Observe(val, exemplar);
279public void Publish() => Unlabelled.Publish();
280public void Unpublish() => Unlabelled.Unpublish();
281
282// From https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go
283/// <summary>
284/// Creates '<paramref name="count"/>' buckets, where the lowest bucket has an
285/// upper bound of '<paramref name="start"/>' and each following bucket's upper bound is '<paramref name="factor"/>'
286/// times the previous bucket's upper bound.
287///
288/// The function throws if '<paramref name="count"/>' is 0 or negative, if '<paramref name="start"/>' is 0 or negative,
289/// or if '<paramref name="factor"/>' is less than or equal 1.
290/// </summary>
291/// <param name="start">The upper bound of the lowest bucket. Must be positive.</param>
292/// <param name="factor">The factor to increase the upper bound of subsequent buckets. Must be greater than 1.</param>
293/// <param name="count">The number of buckets to create. Must be positive.</param>
294public static double[] ExponentialBuckets(double start, double factor, int count)
295{
296if (count <= 0) throw new ArgumentException($"{nameof(ExponentialBuckets)} needs a positive {nameof(count)}");
297if (start <= 0) throw new ArgumentException($"{nameof(ExponentialBuckets)} needs a positive {nameof(start)}");
298if (factor <= 1) throw new ArgumentException($"{nameof(ExponentialBuckets)} needs a {nameof(factor)} greater than 1");
299
300// The math we do can make it incur some tiny avoidable error due to floating point gremlins.
301// We use decimal for the path to preserve as much accuracy as we can, before finally converting to double.
302// It will not fix 100% of the cases where we end up with 0.0000000000000000000000000000001 offset but it helps a lot.
303
304var next = (decimal)start;
305var buckets = new double[count];
306
307for (var i = 0; i < buckets.Length; i++)
308{
309buckets[i] = (double)next;
310next *= (decimal)factor;
311}
312
313return buckets;
314}
315
316// From https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go
317/// <summary>
318/// Creates '<paramref name="count"/>' buckets, where the lowest bucket has an
319/// upper bound of '<paramref name="start"/>' and each following bucket's upper bound is the upper bound of the
320/// previous bucket, incremented by '<paramref name="width"/>'
321///
322/// The function throws if '<paramref name="count"/>' is 0 or negative.
323/// </summary>
324/// <param name="start">The upper bound of the lowest bucket.</param>
325/// <param name="width">The width of each bucket (distance between lower and upper bound).</param>
326/// <param name="count">The number of buckets to create. Must be positive.</param>
327public static double[] LinearBuckets(double start, double width, int count)
328{
329if (count <= 0) throw new ArgumentException($"{nameof(LinearBuckets)} needs a positive {nameof(count)}");
330
331// The math we do can make it incur some tiny avoidable error due to floating point gremlins.
332// We use decimal for the path to preserve as much accuracy as we can, before finally converting to double.
333// It will not fix 100% of the cases where we end up with 0.0000000000000000000000000000001 offset but it helps a lot.
334
335var next = (decimal)start;
336var buckets = new double[count];
337
338for (var i = 0; i < buckets.Length; i++)
339{
340buckets[i] = (double)next;
341next += (decimal)width;
342}
343
344return buckets;
345}
346
347/// <summary>
348/// Divides each power of 10 into N divisions.
349/// </summary>
350/// <param name="startPower">The starting range includes 10 raised to this power.</param>
351/// <param name="endPower">The ranges end with 10 raised to this power (this no longer starts a new range).</param>
352/// <param name="divisions">How many divisions to divide each range into.</param>
353/// <remarks>
354/// For example, with startPower=-1, endPower=2, divisions=4 we would get:
355/// 10^-1 == 0.1 which defines our starting range, giving buckets: 0.25, 0.5, 0.75, 1.0
356/// 10^0 == 1 which is the next range, giving buckets: 2.5, 5, 7.5, 10
357/// 10^1 == 10 which is the next range, giving buckets: 25, 50, 75, 100
358/// 10^2 == 100 which is the end and the top level of the preceding range.
359/// Giving total buckets: 0.25, 0.5, 0.75, 1.0, 2.5, 5, 7.5, 10, 25, 50, 75, 100
360/// </remarks>
361public static double[] PowersOfTenDividedBuckets(int startPower, int endPower, int divisions)
362{
363if (startPower >= endPower)
364throw new ArgumentException($"{nameof(startPower)} must be less than {nameof(endPower)}.", nameof(startPower));
365
366if (divisions <= 0)
367throw new ArgumentOutOfRangeException($"{nameof(divisions)} must be a positive integer.", nameof(divisions));
368
369var buckets = new List<double>();
370
371for (var powerOfTen = startPower; powerOfTen < endPower; powerOfTen++)
372{
373// This gives us the upper bound (the start of the next range).
374var max = (decimal)Math.Pow(10, powerOfTen + 1);
375
376// Then we just divide it into N divisions and we are done!
377for (var division = 0; division < divisions; division++)
378{
379var bucket = max / divisions * (division + 1);
380
381// The math we do can make it incur some tiny avoidable error due to floating point gremlins.
382// We use decimal for the path to preserve as much accuracy as we can, before finally converting to double.
383// It will not fix 100% of the cases where we end up with 0.0000000000000000000000000000001 offset but it helps a lot.
384var candidate = (double)bucket;
385
386// Depending on the number of divisions, it may be that divisions from different powers overlap.
387// For example, a division into 20 would include:
388// 19th value in the 0th power: 9.5 (10/20*19=9.5)
389// 1st value in the 1st power: 5 (100/20*1 = 5)
390// To avoid this being a problem, we simply constrain all values to be increasing.
391if (buckets.Any() && buckets.Last() >= candidate)
392continue; // Skip this one, it is not greater.
393
394buckets.Add(candidate);
395}
396}
397
398return [.. buckets];
399}
400
401// sum + count + buckets
402internal override int TimeseriesCount => ChildCount * (2 + _buckets.Length);
403}