istio

Форк
0
/
client_test.go 
428 строк · 13.2 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package crdclient
16

17
import (
18
	"fmt"
19
	"reflect"
20
	"testing"
21
	"time"
22

23
	"go.uber.org/atomic"
24
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
	"k8s.io/apimachinery/pkg/types"
26

27
	"istio.io/api/meta/v1alpha1"
28
	"istio.io/api/networking/v1alpha3"
29
	clientnetworkingv1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3"
30
	"istio.io/istio/pilot/pkg/model"
31
	"istio.io/istio/pkg/config"
32
	"istio.io/istio/pkg/config/schema/collection"
33
	"istio.io/istio/pkg/config/schema/collections"
34
	"istio.io/istio/pkg/config/schema/gvk"
35
	"istio.io/istio/pkg/config/schema/resource"
36
	"istio.io/istio/pkg/kube"
37
	"istio.io/istio/pkg/kube/controllers"
38
	"istio.io/istio/pkg/kube/kclient/clienttest"
39
	"istio.io/istio/pkg/kube/kubetypes"
40
	"istio.io/istio/pkg/slices"
41
	"istio.io/istio/pkg/test"
42
	"istio.io/istio/pkg/test/util/assert"
43
	"istio.io/istio/pkg/test/util/retry"
44
)
45

46
func makeClient(t *testing.T, schemas collection.Schemas, f kubetypes.DynamicObjectFilter) (model.ConfigStoreController, kube.CLIClient) {
47
	fake := kube.NewFakeClient()
48
	if f != nil {
49
		kube.SetObjectFilter(fake, f)
50
	}
51
	for _, s := range schemas.All() {
52
		clienttest.MakeCRD(t, fake, s.GroupVersionResource())
53
	}
54
	stop := test.NewStop(t)
55
	config := New(fake, Option{})
56
	go config.Run(stop)
57
	fake.RunAndWait(stop)
58
	kube.WaitForCacheSync("test", stop, config.HasSynced)
59
	return config, fake
60
}
61

62
func createResource(t *testing.T, store model.ConfigStoreController, r resource.Schema, configMeta config.Meta) config.Spec {
63
	pb, err := r.NewInstance()
64
	if err != nil {
65
		t.Fatal(err)
66
	}
67

68
	if _, err := store.Create(config.Config{
69
		Meta: configMeta,
70
		Spec: pb,
71
	}); err != nil {
72
		t.Fatalf("Create => got %v", err)
73
	}
74

75
	return pb
76
}
77

78
// Ensure that the client can run without CRDs present
79
func TestClientNoCRDs(t *testing.T) {
80
	schema := collection.NewSchemasBuilder().MustAdd(collections.Sidecar).Build()
81
	store, _ := makeClient(t, schema, nil)
82
	retry.UntilOrFail(t, store.HasSynced, retry.Timeout(time.Second))
83
	r := collections.VirtualService
84
	configMeta := config.Meta{
85
		Name:             "name",
86
		Namespace:        "ns",
87
		GroupVersionKind: r.GroupVersionKind(),
88
	}
89
	createResource(t, store, r, configMeta)
90

91
	retry.UntilSuccessOrFail(t, func() error {
92
		l := store.List(r.GroupVersionKind(), configMeta.Namespace)
93
		if len(l) != 0 {
94
			return fmt.Errorf("expected no items returned for unknown CRD, got %v", l)
95
		}
96
		return nil
97
	}, retry.Timeout(time.Second*5), retry.Converge(5))
98
	retry.UntilOrFail(t, func() bool {
99
		return store.Get(r.GroupVersionKind(), configMeta.Name, configMeta.Namespace) == nil
100
	}, retry.Message("expected no items returned for unknown CRD"), retry.Timeout(time.Second*5), retry.Converge(5))
101
}
102

