kuma

Форк
0
497 строк · 16.3 Кб
1
package meshmetric
2

3
import (
4
	"encoding/json"
5
	"fmt"
6
	"net"
7

8
	. "github.com/onsi/ginkgo/v2"
9
	. "github.com/onsi/gomega"
10

11
	mads "github.com/kumahq/kuma/api/observability/v1"
12
	"github.com/kumahq/kuma/pkg/plugins/policies/meshmetric/api/v1alpha1"
13
	. "github.com/kumahq/kuma/test/framework"
14
	"github.com/kumahq/kuma/test/framework/client"
15
	"github.com/kumahq/kuma/test/framework/deployments/democlient"
16
	"github.com/kumahq/kuma/test/framework/deployments/otelcollector"
17
	"github.com/kumahq/kuma/test/framework/deployments/testserver"
18
	"github.com/kumahq/kuma/test/framework/envs/kubernetes"
19
)
20

21
func BasicMeshMetricForMesh(policyName string, mesh string) InstallFunc {
22
	meshMetric := fmt.Sprintf(`
23
apiVersion: kuma.io/v1alpha1
24
kind: MeshMetric
25
metadata:
26
  name: %s
27
  namespace: %s
28
  labels:
29
    kuma.io/mesh: %s
30
spec:
31
  targetRef:
32
    kind: Mesh
33
  default:
34
    backends:
35
      - type: Prometheus
36
        prometheus: 
37
          port: 8080
38
          path: /metrics
39
          tls:
40
            mode: Disabled
41
`, policyName, Config.KumaNamespace, mesh)
42
	return YamlK8s(meshMetric)
43
}
44

45
func MeshMetricMultiplePrometheusBackends(policyName string, mesh string, firstPrometheus string, secondPrometheus string) InstallFunc {
46
	meshMetric := fmt.Sprintf(`
47
apiVersion: kuma.io/v1alpha1
48
kind: MeshMetric
49
metadata:
50
  name: %s
51
  namespace: %s
52
  labels:
53
    kuma.io/mesh: %s
54
spec:
55
  targetRef:
56
    kind: Mesh
57
  default:
58
    backends:
59
      - type: Prometheus
60
        prometheus: 
61
          clientId: %s
62
          port: 8080
63
          path: /metrics
64
          tls:
65
            mode: Disabled
66
      - type: Prometheus
67
        prometheus: 
68
          clientId: %s
69
          port: 8081
70
          path: /metrics
71
          tls:
72
            mode: Disabled
73
`, policyName, Config.KumaNamespace, mesh, firstPrometheus, secondPrometheus)
74
	return YamlK8s(meshMetric)
75
}
76

77
func MeshMetricWithSpecificPrometheusClientId(policyName string, mesh string, clientId string) InstallFunc {
78
	meshMetric := fmt.Sprintf(`
79
apiVersion: kuma.io/v1alpha1
80
kind: MeshMetric
81
metadata:
82
  name: %s
83
  namespace: %s
84
  labels:
85
    kuma.io/mesh: %s
86
spec:
87
  targetRef:
88
    kind: Mesh
89
  default:
90
    backends:
91
      - type: Prometheus
92
        prometheus: 
93
          clientId: %s
94
          port: 8080
95
          path: /metrics
96
          tls:
97
            mode: Disabled
98
`, policyName, Config.KumaNamespace, mesh, clientId)
99
	return YamlK8s(meshMetric)
100
}
101

102
func MeshMetricWithSpecificPrometheusBackendForMeshService(mesh string, clientId string, serviceName string) InstallFunc {
103
	meshMetric := fmt.Sprintf(`
104
apiVersion: kuma.io/v1alpha1
105
kind: MeshMetric
106
metadata:
107
  name: mesh-metric-2
108
  namespace: %s
109
  labels:
110
    kuma.io/mesh: %s
111
spec:
112
  targetRef:
113
    kind: MeshService
114
    name: %s
115
  default:
116
    backends:
117
      - type: Prometheus
118
        prometheus: 
119
          clientId: %s
120
          port: 8080
121
          path: /metrics
122
          tls:
123
            mode: Disabled
124
`, Config.KumaNamespace, mesh, serviceName, clientId)
125
	return YamlK8s(meshMetric)
126
}
127

