prometheus-net
317 строк · 14.5 Кб
1#if !NET
2using System.Globalization;
3
4namespace Prometheus;
5
6/// <remarks>
7/// Does NOT take ownership of the stream - caller remains the boss.
8/// </remarks>
9internal sealed class TextSerializer : IMetricsSerializer
10{
11internal static readonly byte[] NewLine = [(byte)'\n'];
12internal static readonly byte[] Quote = [(byte)'"'];
13internal static readonly byte[] Equal = [(byte)'='];
14internal static readonly byte[] Comma = [(byte)','];
15internal static readonly byte[] Underscore = [(byte)'_'];
16internal static readonly byte[] LeftBrace = [(byte)'{'];
17internal static readonly byte[] RightBraceSpace = [(byte)'}', (byte)' '];
18internal static readonly byte[] Space = [(byte)' '];
19internal static readonly byte[] SpaceHashSpaceLeftBrace = [(byte)' ', (byte)'#', (byte)' ', (byte)'{'];
20internal static readonly byte[] PositiveInfinity = "+Inf"u8.ToArray();
21internal static readonly byte[] NegativeInfinity = "-Inf"u8.ToArray();
22internal static readonly byte[] NotANumber = "NaN"u8.ToArray();
23internal static readonly byte[] DotZero = ".0"u8.ToArray();
24internal static readonly byte[] FloatPositiveOne = "1.0"u8.ToArray();
25internal static readonly byte[] FloatZero = "0.0"u8.ToArray();
26internal static readonly byte[] FloatNegativeOne = "-1.0"u8.ToArray();
27internal static readonly byte[] IntPositiveOne = "1"u8.ToArray();
28internal static readonly byte[] IntZero = "0"u8.ToArray();
29internal static readonly byte[] IntNegativeOne = "-1"u8.ToArray();
30internal static readonly byte[] EofNewLine = "# EOF\n"u8.ToArray();
31internal static readonly byte[] HashHelpSpace = "# HELP "u8.ToArray();
32internal static readonly byte[] NewlineHashTypeSpace = "\n# TYPE "u8.ToArray();
33
34internal static readonly byte[] Unknown = "unknown"u8.ToArray();
35
36internal static readonly Dictionary<MetricType, byte[]> MetricTypeToBytes = new()
37{
38{ MetricType.Gauge, "gauge"u8.ToArray() },
39{ MetricType.Counter, "counter"u8.ToArray() },
40{ MetricType.Histogram, "histogram"u8.ToArray() },
41{ MetricType.Summary, "summary"u8.ToArray() },
42};
43
44private static readonly char[] DotEChar = ['.', 'e'];
45
46public TextSerializer(Stream stream, ExpositionFormat fmt = ExpositionFormat.PrometheusText)
47{
48_expositionFormat = fmt;
49_stream = new Lazy<Stream>(() => AddStreamBuffering(stream));
50}
51
52// Enables delay-loading of the stream, because touching stream in HTTP handler triggers some behavior.
53public TextSerializer(Func<Stream> streamFactory,
54ExpositionFormat fmt = ExpositionFormat.PrometheusText)
55{
56_expositionFormat = fmt;
57_stream = new Lazy<Stream>(() => AddStreamBuffering(streamFactory()));
58}
59
60/// <summary>
61/// Ensures that writes to the stream are buffered, meaning we do not emit individual "write 1 byte" calls to the stream.
62/// This has been rumored by some users to be relevant in their scenarios (though never with solid evidence or repro steps).
63/// However, we can easily simulate this via the serialization benchmark through named pipes - they are super slow if writing
64/// individual characters. It is a reasonable assumption that this limitation is also true elsewhere, at least on some OS/platform.
65/// </summary>
66private static Stream AddStreamBuffering(Stream inner)
67{
68return new BufferedStream(inner);
69}
70
71public async Task FlushAsync(CancellationToken cancel)
72{
73// If we never opened the stream, we don't touch it on flush.
74if (!_stream.IsValueCreated)
75return;
76
77await _stream.Value.FlushAsync(cancel);
78}
79
80private readonly Lazy<Stream> _stream;
81
82public async ValueTask WriteFamilyDeclarationAsync(string name, byte[] nameBytes, byte[] helpBytes, MetricType type,
83byte[] typeBytes, CancellationToken cancel)
84{
85var nameLen = nameBytes.Length;
86if (_expositionFormat == ExpositionFormat.OpenMetricsText && type == MetricType.Counter)
87{
88if (name.EndsWith("_total"))
89{
90nameLen -= 6; // in OpenMetrics the counter name does not include the _total prefix.
91}
92else
93{
94typeBytes = Unknown; // if the total prefix is missing the _total prefix it is out of spec
95}
96}
97
98await _stream.Value.WriteAsync(HashHelpSpace, 0, HashHelpSpace.Length, cancel);
99await _stream.Value.WriteAsync(nameBytes, 0, nameLen, cancel);
100// The space after the name in "HELP" is mandatory as per ABNF, even if there is no help text.
101await _stream.Value.WriteAsync(Space, 0, Space.Length, cancel);
102if (helpBytes.Length > 0)
103{
104await _stream.Value.WriteAsync(helpBytes, 0, helpBytes.Length, cancel);
105}
106await _stream.Value.WriteAsync(NewlineHashTypeSpace, 0, NewlineHashTypeSpace.Length, cancel);
107await _stream.Value.WriteAsync(nameBytes, 0, nameLen, cancel);
108await _stream.Value.WriteAsync(Space, 0, Space.Length, cancel);
109await _stream.Value.WriteAsync(typeBytes, 0, typeBytes.Length, cancel);
110await _stream.Value.WriteAsync(NewLine, 0, NewLine.Length, cancel);
111}
112
113public async ValueTask WriteEnd(CancellationToken cancel)
114{
115if (_expositionFormat == ExpositionFormat.OpenMetricsText)
116await _stream.Value.WriteAsync(EofNewLine, 0, EofNewLine.Length, cancel);
117}
118
119public async ValueTask WriteMetricPointAsync(byte[] name, byte[] flattenedLabels, CanonicalLabel canonicalLabel,
120double value, ObservedExemplar exemplar, byte[]? suffix, CancellationToken cancel)
121{
122await WriteIdentifierPartAsync(name, flattenedLabels, canonicalLabel, suffix, cancel);
123
124await WriteValue(value, cancel);
125if (_expositionFormat == ExpositionFormat.OpenMetricsText && exemplar.IsValid)
126{
127await WriteExemplarAsync(cancel, exemplar);
128}
129
130await _stream.Value.WriteAsync(NewLine, 0, NewLine.Length, cancel);
131}
132
133public async ValueTask WriteMetricPointAsync(byte[] name, byte[] flattenedLabels, CanonicalLabel canonicalLabel,
134long value, ObservedExemplar exemplar, byte[]? suffix, CancellationToken cancel)
135{
136await WriteIdentifierPartAsync(name, flattenedLabels, canonicalLabel, suffix, cancel);
137
138await WriteValue(value, cancel);
139if (_expositionFormat == ExpositionFormat.OpenMetricsText && exemplar.IsValid)
140{
141await WriteExemplarAsync(cancel, exemplar);
142}
143
144await _stream.Value.WriteAsync(NewLine, 0, NewLine.Length, cancel);
145}
146
147private async Task WriteExemplarAsync(CancellationToken cancel, ObservedExemplar exemplar)
148{
149await _stream.Value.WriteAsync(SpaceHashSpaceLeftBrace, 0, SpaceHashSpaceLeftBrace.Length, cancel);
150for (var i = 0; i < exemplar.Labels!.Length; i++)
151{
152if (i > 0)
153await _stream.Value.WriteAsync(Comma, 0, Comma.Length, cancel);
154
155await WriteLabel(exemplar.Labels![i].KeyBytes, PrometheusConstants.ExemplarEncoding.GetBytes(exemplar.Labels![i].Value), cancel);
156}
157
158await _stream.Value.WriteAsync(RightBraceSpace, 0, RightBraceSpace.Length, cancel);
159await WriteValue(exemplar.Value, cancel);
160await _stream.Value.WriteAsync(Space, 0, Space.Length, cancel);
161await WriteValue(exemplar.Timestamp, cancel);
162}
163
164private async Task WriteLabel(byte[] label, byte[] value, CancellationToken cancel)
165{
166await _stream.Value.WriteAsync(label, 0, label.Length, cancel);
167await _stream.Value.WriteAsync(Equal, 0, Equal.Length, cancel);
168await _stream.Value.WriteAsync(Quote, 0, Quote.Length, cancel);
169await _stream.Value.WriteAsync(value, 0, value.Length, cancel);
170await _stream.Value.WriteAsync(Quote, 0, Quote.Length, cancel);
171}
172
173private async Task WriteValue(double value, CancellationToken cancel)
174{
175if (_expositionFormat == ExpositionFormat.OpenMetricsText)
176{
177switch (value)
178{
179case 0:
180await _stream.Value.WriteAsync(FloatZero, 0, FloatZero.Length, cancel);
181return;
182case 1:
183await _stream.Value.WriteAsync(FloatPositiveOne, 0, FloatPositiveOne.Length, cancel);
184return;
185case -1:
186await _stream.Value.WriteAsync(FloatNegativeOne, 0, FloatNegativeOne.Length, cancel);
187return;
188case double.PositiveInfinity:
189await _stream.Value.WriteAsync(PositiveInfinity, 0, PositiveInfinity.Length, cancel);
190return;
191case double.NegativeInfinity:
192await _stream.Value.WriteAsync(NegativeInfinity, 0, NegativeInfinity.Length, cancel);
193return;
194case double.NaN:
195await _stream.Value.WriteAsync(NotANumber, 0, NotANumber.Length, cancel);
196return;
197}
198}
199
200var valueAsString = value.ToString("g", CultureInfo.InvariantCulture);
201
202var numBytes = PrometheusConstants.ExportEncoding.GetBytes(valueAsString, 0, valueAsString.Length, _stringBytesBuffer, 0);
203await _stream.Value.WriteAsync(_stringBytesBuffer, 0, numBytes, cancel);
204
205// In certain places (e.g. "le" label) we need floating point values to actually have the decimal point in them for OpenMetrics.
206if (_expositionFormat == ExpositionFormat.OpenMetricsText && valueAsString.IndexOfAny(DotEChar) == -1 /* did not contain .|e */)
207await _stream.Value.WriteAsync(DotZero, 0, DotZero.Length, cancel);
208}
209
210private async Task WriteValue(long value, CancellationToken cancel)
211{
212if (_expositionFormat == ExpositionFormat.OpenMetricsText)
213{
214switch (value)
215{
216case 0:
217await _stream.Value.WriteAsync(IntZero, 0, IntZero.Length, cancel);
218return;
219case 1:
220await _stream.Value.WriteAsync(IntPositiveOne, 0, IntPositiveOne.Length, cancel);
221return;
222case -1:
223await _stream.Value.WriteAsync(IntNegativeOne, 0, IntNegativeOne.Length, cancel);
224return;
225}
226}
227
228var valueAsString = value.ToString("D", CultureInfo.InvariantCulture);
229var numBytes = PrometheusConstants.ExportEncoding.GetBytes(valueAsString, 0, valueAsString.Length, _stringBytesBuffer, 0);
230await _stream.Value.WriteAsync(_stringBytesBuffer, 0, numBytes, cancel);
231}
232
233// Reuse a buffer to do the serialization and UTF-8 encoding.
234// Size limit guided by https://stackoverflow.com/questions/21146544/what-is-the-maximum-length-of-double-tostringd
235private readonly char[] _stringCharsBuffer = new char[32];
236private readonly byte[] _stringBytesBuffer = new byte[32];
237
238private readonly ExpositionFormat _expositionFormat;
239
240/// <summary>
241/// Creates a metric identifier, with an optional name postfix and an optional extra label to append to the end.
242/// familyname_postfix{labelkey1="labelvalue1",labelkey2="labelvalue2"}
243/// Note: Terminates with a SPACE
244/// </summary>
245private async Task WriteIdentifierPartAsync(byte[] name, byte[] flattenedLabels,
246CanonicalLabel canonicalLabel, byte[]? suffix, CancellationToken cancel)
247{
248await _stream.Value.WriteAsync(name, 0, name.Length, cancel);
249if (suffix != null && suffix.Length > 0)
250{
251await _stream.Value.WriteAsync(Underscore, 0, Underscore.Length, cancel);
252await _stream.Value.WriteAsync(suffix, 0, suffix.Length, cancel);
253}
254
255if (flattenedLabels.Length > 0 || canonicalLabel.IsNotEmpty)
256{
257await _stream.Value.WriteAsync(LeftBrace, 0, LeftBrace.Length, cancel);
258if (flattenedLabels.Length > 0)
259{
260await _stream.Value.WriteAsync(flattenedLabels, 0, flattenedLabels.Length, cancel);
261}
262
263// Extra labels go to the end (i.e. they are deepest to inherit from).
264if (canonicalLabel.IsNotEmpty)
265{
266if (flattenedLabels.Length > 0)
267{
268await _stream.Value.WriteAsync(Comma, 0, Comma.Length, cancel);
269}
270
271await _stream.Value.WriteAsync(canonicalLabel.Name, 0, canonicalLabel.Name.Length, cancel);
272await _stream.Value.WriteAsync(Equal, 0, Equal.Length, cancel);
273await _stream.Value.WriteAsync(Quote, 0, Quote.Length, cancel);
274if (_expositionFormat == ExpositionFormat.OpenMetricsText)
275await _stream.Value.WriteAsync(
276canonicalLabel.OpenMetrics, 0, canonicalLabel.OpenMetrics.Length, cancel);
277else
278await _stream.Value.WriteAsync(
279canonicalLabel.Prometheus, 0, canonicalLabel.Prometheus.Length, cancel);
280await _stream.Value.WriteAsync(Quote, 0, Quote.Length, cancel);
281}
282
283await _stream.Value.WriteAsync(RightBraceSpace, 0, RightBraceSpace.Length, cancel);
284}
285else
286{
287await _stream.Value.WriteAsync(Space, 0, Space.Length, cancel);
288}
289}
290
291/// <summary>
292/// Encode the special variable in regular Prometheus form and also return a OpenMetrics variant, these can be
293/// the same.
294/// see: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#considerations-canonical-numbers
295/// </summary>
296internal static CanonicalLabel EncodeValueAsCanonicalLabel(byte[] name, double value)
297{
298if (double.IsPositiveInfinity(value))
299return new CanonicalLabel(name, PositiveInfinity, PositiveInfinity);
300
301var valueAsString = value.ToString("g", CultureInfo.InvariantCulture);
302var prometheusBytes = PrometheusConstants.ExportEncoding.GetBytes(valueAsString);
303
304var openMetricsBytes = prometheusBytes;
305
306// Identify whether the original value is floating-point, by checking for presence of the 'e' or '.' characters.
307if (valueAsString.IndexOfAny(DotEChar) == -1)
308{
309// OpenMetrics requires labels containing numeric values to be expressed in floating point format.
310// If all we find is an integer, we add a ".0" to the end to make it a floating point value.
311openMetricsBytes = PrometheusConstants.ExportEncoding.GetBytes(valueAsString + ".0");
312}
313
314return new CanonicalLabel(name, prometheusBytes, openMetricsBytes);
315}
316}
317#endif