istio
382 строки · 11.4 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package leaderelection
16
17import (
18"context"
19"fmt"
20"testing"
21"time"
22
23"go.uber.org/atomic"
24metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25"k8s.io/apimachinery/pkg/runtime"
26"k8s.io/client-go/kubernetes"
27"k8s.io/client-go/kubernetes/fake"
28k8stesting "k8s.io/client-go/testing"
29
30"istio.io/istio/pkg/revisions"
31"istio.io/istio/pkg/test/util/retry"
32)
33
34const testLock = "test-lock"
35
36func createElection(t *testing.T,
37name string, revision string,
38watcher revisions.DefaultWatcher,
39expectLeader bool,
40client kubernetes.Interface, fns ...func(stop <-chan struct{}),
41) (*LeaderElection, chan struct{}) {
42t.Helper()
43return createElectionMulticluster(t, name, revision, false, false, watcher, expectLeader, client, fns...)
44}
45
46func createPerRevisionElection(t *testing.T,
47name string, revision string,
48watcher revisions.DefaultWatcher,
49expectLeader bool,
50client kubernetes.Interface,
51) (*LeaderElection, chan struct{}) {
52t.Helper()
53return createElectionMulticluster(t, name, revision, false, true, watcher, expectLeader, client)
54}
55
56func createElectionMulticluster(t *testing.T,
57name, revision string,
58remote, perRevision bool,
59watcher revisions.DefaultWatcher,
60expectLeader bool,
61client kubernetes.Interface, fns ...func(stop <-chan struct{}),
62) (*LeaderElection, chan struct{}) {
63t.Helper()
64lockName := testLock
65if perRevision {
66lockName += "-" + revision
67}
68l := &LeaderElection{
69namespace: "ns",
70name: name,
71electionID: lockName,
72client: client,
73revision: revision,
74remote: remote,
75defaultWatcher: watcher,
76perRevision: perRevision,
77ttl: time.Second,
78cycle: atomic.NewInt32(0),
79enabled: true,
80}
81gotLeader := make(chan struct{})
82l.AddRunFunction(func(stop <-chan struct{}) {
83gotLeader <- struct{}{}
84})
85for _, fn := range fns {
86l.AddRunFunction(fn)
87}
88stop := make(chan struct{})
89go l.Run(stop)
90
91retry.UntilOrFail(t, func() bool {
92return l.isLeader() == expectLeader
93}, retry.Converge(5), retry.Delay(time.Millisecond*100), retry.Timeout(time.Second*10))
94return l, stop
95}
96
97type fakeDefaultWatcher struct {
98defaultRevision string
99}
100
101func (w *fakeDefaultWatcher) Run(stop <-chan struct{}) {
102}
103
104func (w *fakeDefaultWatcher) HasSynced() bool {
105return true
106}
107
108func (w *fakeDefaultWatcher) GetDefault() string {
109return w.defaultRevision
110}
111
112func (w *fakeDefaultWatcher) AddHandler(handler revisions.DefaultHandler) {
113panic("unimplemented")
114}
115
116func TestLeaderElection(t *testing.T) {
117client := fake.NewSimpleClientset()
118watcher := &fakeDefaultWatcher{}
119// First pod becomes the leader
120_, stop := createElection(t, "pod1", "", watcher, true, client)
121// A new pod is not the leader
122_, stop2 := createElection(t, "pod2", "", watcher, false, client)
123close(stop2)
124close(stop)
125}
126
127func TestPerRevisionElection(t *testing.T) {
128client := fake.NewSimpleClientset()
129watcher := &fakeDefaultWatcher{"foo"}
130// First pod becomes the leader
131_, stop := createPerRevisionElection(t, "pod1", "foo", watcher, true, client)
132// A new pod is not the leader
133_, stop2 := createPerRevisionElection(t, "pod2", "foo", watcher, false, client)
134close(stop2)
135close(stop)
136t.Log("drop")
137// After leader is lost, we can take over
138_, stop3 := createPerRevisionElection(t, "pod2", "foo", watcher, true, client)
139// Other revisions are independent
140_, stop4 := createPerRevisionElection(t, "pod4", "not-foo", watcher, true, client)
141close(stop3)
142close(stop4)
143}
144
145func TestPrioritizedLeaderElection(t *testing.T) {
146client := fake.NewSimpleClientset()
147watcher := &fakeDefaultWatcher{defaultRevision: "red"}
148
149// First pod, revision "green" becomes the leader, but is not the default revision
150_, stop := createElection(t, "pod1", "green", watcher, true, client)
151// Second pod, revision "red", steals the leader lock from "green" since it is the default revision
152_, stop2 := createElection(t, "pod2", "red", watcher, true, client)
153// Third pod with revision "red" comes in and cannot take the lock since another revision with "red" has it
154_, stop3 := createElection(t, "pod3", "red", watcher, false, client)
155// Fourth pod with revision "green" cannot take the lock since a revision with "red" has it.
156_, stop4 := createElection(t, "pod4", "green", watcher, false, client)
157close(stop2)
158close(stop3)
159close(stop4)
160// Now that revision "green" has stopped acting as leader, revision "red" should be able to claim lock.
161_, stop5 := createElection(t, "pod2", "red", watcher, true, client)
162close(stop5)
163close(stop)
164// Revision "green" can reclaim once "red" releases.
165_, stop6 := createElection(t, "pod4", "green", watcher, true, client)
166close(stop6)
167}
168
169func TestMulticlusterLeaderElection(t *testing.T) {
170client := fake.NewSimpleClientset()
171watcher := &fakeDefaultWatcher{}
172// First remote pod becomes the leader
173_, stop := createElectionMulticluster(t, "pod1", "", true, false, watcher, true, client)
174// A new local pod should become leader
175_, stop2 := createElectionMulticluster(t, "pod2", "", false, false, watcher, true, client)
176// A new remote pod cannot become leader
177_, stop3 := createElectionMulticluster(t, "pod3", "", true, false, watcher, false, client)
178close(stop3)
179close(stop2)
180close(stop)
181}
182
183func TestPrioritizedMulticlusterLeaderElection(t *testing.T) {
184client := fake.NewSimpleClientset()
185watcher := &fakeDefaultWatcher{defaultRevision: "red"}
186
187// First pod, revision "green" becomes the remote leader
188_, stop := createElectionMulticluster(t, "pod1", "green", true, false, watcher, true, client)
189// Second pod, revision "red", steals the leader lock from "green" since it is the default revision
190_, stop2 := createElectionMulticluster(t, "pod2", "red", true, false, watcher, true, client)
191// Third pod with revision "red" comes in and can take the lock since it is a local revision "red"
192_, stop3 := createElectionMulticluster(t, "pod3", "red", false, false, watcher, true, client)
193// Fourth pod with revision "red" cannot take the lock since it is remote
194_, stop4 := createElectionMulticluster(t, "pod4", "red", true, false, watcher, false, client)
195close(stop4)
196close(stop3)
197close(stop2)
198close(stop)
199}
200
201func SimpleRevisionComparison(currentLeaderRevision string, l *LeaderElection) bool {
202// Old key comparison impl for interoperablilty testing
203defaultRevision := l.defaultWatcher.GetDefault()
204return l.revision != currentLeaderRevision &&
205// empty default revision indicates that there is no default set
206defaultRevision != "" && defaultRevision == l.revision
207}
208
209type LeaderComparison func(string, *LeaderElection) bool
210
211type instance struct {
212revision string
213remote bool
214comp string
215}
216
217func (i instance) GetComp() (LeaderComparison, string) {
218key := i.revision
219switch i.comp {
220case "location":
221if i.remote {
222key = remoteIstiodPrefix + key
223}
224return LocationPrioritizedComparison, key
225case "simple":
226return SimpleRevisionComparison, key
227default:
228panic("unknown comparison type")
229}
230}
231
232// TestPrioritizationCycles
233func TestPrioritizationCycles(t *testing.T) {
234cases := []instance{}
235for _, rev := range []string{"", "default", "not-default"} {
236for _, loc := range []bool{false, true} {
237for _, comp := range []string{"location", "simple"} {
238cases = append(cases, instance{
239revision: rev,
240remote: loc,
241comp: comp,
242})
243}
244}
245}
246
247for _, start := range cases {
248t.Run(fmt.Sprint(start), func(t *testing.T) {
249checkCycles(t, start, cases, nil)
250})
251}
252}
253
254func alreadyHit(cur instance, chain []instance) bool {
255for _, cc := range chain {
256if cur == cc {
257return true
258}
259}
260return false
261}
262
263func checkCycles(t *testing.T, start instance, cases []instance, chain []instance) {
264if alreadyHit(start, chain) {
265t.Fatalf("cycle on leader election: cur %v, chain %v", start, chain)
266}
267for _, nextHop := range cases {
268next := LeaderElection{
269remote: nextHop.remote,
270defaultWatcher: &fakeDefaultWatcher{defaultRevision: "default"},
271revision: nextHop.revision,
272}
273cmpFunc, key := start.GetComp()
274if cmpFunc(key, &next) {
275nc := append([]instance{}, chain...)
276nc = append(nc, start)
277checkCycles(t, nextHop, cases, nc)
278}
279}
280}
281
282func TestLeaderElectionConfigMapRemoved(t *testing.T) {
283client := fake.NewSimpleClientset()
284watcher := &fakeDefaultWatcher{}
285_, stop := createElection(t, "pod1", "", watcher, true, client)
286if err := client.CoreV1().ConfigMaps("ns").Delete(context.TODO(), testLock, metav1.DeleteOptions{}); err != nil {
287t.Fatal(err)
288}
289retry.UntilSuccessOrFail(t, func() error {
290l, err := client.CoreV1().ConfigMaps("ns").List(context.TODO(), metav1.ListOptions{})
291if err != nil {
292return err
293}
294if len(l.Items) != 1 {
295return fmt.Errorf("got unexpected config map entry: %v", l.Items)
296}
297return nil
298})
299close(stop)
300}
301
302func TestLeaderElectionNoPermission(t *testing.T) {
303client := fake.NewSimpleClientset()
304watcher := &fakeDefaultWatcher{}
305allowRbac := atomic.NewBool(true)
306client.Fake.PrependReactor("update", "*", func(action k8stesting.Action) (bool, runtime.Object, error) {
307if allowRbac.Load() {
308return false, nil, nil
309}
310return true, nil, fmt.Errorf("nope, out of luck")
311})
312
313completions := atomic.NewInt32(0)
314l, stop := createElection(t, "pod1", "", watcher, true, client, func(stop <-chan struct{}) {
315completions.Add(1)
316})
317// Expect to run once
318expectInt(t, completions.Load, 1)
319
320// drop RBAC permissions to update the configmap
321// This simulates loosing an active lease
322allowRbac.Store(false)
323
324// We should start a new cycle at this point
325expectInt(t, l.cycle.Load, 2)
326
327// Add configmap permission back
328allowRbac.Store(true)
329
330// We should get the leader lock back
331expectInt(t, completions.Load, 2)
332
333close(stop)
334}
335
336func expectInt(t *testing.T, f func() int32, expected int32) {
337t.Helper()
338retry.UntilSuccessOrFail(t, func() error {
339got := f()
340if got != expected {
341return fmt.Errorf("unexpected count: %v, want %v", got, expected)
342}
343return nil
344}, retry.Timeout(time.Second))
345}
346
347func TestLeaderElectionDisabled(t *testing.T) {
348client := fake.NewSimpleClientset()
349watcher := &fakeDefaultWatcher{}
350// Prevent LeaderElection from creating a lease, so that the runFn only runs
351// if leader election is disabled.
352client.Fake.PrependReactor("*", "*", func(action k8stesting.Action) (bool, runtime.Object, error) {
353return true, nil, fmt.Errorf("nope, out of luck")
354})
355
356l := &LeaderElection{
357namespace: "ns",
358name: "disabled",
359enabled: false,
360electionID: testLock,
361client: client,
362revision: "",
363defaultWatcher: watcher,
364ttl: time.Second,
365cycle: atomic.NewInt32(0),
366}
367gotLeader := atomic.NewBool(false)
368l.AddRunFunction(func(stop <-chan struct{}) {
369gotLeader.Store(true)
370})
371stop := make(chan struct{})
372go l.Run(stop)
373t.Cleanup(func() {
374close(stop)
375})
376
377// Need to retry until Run() starts to execute in the goroutine.
378retry.UntilOrFail(t, gotLeader.Load, retry.Converge(5), retry.Delay(time.Millisecond*100), retry.Timeout(time.Second*10))
379if !l.isLeader() {
380t.Errorf("isLeader()=false, want true")
381}
382}
383