103
// Ensure that the client can run without CRDs present, but then added later
104
func TestClientDelayedCRDs(t *testing.T) {
105
	// ns1 is allowed, ns2 is not
106
	f := kubetypes.NewStaticObjectFilter(func(obj interface{}) bool {
107
		// When an object is deleted, obj could be a DeletionFinalStateUnknown marker item.
108
		object := controllers.ExtractObject(obj)
109
		if object == nil {
110
			return false
111
		}
112
		ns := object.GetNamespace()
113
		return ns == "ns1"
114
	})
115
	schema := collection.NewSchemasBuilder().MustAdd(collections.Sidecar).Build()
116
	store, fake := makeClient(t, schema, f)
117
	retry.UntilOrFail(t, store.HasSynced, retry.Timeout(time.Second))
118
	r := collections.VirtualService
119

120
	// Create a virtual service
121
	configMeta1 := config.Meta{
122
		Name:             "name1",
123
		Namespace:        "ns1",
124
		GroupVersionKind: r.GroupVersionKind(),
125
	}
126
	createResource(t, store, r, configMeta1)
127

128
	configMeta2 := config.Meta{
129
		Name:             "name2",
130
		Namespace:        "ns2",
131
		GroupVersionKind: r.GroupVersionKind(),
132
	}
133
	createResource(t, store, r, configMeta2)
134

135
	retry.UntilSuccessOrFail(t, func() error {
136
		l := store.List(r.GroupVersionKind(), "")
137
		if len(l) != 0 {
138
			return fmt.Errorf("expected no items returned for unknown CRD")
139
		}
140
		return nil
141
	}, retry.Timeout(time.Second*5), retry.Converge(5))
142

143
	clienttest.MakeCRD(t, fake, r.GroupVersionResource())
144

145
	retry.UntilSuccessOrFail(t, func() error {
146
		l := store.List(r.GroupVersionKind(), "")
147
		if len(l) != 1 {
148
			return fmt.Errorf("expected items returned")
149
		}
150
		if l[0].Name != configMeta1.Name {
151
			return fmt.Errorf("expected `name1` returned")
152
		}
153
		return nil
154
	}, retry.Timeout(time.Second*10), retry.Converge(5))
155
}
156

157
// CheckIstioConfigTypes validates that an empty store can do CRUD operators on all given types
158
func TestClient(t *testing.T) {
159
	store, _ := makeClient(t, collections.PilotGatewayAPI().Union(collections.Kube), nil)
160
	configName := "test"
161
	configNamespace := "test-ns"
162
	timeout := retry.Timeout(time.Millisecond * 200)
163
	for _, r := range collections.PilotGatewayAPI().All() {
164
		name := r.Kind()
165
		t.Run(name, func(t *testing.T) {
166
			configMeta := config.Meta{
167
				GroupVersionKind: r.GroupVersionKind(),
168
				Name:             configName,
169
			}
170
			if !r.IsClusterScoped() {
171
				configMeta.Namespace = configNamespace
172
			}
173
			pb := createResource(t, store, r, configMeta)
174

175
			// Kubernetes is eventually consistent, so we allow a short time to pass before we get
176
			retry.UntilSuccessOrFail(t, func() error {
177
				cfg := store.Get(r.GroupVersionKind(), configName, configMeta.Namespace)
178
				if cfg == nil || !reflect.DeepEqual(cfg.Meta, configMeta) {
179
					return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg)
180
				}
181
				return nil
182
			}, timeout)
183

184
			// Validate it shows up in List
185
			retry.UntilSuccessOrFail(t, func() error {
186
				cfgs := store.List(r.GroupVersionKind(), configMeta.Namespace)
187
				if len(cfgs) != 1 {
188
					return fmt.Errorf("expected 1 config, got %v", len(cfgs))
189
				}
190
				for _, cfg := range cfgs {
191
					if !reflect.DeepEqual(cfg.Meta, configMeta) {
192
						return fmt.Errorf("get(%v) => got %v", name, cfg)
193
					}
194
				}
195
				return nil
196
			}, timeout)
197

198
			// check we can update object metadata
199
			annotations := map[string]string{
200
				"foo": "bar",
201
			}
202
			configMeta.Annotations = annotations
203
			if _, err := store.Update(config.Config{
204
				Meta: configMeta,
205
				Spec: pb,
206
			}); err != nil {
207
				t.Errorf("Unexpected Error in Update -> %v", err)
208
			}
209
			if r.StatusKind() != "" {
210
				stat, err := r.Status()
211
				if err != nil {
212
					t.Fatal(err)
213
				}
214
				if _, err := store.UpdateStatus(config.Config{
215
					Meta:   configMeta,
216
					Status: stat,
217
				}); err != nil {
218
					t.Errorf("Unexpected Error in Update -> %v", err)
219
				}
220
			}
221
			var cfg *config.Config
222
			// validate it is updated
223
			retry.UntilSuccessOrFail(t, func() error {
224
				cfg = store.Get(r.GroupVersionKind(), configName, configMeta.Namespace)
225
				if cfg == nil || !reflect.DeepEqual(cfg.Meta, configMeta) {
226
					return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg)
227
				}
228
				return nil
229
			})
230

231
			// check we can patch items
232
			var patchedCfg config.Config
233
			if _, err := store.(*Client).Patch(*cfg, func(cfg config.Config) (config.Config, types.PatchType) {
234
				cfg.Annotations["fizz"] = "buzz"
235
				patchedCfg = cfg
236
				return cfg, types.JSONPatchType
237
			}); err != nil {
238
				t.Errorf("unexpected err in Patch: %v", err)
239
			}
240
			// validate it is updated
241
			retry.UntilSuccessOrFail(t, func() error {
242
				cfg := store.Get(r.GroupVersionKind(), configName, configMeta.Namespace)
243
				if cfg == nil || !reflect.DeepEqual(cfg.Meta, patchedCfg.Meta) {
244
					return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg)
245
				}
246
				return nil
247
			})
248

249
			// Check we can remove items
250
			if err := store.Delete(r.GroupVersionKind(), configName, configNamespace, nil); err != nil {
251
				t.Fatalf("failed to delete: %v", err)
252
			}
253
			retry.UntilSuccessOrFail(t, func() error {
254
				cfg := store.Get(r.GroupVersionKind(), configName, configNamespace)
255
				if cfg != nil {
256
					return fmt.Errorf("get(%v) => got %v, expected item to be deleted", name, cfg)
257
				}
258
				return nil
259
			}, timeout)
260
		})
