kuma

Форк
0
/
k8s_controlplane.go 
402 строки · 10.2 Кб
1
package framework
2

3
import (
4
	"bytes"
5
	"crypto/tls"
6
	"encoding/json"
7
	"fmt"
8
	"net"
9
	"net/http"
10
	"strconv"
11
	"strings"
12

13
	http_helper "github.com/gruntwork-io/terratest/modules/http-helper"
14
	"github.com/gruntwork-io/terratest/modules/k8s"
15
	"github.com/gruntwork-io/terratest/modules/retry"
16
	"github.com/gruntwork-io/terratest/modules/testing"
17
	"github.com/pkg/errors"
18
	v1 "k8s.io/api/core/v1"
19
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20

21
	"github.com/kumahq/kuma/pkg/config/core"
22
	"github.com/kumahq/kuma/test/framework/kumactl"
23
)
24

25
var _ ControlPlane = &K8sControlPlane{}
26

27
type K8sControlPlane struct {
28
	t          testing.TestingT
29
	mode       core.CpMode
30
	name       string
31
	kubeconfig string
32
	kumactl    *kumactl.KumactlOptions
33
	cluster    *K8sCluster
34
	portFwd    PortFwd
35
	madsFwd    PortFwd
36
	verbose    bool
37
	replicas   int
38
	apiHeaders []string
39
}
40

41
func NewK8sControlPlane(
42
	t testing.TestingT,
43
	mode core.CpMode,
44
	clusterName string,
45
	kubeconfig string,
46
	cluster *K8sCluster,
47
	verbose bool,
48
	replicas int,
49
	apiHeaders []string,
50
) *K8sControlPlane {
51
	name := clusterName + "-" + mode
52
	return &K8sControlPlane{
53
		t:          t,
54
		mode:       mode,
55
		name:       name,
56
		kubeconfig: kubeconfig,
57
		kumactl:    NewKumactlOptionsE2E(t, name, verbose),
58
		cluster:    cluster,
59
		verbose:    verbose,
60
		replicas:   replicas,
61
		apiHeaders: apiHeaders,
62
	}
63
}
64

65
func (c *K8sControlPlane) GetName() string {
66
	return c.name
67
}
68

69
func (c *K8sControlPlane) GetKubectlOptions(namespace ...string) *k8s.KubectlOptions {
70
	options := &k8s.KubectlOptions{
71
		ConfigPath: c.kubeconfig,
72
	}
73
	for _, ns := range namespace {
74
		options.Namespace = ns
75
		break
76
	}
77

78
	return options
79
}
80

81
func (c *K8sControlPlane) PortForwardKumaCP() {
82
	kumaCpPods := c.GetKumaCPPods()
83
	// There could be multiple pods still starting so pick one that's available already
84
	for i := range kumaCpPods {
85
		if k8s.IsPodAvailable(&kumaCpPods[i]) {
86
			c.portFwd.apiServerTunnel = k8s.NewTunnel(c.GetKubectlOptions(Config.KumaNamespace), k8s.ResourceTypePod, kumaCpPods[i].Name, 0, 5681)
87
			c.portFwd.apiServerTunnel.ForwardPort(c.t)
88
			c.portFwd.ApiServerEndpoint = c.portFwd.apiServerTunnel.Endpoint()
89

90
			c.madsFwd.apiServerTunnel = k8s.NewTunnel(c.GetKubectlOptions(Config.KumaNamespace), k8s.ResourceTypePod, kumaCpPods[i].Name, 0, 5676)
91
			c.madsFwd.apiServerTunnel.ForwardPort(c.t)
92
			c.madsFwd.ApiServerEndpoint = c.madsFwd.apiServerTunnel.Endpoint()
93
			return
94
		}
95
	}
96
	c.t.Fatalf("Failed finding an available pod, allPods: %v", kumaCpPods)
97
}
98

