1
// Copyright Istio Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
24
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
"k8s.io/apimachinery/pkg/types"
27
"istio.io/api/meta/v1alpha1"
28
"istio.io/api/networking/v1alpha3"
29
clientnetworkingv1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3"
30
"istio.io/istio/pilot/pkg/model"
31
"istio.io/istio/pkg/config"
32
"istio.io/istio/pkg/config/schema/collection"
33
"istio.io/istio/pkg/config/schema/collections"
34
"istio.io/istio/pkg/config/schema/gvk"
35
"istio.io/istio/pkg/config/schema/resource"
36
"istio.io/istio/pkg/kube"
37
"istio.io/istio/pkg/kube/controllers"
38
"istio.io/istio/pkg/kube/kclient/clienttest"
39
"istio.io/istio/pkg/kube/kubetypes"
40
"istio.io/istio/pkg/slices"
41
"istio.io/istio/pkg/test"
42
"istio.io/istio/pkg/test/util/assert"
43
"istio.io/istio/pkg/test/util/retry"
46
func makeClient(t *testing.T, schemas collection.Schemas, f kubetypes.DynamicObjectFilter) (model.ConfigStoreController, kube.CLIClient) {
47
fake := kube.NewFakeClient()
49
kube.SetObjectFilter(fake, f)
51
for _, s := range schemas.All() {
52
clienttest.MakeCRD(t, fake, s.GroupVersionResource())
54
stop := test.NewStop(t)
55
config := New(fake, Option{})
58
kube.WaitForCacheSync("test", stop, config.HasSynced)
62
func createResource(t *testing.T, store model.ConfigStoreController, r resource.Schema, configMeta config.Meta) config.Spec {
63
pb, err := r.NewInstance()
68
if _, err := store.Create(config.Config{
72
t.Fatalf("Create => got %v", err)
78
// Ensure that the client can run without CRDs present
79
func TestClientNoCRDs(t *testing.T) {
80
schema := collection.NewSchemasBuilder().MustAdd(collections.Sidecar).Build()
81
store, _ := makeClient(t, schema, nil)
82
retry.UntilOrFail(t, store.HasSynced, retry.Timeout(time.Second))
83
r := collections.VirtualService
84
configMeta := config.Meta{
87
GroupVersionKind: r.GroupVersionKind(),
89
createResource(t, store, r, configMeta)
91
retry.UntilSuccessOrFail(t, func() error {
92
l := store.List(r.GroupVersionKind(), configMeta.Namespace)
94
return fmt.Errorf("expected no items returned for unknown CRD, got %v", l)
97
}, retry.Timeout(time.Second*5), retry.Converge(5))
98
retry.UntilOrFail(t, func() bool {
99
return store.Get(r.GroupVersionKind(), configMeta.Name, configMeta.Namespace) == nil
100
}, retry.Message("expected no items returned for unknown CRD"), retry.Timeout(time.Second*5), retry.Converge(5))
103
// Ensure that the client can run without CRDs present, but then added later
104
func TestClientDelayedCRDs(t *testing.T) {
105
// ns1 is allowed, ns2 is not
106
f := kubetypes.NewStaticObjectFilter(func(obj interface{}) bool {
107
// When an object is deleted, obj could be a DeletionFinalStateUnknown marker item.
108
object := controllers.ExtractObject(obj)
112
ns := object.GetNamespace()
115
schema := collection.NewSchemasBuilder().MustAdd(collections.Sidecar).Build()
116
store, fake := makeClient(t, schema, f)
117
retry.UntilOrFail(t, store.HasSynced, retry.Timeout(time.Second))
118
r := collections.VirtualService
120
// Create a virtual service
121
configMeta1 := config.Meta{
124
GroupVersionKind: r.GroupVersionKind(),
126
createResource(t, store, r, configMeta1)
128
configMeta2 := config.Meta{
131
GroupVersionKind: r.GroupVersionKind(),
133
createResource(t, store, r, configMeta2)
135
retry.UntilSuccessOrFail(t, func() error {
136
l := store.List(r.GroupVersionKind(), "")
138
return fmt.Errorf("expected no items returned for unknown CRD")
141
}, retry.Timeout(time.Second*5), retry.Converge(5))
143
clienttest.MakeCRD(t, fake, r.GroupVersionResource())
145
retry.UntilSuccessOrFail(t, func() error {
146
l := store.List(r.GroupVersionKind(), "")
148
return fmt.Errorf("expected items returned")
150
if l[0].Name != configMeta1.Name {
151
return fmt.Errorf("expected `name1` returned")
154
}, retry.Timeout(time.Second*10), retry.Converge(5))
157
// CheckIstioConfigTypes validates that an empty store can do CRUD operators on all given types
158
func TestClient(t *testing.T) {
159
store, _ := makeClient(t, collections.PilotGatewayAPI().Union(collections.Kube), nil)
161
configNamespace := "test-ns"
162
timeout := retry.Timeout(time.Millisecond * 200)
163
for _, r := range collections.PilotGatewayAPI().All() {
165
t.Run(name, func(t *testing.T) {
166
configMeta := config.Meta{
167
GroupVersionKind: r.GroupVersionKind(),
170
if !r.IsClusterScoped() {
171
configMeta.Namespace = configNamespace
173
pb := createResource(t, store, r, configMeta)
175
// Kubernetes is eventually consistent, so we allow a short time to pass before we get
176
retry.UntilSuccessOrFail(t, func() error {
177
cfg := store.Get(r.GroupVersionKind(), configName, configMeta.Namespace)
178
if cfg == nil || !reflect.DeepEqual(cfg.Meta, configMeta) {
179
return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg)
184
// Validate it shows up in List
185
retry.UntilSuccessOrFail(t, func() error {
186
cfgs := store.List(r.GroupVersionKind(), configMeta.Namespace)
188
return fmt.Errorf("expected 1 config, got %v", len(cfgs))
190
for _, cfg := range cfgs {
191
if !reflect.DeepEqual(cfg.Meta, configMeta) {
192
return fmt.Errorf("get(%v) => got %v", name, cfg)
198
// check we can update object metadata
199
annotations := map[string]string{
202
configMeta.Annotations = annotations
203
if _, err := store.Update(config.Config{
207
t.Errorf("Unexpected Error in Update -> %v", err)
209
if r.StatusKind() != "" {
210
stat, err := r.Status()
214
if _, err := store.UpdateStatus(config.Config{
218
t.Errorf("Unexpected Error in Update -> %v", err)
221
var cfg *config.Config
222
// validate it is updated
223
retry.UntilSuccessOrFail(t, func() error {
224
cfg = store.Get(r.GroupVersionKind(), configName, configMeta.Namespace)
225
if cfg == nil || !reflect.DeepEqual(cfg.Meta, configMeta) {
226
return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg)
231
// check we can patch items
232
var patchedCfg config.Config
233
if _, err := store.(*Client).Patch(*cfg, func(cfg config.Config) (config.Config, types.PatchType) {
234
cfg.Annotations["fizz"] = "buzz"
236
return cfg, types.JSONPatchType
238
t.Errorf("unexpected err in Patch: %v", err)
240
// validate it is updated
241
retry.UntilSuccessOrFail(t, func() error {
242
cfg := store.Get(r.GroupVersionKind(), configName, configMeta.Namespace)
243
if cfg == nil || !reflect.DeepEqual(cfg.Meta, patchedCfg.Meta) {
244
return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg)
249
// Check we can remove items
250
if err := store.Delete(r.GroupVersionKind(), configName, configNamespace, nil); err != nil {
251
t.Fatalf("failed to delete: %v", err)
253
retry.UntilSuccessOrFail(t, func() error {
254
cfg := store.Get(r.GroupVersionKind(), configName, configNamespace)
256
return fmt.Errorf("get(%v) => got %v, expected item to be deleted", name, cfg)
263
t.Run("update status", func(t *testing.T) {
264
r := collections.WorkloadGroup
267
cfgMeta := config.Meta{
268
GroupVersionKind: r.GroupVersionKind(),
271
if !r.IsClusterScoped() {
272
cfgMeta.Namespace = namespace
274
pb := &v1alpha3.WorkloadGroup{Probe: &v1alpha3.ReadinessProbe{PeriodSeconds: 6}}
275
if _, err := store.Create(config.Config{
277
Spec: config.Spec(pb),
279
t.Fatalf("Create bad: %v", err)
282
retry.UntilSuccessOrFail(t, func() error {
283
cfg := store.Get(r.GroupVersionKind(), name, cfgMeta.Namespace)
285
return fmt.Errorf("cfg shouldn't be nil :(")
287
if !reflect.DeepEqual(cfg.Meta, cfgMeta) {
288
return fmt.Errorf("something is deeply wrong....., %v", cfg.Meta)
293
stat := &v1alpha1.IstioStatus{
294
Conditions: []*v1alpha1.IstioCondition{
297
Message: "heath is badd",
302
if _, err := store.UpdateStatus(config.Config{
304
Spec: config.Spec(pb),
305
Status: config.Status(stat),
307
t.Errorf("bad: %v", err)
310
retry.UntilSuccessOrFail(t, func() error {
311
cfg := store.Get(r.GroupVersionKind(), name, cfgMeta.Namespace)
313
return fmt.Errorf("cfg can't be nil")
315
if !reflect.DeepEqual(cfg.Status, stat) {
316
return fmt.Errorf("status %v does not match %v", cfg.Status, stat)
323
// TestClientInitialSyncSkipsOtherRevisions tests that the initial sync skips objects from other
325
func TestClientInitialSyncSkipsOtherRevisions(t *testing.T) {
326
fake := kube.NewFakeClient()
327
for _, s := range collections.Istio.All() {
328
clienttest.MakeCRD(t, fake, s.GroupVersionResource())
331
// Populate the client with some ServiceEntrys such that 1/3 are in the default revision and
332
// 2/3 are in different revisions.
333
labels := []map[string]string{
335
{"istio.io/rev": "canary"},
336
{"istio.io/rev": "prod"},
338
var expectedNoRevision []config.Config
339
var expectedCanary []config.Config
340
var expectedProd []config.Config
341
for i := 0; i < 9; i++ {
342
selectedLabels := labels[i%len(labels)]
343
obj := &clientnetworkingv1alpha3.ServiceEntry{
344
ObjectMeta: metav1.ObjectMeta{
345
Name: fmt.Sprintf("test-service-entry-%d", i),
347
Labels: selectedLabels,
349
Spec: v1alpha3.ServiceEntry{},
352
clienttest.NewWriter[*clientnetworkingv1alpha3.ServiceEntry](t, fake).Create(obj)
353
// canary revision should receive only global objects and objects with the canary revision
354
if selectedLabels == nil || reflect.DeepEqual(selectedLabels, labels[1]) {
355
expectedCanary = append(expectedCanary, TranslateObject(obj, gvk.ServiceEntry, ""))
357
// prod revision should receive only global objects and objects with the prod revision
358
if selectedLabels == nil || reflect.DeepEqual(selectedLabels, labels[2]) {
359
expectedProd = append(expectedProd, TranslateObject(obj, gvk.ServiceEntry, ""))
361
// no revision should receive all objects
362
expectedNoRevision = append(expectedNoRevision, TranslateObject(obj, gvk.ServiceEntry, ""))
365
storeCases := map[string][]config.Config{
366
"": expectedNoRevision, // No revision specified, should receive all events.
367
"canary": expectedCanary, // Only SEs from the canary revision should be received.
368
"prod": expectedProd, // Only SEs from the prod revision should be received.
370
for rev, expected := range storeCases {
371
store := New(fake, Option{
375
var cfgsAdded []config.Config
376
store.RegisterEventHandler(
378
func(old config.Config, curr config.Config, event model.Event) {
379
if event != model.EventAdd {
380
t.Fatalf("unexpected event: %v", event)
382
cfgsAdded = append(cfgsAdded, curr)
386
stop := test.NewStop(t)
387
fake.RunAndWait(stop)
390
kube.WaitForCacheSync("test", stop, store.HasSynced)
392
// The order of the events doesn't matter, so sort the two slices so the ordering is consistent
393
sortFunc := func(a config.Config) string {
396
slices.SortBy(cfgsAdded, sortFunc)
397
slices.SortBy(expected, sortFunc)
399
assert.Equal(t, expected, cfgsAdded)
403
func TestClientSync(t *testing.T) {
404
obj := &clientnetworkingv1alpha3.ServiceEntry{
405
ObjectMeta: metav1.ObjectMeta{
406
Name: "test-service-entry",
409
Spec: v1alpha3.ServiceEntry{},
411
fake := kube.NewFakeClient()
412
clienttest.NewWriter[*clientnetworkingv1alpha3.ServiceEntry](t, fake).Create(obj)
413
for _, s := range collections.Pilot.All() {
414
clienttest.MakeCRD(t, fake, s.GroupVersionResource())
416
stop := test.NewStop(t)
417
c := New(fake, Option{})
419
events := atomic.NewInt64(0)
420
c.RegisterEventHandler(gvk.ServiceEntry, func(c config.Config, c2 config.Config, event model.Event) {
424
fake.RunAndWait(stop)
425
kube.WaitForCacheSync("test", stop, c.HasSynced)
426
// This MUST have been called by the time HasSynced returns true
427
assert.Equal(t, events.Load(), 1)