128
func MeshMetricWithApplicationForMesh(policyName, mesh, path string) InstallFunc {
129
	meshMetric := fmt.Sprintf(`
130
apiVersion: kuma.io/v1alpha1
131
kind: MeshMetric
132
metadata:
133
  name: %s
134
  namespace: %s
135
  labels:
136
    kuma.io/mesh: %s
137
spec:
138
  targetRef:
139
    kind: Mesh
140
  default:
141
    applications:
142
      - path: "%s"
143
        port: 80
144
    backends:
145
      - type: Prometheus
146
        prometheus: 
147
          port: 8080
148
          path: /metrics
149
          tls:
150
            mode: Disabled
151
`, policyName, Config.KumaNamespace, mesh, path)
152
	return YamlK8s(meshMetric)
153
}
154

155
func MeshMetricWithOpenTelemetryBackend(mesh, openTelemetryEndpoint string) InstallFunc {
156
	meshMetric := fmt.Sprintf(`
157
apiVersion: kuma.io/v1alpha1
158
kind: MeshMetric
159
metadata:
160
  name: otel-metrics
161
  namespace: %s
162
  labels:
163
    kuma.io/mesh: %s
164
spec:
165
  targetRef:
166
    kind: Mesh
167
  default:
168
    backends:
169
      - type: OpenTelemetry
170
        openTelemetry: 
171
          endpoint: %s
172
`, Config.KumaNamespace, mesh, openTelemetryEndpoint)
173
	return YamlK8s(meshMetric)
174
}
175

176
func MeshMetricWithOpenTelemetryAndPrometheusBackend(mesh, openTelemetryEndpoint string) InstallFunc {
177
	meshMetric := fmt.Sprintf(`
178
apiVersion: kuma.io/v1alpha1
179
kind: MeshMetric
180
metadata:
181
  name: otel-metrics
182
  namespace: %s
183
  labels:
184
    kuma.io/mesh: %s
185
spec:
186
  targetRef:
187
    kind: Mesh
188
  default:
189
    backends:
190
      - type: OpenTelemetry
191
        openTelemetry: 
192
          endpoint: %s
193
      - type: Prometheus
194
        prometheus: 
195
          port: 8080
196
          path: /metrics
197
          tls:
198
            mode: Disabled
199
`, Config.KumaNamespace, mesh, openTelemetryEndpoint)
200
	return YamlK8s(meshMetric)
201
}
202