99
func (c *K8sControlPlane) ClosePortForwards() {
100
	if c.portFwd.apiServerTunnel != nil {
101
		c.portFwd.apiServerTunnel.Close()
102
	}
103
}
104

105
func (c *K8sControlPlane) GetKumaCPPods() []v1.Pod {
106
	return k8s.ListPods(c.t,
107
		c.GetKubectlOptions(Config.KumaNamespace),
108
		metav1.ListOptions{
109
			LabelSelector: "app=" + Config.KumaServiceName,
110
		},
111
	)
112
}
113

114
func (c *K8sControlPlane) GetKumaCPSvc() v1.Service {
115
	return k8s.ListServices(c.t,
116
		c.GetKubectlOptions(Config.KumaNamespace),
117
		metav1.ListOptions{
118
			FieldSelector: "metadata.name=" + Config.KumaServiceName,
119
		},
120
	)[0]
121
}
122

123
func (c *K8sControlPlane) GetKumaCPSyncSvc() v1.Service {
124
	return k8s.ListServices(c.t,
125
		c.GetKubectlOptions(Config.KumaNamespace),
126
		metav1.ListOptions{
127
			FieldSelector: "metadata.name=" + Config.KumaGlobalZoneSyncServiceName,
128
		},
129
	)[0]
130
}
131

132
func (c *K8sControlPlane) VerifyKumaCtl() error {
133
	if c.portFwd.ApiServerEndpoint == "" {
134
		return errors.Errorf("API port not forwarded")
135
	}
136

137
	return c.kumactl.RunKumactl("get", "meshes")
138
}
139

140
func (c *K8sControlPlane) VerifyKumaREST() error {
141
	headers := map[string]string{}
142
	for _, header := range c.apiHeaders {
143
		res := strings.Split(header, "=")
144
		headers[res[0]] = res[1]
145
	}
146
	_, err := http_helper.HTTPDoWithRetryE(
147
		c.t,
148
		"GET",
149
		c.GetGlobalStatusAPI(),
150
		nil,
151
		headers,
152
		http.StatusOK,
153
		DefaultRetries,
154
		DefaultTimeout,
155
		&tls.Config{MinVersion: tls.VersionTLS12},
156
	)
157
	return err
158
}
159

160
func (c *K8sControlPlane) VerifyKumaGUI() error {
161
	if c.mode == core.Zone {
162
		return nil
163
	}
164

165
	return http_helper.HttpGetWithRetryWithCustomValidationE(
166
		c.t,
167
		c.GetAPIServerAddress()+"/gui",
168
		&tls.Config{MinVersion: tls.VersionTLS12},
169
		3,
170
		DefaultTimeout,
171
		func(statusCode int, body string) bool {
172
			return statusCode == http.StatusOK
173
		},
174
	)
175
}
176

177
func (c *K8sControlPlane) PortFwd() PortFwd {
178
	return c.portFwd
179
}
180

181
func (c *K8sControlPlane) MadsPortFwd() PortFwd {
182
	return c.madsFwd
183
}
184

185
func (c *K8sControlPlane) FinalizeAdd() error {
186
	c.PortForwardKumaCP()
187
	return c.FinalizeAddWithPortFwd(c.portFwd, c.madsFwd)
188
}
189

190
func (c *K8sControlPlane) FinalizeAddWithPortFwd(portFwd PortFwd, madsPortForward PortFwd) error {
191
	c.portFwd = portFwd
192
	c.madsFwd = madsPortForward
193
	if !c.cluster.opts.setupKumactl {
194
		return nil
195
	}
196

197
	var token string
198
	t, err := c.retrieveAdminToken()
199
	if err != nil {
200
		return err
201
	}
202
	token = t
203
	return c.kumactl.KumactlConfigControlPlanesAdd(c.name, c.GetAPIServerAddress(), token, c.apiHeaders)
204
}
205

