prometheus-net

Форк
0
/
MetricPusher.cs 
179 строк · 7.1 Кб
1
using System.Diagnostics;
2
using System.Text;
3

4
namespace Prometheus;
5

6
/// <summary>
7
/// A metric server that regularly pushes metrics to a Prometheus PushGateway.
8
/// </summary>
9
public class MetricPusher : MetricHandler
10
{
11
    private readonly TimeSpan _pushInterval;
12
    private readonly HttpMethod _method;
13
    private readonly Uri _targetUrl;
14
    private readonly Func<HttpClient> _httpClientProvider;
15

16
    public MetricPusher(string endpoint, string job, string? instance = null, long intervalMilliseconds = 1000, IEnumerable<Tuple<string, string>>? additionalLabels = null, CollectorRegistry? registry = null, bool pushReplace = false) : this(new MetricPusherOptions
17
    {
18
        Endpoint = endpoint,
19
        Job = job,
20
        Instance = instance,
21
        IntervalMilliseconds = intervalMilliseconds,
22
        AdditionalLabels = additionalLabels,
23
        Registry = registry,
24
        ReplaceOnPush = pushReplace,
25
    })
26
    {
27
    }
28

29
    public MetricPusher(MetricPusherOptions options)
30
    {
31
        if (string.IsNullOrEmpty(options.Endpoint))
32
            throw new ArgumentNullException(nameof(options.Endpoint));
33

34
        if (string.IsNullOrEmpty(options.Job))
35
            throw new ArgumentNullException(nameof(options.Job));
36

37
        if (options.IntervalMilliseconds <= 0)
38
            throw new ArgumentException("Interval must be greater than zero", nameof(options.IntervalMilliseconds));
39

40
        _registry = options.Registry ?? Metrics.DefaultRegistry;
41

42
        _httpClientProvider = options.HttpClientProvider ?? (() => _singletonHttpClient);
43

44
        StringBuilder sb = new StringBuilder(string.Format("{0}/job/{1}", options.Endpoint!.TrimEnd('/'), options.Job));
45
        if (!string.IsNullOrEmpty(options.Instance))
46
            sb.AppendFormat("/instance/{0}", options.Instance);
47

48
        if (options.AdditionalLabels != null)
49
        {
50
            foreach (var pair in options.AdditionalLabels)
51
            {
52
                if (pair == null || string.IsNullOrEmpty(pair.Item1) || string.IsNullOrEmpty(pair.Item2))
53
                    throw new NotSupportedException($"Invalid {nameof(MetricPusher)} additional label: ({pair?.Item1}):({pair?.Item2})");
54

55
                sb.AppendFormat("/{0}/{1}", pair.Item1, pair.Item2);
56
            }
57
        }
58

59
        if (!Uri.TryCreate(sb.ToString(), UriKind.Absolute, out var targetUrl) || targetUrl == null)
60
        {
61
            throw new ArgumentException("Endpoint must be a valid url", nameof(options.Endpoint));
62
        }
63

64
        _targetUrl = targetUrl;
65

66
        _pushInterval = TimeSpan.FromMilliseconds(options.IntervalMilliseconds);
67
        _onError = options.OnError;
68

69
        _method = options.ReplaceOnPush ? HttpMethod.Put : HttpMethod.Post;
70
    }
71

72
    private static readonly HttpClient _singletonHttpClient = new();
73

74
    private readonly CollectorRegistry _registry;
75
    private readonly Action<Exception>? _onError;
76

77
    protected override Task StartServer(CancellationToken cancel)
78
    {
79
        // Start the server processing loop asynchronously in the background.
80
        return Task.Run(async delegate
81
        {
82
            // We do 1 final push after we get cancelled, to ensure that we publish the final state.
83
            var pushingFinalState = false;
84

85
            while (true)
86
            {
87
                // We schedule approximately at the configured interval. There may be some small accumulation for the
88
                // part of the loop we do not measure but it is close enough to be acceptable for all practical scenarios.
89
                var duration = ValueStopwatch.StartNew();
90

91
                try
92
                {
93
                    var httpClient = _httpClientProvider();
94

95
                    var request = new HttpRequestMessage
96
                    {
97
                        Method = _method,
98
                        RequestUri = _targetUrl,
99
                        // We use a copy-pasted implementation of PushStreamContent here to avoid taking a dependency on the old ASP.NET Web API where it lives.
100
                        Content = new PushStreamContentInternal(async (stream, content, context) =>
101
                        {
102
                            try
103
                            {
104
                                // Do not pass CT because we only want to cancel after pushing, so a flush is always performed.
105
                                await _registry.CollectAndExportAsTextAsync(stream, default);
106
                            }
107
                            finally
108
                            {
109
                                stream.Close();
110
                            }
111
                        }, PrometheusConstants.ExporterContentTypeValue),
112
                    };
113

114
                    var response = await httpClient.SendAsync(request);
115

116
                    // If anything goes wrong, we want to get at least an entry in the trace log.
117
                    response.EnsureSuccessStatusCode();
118
                }
119
                catch (ScrapeFailedException ex)
120
                {
121
                    // We do not consider failed scrapes a reportable error since the user code that raises the failure should be the one logging it.
122
                    Trace.WriteLine($"Skipping metrics push due to failed scrape: {ex.Message}");
123
                }
124
                catch (Exception ex)
125
                {
126
                    HandleFailedPush(ex);
127
                }
128

129
                if (cancel.IsCancellationRequested)
130
                {
131
                    if (!pushingFinalState)
132
                    {
133
                        // Continue for one more loop to push the final state.
134
                        // We do this because it might be that we were stopped while in the middle of a push.
135
                        pushingFinalState = true;
136
                        continue;
137
                    }
138
                    else
139
                    {
140
                        // Final push completed, time to pack up our things and go home.
141
                        break;
142
                    }
143
                }
144

145
                var sleepTime = _pushInterval - duration.GetElapsedTime();
146

147
                // Sleep until the interval elapses or the pusher is asked to shut down.
148
                if (sleepTime > TimeSpan.Zero)
149
                {
150
                    try
151
                    {
152
                        await Task.Delay(sleepTime, cancel);
153
                    }
154
                    catch (OperationCanceledException) when (cancel.IsCancellationRequested)
155
                    {
156
                        // The task was cancelled.
157
                        // We continue the loop here to ensure final state gets pushed.
158
                        pushingFinalState = true;
159
                        continue;
160
                    }
161
                }
162
            }
163
        });
164
    }
165

166
    private void HandleFailedPush(Exception ex)
167
    {
168
        if (_onError != null)
169
        {
170
            // Asynchronous because we don't trust the callee to be fast.
171
            Task.Run(() => _onError(ex));
172
        }
173
        else
174
        {
175
            // If there is no error handler registered, we write to trace to at least hopefully get some attention to the problem.
176
            Trace.WriteLine(string.Format("Error in MetricPusher: {0}", ex));
177
        }
178
    }
179
}
180

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.