203
func MeshMetric() {
204
	const (
205
		namespace              = "meshmetric"
206
		observabilityNamespace = "observability"
207
		mainMesh               = "main-metrics-mesh"
208
		mainPrometheusId       = "main-prometheus"
209
		secondaryMesh          = "secondary-metrics-mesh"
210
		secondaryPrometheusId  = "secondary-prometheus"
211
	)
212

213
	BeforeAll(func() {
214
		err := NewClusterSetup().
215
			Install(MeshKubernetes(mainMesh)).
216
			Install(MeshKubernetes(secondaryMesh)).
217
			Install(NamespaceWithSidecarInjection(namespace)).
218
			Install(Namespace(observabilityNamespace)).
219
			Install(otelcollector.Install(
220
				otelcollector.WithNamespace(observabilityNamespace),
221
				otelcollector.WithIPv6(Config.IPV6),
222
			)).
223
			Install(democlient.Install(democlient.WithNamespace(observabilityNamespace))).
224
			Setup(kubernetes.Cluster)
225
		Expect(err).To(Succeed())
226

227
		for i := 0; i < 2; i++ {
228
			Expect(
229
				kubernetes.Cluster.Install(testserver.Install(
230
					testserver.WithName(fmt.Sprintf("test-server-%d", i)),
231
					testserver.WithMesh(mainMesh),
232
					testserver.WithNamespace(namespace),
233
				)),
234
			).To(Succeed())
235
		}
236
		for i := 2; i < 4; i++ {
237
			Expect(
238
				kubernetes.Cluster.Install(testserver.Install(
239
					testserver.WithName(fmt.Sprintf("test-server-%d", i)),
240
					testserver.WithMesh(secondaryMesh),
241
					testserver.WithNamespace(namespace),
242
				)),
243
			).To(Succeed())
244
		}
245
	})
246

247
	E2EAfterEach(func() {
248
		Expect(DeleteMeshResources(kubernetes.Cluster, mainMesh, v1alpha1.MeshMetricResourceTypeDescriptor)).To(Succeed())
249
		Expect(DeleteMeshResources(kubernetes.Cluster, secondaryMesh, v1alpha1.MeshMetricResourceTypeDescriptor)).To(Succeed())
250
	})
251

252
	E2EAfterAll(func() {
253
		Expect(kubernetes.Cluster.TriggerDeleteNamespace(namespace)).To(Succeed())
254
		Expect(kubernetes.Cluster.DeleteMesh(mainMesh)).To(Succeed())
255
		Expect(kubernetes.Cluster.DeleteMesh(secondaryMesh)).To(Succeed())
256
	})
257

258
	It("Basic MeshMetric policy exposes Envoy metrics on correct port", func() {
259
		// given
260
		Expect(kubernetes.Cluster.Install(BasicMeshMetricForMesh("mesh-policy", mainMesh))).To(Succeed())
261
		podIp, err := PodIPOfApp(kubernetes.Cluster, "test-server-0", namespace)
262
		Expect(err).ToNot(HaveOccurred())
263

264
		// then
265
		Eventually(func(g Gomega) {
266
			stdout, _, err := client.CollectResponse(
267
				kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8080")+"/metrics",
268
				client.FromKubernetesPod(namespace, "test-server-0"),
269
			)
270

271
			g.Expect(err).ToNot(HaveOccurred())
272
			g.Expect(stdout).ToNot(BeNil())
273
			// metric from envoy
274
			g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
275
		}).Should(Succeed())
276
	})
277

278
	It("MeshMetric policy with multiple Prometheus backends", func() {
279
		// given
280
		Expect(kubernetes.Cluster.Install(MeshMetricMultiplePrometheusBackends("mesh-policy", mainMesh, mainPrometheusId, secondaryPrometheusId))).To(Succeed())
281
		podIp, err := PodIPOfApp(kubernetes.Cluster, "test-server-0", namespace)
282
		Expect(err).ToNot(HaveOccurred())
283

284
		// then
285
		Eventually(func(g Gomega) {
286
			// main Prometheus backend
287
			stdout, _, err := client.CollectResponse(
288
				kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8080")+"/metrics",
289
				client.FromKubernetesPod(namespace, "test-server-0"),
290
			)
291

292
			g.Expect(err).ToNot(HaveOccurred())
293
			g.Expect(stdout).ToNot(BeNil())
294
			g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
295

296
			// secondary Prometheus backend
297
			stdout, _, err = client.CollectResponse(
298
				kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8081")+"/metrics",
299
				client.FromKubernetesPod(namespace, "test-server-0"),
300
			)
301

302
			g.Expect(err).ToNot(HaveOccurred())
303
			g.Expect(stdout).ToNot(BeNil())
304
			// metric from envoy
305
			g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
306
		}).Should(Succeed())
307
	})
308

309
	It("MeshMetric policy with dynamic configuration and application aggregation correctly exposes aggregated metrics", func() {
310
		// given
311
		Expect(kubernetes.Cluster.Install(MeshMetricWithApplicationForMesh("dynamic-config", mainMesh, "/path-stats"))).To(Succeed())
312
		podIp, err := PodIPOfApp(kubernetes.Cluster, "test-server-0", namespace)
313
		Expect(err).ToNot(HaveOccurred())
314

315
		// then
316
		Eventually(func(g Gomega) {
317
			stdout, _, err := client.CollectResponse(
318
				kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8080")+"/metrics",
319
				client.FromKubernetesPod(namespace, "test-server-0"),
320
			)
321

322
			g.Expect(err).ToNot(HaveOccurred())
323
			g.Expect(stdout).ToNot(BeNil())
324
			// metric from envoy
325
			g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
326
			g.Expect(stdout).To(ContainSubstring("path-stats"))
327
		}, "1m", "1s").Should(Succeed())
328

329
		// update policy config and check if changes was applied on DPP
330
		Expect(kubernetes.Cluster.Install(MeshMetricWithApplicationForMesh("dynamic-config", mainMesh, "/app-stats"))).To(Succeed())
331

332
		// then
333
		Eventually(func(g Gomega) {
334
			stdout, _, err := client.CollectResponse(
335
				kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(podIp, "8080")+"/metrics",
336
				client.FromKubernetesPod(namespace, "test-server-0"),
337
			)
338

339
			g.Expect(err).ToNot(HaveOccurred())
340
			g.Expect(stdout).ToNot(BeNil())
341
			// metric from envoy
342
			g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
343
			g.Expect(stdout).ToNot(ContainSubstring("path-stats"))
344
			g.Expect(stdout).To(ContainSubstring("app-stats"))
345
		}, "1m", "1s").Should(Succeed())
346
	})
347

348
	It("MADS server response contains DPPs from all meshes when prometheus client id is empty", func() {
349
		// given
350
		Expect(kubernetes.Cluster.Install(BasicMeshMetricForMesh("main-mesh-policy", mainMesh))).To(Succeed())
351
		Expect(kubernetes.Cluster.Install(BasicMeshMetricForMesh("secondary-mesh-policy", secondaryMesh))).To(Succeed())
352

353
		// then
354
		Eventually(func(g Gomega) {
355
			assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(mainPrometheusId)
356
			g.Expect(err).ToNot(HaveOccurred())
357

358
			madsResponse := MonitoringAssignmentResponse{}
359
			g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
360
			// all DPPs from both meshes in single MADS response
361
			g.Expect(getServicesFrom(madsResponse)).To(ConsistOf(
362
				"test-server-0_meshmetric_svc_80", "test-server-1_meshmetric_svc_80", "test-server-2_meshmetric_svc_80", "test-server-3_meshmetric_svc_80",
363
			))
364
		}).Should(Succeed())
365

366
		// and same response for secondary backend
367
		Eventually(func(g Gomega) {
368
			assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(secondaryPrometheusId)
369
			g.Expect(err).ToNot(HaveOccurred())
370

371
			madsResponse := MonitoringAssignmentResponse{}
372
			g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
373
			// all DPPs from both meshes in single MADS response
374
			g.Expect(getServicesFrom(madsResponse)).To(ConsistOf(
375
				"test-server-0_meshmetric_svc_80", "test-server-1_meshmetric_svc_80", "test-server-2_meshmetric_svc_80", "test-server-3_meshmetric_svc_80",
376
			))
377
		}).Should(Succeed())
378
	})
379

380
	It("MADS server response contains DPPs from corresponding meshes when prometheus client id is set", func() {
381
		// given
382
		Expect(kubernetes.Cluster.Install(MeshMetricWithSpecificPrometheusClientId("main-mesh-policy", mainMesh, mainPrometheusId))).To(Succeed())
383
		Expect(kubernetes.Cluster.Install(MeshMetricWithSpecificPrometheusClientId("secondary-mesh-policy", secondaryMesh, secondaryPrometheusId))).To(Succeed())
384

385
		// then
386
		Eventually(func(g Gomega) {
387
			assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(mainPrometheusId)
388
			g.Expect(err).ToNot(HaveOccurred())
389

390
			madsResponse := MonitoringAssignmentResponse{}
391
			g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
392
			// all DPPs from primaryMesh for primary Prometheus backend
393
			g.Expect(getServicesFrom(madsResponse)).To(ConsistOf(
394
				"test-server-0_meshmetric_svc_80", "test-server-1_meshmetric_svc_80",
395
			))
396
		}).Should(Succeed())
397

398
		// and
399
		Eventually(func(g Gomega) {
400
			assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(secondaryPrometheusId)
401
			g.Expect(err).ToNot(HaveOccurred())
402

403
			madsResponse := MonitoringAssignmentResponse{}
404
			g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
405
			// all DPPs from secondaryMesh for secondary Prometheus backend
406
			g.Expect(getServicesFrom(madsResponse)).To(ConsistOf(
407
				"test-server-2_meshmetric_svc_80", "test-server-3_meshmetric_svc_80",
408
			))
409
		}).Should(Succeed())
410
	})
411

412
	It("override MADS response for single DPP in mesh", func() {
413
		// given
414
		Expect(kubernetes.Cluster.Install(MeshMetricWithSpecificPrometheusClientId("main-mesh-policy", mainMesh, mainPrometheusId))).To(Succeed())
415
		Expect(kubernetes.Cluster.Install(MeshMetricWithSpecificPrometheusBackendForMeshService(mainMesh, secondaryPrometheusId, "test-server-1_meshmetric_svc_80"))).To(Succeed())
416

417
		// then
418
		Eventually(func(g Gomega) {
419
			assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(mainPrometheusId)
420
			g.Expect(err).ToNot(HaveOccurred())
421

422
			madsResponse := MonitoringAssignmentResponse{}
423
			g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
424
			// two DPPs configured by Mesh targetRef
425
			g.Expect(getServicesFrom(madsResponse)).To(ConsistOf("test-server-0_meshmetric_svc_80"))
426
		}).Should(Succeed())
427

428
		// and
429
		Eventually(func(g Gomega) {
430
			assignment, err := kubernetes.Cluster.GetKuma().GetMonitoringAssignment(secondaryPrometheusId)
431
			g.Expect(err).ToNot(HaveOccurred())
432

433
			madsResponse := MonitoringAssignmentResponse{}
434
			g.Expect(json.Unmarshal([]byte(assignment), &madsResponse)).To(Succeed())
435
			// single DPP overridden by MeshService targetRef
436
			g.Expect(getServicesFrom(madsResponse)).To(ConsistOf("test-server-1_meshmetric_svc_80"))
437
		}).Should(Succeed())
438
	})
439

440
	It("MeshMetric with OpenTelemetry enabled", func() {
441
		// given
442
		openTelemetryCollector := otelcollector.From(kubernetes.Cluster)
443
		Expect(kubernetes.Cluster.Install(MeshMetricWithOpenTelemetryBackend(mainMesh, openTelemetryCollector.CollectorEndpoint()))).To(Succeed())
444

445
		// then
446
		Eventually(func(g Gomega) {
447
			stdout, _, err := client.CollectResponse(
448
				kubernetes.Cluster, "demo-client", openTelemetryCollector.ExporterEndpoint(),
449
				client.FromKubernetesPod(observabilityNamespace, "demo-client"),
450
				client.WithVerbose(),
451
			)
452
			g.Expect(err).ToNot(HaveOccurred())
453
			g.Expect(stdout).To(ContainSubstring("envoy_cluster_external_upstream_rq_time_bucket"))
454
		}, "2m", "3s").Should(Succeed())
455
	})
456

457
	It("MeshMetric with OpenTelemetry and Prometheus enabled", func() {
458
		// given
459
		openTelemetryCollector := otelcollector.From(kubernetes.Cluster)
460
		testServerIp, err := PodIPOfApp(kubernetes.Cluster, "test-server-0", namespace)
461
		Expect(err).ToNot(HaveOccurred())
462
		Expect(kubernetes.Cluster.Install(MeshMetricWithOpenTelemetryAndPrometheusBackend(mainMesh, openTelemetryCollector.CollectorEndpoint()))).To(Succeed())
463

464
		// then
465
		Eventually(func(g Gomega) {
466
			// metrics from OpenTelemetry
467
			stdout, _, err := client.CollectResponse(
468
				kubernetes.Cluster, "demo-client", openTelemetryCollector.ExporterEndpoint(),
469
				client.FromKubernetesPod(observabilityNamespace, "demo-client"),
470
				client.WithVerbose(),
471
			)
472
			g.Expect(err).ToNot(HaveOccurred())
473
			g.Expect(stdout).To(ContainSubstring("envoy_cluster_external_upstream_rq_time_bucket"))
474

475
			// metrics from Prometheus
476
			stdout, _, err = client.CollectResponse(
477
				kubernetes.Cluster, "test-server-0", "http://"+net.JoinHostPort(testServerIp, "8080")+"/metrics",
478
				client.FromKubernetesPod(namespace, "test-server-0"),
479
			)
480
			g.Expect(err).ToNot(HaveOccurred())
481
			g.Expect(stdout).ToNot(BeNil())
482
			g.Expect(stdout).To(ContainSubstring("envoy_http_downstream_rq_xx"))
483
		}, "2m", "3s").Should(Succeed())
484
	})
485
}
486

487
func getServicesFrom(response MonitoringAssignmentResponse) []string {
488
	var services []string
489
	for _, assignment := range response.Resources {
490
		services = append(services, assignment.Service)
491
	}
492
	return services
493
}
494

495
type MonitoringAssignmentResponse struct {
496
	Resources []*mads.MonitoringAssignment `json:"resources"`
497
}
498

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.