261
	}
262

263
	t.Run("update status", func(t *testing.T) {
264
		r := collections.WorkloadGroup
265
		name := "name1"
266
		namespace := "bar"
267
		cfgMeta := config.Meta{
268
			GroupVersionKind: r.GroupVersionKind(),
269
			Name:             name,
270
		}
271
		if !r.IsClusterScoped() {
272
			cfgMeta.Namespace = namespace
273
		}
274
		pb := &v1alpha3.WorkloadGroup{Probe: &v1alpha3.ReadinessProbe{PeriodSeconds: 6}}
275
		if _, err := store.Create(config.Config{
276
			Meta: cfgMeta,
277
			Spec: config.Spec(pb),
278
		}); err != nil {
279
			t.Fatalf("Create bad: %v", err)
280
		}
281

282
		retry.UntilSuccessOrFail(t, func() error {
283
			cfg := store.Get(r.GroupVersionKind(), name, cfgMeta.Namespace)
284
			if cfg == nil {
285
				return fmt.Errorf("cfg shouldn't be nil :(")
286
			}
287
			if !reflect.DeepEqual(cfg.Meta, cfgMeta) {
288
				return fmt.Errorf("something is deeply wrong....., %v", cfg.Meta)
289
			}
290
			return nil
291
		})
292

293
		stat := &v1alpha1.IstioStatus{
294
			Conditions: []*v1alpha1.IstioCondition{
295
				{
296
					Type:    "Health",
297
					Message: "heath is badd",
298
				},
299
			},
300
		}
301

302
		if _, err := store.UpdateStatus(config.Config{
303
			Meta:   cfgMeta,
304
			Spec:   config.Spec(pb),
305
			Status: config.Status(stat),
306
		}); err != nil {
307
			t.Errorf("bad: %v", err)
308
		}
309

310
		retry.UntilSuccessOrFail(t, func() error {
311
			cfg := store.Get(r.GroupVersionKind(), name, cfgMeta.Namespace)
312
			if cfg == nil {
313
				return fmt.Errorf("cfg can't be nil")
314
			}
315
			if !reflect.DeepEqual(cfg.Status, stat) {
316
				return fmt.Errorf("status %v does not match %v", cfg.Status, stat)
317
			}
318
			return nil
319
		})
320
	})
321
}
322

