istio

Форк
0
/
leaderelection_test.go 
382 строки · 11.4 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package leaderelection
16

17
import (
18
	"context"
19
	"fmt"
20
	"testing"
21
	"time"
22

23
	"go.uber.org/atomic"
24
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
	"k8s.io/apimachinery/pkg/runtime"
26
	"k8s.io/client-go/kubernetes"
27
	"k8s.io/client-go/kubernetes/fake"
28
	k8stesting "k8s.io/client-go/testing"
29

30
	"istio.io/istio/pkg/revisions"
31
	"istio.io/istio/pkg/test/util/retry"
32
)
33

34
const testLock = "test-lock"
35

36
func createElection(t *testing.T,
37
	name string, revision string,
38
	watcher revisions.DefaultWatcher,
39
	expectLeader bool,
40
	client kubernetes.Interface, fns ...func(stop <-chan struct{}),
41
) (*LeaderElection, chan struct{}) {
42
	t.Helper()
43
	return createElectionMulticluster(t, name, revision, false, false, watcher, expectLeader, client, fns...)
44
}
45

46
func createPerRevisionElection(t *testing.T,
47
	name string, revision string,
48
	watcher revisions.DefaultWatcher,
49
	expectLeader bool,
50
	client kubernetes.Interface,
51
) (*LeaderElection, chan struct{}) {
52
	t.Helper()
53
	return createElectionMulticluster(t, name, revision, false, true, watcher, expectLeader, client)
54
}
55

56
func createElectionMulticluster(t *testing.T,
57
	name, revision string,
58
	remote, perRevision bool,
59
	watcher revisions.DefaultWatcher,
60
	expectLeader bool,
61
	client kubernetes.Interface, fns ...func(stop <-chan struct{}),
62
) (*LeaderElection, chan struct{}) {
63
	t.Helper()
64
	lockName := testLock
65
	if perRevision {
66
		lockName += "-" + revision
67
	}
68
	l := &LeaderElection{
69
		namespace:      "ns",
70
		name:           name,
71
		electionID:     lockName,
72
		client:         client,
73
		revision:       revision,
74
		remote:         remote,
75
		defaultWatcher: watcher,
76
		perRevision:    perRevision,
77
		ttl:            time.Second,
78
		cycle:          atomic.NewInt32(0),
79
		enabled:        true,
80
	}
81
	gotLeader := make(chan struct{})
82
	l.AddRunFunction(func(stop <-chan struct{}) {
83
		gotLeader <- struct{}{}
84
	})
85
	for _, fn := range fns {
86
		l.AddRunFunction(fn)
87
	}
88
	stop := make(chan struct{})
89
	go l.Run(stop)
90

91
	retry.UntilOrFail(t, func() bool {
92
		return l.isLeader() == expectLeader
93
	}, retry.Converge(5), retry.Delay(time.Millisecond*100), retry.Timeout(time.Second*10))
94
	return l, stop
95
}
96

97
type fakeDefaultWatcher struct {
98
	defaultRevision string
99
}
100

101
func (w *fakeDefaultWatcher) Run(stop <-chan struct{}) {
102
}
103

104
func (w *fakeDefaultWatcher) HasSynced() bool {
105
	return true
106
}
107

108
func (w *fakeDefaultWatcher) GetDefault() string {
109
	return w.defaultRevision
110
}
111

112
func (w *fakeDefaultWatcher) AddHandler(handler revisions.DefaultHandler) {
113
	panic("unimplemented")
114
}
115

116
func TestLeaderElection(t *testing.T) {
117
	client := fake.NewSimpleClientset()
118
	watcher := &fakeDefaultWatcher{}
119
	// First pod becomes the leader
120
	_, stop := createElection(t, "pod1", "", watcher, true, client)
121
	// A new pod is not the leader
122
	_, stop2 := createElection(t, "pod2", "", watcher, false, client)
123
	close(stop2)
124
	close(stop)
125
}
126

127
func TestPerRevisionElection(t *testing.T) {
128
	client := fake.NewSimpleClientset()
129
	watcher := &fakeDefaultWatcher{"foo"}
130
	// First pod becomes the leader
131
	_, stop := createPerRevisionElection(t, "pod1", "foo", watcher, true, client)
132
	// A new pod is not the leader
133
	_, stop2 := createPerRevisionElection(t, "pod2", "foo", watcher, false, client)
134
	close(stop2)
135
	close(stop)
136
	t.Log("drop")
137
	// After leader is lost, we can take over
138
	_, stop3 := createPerRevisionElection(t, "pod2", "foo", watcher, true, client)
139
	// Other revisions are independent
140
	_, stop4 := createPerRevisionElection(t, "pod4", "not-foo", watcher, true, client)
141
	close(stop3)
142
	close(stop4)
143
}
144

