kuma
497 строк · 16.3 Кб
1package meshmetric
2
3import (
4"encoding/json"
5"fmt"
6"net"
7
8. "github.com/onsi/ginkgo/v2"
9. "github.com/onsi/gomega"
10
11mads "github.com/kumahq/kuma/api/observability/v1"
12"github.com/kumahq/kuma/pkg/plugins/policies/meshmetric/api/v1alpha1"
13. "github.com/kumahq/kuma/test/framework"
14"github.com/kumahq/kuma/test/framework/client"
15"github.com/kumahq/kuma/test/framework/deployments/democlient"
16"github.com/kumahq/kuma/test/framework/deployments/otelcollector"
17"github.com/kumahq/kuma/test/framework/deployments/testserver"
18"github.com/kumahq/kuma/test/framework/envs/kubernetes"
19)
20
21func BasicMeshMetricForMesh(policyName string, mesh string) InstallFunc {
22meshMetric := fmt.Sprintf(`
23apiVersion: kuma.io/v1alpha1
24kind: MeshMetric
25metadata:
26name: %s
27namespace: %s
28labels:
29kuma.io/mesh: %s
30spec:
31targetRef:
32kind: Mesh
33default:
34backends:
35- type: Prometheus
36prometheus:
37port: 8080
38path: /metrics
39tls:
40mode: Disabled
41`, policyName, Config.KumaNamespace, mesh)
42return YamlK8s(meshMetric)
43}
44
45func MeshMetricMultiplePrometheusBackends(policyName string, mesh string, firstPrometheus string, secondPrometheus string) InstallFunc {
46meshMetric := fmt.Sprintf(`
47apiVersion: kuma.io/v1alpha1
48kind: MeshMetric
49metadata:
50name: %s
51namespace: %s
52labels:
53kuma.io/mesh: %s
54spec:
55targetRef:
56kind: Mesh
57default:
58backends:
59- type: Prometheus
60prometheus:
61clientId: %s
62port: 8080
63path: /metrics
64tls:
65mode: Disabled
66- type: Prometheus
67prometheus:
68clientId: %s
69port: 8081
70path: /metrics
71tls:
72mode: Disabled
73`, policyName, Config.KumaNamespace, mesh, firstPrometheus, secondPrometheus)
74return YamlK8s(meshMetric)
75}
76
77func MeshMetricWithSpecificPrometheusClientId(policyName string, mesh string, clientId string) InstallFunc {
78meshMetric := fmt.Sprintf(`
79apiVersion: kuma.io/v1alpha1
80kind: MeshMetric
81metadata:
82name: %s
83namespace: %s
84labels:
85kuma.io/mesh: %s
86spec:
87targetRef:
88kind: Mesh
89default:
90backends:
91- type: Prometheus
92prometheus:
93clientId: %s
94port: 8080
95path: /metrics
96tls:
97mode: Disabled
98`, policyName, Config.KumaNamespace, mesh, clientId)
99return YamlK8s(meshMetric)
100}
101
102func MeshMetricWithSpecificPrometheusBackendForMeshService(mesh string, clientId string, serviceName string) InstallFunc {
103meshMetric := fmt.Sprintf(`
104apiVersion: kuma.io/v1alpha1
105kind: MeshMetric
106metadata:
107name: mesh-metric-2
108namespace: %s
109labels:
110kuma.io/mesh: %s
111spec:
112targetRef:
113kind: MeshService
114name: %s
115default:
116backends:
117- type: Prometheus
118prometheus:
119clientId: %s
120port: 8080
121path: /metrics
122tls:
123mode: Disabled
124`, Config.KumaNamespace, mesh, serviceName, clientId)
125return YamlK8s(meshMetric)
126}
127
128func MeshMetricWithApplicationForMesh(policyName, mesh, path string) InstallFunc {
129meshMetric := fmt.Sprintf(`
130apiVersion: kuma.io/v1alpha1
131kind: MeshMetric
132metadata:
133name: %s
134namespace: %s
135labels:
136kuma.io/mesh: %s
137spec:
138targetRef:
139kind: Mesh
140default:
141applications:
142- path: "%s"
143port: 80
144backends:
145- type: Prometheus
146prometheus:
147port: 8080
148path: /metrics
149tls:
150mode: Disabled
151`, policyName, Config.KumaNamespace, mesh, path)
152return YamlK8s(meshMetric)
153}
154
155func MeshMetricWithOpenTelemetryBackend(mesh, openTelemetryEndpoint string) InstallFunc {
156meshMetric := fmt.Sprintf(`
157apiVersion: kuma.io/v1alpha1
158kind: MeshMetric
159metadata:
160name: otel-metrics
161namespace: %s
162labels:
163kuma.io/mesh: %s
164spec:
165targetRef:
166kind: Mesh
167default:
168backends:
169- type: OpenTelemetry
170openTelemetry:
171endpoint: %s
172`, Config.KumaNamespace, mesh, openTelemetryEndpoint)
173return YamlK8s(meshMetric)
174}
175
176func MeshMetricWithOpenTelemetryAndPrometheusBackend(mesh, openTelemetryEndpoint string) InstallFunc {
177meshMetric := fmt.Sprintf(`
178apiVersion: kuma.io/v1alpha1
179kind: MeshMetric
180metadata:
181name: otel-metrics
182namespace: %s
183labels:
184kuma.io/mesh: %s
185spec:
186targetRef:
187kind: Mesh
188default:
189backends:
190- type: OpenTelemetry
191openTelemetry:
192endpoint: %s
193- type: Prometheus
194prometheus:
195port: 8080
196path: /metrics
197tls:
198mode: Disabled
199`, Config.KumaNamespace, mesh, openTelemetryEndpoint)
200return YamlK8s(meshMetric)
201}
202
203func MeshMetric() {
204const (
205namespace = "meshmetric"
206observabilityNamespace = "observability"
207mainMesh = "main-metrics-mesh"
208mainPrometheusId = "main-prometheus"
209secondaryMesh = "secondary-metrics-mesh"
210secondaryPrometheusId = "secondary-prometheus"
211)
212
213BeforeAll(func() {
214err := NewClusterSetup().
215Install(MeshKubernetes(mainMesh)).
216Install(MeshKubernetes(secondaryMesh)).
217Install(NamespaceWithSidecarInjection(namespace)).
218Install(Namespace(observabilityNamespace)).
219Install(otelcollector.Install(
220otelcollector.WithNamespace(observabilityNamespace),
221otelcollector.WithIPv6(Config.IPV6),
222)).
223Install(democlient.Install(democlient.WithNamespace(observabilityNamespace))).
224Setup(kubernetes.Cluster)
225Expect(err).To(Succeed())
226
227for i := 0; i < 2; i++ {
228Expect(
229kubernetes.Cluster.Install(testserver.Install(
230testserver.WithName(fmt.Sprintf("test-server-%d", i)),
231testserver.WithMesh(mainMesh),
232testserver.WithNamespace(namespace),
233)),
234).To(Succeed())
235}
236for i := 2; i < 4; i++ {
237Expect(
238kubernetes.Cluster.Install(testserver.Install(
239testserver.WithName(fmt.Sprintf("test-server-%d", i)),
240testserver.WithMesh(secondaryMesh),
241testserver.WithNamespace(namespace),
242)),
243).To(Succeed())
244}
245})
246
247E2EAfterEach(func() {
248Expect(DeleteMeshResources(kubernetes.Cluster, mainMesh, v1alpha1.MeshMetricResourceTypeDescriptor)).To(Succeed())
249Expect(DeleteMeshResources(kubernetes.Cluster, secondaryMesh, v1alpha1.MeshMetricResourceTypeDescriptor)).To(Succeed())
250})
251
252E2EAfterAll(func() {
253Expect(kubernetes.Cluster.TriggerDeleteNamespace(namespace)).To(Succeed())
254Expect(kubernetes.Cluster.DeleteMesh(mainMesh)).To(Succeed())
255Expect(kubernetes.Cluster.DeleteMesh(secondaryMesh)).To(Succeed())
256})
257
258It("Basic MeshMetric policy exposes Envoy metrics on correct port", func() {
259// given
260Expect(kubernetes.Cluster.Install(BasicMeshMetricForMesh("mesh-policy", mainMesh))).To(Succeed())
261podIp, err := PodIPOfApp(kubernetes.Cluster, "test-server-0", namespace)
262Expect(err).ToNot(HaveOccurred())
263
264// then
265Eventually(func(g Gomega) {
266stdout, _, err := client.CollectResponse(
267kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8080")+"/metrics",
268client.FromKubernetesPod(namespace, "test-server-0"),
269)
270
271g.Expect(err).ToNot(HaveOccurred())
272g.Expect(stdout).ToNot(BeNil())
273// metric from envoy
274g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
275}).Should(Succeed())
276})
277
278It("MeshMetric policy with multiple Prometheus backends", func() {
279// given
280Expect(kubernetes.Cluster.Install(MeshMetricMultiplePrometheusBackends("mesh-policy", mainMesh, mainPrometheusId, secondaryPrometheusId))).To(Succeed())
281podIp, err := PodIPOfApp(kubernetes.Cluster, "test-server-0", namespace)
282Expect(err).ToNot(HaveOccurred())
283
284// then
285Eventually(func(g Gomega) {
286// main Prometheus backend
287stdout, _, err := client.CollectResponse(
288kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8080")+"/metrics",
289client.FromKubernetesPod(namespace, "test-server-0"),
290)
291
292g.Expect(err).ToNot(HaveOccurred())
293g.Expect(stdout).ToNot(BeNil())
294g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
295
296// secondary Prometheus backend
297stdout, _, err = client.CollectResponse(
298kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8081")+"/metrics",
299client.FromKubernetesPod(namespace, "test-server-0"),
300)
301
302g.Expect(err).ToNot(HaveOccurred())
303g.Expect(stdout).ToNot(BeNil())
304// metric from envoy
305g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
306}).Should(Succeed())
307})
308
309It("MeshMetric policy with dynamic configuration and application aggregation correctly exposes aggregated metrics", func() {
310// given
311Expect(kubernetes.Cluster.Install(MeshMetricWithApplicationForMesh("dynamic-config", mainMesh, "/path-stats"))).To(Succeed())
312podIp, err := PodIPOfApp(kubernetes.Cluster, "test-server-0", namespace)
313Expect(err).ToNot(HaveOccurred())
314
315// then
316Eventually(func(g Gomega) {
317stdout, _, err := client.CollectResponse(
318kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8080")+"/metrics",
319client.FromKubernetesPod(namespace, "test-server-0"),
320)
321
322g.Expect(err).ToNot(HaveOccurred())
323g.Expect(stdout).ToNot(BeNil())
324// metric from envoy
325g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
326g.Expect(stdout).To(ContainSubstring("path-stats"))
327}, "1m", "1s").Should(Succeed())
328
329// update policy config and check if changes was applied on DPP
330Expect(kubernetes.Cluster.Install(MeshMetricWithApplicationForMesh("dynamic-config", mainMesh, "/app-stats"))).To(Succeed())
331
332// then
333Eventually(func(g Gomega) {
334stdout, _, err := client.CollectResponse(
335kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8080")+"/metrics",
336client.FromKubernetesPod(namespace, "test-server-0"),
337)
338
339g.Expect(err).ToNot(HaveOccurred())
340g.Expect(stdout).ToNot(BeNil())
341// metric from envoy
342g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
343g.Expect(stdout).ToNot(ContainSubstring("path-stats"))
344g.Expect(stdout).To(ContainSubstring("app-stats"))
345}, "1m", "1s").Should(Succeed())
346})
347
348It("MADS server response contains DPPs from all meshes when prometheus client id is empty", func() {
349// given
350Expect(kubernetes.Cluster.Install(BasicMeshMetricForMesh("main-mesh-policy", mainMesh))).To(Succeed())
351Expect(kubernetes.Cluster.Install(BasicMeshMetricForMesh("secondary-mesh-policy", secondaryMesh))).To(Succeed())
352
353// then
354Eventually(func(g Gomega) {
355assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(mainPrometheusId)
356g.Expect(err).ToNot(HaveOccurred())
357
358madsResponse := MonitoringAssignmentResponse{}
359g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
360// all DPPs from both meshes in single MADS response
361g.Expect(getServicesFrom(madsResponse)).To(ConsistOf(
362"test-server-0_meshmetric_svc_80", "test-server-1_meshmetric_svc_80", "test-server-2_meshmetric_svc_80", "test-server-3_meshmetric_svc_80",
363))
364}).Should(Succeed())
365
366// and same response for secondary backend
367Eventually(func(g Gomega) {
368assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(secondaryPrometheusId)
369g.Expect(err).ToNot(HaveOccurred())
370
371madsResponse := MonitoringAssignmentResponse{}
372g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
373// all DPPs from both meshes in single MADS response
374g.Expect(getServicesFrom(madsResponse)).To(ConsistOf(
375"test-server-0_meshmetric_svc_80", "test-server-1_meshmetric_svc_80", "test-server-2_meshmetric_svc_80", "test-server-3_meshmetric_svc_80",
376))
377}).Should(Succeed())
378})
379
380It("MADS server response contains DPPs from corresponding meshes when prometheus client id is set", func() {
381// given
382Expect(kubernetes.Cluster.Install(MeshMetricWithSpecificPrometheusClientId("main-mesh-policy", mainMesh, mainPrometheusId))).To(Succeed())
383Expect(kubernetes.Cluster.Install(MeshMetricWithSpecificPrometheusClientId("secondary-mesh-policy", secondaryMesh, secondaryPrometheusId))).To(Succeed())
384
385// then
386Eventually(func(g Gomega) {
387assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(mainPrometheusId)
388g.Expect(err).ToNot(HaveOccurred())
389
390madsResponse := MonitoringAssignmentResponse{}
391g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
392// all DPPs from primaryMesh for primary Prometheus backend
393g.Expect(getServicesFrom(madsResponse)).To(ConsistOf(
394"test-server-0_meshmetric_svc_80", "test-server-1_meshmetric_svc_80",
395))
396}).Should(Succeed())
397
398// and
399Eventually(func(g Gomega) {
400assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(secondaryPrometheusId)
401g.Expect(err).ToNot(HaveOccurred())
402
403madsResponse := MonitoringAssignmentResponse{}
404g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
405// all DPPs from secondaryMesh for secondary Prometheus backend
406g.Expect(getServicesFrom(madsResponse)).To(ConsistOf(
407"test-server-2_meshmetric_svc_80", "test-server-3_meshmetric_svc_80",
408))
409}).Should(Succeed())
410})
411
412It("override MADS response for single DPP in mesh", func() {
413// given
414Expect(kubernetes.Cluster.Install(MeshMetricWithSpecificPrometheusClientId("main-mesh-policy", mainMesh, mainPrometheusId))).To(Succeed())
415Expect(kubernetes.Cluster.Install(MeshMetricWithSpecificPrometheusBackendForMeshService(mainMesh, secondaryPrometheusId, "test-server-1_meshmetric_svc_80"))).To(Succeed())
416
417// then
418Eventually(func(g Gomega) {
419assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(mainPrometheusId)
420g.Expect(err).ToNot(HaveOccurred())
421
422madsResponse := MonitoringAssignmentResponse{}
423g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
424// two DPPs configured by Mesh targetRef
425g.Expect(getServicesFrom(madsResponse)).To(ConsistOf("test-server-0_meshmetric_svc_80"))
426}).Should(Succeed())
427
428// and
429Eventually(func(g Gomega) {
430assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(secondaryPrometheusId)
431g.Expect(err).ToNot(HaveOccurred())
432
433madsResponse := MonitoringAssignmentResponse{}
434g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
435// single DPP overridden by MeshService targetRef
436g.Expect(getServicesFrom(madsResponse)).To(ConsistOf("test-server-1_meshmetric_svc_80"))
437}).Should(Succeed())
438})
439
440It("MeshMetric with OpenTelemetry enabled", func() {
441// given
442openTelemetryCollector := otelcollector.From(kubernetes.Cluster)
443Expect(kubernetes.Cluster.Install(MeshMetricWithOpenTelemetryBackend(mainMesh, openTelemetryCollector.CollectorEndpoint()))).To(Succeed())
444
445// then
446Eventually(func(g Gomega) {
447stdout, _, err := client.CollectResponse(
448kubernetes.Cluster, "demo-client", openTelemetryCollector.ExporterEndpoint(),
449client.FromKubernetesPod(observabilityNamespace, "demo-client"),
450client.WithVerbose(),
451)
452g.Expect(err).ToNot(HaveOccurred())
453g.Expect(stdout).To(ContainSubstring("envoy_cluster_external_upstream_rq_time_bucket"))
454}, "2m", "3s").Should(Succeed())
455})
456
457It("MeshMetric with OpenTelemetry and Prometheus enabled", func() {
458// given
459openTelemetryCollector := otelcollector.From(kubernetes.Cluster)
460testServerIp, err := PodIPOfApp(kubernetes.Cluster, "test-server-0", namespace)
461Expect(err).ToNot(HaveOccurred())
462Expect(kubernetes.Cluster.Install(MeshMetricWithOpenTelemetryAndPrometheusBackend(mainMesh, openTelemetryCollector.CollectorEndpoint()))).To(Succeed())
463
464// then
465Eventually(func(g Gomega) {
466// metrics from OpenTelemetry
467stdout, _, err := client.CollectResponse(
468kubernetes.Cluster, "demo-client", openTelemetryCollector.ExporterEndpoint(),
469client.FromKubernetesPod(observabilityNamespace, "demo-client"),
470client.WithVerbose(),
471)
472g.Expect(err).ToNot(HaveOccurred())
473g.Expect(stdout).To(ContainSubstring("envoy_cluster_external_upstream_rq_time_bucket"))
474
475// metrics from Prometheus
476stdout, _, err = client.CollectResponse(
477kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(testServerIp, "8080")+"/metrics",
478client.FromKubernetesPod(namespace, "test-server-0"),
479)
480g.Expect(err).ToNot(HaveOccurred())
481g.Expect(stdout).ToNot(BeNil())
482g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
483}, "2m", "3s").Should(Succeed())
484})
485}
486
487func getServicesFrom(response MonitoringAssignmentResponse) []string {
488var services []string
489for _, assignment := range response.Resources {
490services = append(services, assignment.Service)
491}
492return services
493}
494
495type MonitoringAssignmentResponse struct {
496Resources []*mads.MonitoringAssignment `json:"resources"`
497}
498