206
func (c *K8sControlPlane) retrieveAdminToken() (string, error) {
207
	if authnType, exist := c.cluster.opts.env["KUMA_API_SERVER_AUTHN_TYPE"]; exist && authnType != "tokens" {
208
		return "", nil
209
	}
210
	if c.cluster.opts.helmOpts["controlPlane.environment"] == "universal" {
211
		body, err := http_helper.HTTPDoWithRetryWithOptionsE(c.t, http_helper.HttpDoOptions{
212
			Method:    "GET",
213
			Url:       c.GetAPIServerAddress() + "/global-secrets/admin-user-token",
214
			TlsConfig: &tls.Config{MinVersion: tls.VersionTLS12},
215
			Body:      bytes.NewReader([]byte{}),
216
		}, http.StatusOK, DefaultRetries, DefaultTimeout)
217
		if err != nil {
218
			return "", err
219
		}
220
		return ExtractSecretDataFromResponse(body)
221
	}
222

223
	return retry.DoWithRetryE(c.t, "generating DP token", DefaultRetries, DefaultTimeout, func() (string, error) {
224
		sec, err := k8s.GetSecretE(c.t, c.GetKubectlOptions(Config.KumaNamespace), "admin-user-token")
225
		if err != nil {
226
			return "", err
227
		}
228
		return string(sec.Data["value"]), nil
229
	})
230
}
231

232
func (c *K8sControlPlane) InstallCP(args ...string) (string, error) {
233
	// store the kumactl environment
234
	oldEnv := c.kumactl.Env
235
	c.kumactl.Env["KUBECONFIG"] = c.GetKubectlOptions().ConfigPath
236
	defer func() {
237
		c.kumactl.Env = oldEnv // restore kumactl environment
238
	}()
239
	return c.kumactl.KumactlInstallCP(args...)
240
}
241

242
func (c *K8sControlPlane) GetKDSInsecureServerAddress() string {
243
	svc := c.GetKumaCPSyncSvc()
244
	return "grpc://" + c.getKumaCPAddress(svc, "global-zone-sync")
245
}
246

247
func (c *K8sControlPlane) GetKDSServerAddress() string {
248
	svc := c.GetKumaCPSyncSvc()
249
	return "grpcs://" + c.getKumaCPAddress(svc, "global-zone-sync")
250
}
251

252
func (c *K8sControlPlane) GetXDSServerAddress() string {
253
	svc := c.GetKumaCPSvc()
254
	return c.getKumaCPAddress(svc, "dp-server")
255
}
256

257
// A naive implementation to find the Host & Port where a Service is exposing a
258
// CP port.
259
func (c *K8sControlPlane) getKumaCPAddress(svc v1.Service, portName string) string {
260
	var svcPort v1.ServicePort
261
	for _, port := range svc.Spec.Ports {
262
		if port.Name == portName {
263
			svcPort = port
264
		}
265
	}
266

267
	var address string
268
	var portNumber int32
269

270
	// As EKS and AWS generally returns dns records of load balancers instead of
271
	// IP addresses, accessing this data (hostname) was only tested there,
272
	// so the env var was created for that purpose
273
	if Config.UseLoadBalancer {
274
		address = svc.Status.LoadBalancer.Ingress[0].IP
275

276
		if Config.UseHostnameInsteadOfIP {
277
			address = svc.Status.LoadBalancer.Ingress[0].Hostname
278
		}
279

280
		portNumber = svcPort.Port
281
	} else {
282
		pod := c.GetKumaCPPods()[0]
283
		address = pod.Status.HostIP
284

285
		portNumber = svcPort.NodePort
286
	}
287

288
	return net.JoinHostPort(
289
		address, strconv.FormatUint(uint64(portNumber), 10),
290
	)
291
}
292

293
func (c *K8sControlPlane) GetAPIServerAddress() string {
294
	if c.portFwd.ApiServerEndpoint == "" {
295
		panic("Port forward wasn't setup!")
296
	}
297
	return "http://" + c.portFwd.ApiServerEndpoint
298
}
299