145
func TestPrioritizedLeaderElection(t *testing.T) {
146
	client := fake.NewSimpleClientset()
147
	watcher := &fakeDefaultWatcher{defaultRevision: "red"}
148

149
	// First pod, revision "green" becomes the leader, but is not the default revision
150
	_, stop := createElection(t, "pod1", "green", watcher, true, client)
151
	// Second pod, revision "red", steals the leader lock from "green" since it is the default revision
152
	_, stop2 := createElection(t, "pod2", "red", watcher, true, client)
153
	// Third pod with revision "red" comes in and cannot take the lock since another revision with "red" has it
154
	_, stop3 := createElection(t, "pod3", "red", watcher, false, client)
155
	// Fourth pod with revision "green" cannot take the lock since a revision with "red" has it.
156
	_, stop4 := createElection(t, "pod4", "green", watcher, false, client)
157
	close(stop2)
158
	close(stop3)
159
	close(stop4)
160
	// Now that revision "green" has stopped acting as leader, revision "red" should be able to claim lock.
161
	_, stop5 := createElection(t, "pod2", "red", watcher, true, client)
162
	close(stop5)
163
	close(stop)
164
	// Revision "green" can reclaim once "red" releases.
165
	_, stop6 := createElection(t, "pod4", "green", watcher, true, client)
166
	close(stop6)
167
}
168

169
func TestMulticlusterLeaderElection(t *testing.T) {
170
	client := fake.NewSimpleClientset()
171
	watcher := &fakeDefaultWatcher{}
172
	// First remote pod becomes the leader
173
	_, stop := createElectionMulticluster(t, "pod1", "", true, false, watcher, true, client)
174
	// A new local pod should become leader
175
	_, stop2 := createElectionMulticluster(t, "pod2", "", false, false, watcher, true, client)
176
	// A new remote pod cannot become leader
177
	_, stop3 := createElectionMulticluster(t, "pod3", "", true, false, watcher, false, client)
178
	close(stop3)
179
	close(stop2)
180
	close(stop)
181
}
182

183
func TestPrioritizedMulticlusterLeaderElection(t *testing.T) {
184
	client := fake.NewSimpleClientset()
185
	watcher := &fakeDefaultWatcher{defaultRevision: "red"}
186

187
	// First pod, revision "green" becomes the remote leader
188
	_, stop := createElectionMulticluster(t, "pod1", "green", true, false, watcher, true, client)
189
	// Second pod, revision "red", steals the leader lock from "green" since it is the default revision
190
	_, stop2 := createElectionMulticluster(t, "pod2", "red", true, false, watcher, true, client)
191
	// Third pod with revision "red" comes in and can take the lock since it is a local revision "red"
192
	_, stop3 := createElectionMulticluster(t, "pod3", "red", false, false, watcher, true, client)
193
	// Fourth pod with revision "red" cannot take the lock since it is remote
194
	_, stop4 := createElectionMulticluster(t, "pod4", "red", true, false, watcher, false, client)
195
	close(stop4)
196
	close(stop3)
197
	close(stop2)
198
	close(stop)
199
}
200

201
func SimpleRevisionComparison(currentLeaderRevision string, l *LeaderElection) bool {
202
	// Old key comparison impl for interoperablilty testing
203
	defaultRevision := l.defaultWatcher.GetDefault()
204
	return l.revision != currentLeaderRevision &&
205
		// empty default revision indicates that there is no default set
206
		defaultRevision != "" && defaultRevision == l.revision
207
}
208

209
type LeaderComparison func(string, *LeaderElection) bool
210

211
type instance struct {
212
	revision string
213
	remote   bool
214
	comp     string
215
}
216

217
func (i instance) GetComp() (LeaderComparison, string) {
218
	key := i.revision
219
	switch i.comp {
220
	case "location":
221
		if i.remote {
222
			key = remoteIstiodPrefix + key
223
		}
224
		return LocationPrioritizedComparison, key
225
	case "simple":
226
		return SimpleRevisionComparison, key
227
	default:
228
		panic("unknown comparison type")
229
	}
230
}
231

232
// TestPrioritizationCycles
233
func TestPrioritizationCycles(t *testing.T) {
234
	cases := []instance{}
235
	for _, rev := range []string{"", "default", "not-default"} {
236
		for _, loc := range []bool{false, true} {
237
			for _, comp := range []string{"location", "simple"} {
238
				cases = append(cases, instance{
239
					revision: rev,
240
					remote:   loc,
241
					comp:     comp,
242
				})
243
			}
244
		}
245
	}
246

247
	for _, start := range cases {
248
		t.Run(fmt.Sprint(start), func(t *testing.T) {
249
			checkCycles(t, start, cases, nil)
250
		})
251
	}
252
}
253

254
func alreadyHit(cur instance, chain []instance) bool {
255
	for _, cc := range chain {
256
		if cur == cc {
257
			return true
258
		}
259
	}
260
	return false
261
}
262