323
// TestClientInitialSyncSkipsOtherRevisions tests that the initial sync skips objects from other
324
// revisions.
325
func TestClientInitialSyncSkipsOtherRevisions(t *testing.T) {
326
	fake := kube.NewFakeClient()
327
	for _, s := range collections.Istio.All() {
328
		clienttest.MakeCRD(t, fake, s.GroupVersionResource())
329
	}
330

331
	// Populate the client with some ServiceEntrys such that 1/3 are in the default revision and
332
	// 2/3 are in different revisions.
333
	labels := []map[string]string{
334
		nil,
335
		{"istio.io/rev": "canary"},
336
		{"istio.io/rev": "prod"},
337
	}
338
	var expectedNoRevision []config.Config
339
	var expectedCanary []config.Config
340
	var expectedProd []config.Config
341
	for i := 0; i < 9; i++ {
342
		selectedLabels := labels[i%len(labels)]
343
		obj := &clientnetworkingv1alpha3.ServiceEntry{
344
			ObjectMeta: metav1.ObjectMeta{
345
				Name:      fmt.Sprintf("test-service-entry-%d", i),
346
				Namespace: "test",
347
				Labels:    selectedLabels,
348
			},
349
			Spec: v1alpha3.ServiceEntry{},
350
		}
351

352
		clienttest.NewWriter[*clientnetworkingv1alpha3.ServiceEntry](t, fake).Create(obj)
353
		// canary revision should receive only global objects and objects with the canary revision
354
		if selectedLabels == nil || reflect.DeepEqual(selectedLabels, labels[1]) {
355
			expectedCanary = append(expectedCanary, TranslateObject(obj, gvk.ServiceEntry, ""))
356
		}
357
		// prod revision should receive only global objects and objects with the prod revision
358
		if selectedLabels == nil || reflect.DeepEqual(selectedLabels, labels[2]) {
359
			expectedProd = append(expectedProd, TranslateObject(obj, gvk.ServiceEntry, ""))
360
		}
361
		// no revision should receive all objects
362
		expectedNoRevision = append(expectedNoRevision, TranslateObject(obj, gvk.ServiceEntry, ""))
363
	}
364

365
	storeCases := map[string][]config.Config{
366
		"":       expectedNoRevision, // No revision specified, should receive all events.
367
		"canary": expectedCanary,     // Only SEs from the canary revision should be received.
368
		"prod":   expectedProd,       // Only SEs from the prod revision should be received.
369
	}
370
	for rev, expected := range storeCases {
371
		store := New(fake, Option{
372
			Revision: rev,
373
		})
374

375
		var cfgsAdded []config.Config
376
		store.RegisterEventHandler(
377
			gvk.ServiceEntry,
378
			func(old config.Config, curr config.Config, event model.Event) {
379
				if event != model.EventAdd {
380
					t.Fatalf("unexpected event: %v", event)
381
				}
382
				cfgsAdded = append(cfgsAdded, curr)
383
			},
384
		)
385

386
		stop := test.NewStop(t)
387
		fake.RunAndWait(stop)
388
		go store.Run(stop)
389

390
		kube.WaitForCacheSync("test", stop, store.HasSynced)
391

392
		// The order of the events doesn't matter, so sort the two slices so the ordering is consistent
393
		sortFunc := func(a config.Config) string {
394
			return a.Key()
395
		}
396
		slices.SortBy(cfgsAdded, sortFunc)
397
		slices.SortBy(expected, sortFunc)
398

399
		assert.Equal(t, expected, cfgsAdded)
400
	}
401
}
402

403
func TestClientSync(t *testing.T) {
404
	obj := &clientnetworkingv1alpha3.ServiceEntry{
405
		ObjectMeta: metav1.ObjectMeta{
406
			Name:      "test-service-entry",
407
			Namespace: "test",
408
		},
409
		Spec: v1alpha3.ServiceEntry{},
410
	}
411
	fake := kube.NewFakeClient()
412
	clienttest.NewWriter[*clientnetworkingv1alpha3.ServiceEntry](t, fake).Create(obj)
413
	for _, s := range collections.Pilot.All() {
414
		clienttest.MakeCRD(t, fake, s.GroupVersionResource())
415
	}
416
	stop := test.NewStop(t)
417
	c := New(fake, Option{})
418

419
	events := atomic.NewInt64(0)
420
	c.RegisterEventHandler(gvk.ServiceEntry, func(c config.Config, c2 config.Config, event model.Event) {
421
		events.Inc()
422
	})
423
	go c.Run(stop)
424
	fake.RunAndWait(stop)
425
	kube.WaitForCacheSync("test", stop, c.HasSynced)
426
	// This MUST have been called by the time HasSynced returns true
427
	assert.Equal(t, events.Load(), 1)
428
}
429

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.