istio

Форк
0
447 строк · 16.3 Кб
1
/*
2
Copyright 2015 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
// Package leaderelection implements leader election of a set of endpoints.
18
// It uses an annotation in the endpoints object to store the record of the
19
// election state. This implementation does not guarantee that only one
20
// client is acting as a leader (a.k.a. fencing).
21
//
22
// A client only acts on timestamps captured locally to infer the state of the
23
// leader election. The client does not consider timestamps in the leader
24
// election record to be accurate because these timestamps may not have been
25
// produced by a local clock. The implementation does not depend on their
26
// accuracy and only uses their change to indicate that another client has
27
// renewed the leader lease. Thus the implementation is tolerant to arbitrary
28
// clock skew, but is not tolerant to arbitrary clock skew rate.
29
//
30
// However the level of tolerance to skew rate can be configured by setting
31
// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
32
// maximum tolerated ratio of time passed on the fastest node to time passed on
33
// the slowest node can be approximately achieved with a configuration that sets
34
// the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted
35
// to tolerate some nodes progressing forward in time twice as fast as other nodes,
36
// the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds.
37
//
38
// While not required, some method of clock synchronization between nodes in the
39
// cluster is highly recommended. It's important to keep in mind when configuring
40
// this client that the tolerance to skew rate varies inversely to master
41
// availability.
42
//
43
// Larger clusters often have a more lenient SLA for API latency. This should be
44
// taken into account when configuring the client. The rate of leader transitions
45
// should be monitored and RetryPeriod and LeaseDuration should be increased
46
// until the rate is stable and acceptably low. It's important to keep in mind
47
// when configuring this client that the tolerance to API latency varies inversely
48
// to master availability.
49
//
50
// DISCLAIMER: this is an alpha API. This library will likely change significantly
51
// or even be removed entirely in subsequent releases. Depend on this API at
52
// your own risk.
53
// nolint
54
package k8sleaderelection
55

56
import (
57
	"bytes"
58
	"context"
59
	"fmt"
60
	"sync"
61
	"time"
62

63
	"k8s.io/apimachinery/pkg/api/errors"
64
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
65
	"k8s.io/apimachinery/pkg/util/runtime"
66
	"k8s.io/apimachinery/pkg/util/wait"
67
	"k8s.io/klog/v2"
68
	"k8s.io/utils/clock"
69

70
	"istio.io/istio/pilot/pkg/leaderelection/k8sleaderelection/k8sresourcelock"
71
)
72

73
const (
74
	JitterFactor = 1.2
75
)
76

77
// NewLeaderElector creates a LeaderElector from a LeaderElectionConfig
78
func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
79
	if lec.LeaseDuration <= lec.RenewDeadline {
80
		return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
81
	}
82
	if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
83
		return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
84
	}
85
	if lec.LeaseDuration < 1 {
86
		return nil, fmt.Errorf("leaseDuration must be greater than zero")
87
	}
88
	if lec.RenewDeadline < 1 {
89
		return nil, fmt.Errorf("renewDeadline must be greater than zero")
90
	}
91
	if lec.RetryPeriod < 1 {
92
		return nil, fmt.Errorf("retryPeriod must be greater than zero")
93
	}
94
	if lec.Callbacks.OnStartedLeading == nil {
95
		return nil, fmt.Errorf("callback OnStartedLeading must not be nil")
96
	}
97
	if lec.Callbacks.OnStoppedLeading == nil {
98
		return nil, fmt.Errorf("callback OnStoppedLeading  must not be nil")
99
	}
100

101
	if lec.Lock == nil {
102
		return nil, fmt.Errorf("lock must not be nil")
103
	}
104
	le := LeaderElector{
105
		config:  lec,
106
		clock:   clock.RealClock{},
107
		metrics: globalMetricsFactory.newLeaderMetrics(),
108
	}
109
	le.metrics.leaderOff(le.config.Name)
110
	return &le, nil
111
}
112

113
type KeyComparisonFunc func(existingKey string) bool
114

115
type LeaderElectionConfig struct {
116
	// Lock is the resource that will be used for locking
117
	Lock k8sresourcelock.Interface
118

119
	// LeaseDuration is the duration that non-leader candidates will
120
	// wait to force acquire leadership. This is measured against time of
121
	// last observed ack.
122
	//
123
	// A client needs to wait a full LeaseDuration without observing a change to
124
	// the record before it can attempt to take over. When all clients are
125
	// shutdown and a new set of clients are started with different names against
126
	// the same leader record, they must wait the full LeaseDuration before
127
	// attempting to acquire the lease. Thus LeaseDuration should be as short as
128
	// possible (within your tolerance for clock skew rate) to avoid a possible
129
	// long waits in the scenario.
130
	//
131
	// Core clients default this value to 15 seconds.
132
	LeaseDuration time.Duration
133
	// RenewDeadline is the duration that the acting master will retry
134
	// refreshing leadership before giving up.
135
	//
136
	// Core clients default this value to 10 seconds.
137
	RenewDeadline time.Duration
138
	// RetryPeriod is the duration the LeaderElector clients should wait
139
	// between tries of actions.
140
	//
141
	// Core clients default this value to 2 seconds.
142
	RetryPeriod time.Duration
143

144
	// KeyComparison defines a function to compare the existing leader's key to our own.
145
	// If the function returns true, indicating our key has high precedence, we will take over
146
	// leadership even if their is another un-expired leader.
147
	//
148
	// This can be used to implemented a prioritized leader election. For example, if multiple
149
	// versions of the same application run simultaneously, we can ensure the newest version
150
	// will become the leader.
151
	//
152
	// It is the responsibility of the caller to ensure that all KeyComparison functions are
153
	// logically consistent between all clients participating in the leader election to avoid multiple
154
	// clients claiming to have high precedence and constantly pre-empting the existing leader.
155
	//
156
	// KeyComparison functions should ensure they handle an empty existingKey, as "key" is not a required field.
157
	//
158
	// Warning: when a lock is stolen (from KeyComparison returning true), the old leader may not
159
	// immediately be notified they have lost the leader election.
160
	KeyComparison KeyComparisonFunc
161

162
	// Callbacks are callbacks that are triggered during certain lifecycle
163
	// events of the LeaderElector
164
	Callbacks LeaderCallbacks
165

166
	// WatchDog is the associated health checker
167
	// WatchDog may be null if its not needed/configured.
168
	WatchDog *HealthzAdaptor
169

170
	// ReleaseOnCancel should be set true if the lock should be released
171
	// when the run context is canceled. If you set this to true, you must
172
	// ensure all code guarded by this lease has successfully completed
173
	// prior to canceling the context, or you may have two processes
174
	// simultaneously acting on the critical path.
175
	ReleaseOnCancel bool
176

177
	// Name is the name of the resource lock for debugging
178
	Name string
179
}
180

181
// LeaderCallbacks are callbacks that are triggered during certain
182
// lifecycle events of the LeaderElector. These are invoked asynchronously.
183
//
184
// possible future callbacks:
185
//   - OnChallenge()
186
type LeaderCallbacks struct {
187
	// OnStartedLeading is called when a LeaderElector client starts leading
188
	OnStartedLeading func(context.Context)
189
	// OnStoppedLeading is called when a LeaderElector client stops leading
190
	OnStoppedLeading func()
191
	// OnNewLeader is called when the client observes a leader that is
192
	// not the previously observed leader. This includes the first observed
193
	// leader when the client starts.
194
	OnNewLeader func(identity string)
195
}
196

197
// LeaderElector is a leader election client.
198
type LeaderElector struct {
199
	config LeaderElectionConfig
200
	// internal bookkeeping
201
	observedRecord    k8sresourcelock.LeaderElectionRecord
202
	observedRawRecord []byte
203
	observedTime      time.Time
204
	// used to implement OnNewLeader(), may lag slightly from the
205
	// value observedRecord.HolderIdentity if the transition has
206
	// not yet been reported.
207
	reportedLeader string
208

209
	// clock is wrapper around time to allow for less flaky testing
210
	clock clock.Clock
211

212
	// used to lock the observedRecord
213
	observedRecordLock sync.Mutex
214

215
	metrics leaderMetricsAdapter
216
}
217

218
// Run starts the leader election loop. Run will not return
219
// before leader election loop is stopped by ctx or it has
220
// stopped holding the leader lease
221
func (le *LeaderElector) Run(ctx context.Context) {
222
	defer runtime.HandleCrash()
223
	defer func() {
224
		le.config.Callbacks.OnStoppedLeading()
225
	}()
226

227
	if !le.acquire(ctx) {
228
		return // ctx signaled done
229
	}
230
	ctx, cancel := context.WithCancel(ctx)
231
	defer cancel()
232
	go le.config.Callbacks.OnStartedLeading(ctx)
233
	le.renew(ctx)
234
}
235

236
// RunOrDie starts a client with the provided config or panics if the config
237
// fails to validate. RunOrDie blocks until leader election loop is
238
// stopped by ctx or it has stopped holding the leader lease
239
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
240
	le, err := NewLeaderElector(lec)
241
	if err != nil {
242
		panic(err)
243
	}
244
	if lec.WatchDog != nil {
245
		lec.WatchDog.SetLeaderElection(le)
246
	}
247
	le.Run(ctx)
248
}
249

250
// GetLeader returns the identity of the last observed leader or returns the empty string if
251
// no leader has yet been observed.
252
// This function is for informational purposes. (e.g. monitoring, logs, etc.)
253
func (le *LeaderElector) GetLeader() string {
254
	return le.getObservedRecord().HolderIdentity
255
}
256

257
// IsLeader returns true if the last observed leader was this client else returns false.
258
func (le *LeaderElector) IsLeader() bool {
259
	return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity()
260
}
261

262
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
263
// Returns false if ctx signals done.
264
func (le *LeaderElector) acquire(ctx context.Context) bool {
265
	ctx, cancel := context.WithCancel(ctx)
266
	defer cancel()
267
	succeeded := false
268
	desc := le.config.Lock.Describe()
269
	klog.Infof("attempting to acquire leader lease %v...", desc)
270
	wait.JitterUntil(func() {
271
		succeeded = le.tryAcquireOrRenew(ctx)
272
		le.maybeReportTransition()
273
		if !succeeded {
274
			klog.V(4).Infof("failed to acquire lease %v", desc)
275
			return
276
		}
277
		le.config.Lock.RecordEvent("became leader")
278
		le.metrics.leaderOn(le.config.Name)
279
		klog.Infof("successfully acquired lease %v", desc)
280
		cancel()
281
	}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
282
	return succeeded
283
}
284

285
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
286
func (le *LeaderElector) renew(ctx context.Context) {
287
	ctx, cancel := context.WithCancel(ctx)
288
	defer cancel()
289
	wait.Until(func() {
290
		timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
291
		defer timeoutCancel()
292
		err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
293
			return le.tryAcquireOrRenew(timeoutCtx), nil
294
		}, timeoutCtx.Done())
295

296
		le.maybeReportTransition()
297
		desc := le.config.Lock.Describe()
298
		if err == nil {
299
			klog.V(5).Infof("successfully renewed lease %v", desc)
300
			return
301
		}
302
		le.config.Lock.RecordEvent("stopped leading")
303
		le.metrics.leaderOff(le.config.Name)
304
		klog.Infof("failed to renew lease %v: %v", desc, err)
305
		cancel()
306
	}, le.config.RetryPeriod, ctx.Done())
307

308
	// if we hold the lease, give it up
309
	if le.config.ReleaseOnCancel {
310
		le.release()
311
	}
312
}
313

314
// release attempts to release the leader lease if we have acquired it.
315
func (le *LeaderElector) release() bool {
316
	if !le.IsLeader() {
317
		return true
318
	}
319
	now := metav1.Now()
320
	leaderElectionRecord := k8sresourcelock.LeaderElectionRecord{
321
		LeaderTransitions:    le.observedRecord.LeaderTransitions,
322
		LeaseDurationSeconds: 1,
323
		RenewTime:            now,
324
		AcquireTime:          now,
325
	}
326
	if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
327
		klog.Errorf("Failed to release lock: %v", err)
328
		return false
329
	}
330

331
	le.setObservedRecord(&leaderElectionRecord)
332
	return true
333
}
334

335
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
336
// else it tries to renew the lease if it has already been acquired. Returns true
337
// on success else returns false.
338
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
339
	now := metav1.Now()
340
	leaderElectionRecord := k8sresourcelock.LeaderElectionRecord{
341
		HolderIdentity:       le.config.Lock.Identity(),
342
		HolderKey:            le.config.Lock.Key(),
343
		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
344
		RenewTime:            now,
345
		AcquireTime:          now,
346
	}
347

348
	// 1. obtain or create the ElectionRecord
349
	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
350
	if err != nil {
351
		if !errors.IsNotFound(err) {
352
			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
353
			return false
354
		}
355
		if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
356
			klog.Errorf("error initially creating leader election record: %v", err)
357
			return false
358
		}
359

360
		le.setObservedRecord(&leaderElectionRecord)
361

362
		return true
363
	}
364

365
	// 2. Record obtained, check the Identity & Time
366
	if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
367
		le.setObservedRecord(oldLeaderElectionRecord)
368

369
		le.observedRawRecord = oldLeaderElectionRawRecord
370
	}
371
	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
372
		le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
373
		!le.IsLeader() {
374
		if le.config.KeyComparison != nil && le.config.KeyComparison(oldLeaderElectionRecord.HolderKey) {
375
			// Lock is held and not expired, but our key is higher than the existing one.
376
			// We will pre-empt the existing leader.
377
			// nolint: lll
378
			klog.V(4).Infof("lock is held by %v with key %v, but our key (%v) evicts it", oldLeaderElectionRecord.HolderIdentity, oldLeaderElectionRecord.HolderKey, le.config.Lock.Key())
379
		} else {
380
			klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
381
			return false
382
		}
383
	}
384

385
	// 3. We're going to try to update. The leaderElectionRecord is set to it's default
386
	// here. Let's correct it before updating.
387
	if le.IsLeader() {
388
		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
389
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
390
	} else {
391
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
392
	}
393

394
	// update the lock itself
395
	if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
396
		klog.Errorf("Failed to update lock: %v", err)
397
		return false
398
	}
399

400
	le.setObservedRecord(&leaderElectionRecord)
401
	return true
402
}
403

404
func (le *LeaderElector) maybeReportTransition() {
405
	if le.observedRecord.HolderIdentity == le.reportedLeader {
406
		return
407
	}
408
	le.reportedLeader = le.observedRecord.HolderIdentity
409
	if le.config.Callbacks.OnNewLeader != nil {
410
		go le.config.Callbacks.OnNewLeader(le.reportedLeader)
411
	}
412
}
413

414
// Check will determine if the current lease is expired by more than timeout.
415
func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
416
	if !le.IsLeader() {
417
		// Currently not concerned with the case that we are hot standby
418
		return nil
419
	}
420
	// If we are more than timeout seconds after the lease duration that is past the timeout
421
	// on the lease renew. Time to start reporting ourselves as unhealthy. We should have
422
	// died but conditions like deadlock can prevent this. (See #70819)
423
	if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease {
424
		return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name)
425
	}
426

427
	return nil
428
}
429

430
// setObservedRecord will set a new observedRecord and update observedTime to the current time.
431
// Protect critical sections with lock.
432
func (le *LeaderElector) setObservedRecord(observedRecord *k8sresourcelock.LeaderElectionRecord) {
433
	le.observedRecordLock.Lock()
434
	defer le.observedRecordLock.Unlock()
435

436
	le.observedRecord = *observedRecord
437
	le.observedTime = le.clock.Now()
438
}
439

440
// getObservedRecord returns observersRecord.
441
// Protect critical sections with lock.
442
func (le *LeaderElector) getObservedRecord() k8sresourcelock.LeaderElectionRecord {
443
	le.observedRecordLock.Lock()
444
	defer le.observedRecordLock.Unlock()
445

446
	return le.observedRecord
447
}
448

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.