300
func (c *K8sControlPlane) GetMetrics() (string, error) {
301
	panic("not implemented")
302
}
303

304
func (c *K8sControlPlane) GetMonitoringAssignment(clientId string) (string, error) {
305
	if c.madsFwd.ApiServerEndpoint == "" {
306
		panic("MADS port forward wasn't setup!")
307
	}
308
	madsEndpoint := "http://" + c.madsFwd.ApiServerEndpoint
309

310
	return http_helper.HTTPDoWithRetryE(
311
		c.t,
312
		"POST",
313
		madsEndpoint+"/v3/discovery:monitoringassignments",
314
		[]byte(fmt.Sprintf(`{"type_url": "type.googleapis.com/kuma.observability.v1.MonitoringAssignment","node": {"id": "%s"}}`, clientId)),
315
		map[string]string{
316
			"content-type": "application/json",
317
		},
318
		200,
319
		DefaultRetries,
320
		DefaultTimeout,
321
		&tls.Config{MinVersion: tls.VersionTLS12},
322
	)
323
}
324

325
func (c *K8sControlPlane) Exec(cmd ...string) (string, string, error) {
326
	pod := c.GetKumaCPPods()[0]
327
	return c.cluster.Exec(pod.Namespace, pod.Name, "", cmd...)
328
}
329

330
func (c *K8sControlPlane) GetGlobalStatusAPI() string {
331
	return c.GetAPIServerAddress() + "/status/zones"
332
}
333

334
func (c *K8sControlPlane) generateToken(
335
	tokenPath string,
336
	data string,
337
) (string, error) {
338
	token, err := c.retrieveAdminToken()
339
	if err != nil {
340
		return "", err
341
	}
342

343
	return http_helper.HTTPDoWithRetryE(
344
		c.t,
345
		"POST",
346
		c.GetAPIServerAddress()+"/tokens"+tokenPath,
347
		[]byte(data),
348
		map[string]string{
349
			"content-type":  "application/json",
350
			"authorization": "Bearer " + token,
351
		},
352
		200,
353
		DefaultRetries,
354
		DefaultTimeout,
355
		&tls.Config{MinVersion: tls.VersionTLS12},
356
	)
357
}
358

359
func (c *K8sControlPlane) GenerateDpToken(mesh, service string) (string, error) {
360
	var dpType string
361
	if service == "ingress" {
362
		dpType = "ingress"
363
	}
364

365
	data := fmt.Sprintf(
366
		`{"mesh": "%s", "type": "%s", "tags": {"kuma.io/service": ["%s"]}}`,
367
		mesh,
368
		dpType,
369
		service,
370
	)
371

372
	return c.generateToken("", data)
373
}
374

375
func (c *K8sControlPlane) GenerateZoneIngressToken(zone string) (string, error) {
376
	data := fmt.Sprintf(`{"zone": "%s", "scope": ["ingress"]}`, zone)
377

378
	return c.generateToken("/zone", data)
379
}
380

381
func (c *K8sControlPlane) GenerateZoneIngressLegacyToken(zone string) (string, error) {
382
	data := fmt.Sprintf(`{"zone": "%s"}`, zone)
383

384
	return c.generateToken("/zone-ingress", data)
385
}
386

387
func (c *K8sControlPlane) GenerateZoneEgressToken(zone string) (string, error) {
388
	data := fmt.Sprintf(`{"zone": "%s", "scope": ["egress"]}`, zone)
389

390
	return c.generateToken("/zone", data)
391
}
392

393
func (c *K8sControlPlane) GenerateZoneToken(zone string, scope []string) (string, error) {
394
	scopeJson, err := json.Marshal(scope)
395
	if err != nil {
396
		return "", err
397
	}
398

399
	data := fmt.Sprintf(`'{"zone": "%s", "scope": %s}'`, zone, scopeJson)
400

401
	return c.generateToken("/zone", data)
402
}
403

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.