prometheus-net
179 строк · 7.1 Кб
1using System.Diagnostics;2using System.Text;3
4namespace Prometheus;5
6/// <summary>
7/// A metric server that regularly pushes metrics to a Prometheus PushGateway.
8/// </summary>
9public class MetricPusher : MetricHandler10{
11private readonly TimeSpan _pushInterval;12private readonly HttpMethod _method;13private readonly Uri _targetUrl;14private readonly Func<HttpClient> _httpClientProvider;15
16public MetricPusher(string endpoint, string job, string? instance = null, long intervalMilliseconds = 1000, IEnumerable<Tuple<string, string>>? additionalLabels = null, CollectorRegistry? registry = null, bool pushReplace = false) : this(new MetricPusherOptions17{18Endpoint = endpoint,19Job = job,20Instance = instance,21IntervalMilliseconds = intervalMilliseconds,22AdditionalLabels = additionalLabels,23Registry = registry,24ReplaceOnPush = pushReplace,25})26{27}28
29public MetricPusher(MetricPusherOptions options)30{31if (string.IsNullOrEmpty(options.Endpoint))32throw new ArgumentNullException(nameof(options.Endpoint));33
34if (string.IsNullOrEmpty(options.Job))35throw new ArgumentNullException(nameof(options.Job));36
37if (options.IntervalMilliseconds <= 0)38throw new ArgumentException("Interval must be greater than zero", nameof(options.IntervalMilliseconds));39
40_registry = options.Registry ?? Metrics.DefaultRegistry;41
42_httpClientProvider = options.HttpClientProvider ?? (() => _singletonHttpClient);43
44StringBuilder sb = new StringBuilder(string.Format("{0}/job/{1}", options.Endpoint!.TrimEnd('/'), options.Job));45if (!string.IsNullOrEmpty(options.Instance))46sb.AppendFormat("/instance/{0}", options.Instance);47
48if (options.AdditionalLabels != null)49{50foreach (var pair in options.AdditionalLabels)51{52if (pair == null || string.IsNullOrEmpty(pair.Item1) || string.IsNullOrEmpty(pair.Item2))53throw new NotSupportedException($"Invalid {nameof(MetricPusher)} additional label: ({pair?.Item1}):({pair?.Item2})");54
55sb.AppendFormat("/{0}/{1}", pair.Item1, pair.Item2);56}57}58
59if (!Uri.TryCreate(sb.ToString(), UriKind.Absolute, out var targetUrl) || targetUrl == null)60{61throw new ArgumentException("Endpoint must be a valid url", nameof(options.Endpoint));62}63
64_targetUrl = targetUrl;65
66_pushInterval = TimeSpan.FromMilliseconds(options.IntervalMilliseconds);67_onError = options.OnError;68
69_method = options.ReplaceOnPush ? HttpMethod.Put : HttpMethod.Post;70}71
72private static readonly HttpClient _singletonHttpClient = new();73
74private readonly CollectorRegistry _registry;75private readonly Action<Exception>? _onError;76
77protected override Task StartServer(CancellationToken cancel)78{79// Start the server processing loop asynchronously in the background.80return Task.Run(async delegate81{82// We do 1 final push after we get cancelled, to ensure that we publish the final state.83var pushingFinalState = false;84
85while (true)86{87// We schedule approximately at the configured interval. There may be some small accumulation for the88// part of the loop we do not measure but it is close enough to be acceptable for all practical scenarios.89var duration = ValueStopwatch.StartNew();90
91try92{93var httpClient = _httpClientProvider();94
95var request = new HttpRequestMessage96{97Method = _method,98RequestUri = _targetUrl,99// We use a copy-pasted implementation of PushStreamContent here to avoid taking a dependency on the old ASP.NET Web API where it lives.100Content = new PushStreamContentInternal(async (stream, content, context) =>101{102try103{104// Do not pass CT because we only want to cancel after pushing, so a flush is always performed.105await _registry.CollectAndExportAsTextAsync(stream, default);106}107finally108{109stream.Close();110}111}, PrometheusConstants.ExporterContentTypeValue),112};113
114var response = await httpClient.SendAsync(request);115
116// If anything goes wrong, we want to get at least an entry in the trace log.117response.EnsureSuccessStatusCode();118}119catch (ScrapeFailedException ex)120{121// We do not consider failed scrapes a reportable error since the user code that raises the failure should be the one logging it.122Trace.WriteLine($"Skipping metrics push due to failed scrape: {ex.Message}");123}124catch (Exception ex)125{126HandleFailedPush(ex);127}128
129if (cancel.IsCancellationRequested)130{131if (!pushingFinalState)132{133// Continue for one more loop to push the final state.134// We do this because it might be that we were stopped while in the middle of a push.135pushingFinalState = true;136continue;137}138else139{140// Final push completed, time to pack up our things and go home.141break;142}143}144
145var sleepTime = _pushInterval - duration.GetElapsedTime();146
147// Sleep until the interval elapses or the pusher is asked to shut down.148if (sleepTime > TimeSpan.Zero)149{150try151{152await Task.Delay(sleepTime, cancel);153}154catch (OperationCanceledException) when (cancel.IsCancellationRequested)155{156// The task was cancelled.157// We continue the loop here to ensure final state gets pushed.158pushingFinalState = true;159continue;160}161}162}163});164}165
166private void HandleFailedPush(Exception ex)167{168if (_onError != null)169{170// Asynchronous because we don't trust the callee to be fast.171Task.Run(() => _onError(ex));172}173else174{175// If there is no error handler registered, we write to trace to at least hopefully get some attention to the problem.176Trace.WriteLine(string.Format("Error in MetricPusher: {0}", ex));177}178}179}
180