263
func checkCycles(t *testing.T, start instance, cases []instance, chain []instance) {
264
	if alreadyHit(start, chain) {
265
		t.Fatalf("cycle on leader election: cur %v, chain %v", start, chain)
266
	}
267
	for _, nextHop := range cases {
268
		next := LeaderElection{
269
			remote:         nextHop.remote,
270
			defaultWatcher: &fakeDefaultWatcher{defaultRevision: "default"},
271
			revision:       nextHop.revision,
272
		}
273
		cmpFunc, key := start.GetComp()
274
		if cmpFunc(key, &next) {
275
			nc := append([]instance{}, chain...)
276
			nc = append(nc, start)
277
			checkCycles(t, nextHop, cases, nc)
278
		}
279
	}
280
}
281

282
func TestLeaderElectionConfigMapRemoved(t *testing.T) {
283
	client := fake.NewSimpleClientset()
284
	watcher := &fakeDefaultWatcher{}
285
	_, stop := createElection(t, "pod1", "", watcher, true, client)
286
	if err := client.CoreV1().ConfigMaps("ns").Delete(context.TODO(), testLock, metav1.DeleteOptions{}); err != nil {
287
		t.Fatal(err)
288
	}
289
	retry.UntilSuccessOrFail(t, func() error {
290
		l, err := client.CoreV1().ConfigMaps("ns").List(context.TODO(), metav1.ListOptions{})
291
		if err != nil {
292
			return err
293
		}
294
		if len(l.Items) != 1 {
295
			return fmt.Errorf("got unexpected config map entry: %v", l.Items)
296
		}
297
		return nil
298
	})
299
	close(stop)
300
}
301

302
func TestLeaderElectionNoPermission(t *testing.T) {
303
	client := fake.NewSimpleClientset()
304
	watcher := &fakeDefaultWatcher{}
305
	allowRbac := atomic.NewBool(true)
306
	client.Fake.PrependReactor("update", "*", func(action k8stesting.Action) (bool, runtime.Object, error) {
307
		if allowRbac.Load() {
308
			return false, nil, nil
309
		}
310
		return true, nil, fmt.Errorf("nope, out of luck")
311
	})
312

313
	completions := atomic.NewInt32(0)
314
	l, stop := createElection(t, "pod1", "", watcher, true, client, func(stop <-chan struct{}) {
315
		completions.Add(1)
316
	})
317
	// Expect to run once
318
	expectInt(t, completions.Load, 1)
319

320
	// drop RBAC permissions to update the configmap
321
	// This simulates loosing an active lease
322
	allowRbac.Store(false)
323

324
	// We should start a new cycle at this point
325
	expectInt(t, l.cycle.Load, 2)
326

327
	// Add configmap permission back
328
	allowRbac.Store(true)
329

330
	// We should get the leader lock back
331
	expectInt(t, completions.Load, 2)
332

333
	close(stop)
334
}
335

336
func expectInt(t *testing.T, f func() int32, expected int32) {
337
	t.Helper()
338
	retry.UntilSuccessOrFail(t, func() error {
339
		got := f()
340
		if got != expected {
341
			return fmt.Errorf("unexpected count: %v, want %v", got, expected)
342
		}
343
		return nil
344
	}, retry.Timeout(time.Second))
345
}
346

347
func TestLeaderElectionDisabled(t *testing.T) {
348
	client := fake.NewSimpleClientset()
349
	watcher := &fakeDefaultWatcher{}
350
	// Prevent LeaderElection from creating a lease, so that the runFn only runs
351
	// if leader election is disabled.
352
	client.Fake.PrependReactor("*", "*", func(action k8stesting.Action) (bool, runtime.Object, error) {
353
		return true, nil, fmt.Errorf("nope, out of luck")
354
	})
355

356
	l := &LeaderElection{
357
		namespace:      "ns",
358
		name:           "disabled",
359
		enabled:        false,
360
		electionID:     testLock,
361
		client:         client,
362
		revision:       "",
363
		defaultWatcher: watcher,
364
		ttl:            time.Second,
365
		cycle:          atomic.NewInt32(0),
366
	}
367
	gotLeader := atomic.NewBool(false)
368
	l.AddRunFunction(func(stop <-chan struct{}) {
369
		gotLeader.Store(true)
370
	})
371
	stop := make(chan struct{})
372
	go l.Run(stop)
373
	t.Cleanup(func() {
374
		close(stop)
375
	})
376

377
	// Need to retry until Run() starts to execute in the goroutine.
378
	retry.UntilOrFail(t, gotLeader.Load, retry.Converge(5), retry.Delay(time.Millisecond*100), retry.Timeout(time.Second*10))
379
	if !l.isLeader() {
380
		t.Errorf("isLeader()=false, want true")
381
	}
382
}
383

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.