istio
447 строк · 16.3 Кб
1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17// Package leaderelection implements leader election of a set of endpoints.
18// It uses an annotation in the endpoints object to store the record of the
19// election state. This implementation does not guarantee that only one
20// client is acting as a leader (a.k.a. fencing).
21//
22// A client only acts on timestamps captured locally to infer the state of the
23// leader election. The client does not consider timestamps in the leader
24// election record to be accurate because these timestamps may not have been
25// produced by a local clock. The implementation does not depend on their
26// accuracy and only uses their change to indicate that another client has
27// renewed the leader lease. Thus the implementation is tolerant to arbitrary
28// clock skew, but is not tolerant to arbitrary clock skew rate.
29//
30// However the level of tolerance to skew rate can be configured by setting
31// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
32// maximum tolerated ratio of time passed on the fastest node to time passed on
33// the slowest node can be approximately achieved with a configuration that sets
34// the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted
35// to tolerate some nodes progressing forward in time twice as fast as other nodes,
36// the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds.
37//
38// While not required, some method of clock synchronization between nodes in the
39// cluster is highly recommended. It's important to keep in mind when configuring
40// this client that the tolerance to skew rate varies inversely to master
41// availability.
42//
43// Larger clusters often have a more lenient SLA for API latency. This should be
44// taken into account when configuring the client. The rate of leader transitions
45// should be monitored and RetryPeriod and LeaseDuration should be increased
46// until the rate is stable and acceptably low. It's important to keep in mind
47// when configuring this client that the tolerance to API latency varies inversely
48// to master availability.
49//
50// DISCLAIMER: this is an alpha API. This library will likely change significantly
51// or even be removed entirely in subsequent releases. Depend on this API at
52// your own risk.
53// nolint
54package k8sleaderelection
55
56import (
57"bytes"
58"context"
59"fmt"
60"sync"
61"time"
62
63"k8s.io/apimachinery/pkg/api/errors"
64metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
65"k8s.io/apimachinery/pkg/util/runtime"
66"k8s.io/apimachinery/pkg/util/wait"
67"k8s.io/klog/v2"
68"k8s.io/utils/clock"
69
70"istio.io/istio/pilot/pkg/leaderelection/k8sleaderelection/k8sresourcelock"
71)
72
73const (
74JitterFactor = 1.2
75)
76
77// NewLeaderElector creates a LeaderElector from a LeaderElectionConfig
78func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
79if lec.LeaseDuration <= lec.RenewDeadline {
80return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
81}
82if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
83return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
84}
85if lec.LeaseDuration < 1 {
86return nil, fmt.Errorf("leaseDuration must be greater than zero")
87}
88if lec.RenewDeadline < 1 {
89return nil, fmt.Errorf("renewDeadline must be greater than zero")
90}
91if lec.RetryPeriod < 1 {
92return nil, fmt.Errorf("retryPeriod must be greater than zero")
93}
94if lec.Callbacks.OnStartedLeading == nil {
95return nil, fmt.Errorf("callback OnStartedLeading must not be nil")
96}
97if lec.Callbacks.OnStoppedLeading == nil {
98return nil, fmt.Errorf("callback OnStoppedLeading must not be nil")
99}
100
101if lec.Lock == nil {
102return nil, fmt.Errorf("lock must not be nil")
103}
104le := LeaderElector{
105config: lec,
106clock: clock.RealClock{},
107metrics: globalMetricsFactory.newLeaderMetrics(),
108}
109le.metrics.leaderOff(le.config.Name)
110return &le, nil
111}
112
113type KeyComparisonFunc func(existingKey string) bool
114
115type LeaderElectionConfig struct {
116// Lock is the resource that will be used for locking
117Lock k8sresourcelock.Interface
118
119// LeaseDuration is the duration that non-leader candidates will
120// wait to force acquire leadership. This is measured against time of
121// last observed ack.
122//
123// A client needs to wait a full LeaseDuration without observing a change to
124// the record before it can attempt to take over. When all clients are
125// shutdown and a new set of clients are started with different names against
126// the same leader record, they must wait the full LeaseDuration before
127// attempting to acquire the lease. Thus LeaseDuration should be as short as
128// possible (within your tolerance for clock skew rate) to avoid a possible
129// long waits in the scenario.
130//
131// Core clients default this value to 15 seconds.
132LeaseDuration time.Duration
133// RenewDeadline is the duration that the acting master will retry
134// refreshing leadership before giving up.
135//
136// Core clients default this value to 10 seconds.
137RenewDeadline time.Duration
138// RetryPeriod is the duration the LeaderElector clients should wait
139// between tries of actions.
140//
141// Core clients default this value to 2 seconds.
142RetryPeriod time.Duration
143
144// KeyComparison defines a function to compare the existing leader's key to our own.
145// If the function returns true, indicating our key has high precedence, we will take over
146// leadership even if their is another un-expired leader.
147//
148// This can be used to implemented a prioritized leader election. For example, if multiple
149// versions of the same application run simultaneously, we can ensure the newest version
150// will become the leader.
151//
152// It is the responsibility of the caller to ensure that all KeyComparison functions are
153// logically consistent between all clients participating in the leader election to avoid multiple
154// clients claiming to have high precedence and constantly pre-empting the existing leader.
155//
156// KeyComparison functions should ensure they handle an empty existingKey, as "key" is not a required field.
157//
158// Warning: when a lock is stolen (from KeyComparison returning true), the old leader may not
159// immediately be notified they have lost the leader election.
160KeyComparison KeyComparisonFunc
161
162// Callbacks are callbacks that are triggered during certain lifecycle
163// events of the LeaderElector
164Callbacks LeaderCallbacks
165
166// WatchDog is the associated health checker
167// WatchDog may be null if its not needed/configured.
168WatchDog *HealthzAdaptor
169
170// ReleaseOnCancel should be set true if the lock should be released
171// when the run context is canceled. If you set this to true, you must
172// ensure all code guarded by this lease has successfully completed
173// prior to canceling the context, or you may have two processes
174// simultaneously acting on the critical path.
175ReleaseOnCancel bool
176
177// Name is the name of the resource lock for debugging
178Name string
179}
180
181// LeaderCallbacks are callbacks that are triggered during certain
182// lifecycle events of the LeaderElector. These are invoked asynchronously.
183//
184// possible future callbacks:
185// - OnChallenge()
186type LeaderCallbacks struct {
187// OnStartedLeading is called when a LeaderElector client starts leading
188OnStartedLeading func(context.Context)
189// OnStoppedLeading is called when a LeaderElector client stops leading
190OnStoppedLeading func()
191// OnNewLeader is called when the client observes a leader that is
192// not the previously observed leader. This includes the first observed
193// leader when the client starts.
194OnNewLeader func(identity string)
195}
196
197// LeaderElector is a leader election client.
198type LeaderElector struct {
199config LeaderElectionConfig
200// internal bookkeeping
201observedRecord k8sresourcelock.LeaderElectionRecord
202observedRawRecord []byte
203observedTime time.Time
204// used to implement OnNewLeader(), may lag slightly from the
205// value observedRecord.HolderIdentity if the transition has
206// not yet been reported.
207reportedLeader string
208
209// clock is wrapper around time to allow for less flaky testing
210clock clock.Clock
211
212// used to lock the observedRecord
213observedRecordLock sync.Mutex
214
215metrics leaderMetricsAdapter
216}
217
218// Run starts the leader election loop. Run will not return
219// before leader election loop is stopped by ctx or it has
220// stopped holding the leader lease
221func (le *LeaderElector) Run(ctx context.Context) {
222defer runtime.HandleCrash()
223defer func() {
224le.config.Callbacks.OnStoppedLeading()
225}()
226
227if !le.acquire(ctx) {
228return // ctx signaled done
229}
230ctx, cancel := context.WithCancel(ctx)
231defer cancel()
232go le.config.Callbacks.OnStartedLeading(ctx)
233le.renew(ctx)
234}
235
236// RunOrDie starts a client with the provided config or panics if the config
237// fails to validate. RunOrDie blocks until leader election loop is
238// stopped by ctx or it has stopped holding the leader lease
239func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
240le, err := NewLeaderElector(lec)
241if err != nil {
242panic(err)
243}
244if lec.WatchDog != nil {
245lec.WatchDog.SetLeaderElection(le)
246}
247le.Run(ctx)
248}
249
250// GetLeader returns the identity of the last observed leader or returns the empty string if
251// no leader has yet been observed.
252// This function is for informational purposes. (e.g. monitoring, logs, etc.)
253func (le *LeaderElector) GetLeader() string {
254return le.getObservedRecord().HolderIdentity
255}
256
257// IsLeader returns true if the last observed leader was this client else returns false.
258func (le *LeaderElector) IsLeader() bool {
259return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity()
260}
261
262// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
263// Returns false if ctx signals done.
264func (le *LeaderElector) acquire(ctx context.Context) bool {
265ctx, cancel := context.WithCancel(ctx)
266defer cancel()
267succeeded := false
268desc := le.config.Lock.Describe()
269klog.Infof("attempting to acquire leader lease %v...", desc)
270wait.JitterUntil(func() {
271succeeded = le.tryAcquireOrRenew(ctx)
272le.maybeReportTransition()
273if !succeeded {
274klog.V(4).Infof("failed to acquire lease %v", desc)
275return
276}
277le.config.Lock.RecordEvent("became leader")
278le.metrics.leaderOn(le.config.Name)
279klog.Infof("successfully acquired lease %v", desc)
280cancel()
281}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
282return succeeded
283}
284
285// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
286func (le *LeaderElector) renew(ctx context.Context) {
287ctx, cancel := context.WithCancel(ctx)
288defer cancel()
289wait.Until(func() {
290timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
291defer timeoutCancel()
292err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
293return le.tryAcquireOrRenew(timeoutCtx), nil
294}, timeoutCtx.Done())
295
296le.maybeReportTransition()
297desc := le.config.Lock.Describe()
298if err == nil {
299klog.V(5).Infof("successfully renewed lease %v", desc)
300return
301}
302le.config.Lock.RecordEvent("stopped leading")
303le.metrics.leaderOff(le.config.Name)
304klog.Infof("failed to renew lease %v: %v", desc, err)
305cancel()
306}, le.config.RetryPeriod, ctx.Done())
307
308// if we hold the lease, give it up
309if le.config.ReleaseOnCancel {
310le.release()
311}
312}
313
314// release attempts to release the leader lease if we have acquired it.
315func (le *LeaderElector) release() bool {
316if !le.IsLeader() {
317return true
318}
319now := metav1.Now()
320leaderElectionRecord := k8sresourcelock.LeaderElectionRecord{
321LeaderTransitions: le.observedRecord.LeaderTransitions,
322LeaseDurationSeconds: 1,
323RenewTime: now,
324AcquireTime: now,
325}
326if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
327klog.Errorf("Failed to release lock: %v", err)
328return false
329}
330
331le.setObservedRecord(&leaderElectionRecord)
332return true
333}
334
335// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
336// else it tries to renew the lease if it has already been acquired. Returns true
337// on success else returns false.
338func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
339now := metav1.Now()
340leaderElectionRecord := k8sresourcelock.LeaderElectionRecord{
341HolderIdentity: le.config.Lock.Identity(),
342HolderKey: le.config.Lock.Key(),
343LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
344RenewTime: now,
345AcquireTime: now,
346}
347
348// 1. obtain or create the ElectionRecord
349oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
350if err != nil {
351if !errors.IsNotFound(err) {
352klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
353return false
354}
355if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
356klog.Errorf("error initially creating leader election record: %v", err)
357return false
358}
359
360le.setObservedRecord(&leaderElectionRecord)
361
362return true
363}
364
365// 2. Record obtained, check the Identity & Time
366if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
367le.setObservedRecord(oldLeaderElectionRecord)
368
369le.observedRawRecord = oldLeaderElectionRawRecord
370}
371if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
372le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
373!le.IsLeader() {
374if le.config.KeyComparison != nil && le.config.KeyComparison(oldLeaderElectionRecord.HolderKey) {
375// Lock is held and not expired, but our key is higher than the existing one.
376// We will pre-empt the existing leader.
377// nolint: lll
378klog.V(4).Infof("lock is held by %v with key %v, but our key (%v) evicts it", oldLeaderElectionRecord.HolderIdentity, oldLeaderElectionRecord.HolderKey, le.config.Lock.Key())
379} else {
380klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
381return false
382}
383}
384
385// 3. We're going to try to update. The leaderElectionRecord is set to it's default
386// here. Let's correct it before updating.
387if le.IsLeader() {
388leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
389leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
390} else {
391leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
392}
393
394// update the lock itself
395if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
396klog.Errorf("Failed to update lock: %v", err)
397return false
398}
399
400le.setObservedRecord(&leaderElectionRecord)
401return true
402}
403
404func (le *LeaderElector) maybeReportTransition() {
405if le.observedRecord.HolderIdentity == le.reportedLeader {
406return
407}
408le.reportedLeader = le.observedRecord.HolderIdentity
409if le.config.Callbacks.OnNewLeader != nil {
410go le.config.Callbacks.OnNewLeader(le.reportedLeader)
411}
412}
413
414// Check will determine if the current lease is expired by more than timeout.
415func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
416if !le.IsLeader() {
417// Currently not concerned with the case that we are hot standby
418return nil
419}
420// If we are more than timeout seconds after the lease duration that is past the timeout
421// on the lease renew. Time to start reporting ourselves as unhealthy. We should have
422// died but conditions like deadlock can prevent this. (See #70819)
423if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease {
424return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name)
425}
426
427return nil
428}
429
430// setObservedRecord will set a new observedRecord and update observedTime to the current time.
431// Protect critical sections with lock.
432func (le *LeaderElector) setObservedRecord(observedRecord *k8sresourcelock.LeaderElectionRecord) {
433le.observedRecordLock.Lock()
434defer le.observedRecordLock.Unlock()
435
436le.observedRecord = *observedRecord
437le.observedTime = le.clock.Now()
438}
439
440// getObservedRecord returns observersRecord.
441// Protect critical sections with lock.
442func (le *LeaderElector) getObservedRecord() k8sresourcelock.LeaderElectionRecord {
443le.observedRecordLock.Lock()
444defer le.observedRecordLock.Unlock()
445
446return le.observedRecord
447}
448