cilium

Форк
0
/
deletion_queue.go 
179 строк · 6.0 Кб
1
// SPDX-License-Identifier: Apache-2.0
2
// Copyright Authors of Cilium
3

4
package lib
5

6
import (
7
	"context"
8
	"crypto/sha256"
9
	"errors"
10
	"fmt"
11
	"os"
12
	"path/filepath"
13
	"time"
14

15
	"github.com/sirupsen/logrus"
16

17
	"github.com/cilium/cilium/api/v1/models"
18
	"github.com/cilium/cilium/pkg/client"
19
	"github.com/cilium/cilium/pkg/defaults"
20
	"github.com/cilium/cilium/pkg/lock/lockfile"
21
	"github.com/cilium/cilium/pkg/logging/logfields"
22
)
23

24
type DeletionFallbackClient struct {
25
	logger *logrus.Entry
26
	cli    *client.Client
27

28
	lockfile *lockfile.Lockfile
29
}
30

31
// the timeout for connecting and obtaining the lock
32
// the default of 30 seconds is too long; kubelet will time us out before then
33
const timeoutSeconds = 10
34

35
// the maximum number of queued deletions allowed, to protect against kubelet insanity
36
const maxDeletionFiles = 256
37

38
// NewDeletionFallbackClient creates a client that will either issue an EndpointDelete
39
// request via the api, *or* queue one in a temporary directory.
40
// To prevent race conditions, the logic is:
41
// 1. Try and connect to the socket. if that succeeds, done
42
// 2. Otherwise, take a shared lock on the delete queue directory
43
// 3. Once we get the lock, check to see if the socket now exists
44
// 4. If it exists, drop the lock and use the api
45
func NewDeletionFallbackClient(logger *logrus.Entry) (*DeletionFallbackClient, error) {
46
	dc := &DeletionFallbackClient{
47
		logger: logger,
48
	}
49

50
	// Try and connect (the usual case)
51
	err := dc.tryConnect()
52
	if err == nil {
53
		return dc, nil
54
	}
55
	dc.logger.WithError(err).Warnf("Failed to connect to agent socket at %s.", client.DefaultSockPath())
56

57
	// We failed to connect: get the queue lock
58
	if err := dc.tryQueueLock(); err != nil {
59
		return nil, fmt.Errorf("failed to acquire deletion queue: %w", err)
60
	}
61

62
	// We have the queue lock; try and connect again
63
	// just in case the agent finished starting up while we were waiting
64
	if err := dc.tryConnect(); err == nil {
65
		dc.logger.Info("Successfully connected to API on second try.")
66
		// hey, it's back up!
67
		dc.lockfile.Unlock()
68
		dc.lockfile = nil
69
		return dc, nil
70
	}
71

72
	// We have the lockfile, but no valid client
73
	dc.logger.Info("Agent is down, falling back to deletion queue directory")
74
	return dc, nil
75
}
76

77
func (dc *DeletionFallbackClient) tryConnect() error {
78
	c, err := client.NewDefaultClientWithTimeout(timeoutSeconds * time.Second)
79
	if err != nil {
80
		return err
81
	}
82
	dc.cli = c
83
	return nil
84
}
85

86
func (dc *DeletionFallbackClient) tryQueueLock() error {
87
	dc.logger.Debugf("attempting to acquire deletion queue lock at %s", defaults.DeleteQueueLockfile)
88

89
	// Ensure deletion queue directory exists, obtain shared lock
90
	err := os.MkdirAll(defaults.DeleteQueueDir, 0755)
91
	if err != nil {
92
		return fmt.Errorf("failed to create deletion queue directory %s: %w", defaults.DeleteQueueDir, err)
93
	}
94

95
	lf, err := lockfile.NewLockfile(defaults.DeleteQueueLockfile)
96
	if err != nil {
97
		return fmt.Errorf("failed to open lockfile %s: %w", defaults.DeleteQueueLockfile, err)
98
	}
99

100
	ctx, cancel := context.WithTimeout(context.Background(), timeoutSeconds*time.Second)
101
	defer cancel()
102

103
	err = lf.Lock(ctx, false) // get the shared lock
104
	if err != nil {
105
		return fmt.Errorf("failed to acquire lock: %w", err)
106
	}
107
	dc.lockfile = lf
108
	return nil
109
}
110

