crossplane
121 строка · 4.2 Кб
1/*
2Copyright 2023 The Crossplane Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License"); you may not use
5this file except in compliance with the License. You may obtain a copy of the
6License at
7
8http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software distributed
11under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
12CONDITIONS OF ANY KIND, either express or implied. See the License for the
13specific language governing permissions and limitations under the License.
14*/
15
16package xfn
17
18import (
19"context"
20"time"
21
22"github.com/prometheus/client_golang/prometheus"
23"google.golang.org/grpc"
24"google.golang.org/grpc/status"
25
26"github.com/crossplane/crossplane/apis/apiextensions/fn/proto/v1beta1"
27)
28
29// Metrics are requests, errors, and duration (RED) metrics for composition
30// function runs.
31type Metrics struct {
32requests *prometheus.CounterVec
33responses *prometheus.CounterVec
34duration *prometheus.HistogramVec
35}
36
37// NewMetrics creates metrics for composition function runs.
38func NewMetrics() *Metrics {
39return &Metrics{
40requests: prometheus.NewCounterVec(prometheus.CounterOpts{
41Subsystem: "composition",
42Name: "run_function_request_total",
43Help: "Total number of RunFunctionRequests sent.",
44}, []string{"function_name", "function_package", "grpc_target"}),
45
46responses: prometheus.NewCounterVec(prometheus.CounterOpts{
47Subsystem: "composition",
48Name: "run_function_response_total",
49Help: "Total number of RunFunctionResponses received.",
50}, []string{"function_name", "function_package", "grpc_target", "grpc_code", "result_severity"}),
51
52duration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
53Subsystem: "composition",
54Name: "run_function_seconds",
55Help: "Histogram of RunFunctionResponse latency (seconds).",
56Buckets: prometheus.DefBuckets,
57}, []string{"function_name", "function_package", "grpc_target", "grpc_code", "result_severity"}),
58}
59}
60
61// Describe sends the super-set of all possible descriptors of metrics
62// collected by this Collector to the provided channel and returns once
63// the last descriptor has been sent.
64func (m *Metrics) Describe(ch chan<- *prometheus.Desc) {
65m.requests.Describe(ch)
66m.responses.Describe(ch)
67m.duration.Describe(ch)
68}
69
70// Collect is called by the Prometheus registry when collecting
71// metrics. The implementation sends each collected metric via the
72// provided channel and returns once the last metric has been sent.
73func (m *Metrics) Collect(ch chan<- prometheus.Metric) {
74m.requests.Collect(ch)
75m.responses.Collect(ch)
76m.duration.Collect(ch)
77}
78
79// CreateInterceptor returns a gRPC UnaryClientInterceptor for the named
80// function. The supplied package (pkg) should be the package's OCI reference.
81func (m *Metrics) CreateInterceptor(name, pkg string) grpc.UnaryClientInterceptor {
82return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
83l := prometheus.Labels{"function_name": name, "function_package": pkg, "grpc_target": cc.Target()}
84
85m.requests.With(l).Inc()
86
87start := time.Now()
88err := invoker(ctx, method, req, reply, cc, opts...)
89duration := time.Since(start)
90
91s, _ := status.FromError(err)
92l["grpc_code"] = s.Code().String()
93
94// We consider the 'severity' of the response to be that of the most
95// severe result in the response. A response with no results, or only
96// normal results, has severity "Normal". A response with warnings, but
97// no fatal results, has severity "Warning". A response with fatal
98// results has severity "Fatal".
99l["result_severity"] = "Normal"
100if rsp, ok := reply.(*v1beta1.RunFunctionResponse); ok {
101for _, r := range rsp.GetResults() {
102// Keep iterating if we see a warning result - we might still
103// see a fatal result.
104if r.GetSeverity() == v1beta1.Severity_SEVERITY_WARNING {
105l["result_severity"] = "Warning"
106}
107// Break if we see a fatal result, to ensure we don't downgrade
108// the severity to warning.
109if r.GetSeverity() == v1beta1.Severity_SEVERITY_FATAL {
110l["result_severity"] = "Fatal"
111break
112}
113}
114}
115
116m.responses.With(l).Inc()
117m.duration.With(l).Observe(duration.Seconds())
118
119return err
120}
121}
122