istio

Форк
0
1323 строки · 42.5 Кб
1
/*
2
Copyright 2015 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
// nolint
18
package k8sleaderelection
19

20
import (
21
	"context"
22
	"encoding/json"
23
	"fmt"
24
	"sync"
25
	"testing"
26
	"time"
27

28
	coordinationv1 "k8s.io/api/coordination/v1"
29
	corev1 "k8s.io/api/core/v1"
30
	"k8s.io/apimachinery/pkg/api/equality"
31
	"k8s.io/apimachinery/pkg/api/errors"
32
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
	"k8s.io/apimachinery/pkg/runtime"
34
	"k8s.io/apimachinery/pkg/util/diff"
35
	"k8s.io/client-go/kubernetes/fake"
36
	fakeclient "k8s.io/client-go/testing"
37
	"k8s.io/client-go/tools/record"
38
	"k8s.io/utils/clock"
39

40
	rl "istio.io/istio/pilot/pkg/leaderelection/k8sleaderelection/k8sresourcelock"
41
)
42

43
func createLockObject(t *testing.T, objectType, namespace, name string, record *rl.LeaderElectionRecord) (obj runtime.Object) {
44
	objectMeta := metav1.ObjectMeta{
45
		Namespace: namespace,
46
		Name:      name,
47
	}
48
	if record != nil {
49
		recordBytes, _ := json.Marshal(record)
50
		objectMeta.Annotations = map[string]string{
51
			rl.LeaderElectionRecordAnnotationKey: string(recordBytes),
52
		}
53
	}
54
	switch objectType {
55
	case "endpoints":
56
		obj = &corev1.Endpoints{ObjectMeta: objectMeta}
57
	case "configmaps":
58
		obj = &corev1.ConfigMap{ObjectMeta: objectMeta}
59
	case "leases":
60
		var spec coordinationv1.LeaseSpec
61
		if record != nil {
62
			spec = rl.LeaderElectionRecordToLeaseSpec(record)
63
		}
64
		obj = &coordinationv1.Lease{ObjectMeta: objectMeta, Spec: spec}
65
	default:
66
		t.Fatal("unexpected objType:" + objectType)
67
	}
68
	return
69
}
70

71
// Will test leader election using endpoints as the resource
72
func TestTryAcquireOrRenewEndpoints(t *testing.T) {
73
	testTryAcquireOrRenew(t, "endpoints")
74
}
75

76
type Reactor struct {
77
	verb       string
78
	objectType string
79
	reaction   fakeclient.ReactionFunc
80
}
81

82
func testTryAcquireOrRenew(t *testing.T, objectType string) {
83
	future := time.Now().Add(1000 * time.Hour)
84
	past := time.Now().Add(-1000 * time.Hour)
85

86
	tests := []struct {
87
		name           string
88
		observedRecord rl.LeaderElectionRecord
89
		observedTime   time.Time
90
		reactors       []Reactor
91

92
		key               string
93
		keyComparisonFunc KeyComparisonFunc
94

95
		expectSuccess    bool
96
		transitionLeader bool
97
		outHolder        string
98
	}{
99
		{
100
			name: "acquire from no object",
101
			reactors: []Reactor{
102
				{
103
					verb: "get",
104
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
105
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
106
					},
107
				},
108
				{
109
					verb: "create",
110
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
111
						return true, action.(fakeclient.CreateAction).GetObject(), nil
112
					},
113
				},
114
			},
115
			expectSuccess: true,
116
			outHolder:     "baz",
117
		},
118
		{
119
			name: "acquire from object without annotations",
120
			reactors: []Reactor{
121
				{
122
					verb: "get",
123
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
124
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), nil), nil
125
					},
126
				},
127
				{
128
					verb: "update",
129
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
130
						return true, action.(fakeclient.CreateAction).GetObject(), nil
131
					},
132
				},
133
			},
134
			expectSuccess:    true,
135
			transitionLeader: true,
136
			outHolder:        "baz",
137
		},
138
		{
139
			name: "acquire from unled object",
140
			reactors: []Reactor{
141
				{
142
					verb: "get",
143
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
144
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
145
					},
146
				},
147
				{
148
					verb: "update",
149
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
150
						return true, action.(fakeclient.CreateAction).GetObject(), nil
151
					},
152
				},
153
			},
154

155
			expectSuccess:    true,
156
			transitionLeader: true,
157
			outHolder:        "baz",
158
		},
159
		{
160
			name: "acquire from led, unacked object",
161
			reactors: []Reactor{
162
				{
163
					verb: "get",
164
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
165
						// nolint: lll
166
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
167
					},
168
				},
169
				{
170
					verb: "update",
171
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
172
						return true, action.(fakeclient.CreateAction).GetObject(), nil
173
					},
174
				},
175
			},
176
			observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
177
			observedTime:   past,
178

179
			expectSuccess:    true,
180
			transitionLeader: true,
181
			outHolder:        "baz",
182
		},
183
		{
184
			name: "acquire from empty led, acked object",
185
			reactors: []Reactor{
186
				{
187
					verb: "get",
188
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
189
						// nolint: lll
190
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: ""}), nil
191
					},
192
				},
193
				{
194
					verb: "update",
195
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
196
						return true, action.(fakeclient.CreateAction).GetObject(), nil
197
					},
198
				},
199
			},
200
			observedTime: future,
201

202
			expectSuccess:    true,
203
			transitionLeader: true,
204
			outHolder:        "baz",
205
		},
206
		{
207
			name: "don't acquire from led, acked object",
208
			reactors: []Reactor{
209
				{
210
					verb: "get",
211
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
212
						// nolint: lll
213
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
214
					},
215
				},
216
			},
217
			observedTime: future,
218

219
			expectSuccess: false,
220
			outHolder:     "bing",
221
		},
222
		{
223
			name: "don't acquire from led, acked object with key",
224
			reactors: []Reactor{
225
				{
226
					verb: "get",
227
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
228
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(),
229
							&rl.LeaderElectionRecord{HolderIdentity: "bing", HolderKey: "a"}), nil
230
					},
231
				},
232
			},
233
			observedTime: future,
234

235
			expectSuccess: false,
236
			outHolder:     "bing",
237
		},
238
		// Uncomment when https://github.com/kubernetes/kubernetes/pull/103442/files#r715818684 is resolved.
239
		//{
240
		//	name: "don't acquire from led, acked object with key when our key is smaller",
241
		//	reactors: []Reactor{
242
		//		{
243
		//			verb: "get",
244
		//			reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
245
		//				return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(),
246
		//					&rl.LeaderElectionRecord{HolderIdentity: "bing", HolderKey: "zzzz"}), nil
247
		//			},
248
		//		},
249
		//	},
250
		//	observedTime: future,
251
		//
252
		//	key: "aaa",
253
		//	keyComparisonFunc: func(existingKey string) bool {
254
		//		return "aaa" > existingKey
255
		//	},
256
		//
257
		//	expectSuccess: false,
258
		//	outHolder:     "bing",
259
		//},
260
		{
261
			name: "steal from led object with key when our key is larger",
262
			reactors: []Reactor{
263
				{
264
					verb: "get",
265
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
266
						// nolint: lll
267
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(),
268
							&rl.LeaderElectionRecord{HolderIdentity: "bing", HolderKey: "aaa"}), nil
269
					},
270
				},
271
				{
272
					verb: "update",
273
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
274
						return true, action.(fakeclient.CreateAction).GetObject(), nil
275
					},
276
				},
277
			},
278
			observedTime: future,
279

280
			key: "zzz",
281
			keyComparisonFunc: func(existingKey string) bool {
282
				return "zzz" > existingKey
283
			},
284

285
			transitionLeader: true,
286
			expectSuccess:    true,
287
			outHolder:        "baz",
288
		},
289
		{
290
			name: "handle led object with no key",
291
			reactors: []Reactor{
292
				{
293
					verb: "get",
294
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
295
						// nolint: lll
296
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(),
297
							&rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
298
					},
299
				},
300
			},
301
			observedTime: future,
302

303
			key: "zzz",
304
			keyComparisonFunc: func(existingKey string) bool {
305
				return existingKey != "" && "zzz" > existingKey
306
			},
307

308
			expectSuccess: false,
309
			outHolder:     "bing",
310
		},
311
		{
312
			name: "renew already acquired object",
313
			reactors: []Reactor{
314
				{
315
					verb: "get",
316
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
317
						// nolint: lll
318
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
319
					},
320
				},
321
				{
322
					verb: "update",
323
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
324
						return true, action.(fakeclient.CreateAction).GetObject(), nil
325
					},
326
				},
327
			},
328
			observedTime:   future,
329
			observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
330

331
			expectSuccess: true,
332
			outHolder:     "baz",
333
		},
334
	}
335

336
	for i := range tests {
337
		test := &tests[i]
338
		t.Run(test.name, func(t *testing.T) {
339
			// OnNewLeader is called async so we have to wait for it.
340
			var wg sync.WaitGroup
341
			wg.Add(1)
342
			var reportedLeader string
343
			var lock rl.Interface
344

345
			objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
346
			resourceLockConfig := rl.ResourceLockConfig{
347
				Identity:      "baz",
348
				Key:           test.key,
349
				EventRecorder: &record.FakeRecorder{},
350
			}
351
			c := &fake.Clientset{}
352
			for _, reactor := range test.reactors {
353
				c.AddReactor(reactor.verb, objectType, reactor.reaction)
354
			}
355
			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
356
				t.Errorf("unreachable action. testclient called too many times: %+v", action)
357
				return true, nil, fmt.Errorf("unreachable action")
358
			})
359

360
			switch objectType {
361
			case "endpoints":
362
				lock = &rl.EndpointsLock{
363
					EndpointsMeta: objectMeta,
364
					LockConfig:    resourceLockConfig,
365
					Client:        c.CoreV1(),
366
				}
367
			case "configmaps":
368
				lock = &rl.ConfigMapLock{
369
					ConfigMapMeta: objectMeta,
370
					LockConfig:    resourceLockConfig,
371
					Client:        c.CoreV1(),
372
				}
373
			case "leases":
374
				lock = &rl.LeaseLock{
375
					LeaseMeta:  objectMeta,
376
					LockConfig: resourceLockConfig,
377
					Client:     c.CoordinationV1(),
378
				}
379
			}
380

381
			lec := LeaderElectionConfig{
382
				Lock:          lock,
383
				KeyComparison: test.keyComparisonFunc,
384
				LeaseDuration: 10 * time.Second,
385
				Callbacks: LeaderCallbacks{
386
					OnNewLeader: func(l string) {
387
						defer wg.Done()
388
						reportedLeader = l
389
					},
390
				},
391
			}
392
			observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
393
			le := &LeaderElector{
394
				config:            lec,
395
				observedRecord:    test.observedRecord,
396
				observedRawRecord: observedRawRecord,
397
				observedTime:      test.observedTime,
398
				clock:             clock.RealClock{},
399
			}
400
			if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
401
				t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
402
			}
403

404
			le.observedRecord.AcquireTime = metav1.Time{}
405
			le.observedRecord.RenewTime = metav1.Time{}
406
			if le.observedRecord.HolderIdentity != test.outHolder {
407
				t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
408
			}
409
			if len(test.reactors) != len(c.Actions()) {
410
				t.Errorf("wrong number of api interactions")
411
			}
412
			if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
413
				t.Errorf("leader should have transitioned but did not")
414
			}
415
			if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
416
				t.Errorf("leader should not have transitioned but did")
417
			}
418

419
			le.maybeReportTransition()
420
			wg.Wait()
421
			if reportedLeader != test.outHolder {
422
				t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
423
			}
424
		})
425
	}
426
}
427

428
// Will test leader election using configmap as the resource
429
func TestTryAcquireOrRenewConfigMaps(t *testing.T) {
430
	testTryAcquireOrRenew(t, "configmaps")
431
}
432

433
// Will test leader election using lease as the resource
434
func TestTryAcquireOrRenewLeases(t *testing.T) {
435
	testTryAcquireOrRenew(t, "leases")
436
}
437

438
func TestLeaseSpecToLeaderElectionRecordRoundTrip(t *testing.T) {
439
	holderIdentity := "foo"
440
	leaseDurationSeconds := int32(10)
441
	leaseTransitions := int32(1)
442
	oldSpec := coordinationv1.LeaseSpec{
443
		HolderIdentity:       &holderIdentity,
444
		LeaseDurationSeconds: &leaseDurationSeconds,
445
		AcquireTime:          &metav1.MicroTime{Time: time.Now()},
446
		RenewTime:            &metav1.MicroTime{Time: time.Now()},
447
		LeaseTransitions:     &leaseTransitions,
448
	}
449

450
	oldRecord := rl.LeaseSpecToLeaderElectionRecord(&oldSpec)
451
	newSpec := rl.LeaderElectionRecordToLeaseSpec(oldRecord)
452

453
	if !equality.Semantic.DeepEqual(oldSpec, newSpec) {
454
		t.Errorf("diff: %v", diff.ObjectReflectDiff(oldSpec, newSpec))
455
	}
456

457
	newRecord := rl.LeaseSpecToLeaderElectionRecord(&newSpec)
458

459
	if !equality.Semantic.DeepEqual(oldRecord, newRecord) {
460
		t.Errorf("diff: %v", diff.ObjectReflectDiff(oldRecord, newRecord))
461
	}
462
}
463

464
func multiLockType(t *testing.T, objectType string) (primaryType, secondaryType string) {
465
	switch objectType {
466
	case rl.EndpointsLeasesResourceLock:
467
		return rl.EndpointsResourceLock, rl.LeasesResourceLock
468
	case rl.ConfigMapsLeasesResourceLock:
469
		return rl.ConfigMapsResourceLock, rl.LeasesResourceLock
470
	default:
471
		t.Fatal("unexpected objType:" + objectType)
472
	}
473
	return
474
}
475

476
func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRecord) (ret []byte) {
477
	var err error
478
	switch objectType {
479
	case "endpoints", "configmaps", "leases":
480
		ret, err = json.Marshal(ler)
481
		if err != nil {
482
			t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
483
		}
484
	case "endpointsleases", "configmapsleases":
485
		recordBytes, err := json.Marshal(ler)
486
		if err != nil {
487
			t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
488
		}
489
		ret = rl.ConcatRawRecord(recordBytes, recordBytes)
490
	default:
491
		t.Fatal("unexpected objType:" + objectType)
492
	}
493
	return
494
}
495

496
func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
497
	future := time.Now().Add(1000 * time.Hour)
498
	past := time.Now().Add(-1000 * time.Hour)
499
	primaryType, secondaryType := multiLockType(t, objectType)
500
	tests := []struct {
501
		name              string
502
		observedRecord    rl.LeaderElectionRecord
503
		observedRawRecord []byte
504
		observedTime      time.Time
505
		reactors          []Reactor
506

507
		expectSuccess    bool
508
		transitionLeader bool
509
		outHolder        string
510
	}{
511
		{
512
			name: "acquire from no object",
513
			reactors: []Reactor{
514
				{
515
					verb:       "get",
516
					objectType: primaryType,
517
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
518
						// nolint: lll
519
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
520
					},
521
				},
522
				{
523
					verb:       "create",
524
					objectType: primaryType,
525
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
526
						return true, action.(fakeclient.CreateAction).GetObject(), nil
527
					},
528
				},
529
				{
530
					verb:       "create",
531
					objectType: secondaryType,
532
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
533
						return true, action.(fakeclient.CreateAction).GetObject(), nil
534
					},
535
				},
536
			},
537
			expectSuccess: true,
538
			outHolder:     "baz",
539
		},
540
		{
541
			name: "acquire from unled old object",
542
			reactors: []Reactor{
543
				{
544
					verb:       "get",
545
					objectType: primaryType,
546
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
547
						// nolint: lll
548
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
549
					},
550
				},
551
				{
552
					verb:       "get",
553
					objectType: secondaryType,
554
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
555
						// nolint: lll
556
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
557
					},
558
				},
559
				{
560
					verb:       "update",
561
					objectType: primaryType,
562
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
563
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
564
					},
565
				},
566
				{
567
					verb:       "get",
568
					objectType: secondaryType,
569
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
570
						// nolint: lll
571
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
572
					},
573
				},
574
				{
575
					verb:       "create",
576
					objectType: secondaryType,
577
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
578
						return true, action.(fakeclient.CreateAction).GetObject(), nil
579
					},
580
				},
581
			},
582
			expectSuccess:    true,
583
			transitionLeader: true,
584
			outHolder:        "baz",
585
		},
586
		{
587
			name: "acquire from unled transition object",
588
			reactors: []Reactor{
589
				{
590
					verb:       "get",
591
					objectType: primaryType,
592
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
593
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
594
					},
595
				},
596
				{
597
					verb:       "get",
598
					objectType: secondaryType,
599
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
600
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
601
					},
602
				},
603
				{
604
					verb:       "update",
605
					objectType: primaryType,
606
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
607
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
608
					},
609
				},
610
				{
611
					verb:       "get",
612
					objectType: secondaryType,
613
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
614
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
615
					},
616
				},
617
				{
618
					verb:       "update",
619
					objectType: secondaryType,
620
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
621
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
622
					},
623
				},
624
			},
625
			expectSuccess:    true,
626
			transitionLeader: true,
627
			outHolder:        "baz",
628
		},
629
		{
630
			name: "acquire from led, unack old object",
631
			reactors: []Reactor{
632
				{
633
					verb:       "get",
634
					objectType: primaryType,
635
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
636
						// nolint: lll
637
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
638
					},
639
				},
640
				{
641
					verb:       "get",
642
					objectType: secondaryType,
643
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
644
						// nolint: lll
645
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
646
					},
647
				},
648
				{
649
					verb:       "update",
650
					objectType: primaryType,
651
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
652
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
653
					},
654
				},
655
				{
656
					verb:       "get",
657
					objectType: secondaryType,
658
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
659
						// nolint: lll
660
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
661
					},
662
				},
663
				{
664
					verb:       "create",
665
					objectType: secondaryType,
666
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
667
						return true, action.(fakeclient.CreateAction).GetObject(), nil
668
					},
669
				},
670
			},
671
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
672
			observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
673
			observedTime:      past,
674

675
			expectSuccess:    true,
676
			transitionLeader: true,
677
			outHolder:        "baz",
678
		},
679
		{
680
			name: "acquire from led, unack transition object",
681
			reactors: []Reactor{
682
				{
683
					verb:       "get",
684
					objectType: primaryType,
685
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
686
						// nolint: lll
687
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
688
					},
689
				},
690
				{
691
					verb:       "get",
692
					objectType: secondaryType,
693
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
694
						// nolint: lll
695
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
696
					},
697
				},
698
				{
699
					verb:       "update",
700
					objectType: primaryType,
701
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
702
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
703
					},
704
				},
705
				{
706
					verb:       "get",
707
					objectType: secondaryType,
708
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
709
						// nolint: lll
710
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
711
					},
712
				},
713
				{
714
					verb:       "update",
715
					objectType: secondaryType,
716
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
717
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
718
					},
719
				},
720
			},
721
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
722
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
723
			observedTime:      past,
724

725
			expectSuccess:    true,
726
			transitionLeader: true,
727
			outHolder:        "baz",
728
		},
729
		{
730
			name: "acquire from conflict led, ack transition object",
731
			reactors: []Reactor{
732
				{
733
					verb:       "get",
734
					objectType: primaryType,
735
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
736
						// nolint: lll
737
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
738
					},
739
				},
740
				{
741
					verb:       "get",
742
					objectType: secondaryType,
743
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
744
						// nolint: lll
745
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
746
					},
747
				},
748
			},
749
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
750
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
751
			observedTime:      future,
752

753
			expectSuccess: false,
754
			outHolder:     rl.UnknownLeader,
755
		},
756
		{
757
			name: "acquire from led, unack unknown object",
758
			reactors: []Reactor{
759
				{
760
					verb:       "get",
761
					objectType: primaryType,
762
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
763
						// nolint: lll
764
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
765
					},
766
				},
767
				{
768
					verb:       "get",
769
					objectType: secondaryType,
770
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
771
						// nolint: lll
772
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
773
					},
774
				},
775
				{
776
					verb:       "update",
777
					objectType: primaryType,
778
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
779
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
780
					},
781
				},
782
				{
783
					verb:       "get",
784
					objectType: secondaryType,
785
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
786
						// nolint: lll
787
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
788
					},
789
				},
790
				{
791
					verb:       "update",
792
					objectType: secondaryType,
793
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
794
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
795
					},
796
				},
797
			},
798
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader},
799
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}),
800
			observedTime:      past,
801

802
			expectSuccess:    true,
803
			transitionLeader: true,
804
			outHolder:        "baz",
805
		},
806
		{
807
			name: "don't acquire from led, ack old object",
808
			reactors: []Reactor{
809
				{
810
					verb:       "get",
811
					objectType: primaryType,
812
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
813
						// nolint: lll
814
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
815
					},
816
				},
817
				{
818
					verb:       "get",
819
					objectType: secondaryType,
820
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
821
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
822
					},
823
				},
824
			},
825
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
826
			observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
827
			observedTime:      future,
828

829
			expectSuccess: false,
830
			outHolder:     "bing",
831
		},
832
		{
833
			name: "don't acquire from led, acked new object, observe new record",
834
			reactors: []Reactor{
835
				{
836
					verb:       "get",
837
					objectType: primaryType,
838
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
839
						// nolint: lll
840
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
841
					},
842
				},
843
				{
844
					verb:       "get",
845
					objectType: secondaryType,
846
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
847
						// nolint: lll
848
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
849
					},
850
				},
851
			},
852
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
853
			observedRawRecord: GetRawRecordOrDie(t, secondaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
854
			observedTime:      future,
855

856
			expectSuccess: false,
857
			outHolder:     rl.UnknownLeader,
858
		},
859
		{
860
			name: "don't acquire from led, acked new object, observe transition record",
861
			reactors: []Reactor{
862
				{
863
					verb:       "get",
864
					objectType: primaryType,
865
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
866
						// nolint: lll
867
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
868
					},
869
				},
870
				{
871
					verb:       "get",
872
					objectType: secondaryType,
873
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
874
						// nolint: lll
875
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
876
					},
877
				},
878
			},
879
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
880
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
881
			observedTime:      future,
882

883
			expectSuccess: false,
884
			outHolder:     "bing",
885
		},
886
		{
887
			name: "renew already required object",
888
			reactors: []Reactor{
889
				{
890
					verb:       "get",
891
					objectType: primaryType,
892
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
893
						// nolint: lll
894
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
895
					},
896
				},
897
				{
898
					verb:       "get",
899
					objectType: secondaryType,
900
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
901
						// nolint: lll
902
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
903
					},
904
				},
905
				{
906
					verb:       "update",
907
					objectType: primaryType,
908
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
909
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
910
					},
911
				},
912
				{
913
					verb:       "get",
914
					objectType: secondaryType,
915
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
916
						// nolint: lll
917
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
918
					},
919
				},
920
				{
921
					verb:       "update",
922
					objectType: secondaryType,
923
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
924
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
925
					},
926
				},
927
			},
928
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "baz"},
929
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "baz"}),
930
			observedTime:      future,
931

932
			expectSuccess: true,
933
			outHolder:     "baz",
934
		},
935
	}
936

937
	for i := range tests {
938
		test := &tests[i]
939
		t.Run(test.name, func(t *testing.T) {
940
			// OnNewLeader is called async so we have to wait for it.
941
			var wg sync.WaitGroup
942
			wg.Add(1)
943
			var reportedLeader string
944
			var lock rl.Interface
945

946
			objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
947
			resourceLockConfig := rl.ResourceLockConfig{
948
				Identity:      "baz",
949
				EventRecorder: &record.FakeRecorder{},
950
			}
951
			c := &fake.Clientset{}
952
			for _, reactor := range test.reactors {
953
				c.AddReactor(reactor.verb, reactor.objectType, reactor.reaction)
954
			}
955
			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
956
				t.Errorf("unreachable action. testclient called too many times: %+v", action)
957
				return true, nil, fmt.Errorf("unreachable action")
958
			})
959

960
			switch objectType {
961
			case rl.EndpointsLeasesResourceLock:
962
				lock = &rl.MultiLock{
963
					Primary: &rl.EndpointsLock{
964
						EndpointsMeta: objectMeta,
965
						LockConfig:    resourceLockConfig,
966
						Client:        c.CoreV1(),
967
					},
968
					Secondary: &rl.LeaseLock{
969
						LeaseMeta:  objectMeta,
970
						LockConfig: resourceLockConfig,
971
						Client:     c.CoordinationV1(),
972
					},
973
				}
974
			case rl.ConfigMapsLeasesResourceLock:
975
				lock = &rl.MultiLock{
976
					Primary: &rl.ConfigMapLock{
977
						ConfigMapMeta: objectMeta,
978
						LockConfig:    resourceLockConfig,
979
						Client:        c.CoreV1(),
980
					},
981
					Secondary: &rl.LeaseLock{
982
						LeaseMeta:  objectMeta,
983
						LockConfig: resourceLockConfig,
984
						Client:     c.CoordinationV1(),
985
					},
986
				}
987
			}
988

989
			lec := LeaderElectionConfig{
990
				Lock:          lock,
991
				LeaseDuration: 10 * time.Second,
992
				Callbacks: LeaderCallbacks{
993
					OnNewLeader: func(l string) {
994
						defer wg.Done()
995
						reportedLeader = l
996
					},
997
				},
998
			}
999
			le := &LeaderElector{
1000
				config:            lec,
1001
				observedRecord:    test.observedRecord,
1002
				observedRawRecord: test.observedRawRecord,
1003
				observedTime:      test.observedTime,
1004
				clock:             clock.RealClock{},
1005
			}
1006
			if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
1007
				t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
1008
			}
1009

1010
			le.observedRecord.AcquireTime = metav1.Time{}
1011
			le.observedRecord.RenewTime = metav1.Time{}
1012
			if le.observedRecord.HolderIdentity != test.outHolder {
1013
				t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
1014
			}
1015
			if len(test.reactors) != len(c.Actions()) {
1016
				t.Errorf("wrong number of api interactions")
1017
			}
1018
			if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
1019
				t.Errorf("leader should have transitioned but did not")
1020
			}
1021
			if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
1022
				t.Errorf("leader should not have transitioned but did")
1023
			}
1024

1025
			le.maybeReportTransition()
1026
			wg.Wait()
1027
			if reportedLeader != test.outHolder {
1028
				t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
1029
			}
1030
		})
1031
	}
1032
}
1033

1034
// Will test leader election using endpointsleases as the resource
1035
func TestTryAcquireOrRenewEndpointsLeases(t *testing.T) {
1036
	testTryAcquireOrRenewMultiLock(t, "endpointsleases")
1037
}
1038

1039
// Will test leader election using configmapsleases as the resource
1040
func TestTryAcquireOrRenewConfigMapsLeases(t *testing.T) {
1041
	testTryAcquireOrRenewMultiLock(t, "configmapsleases")
1042
}
1043

1044
func testReleaseLease(t *testing.T, objectType string) {
1045
	tests := []struct {
1046
		name           string
1047
		observedRecord rl.LeaderElectionRecord
1048
		observedTime   time.Time
1049
		reactors       []Reactor
1050

1051
		expectSuccess    bool
1052
		transitionLeader bool
1053
		outHolder        string
1054
	}{
1055
		{
1056
			name: "release acquired lock from no object",
1057
			reactors: []Reactor{
1058
				{
1059
					verb:       "get",
1060
					objectType: objectType,
1061
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
1062
						// nolint: lll
1063
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
1064
					},
1065
				},
1066
				{
1067
					verb:       "create",
1068
					objectType: objectType,
1069
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
1070
						return true, action.(fakeclient.CreateAction).GetObject(), nil
1071
					},
1072
				},
1073
				{
1074
					verb:       "update",
1075
					objectType: objectType,
1076
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
1077
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
1078
					},
1079
				},
1080
			},
1081
			expectSuccess: true,
1082
			outHolder:     "",
1083
		},
1084
	}
1085

1086
	for i := range tests {
1087
		test := &tests[i]
1088
		t.Run(test.name, func(t *testing.T) {
1089
			// OnNewLeader is called async so we have to wait for it.
1090
			var wg sync.WaitGroup
1091
			wg.Add(1)
1092
			var reportedLeader string
1093
			var lock rl.Interface
1094

1095
			objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
1096
			resourceLockConfig := rl.ResourceLockConfig{
1097
				Identity:      "baz",
1098
				EventRecorder: &record.FakeRecorder{},
1099
			}
1100
			c := &fake.Clientset{}
1101
			for _, reactor := range test.reactors {
1102
				c.AddReactor(reactor.verb, objectType, reactor.reaction)
1103
			}
1104
			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
1105
				t.Errorf("unreachable action. testclient called too many times: %+v", action)
1106
				return true, nil, fmt.Errorf("unreachable action")
1107
			})
1108

1109
			switch objectType {
1110
			case "endpoints":
1111
				lock = &rl.EndpointsLock{
1112
					EndpointsMeta: objectMeta,
1113
					LockConfig:    resourceLockConfig,
1114
					Client:        c.CoreV1(),
1115
				}
1116
			case "configmaps":
1117
				lock = &rl.ConfigMapLock{
1118
					ConfigMapMeta: objectMeta,
1119
					LockConfig:    resourceLockConfig,
1120
					Client:        c.CoreV1(),
1121
				}
1122
			case "leases":
1123
				lock = &rl.LeaseLock{
1124
					LeaseMeta:  objectMeta,
1125
					LockConfig: resourceLockConfig,
1126
					Client:     c.CoordinationV1(),
1127
				}
1128
			}
1129

1130
			lec := LeaderElectionConfig{
1131
				Lock:          lock,
1132
				LeaseDuration: 10 * time.Second,
1133
				Callbacks: LeaderCallbacks{
1134
					OnNewLeader: func(l string) {
1135
						defer wg.Done()
1136
						reportedLeader = l
1137
					},
1138
				},
1139
			}
1140
			observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
1141
			le := &LeaderElector{
1142
				config:            lec,
1143
				observedRecord:    test.observedRecord,
1144
				observedRawRecord: observedRawRecord,
1145
				observedTime:      test.observedTime,
1146
				clock:             clock.RealClock{},
1147
			}
1148
			if !le.tryAcquireOrRenew(context.Background()) {
1149
				t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
1150
			}
1151

1152
			le.maybeReportTransition()
1153

1154
			// Wait for a response to the leader transition, and add 1 so that we can track the final transition.
1155
			wg.Wait()
1156
			wg.Add(1)
1157

1158
			if test.expectSuccess != le.release() {
1159
				t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess)
1160
			}
1161

1162
			le.observedRecord.AcquireTime = metav1.Time{}
1163
			le.observedRecord.RenewTime = metav1.Time{}
1164
			if le.observedRecord.HolderIdentity != test.outHolder {
1165
				t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
1166
			}
1167
			if len(test.reactors) != len(c.Actions()) {
1168
				t.Errorf("wrong number of api interactions")
1169
			}
1170
			if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
1171
				t.Errorf("leader should have transitioned but did not")
1172
			}
1173
			if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
1174
				t.Errorf("leader should not have transitioned but did")
1175
			}
1176
			le.maybeReportTransition()
1177
			wg.Wait()
1178
			if reportedLeader != test.outHolder {
1179
				t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
1180
			}
1181
		})
1182
	}
1183
}
1184

1185
// Will test leader election using endpoints as the resource
1186
func TestReleaseLeaseEndpoints(t *testing.T) {
1187
	testReleaseLease(t, "endpoints")
1188
}
1189

1190
// Will test leader election using endpoints as the resource
1191
func TestReleaseLeaseConfigMaps(t *testing.T) {
1192
	testReleaseLease(t, "configmaps")
1193
}
1194

1195
// Will test leader election using endpoints as the resource
1196
func TestReleaseLeaseLeases(t *testing.T) {
1197
	testReleaseLease(t, "leases")
1198
}
1199

1200
func TestReleaseOnCancellation_Endpoints(t *testing.T) {
1201
	testReleaseOnCancellation(t, "endpoints")
1202
}
1203

1204
func TestReleaseOnCancellation_ConfigMaps(t *testing.T) {
1205
	testReleaseOnCancellation(t, "configmaps")
1206
}
1207

1208
func TestReleaseOnCancellation_Leases(t *testing.T) {
1209
	testReleaseOnCancellation(t, "leases")
1210
}
1211

1212
func testReleaseOnCancellation(t *testing.T, objectType string) {
1213
	var (
1214
		onNewLeader   = make(chan struct{})
1215
		onRenewCalled = make(chan struct{})
1216
		onRenewResume = make(chan struct{})
1217
		onRelease     = make(chan struct{})
1218

1219
		lockObj runtime.Object
1220
		updates int
1221
	)
1222

1223
	resourceLockConfig := rl.ResourceLockConfig{
1224
		Identity:      "baz",
1225
		EventRecorder: &record.FakeRecorder{},
1226
	}
1227
	c := &fake.Clientset{}
1228

1229
	c.AddReactor("get", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
1230
		if lockObj != nil {
1231
			return true, lockObj, nil
1232
		}
1233
		return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
1234
	})
1235

1236
	// create lock
1237
	c.AddReactor("create", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
1238
		lockObj = action.(fakeclient.CreateAction).GetObject()
1239
		return true, lockObj, nil
1240
	})
1241

1242
	c.AddReactor("update", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
1243
		updates++
1244

1245
		// Second update (first renew) should return our canceled error
1246
		// FakeClient doesn't do anything with the context so we're doing this ourselves
1247
		if updates == 2 {
1248
			close(onRenewCalled)
1249
			<-onRenewResume
1250
			return true, nil, context.Canceled
1251
		} else if updates == 3 {
1252
			close(onRelease)
1253
		}
1254

1255
		lockObj = action.(fakeclient.UpdateAction).GetObject()
1256
		return true, lockObj, nil
1257
	})
1258

1259
	c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
1260
		t.Errorf("unreachable action. testclient called too many times: %+v", action)
1261
		return true, nil, fmt.Errorf("unreachable action")
1262
	})
1263

1264
	lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
1265
	if err != nil {
1266
		t.Fatal("resourcelock.New() = ", err)
1267
	}
1268

1269
	lec := LeaderElectionConfig{
1270
		Lock:          lock,
1271
		LeaseDuration: 40 * time.Millisecond,
1272
		RenewDeadline: 20 * time.Millisecond,
1273
		RetryPeriod:   10 * time.Millisecond,
1274

1275
		// This is what we're testing
1276
		ReleaseOnCancel: true,
1277

1278
		Callbacks: LeaderCallbacks{
1279
			OnNewLeader:      func(identity string) {},
1280
			OnStoppedLeading: func() {},
1281
			OnStartedLeading: func(context.Context) {
1282
				close(onNewLeader)
1283
			},
1284
		},
1285
	}
1286

1287
	elector, err := NewLeaderElector(lec)
1288
	if err != nil {
1289
		t.Fatal("Failed to create leader elector: ", err)
1290
	}
1291

1292
	ctx, cancel := context.WithCancel(context.Background())
1293

1294
	go elector.Run(ctx)
1295

1296
	// Wait for us to become the leader
1297
	select {
1298
	case <-onNewLeader:
1299
	case <-time.After(1 * time.Second):
1300
		t.Fatal("failed to become the leader")
1301
	}
1302

1303
	// Wait for renew (update) to be invoked
1304
	select {
1305
	case <-onRenewCalled:
1306
	case <-time.After(1 * time.Second):
1307
		t.Fatal("the elector failed to renew the lock")
1308
	}
1309

1310
	// Cancel the context - stopping the elector while
1311
	// it's running
1312
	cancel()
1313

1314
	// Resume the update call to return the cancellation
1315
	// which should trigger the release flow
1316
	close(onRenewResume)
1317

1318
	select {
1319
	case <-onRelease:
1320
	case <-time.After(1 * time.Second):
1321
		t.Fatal("the lock was not released")
1322
	}
1323
}
1324

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.