13
http_helper "github.com/gruntwork-io/terratest/modules/http-helper"
14
"github.com/gruntwork-io/terratest/modules/k8s"
15
"github.com/gruntwork-io/terratest/modules/retry"
16
"github.com/gruntwork-io/terratest/modules/testing"
17
"github.com/pkg/errors"
18
v1 "k8s.io/api/core/v1"
19
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21
"github.com/kumahq/kuma/pkg/config/core"
22
"github.com/kumahq/kuma/test/framework/kumactl"
25
var _ ControlPlane = &K8sControlPlane{}
27
type K8sControlPlane struct {
32
kumactl *kumactl.KumactlOptions
41
func NewK8sControlPlane(
51
name := clusterName + "-" + mode
52
return &K8sControlPlane{
56
kubeconfig: kubeconfig,
57
kumactl: NewKumactlOptionsE2E(t, name, verbose),
61
apiHeaders: apiHeaders,
65
func (c *K8sControlPlane) GetName() string {
69
func (c *K8sControlPlane) GetKubectlOptions(namespace ...string) *k8s.KubectlOptions {
70
options := &k8s.KubectlOptions{
71
ConfigPath: c.kubeconfig,
73
for _, ns := range namespace {
74
options.Namespace = ns
81
func (c *K8sControlPlane) PortForwardKumaCP() {
82
kumaCpPods := c.GetKumaCPPods()
83
// There could be multiple pods still starting so pick one that's available already
84
for i := range kumaCpPods {
85
if k8s.IsPodAvailable(&kumaCpPods[i]) {
86
c.portFwd.apiServerTunnel = k8s.NewTunnel(c.GetKubectlOptions(Config.KumaNamespace), k8s.ResourceTypePod, kumaCpPods[i].Name, 0, 5681)
87
c.portFwd.apiServerTunnel.ForwardPort(c.t)
88
c.portFwd.ApiServerEndpoint = c.portFwd.apiServerTunnel.Endpoint()
90
c.madsFwd.apiServerTunnel = k8s.NewTunnel(c.GetKubectlOptions(Config.KumaNamespace), k8s.ResourceTypePod, kumaCpPods[i].Name, 0, 5676)
91
c.madsFwd.apiServerTunnel.ForwardPort(c.t)
92
c.madsFwd.ApiServerEndpoint = c.madsFwd.apiServerTunnel.Endpoint()
96
c.t.Fatalf("Failed finding an available pod, allPods: %v", kumaCpPods)
99
func (c *K8sControlPlane) ClosePortForwards() {
100
if c.portFwd.apiServerTunnel != nil {
101
c.portFwd.apiServerTunnel.Close()
105
func (c *K8sControlPlane) GetKumaCPPods() []v1.Pod {
106
return k8s.ListPods(c.t,
107
c.GetKubectlOptions(Config.KumaNamespace),
109
LabelSelector: "app=" + Config.KumaServiceName,
114
func (c *K8sControlPlane) GetKumaCPSvc() v1.Service {
115
return k8s.ListServices(c.t,
116
c.GetKubectlOptions(Config.KumaNamespace),
118
FieldSelector: "metadata.name=" + Config.KumaServiceName,
123
func (c *K8sControlPlane) GetKumaCPSyncSvc() v1.Service {
124
return k8s.ListServices(c.t,
125
c.GetKubectlOptions(Config.KumaNamespace),
127
FieldSelector: "metadata.name=" + Config.KumaGlobalZoneSyncServiceName,
132
func (c *K8sControlPlane) VerifyKumaCtl() error {
133
if c.portFwd.ApiServerEndpoint == "" {
134
return errors.Errorf("API port not forwarded")
137
return c.kumactl.RunKumactl("get", "meshes")
140
func (c *K8sControlPlane) VerifyKumaREST() error {
141
headers := map[string]string{}
142
for _, header := range c.apiHeaders {
143
res := strings.Split(header, "=")
144
headers[res[0]] = res[1]
146
_, err := http_helper.HTTPDoWithRetryE(
149
c.GetGlobalStatusAPI(),
155
&tls.Config{MinVersion: tls.VersionTLS12},
160
func (c *K8sControlPlane) VerifyKumaGUI() error {
161
if c.mode == core.Zone {
165
return http_helper.HttpGetWithRetryWithCustomValidationE(
167
c.GetAPIServerAddress()+"/gui",
168
&tls.Config{MinVersion: tls.VersionTLS12},
171
func(statusCode int, body string) bool {
172
return statusCode == http.StatusOK
177
func (c *K8sControlPlane) PortFwd() PortFwd {
181
func (c *K8sControlPlane) MadsPortFwd() PortFwd {
185
func (c *K8sControlPlane) FinalizeAdd() error {
186
c.PortForwardKumaCP()
187
return c.FinalizeAddWithPortFwd(c.portFwd, c.madsFwd)
190
func (c *K8sControlPlane) FinalizeAddWithPortFwd(portFwd PortFwd, madsPortForward PortFwd) error {
192
c.madsFwd = madsPortForward
193
if !c.cluster.opts.setupKumactl {
198
t, err := c.retrieveAdminToken()
203
return c.kumactl.KumactlConfigControlPlanesAdd(c.name, c.GetAPIServerAddress(), token, c.apiHeaders)
206
func (c *K8sControlPlane) retrieveAdminToken() (string, error) {
207
if authnType, exist := c.cluster.opts.env["KUMA_API_SERVER_AUTHN_TYPE"]; exist && authnType != "tokens" {
210
if c.cluster.opts.helmOpts["controlPlane.environment"] == "universal" {
211
body, err := http_helper.HTTPDoWithRetryWithOptionsE(c.t, http_helper.HttpDoOptions{
213
Url: c.GetAPIServerAddress() + "/global-secrets/admin-user-token",
214
TlsConfig: &tls.Config{MinVersion: tls.VersionTLS12},
215
Body: bytes.NewReader([]byte{}),
216
}, http.StatusOK, DefaultRetries, DefaultTimeout)
220
return ExtractSecretDataFromResponse(body)
223
return retry.DoWithRetryE(c.t, "generating DP token", DefaultRetries, DefaultTimeout, func() (string, error) {
224
sec, err := k8s.GetSecretE(c.t, c.GetKubectlOptions(Config.KumaNamespace), "admin-user-token")
228
return string(sec.Data["value"]), nil
232
func (c *K8sControlPlane) InstallCP(args ...string) (string, error) {
233
// store the kumactl environment
234
oldEnv := c.kumactl.Env
235
c.kumactl.Env["KUBECONFIG"] = c.GetKubectlOptions().ConfigPath
237
c.kumactl.Env = oldEnv // restore kumactl environment
239
return c.kumactl.KumactlInstallCP(args...)
242
func (c *K8sControlPlane) GetKDSInsecureServerAddress() string {
243
svc := c.GetKumaCPSyncSvc()
244
return "grpc://" + c.getKumaCPAddress(svc, "global-zone-sync")
247
func (c *K8sControlPlane) GetKDSServerAddress() string {
248
svc := c.GetKumaCPSyncSvc()
249
return "grpcs://" + c.getKumaCPAddress(svc, "global-zone-sync")
252
func (c *K8sControlPlane) GetXDSServerAddress() string {
253
svc := c.GetKumaCPSvc()
254
return c.getKumaCPAddress(svc, "dp-server")
257
// A naive implementation to find the Host & Port where a Service is exposing a
259
func (c *K8sControlPlane) getKumaCPAddress(svc v1.Service, portName string) string {
260
var svcPort v1.ServicePort
261
for _, port := range svc.Spec.Ports {
262
if port.Name == portName {
270
// As EKS and AWS generally returns dns records of load balancers instead of
271
// IP addresses, accessing this data (hostname) was only tested there,
272
// so the env var was created for that purpose
273
if Config.UseLoadBalancer {
274
address = svc.Status.LoadBalancer.Ingress[0].IP
276
if Config.UseHostnameInsteadOfIP {
277
address = svc.Status.LoadBalancer.Ingress[0].Hostname
280
portNumber = svcPort.Port
282
pod := c.GetKumaCPPods()[0]
283
address = pod.Status.HostIP
285
portNumber = svcPort.NodePort
288
return net.JoinHostPort(
289
address, strconv.FormatUint(uint64(portNumber), 10),
293
func (c *K8sControlPlane) GetAPIServerAddress() string {
294
if c.portFwd.ApiServerEndpoint == "" {
295
panic("Port forward wasn't setup!")
297
return "http://" + c.portFwd.ApiServerEndpoint
300
func (c *K8sControlPlane) GetMetrics() (string, error) {
301
panic("not implemented")
304
func (c *K8sControlPlane) GetMonitoringAssignment(clientId string) (string, error) {
305
if c.madsFwd.ApiServerEndpoint == "" {
306
panic("MADS port forward wasn't setup!")
308
madsEndpoint := "http://" + c.madsFwd.ApiServerEndpoint
310
return http_helper.HTTPDoWithRetryE(
313
madsEndpoint+"/v3/discovery:monitoringassignments",
314
[]byte(fmt.Sprintf(`{"type_url": "type.googleapis.com/kuma.observability.v1.MonitoringAssignment","node": {"id": "%s"}}`, clientId)),
316
"content-type": "application/json",
321
&tls.Config{MinVersion: tls.VersionTLS12},
325
func (c *K8sControlPlane) Exec(cmd ...string) (string, string, error) {
326
pod := c.GetKumaCPPods()[0]
327
return c.cluster.Exec(pod.Namespace, pod.Name, "", cmd...)
330
func (c *K8sControlPlane) GetGlobalStatusAPI() string {
331
return c.GetAPIServerAddress() + "/status/zones"
334
func (c *K8sControlPlane) generateToken(
338
token, err := c.retrieveAdminToken()
343
return http_helper.HTTPDoWithRetryE(
346
c.GetAPIServerAddress()+"/tokens"+tokenPath,
349
"content-type": "application/json",
350
"authorization": "Bearer " + token,
355
&tls.Config{MinVersion: tls.VersionTLS12},
359
func (c *K8sControlPlane) GenerateDpToken(mesh, service string) (string, error) {
361
if service == "ingress" {
366
`{"mesh": "%s", "type": "%s", "tags": {"kuma.io/service": ["%s"]}}`,
372
return c.generateToken("", data)
375
func (c *K8sControlPlane) GenerateZoneIngressToken(zone string) (string, error) {
376
data := fmt.Sprintf(`{"zone": "%s", "scope": ["ingress"]}`, zone)
378
return c.generateToken("/zone", data)
381
func (c *K8sControlPlane) GenerateZoneIngressLegacyToken(zone string) (string, error) {
382
data := fmt.Sprintf(`{"zone": "%s"}`, zone)
384
return c.generateToken("/zone-ingress", data)
387
func (c *K8sControlPlane) GenerateZoneEgressToken(zone string) (string, error) {
388
data := fmt.Sprintf(`{"zone": "%s", "scope": ["egress"]}`, zone)
390
return c.generateToken("/zone", data)
393
func (c *K8sControlPlane) GenerateZoneToken(zone string, scope []string) (string, error) {
394
scopeJson, err := json.Marshal(scope)
399
data := fmt.Sprintf(`'{"zone": "%s", "scope": %s}'`, zone, scopeJson)
401
return c.generateToken("/zone", data)