111
// EndpointDelete deletes an endpoint given by an endpoint id, either
112
// by directly accessing the API or dropping in a queued-deletion file.
113
// endpoint-id is a qualified endpoint reference, e.g. "container-id:XXXXXXX"
114
func (dc *DeletionFallbackClient) EndpointDelete(id string) error {
115
	if dc.cli != nil {
116
		return dc.cli.EndpointDelete(id)
117
	}
118

119
	// fall-back mode
120
	if dc.lockfile != nil {
121
		dc.logger.WithField(logfields.EndpointID, id).Info("Queueing deletion request for endpoint")
122
		return dc.enqueueDeletionRequestLocked(id)
123
	}
124

125
	return errors.New("attempt to delete with no valid connection")
126
}
127

128
// EndpointDeleteMany deletes multiple endpoints based on the endpoint deletion request,
129
// either by directly accessing the API or dropping in a queued-deletion file.
130
func (dc *DeletionFallbackClient) EndpointDeleteMany(req *models.EndpointBatchDeleteRequest) error {
131
	if dc.cli != nil {
132
		return dc.cli.EndpointDeleteMany(req)
133
	}
134

135
	// fall-back mode
136
	if dc.lockfile != nil {
137
		dc.logger.WithField(logfields.Request, req).Info("Queueing endpoint batch deletion request")
138
		b, err := req.MarshalBinary()
139
		if err != nil {
140
			return fmt.Errorf("failed to marshal endpoint delete request: %w", err)
141
		}
142
		return dc.enqueueDeletionRequestLocked(string(b))
143
	}
144

145
	return errors.New("attempt to delete with no valid connection")
146
}
147

148
// enqueueDeletionRequestLocked enqueues the encoded endpoint deletion request into the
149
// endpoint deletion queue. Requires the caller to hold the deletion queue lock.
150
func (dc *DeletionFallbackClient) enqueueDeletionRequestLocked(contents string) error {
151
	// sanity check: if there are too many queued deletes, just return error
152
	// back up to the kubelet. If we get here, it's either because something
153
	// has gone wrong with the kubelet, or the agent has been down for a very
154
	// long time. To guard aganst long agent startup times (when it empties the
155
	// queue), limit us to 256 queued deletions. If this does, indeed, overflow,
156
	// then the kubelet will get the failure and eventually retry deletion.
157
	files, err := os.ReadDir(defaults.DeleteQueueDir)
158
	if err != nil {
159
		dc.logger.WithField(logfields.Path, defaults.DeleteQueueDir).WithError(err).Error("failed to list deletion queue directory")
160
		return err
161
	}
162
	if len(files) > maxDeletionFiles {
163
		return fmt.Errorf("deletion queue directory %s has too many entries; aborting", defaults.DeleteQueueDir)
164
	}
165

166
	// hash endpoint id for a random filename
167
	h := sha256.New()
168
	h.Write([]byte(contents))
169
	filename := fmt.Sprintf("%x.delete", h.Sum(nil))
170
	path := filepath.Join(defaults.DeleteQueueDir, filename)
171

172
	err = os.WriteFile(path, []byte(contents), 0644)
173
	if err != nil {
174
		dc.logger.WithField(logfields.Path, path).WithError(err).Error("failed to write deletion file")
175
		return fmt.Errorf("failed to write deletion file %s: %w", path, err)
176
	}
177
	dc.logger.Info("wrote queued deletion file")
178
	return nil
179
}
180

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.