cilium
179 строк · 6.0 Кб
1// SPDX-License-Identifier: Apache-2.0
2// Copyright Authors of Cilium
3
4package lib
5
6import (
7"context"
8"crypto/sha256"
9"errors"
10"fmt"
11"os"
12"path/filepath"
13"time"
14
15"github.com/sirupsen/logrus"
16
17"github.com/cilium/cilium/api/v1/models"
18"github.com/cilium/cilium/pkg/client"
19"github.com/cilium/cilium/pkg/defaults"
20"github.com/cilium/cilium/pkg/lock/lockfile"
21"github.com/cilium/cilium/pkg/logging/logfields"
22)
23
24type DeletionFallbackClient struct {
25logger *logrus.Entry
26cli *client.Client
27
28lockfile *lockfile.Lockfile
29}
30
31// the timeout for connecting and obtaining the lock
32// the default of 30 seconds is too long; kubelet will time us out before then
33const timeoutSeconds = 10
34
35// the maximum number of queued deletions allowed, to protect against kubelet insanity
36const maxDeletionFiles = 256
37
38// NewDeletionFallbackClient creates a client that will either issue an EndpointDelete
39// request via the api, *or* queue one in a temporary directory.
40// To prevent race conditions, the logic is:
41// 1. Try and connect to the socket. if that succeeds, done
42// 2. Otherwise, take a shared lock on the delete queue directory
43// 3. Once we get the lock, check to see if the socket now exists
44// 4. If it exists, drop the lock and use the api
45func NewDeletionFallbackClient(logger *logrus.Entry) (*DeletionFallbackClient, error) {
46dc := &DeletionFallbackClient{
47logger: logger,
48}
49
50// Try and connect (the usual case)
51err := dc.tryConnect()
52if err == nil {
53return dc, nil
54}
55dc.logger.WithError(err).Warnf("Failed to connect to agent socket at %s.", client.DefaultSockPath())
56
57// We failed to connect: get the queue lock
58if err := dc.tryQueueLock(); err != nil {
59return nil, fmt.Errorf("failed to acquire deletion queue: %w", err)
60}
61
62// We have the queue lock; try and connect again
63// just in case the agent finished starting up while we were waiting
64if err := dc.tryConnect(); err == nil {
65dc.logger.Info("Successfully connected to API on second try.")
66// hey, it's back up!
67dc.lockfile.Unlock()
68dc.lockfile = nil
69return dc, nil
70}
71
72// We have the lockfile, but no valid client
73dc.logger.Info("Agent is down, falling back to deletion queue directory")
74return dc, nil
75}
76
77func (dc *DeletionFallbackClient) tryConnect() error {
78c, err := client.NewDefaultClientWithTimeout(timeoutSeconds * time.Second)
79if err != nil {
80return err
81}
82dc.cli = c
83return nil
84}
85
86func (dc *DeletionFallbackClient) tryQueueLock() error {
87dc.logger.Debugf("attempting to acquire deletion queue lock at %s", defaults.DeleteQueueLockfile)
88
89// Ensure deletion queue directory exists, obtain shared lock
90err := os.MkdirAll(defaults.DeleteQueueDir, 0755)
91if err != nil {
92return fmt.Errorf("failed to create deletion queue directory %s: %w", defaults.DeleteQueueDir, err)
93}
94
95lf, err := lockfile.NewLockfile(defaults.DeleteQueueLockfile)
96if err != nil {
97return fmt.Errorf("failed to open lockfile %s: %w", defaults.DeleteQueueLockfile, err)
98}
99
100ctx, cancel := context.WithTimeout(context.Background(), timeoutSeconds*time.Second)
101defer cancel()
102
103err = lf.Lock(ctx, false) // get the shared lock
104if err != nil {
105return fmt.Errorf("failed to acquire lock: %w", err)
106}
107dc.lockfile = lf
108return nil
109}
110
111// EndpointDelete deletes an endpoint given by an endpoint id, either
112// by directly accessing the API or dropping in a queued-deletion file.
113// endpoint-id is a qualified endpoint reference, e.g. "container-id:XXXXXXX"
114func (dc *DeletionFallbackClient) EndpointDelete(id string) error {
115if dc.cli != nil {
116return dc.cli.EndpointDelete(id)
117}
118
119// fall-back mode
120if dc.lockfile != nil {
121dc.logger.WithField(logfields.EndpointID, id).Info("Queueing deletion request for endpoint")
122return dc.enqueueDeletionRequestLocked(id)
123}
124
125return errors.New("attempt to delete with no valid connection")
126}
127
128// EndpointDeleteMany deletes multiple endpoints based on the endpoint deletion request,
129// either by directly accessing the API or dropping in a queued-deletion file.
130func (dc *DeletionFallbackClient) EndpointDeleteMany(req *models.EndpointBatchDeleteRequest) error {
131if dc.cli != nil {
132return dc.cli.EndpointDeleteMany(req)
133}
134
135// fall-back mode
136if dc.lockfile != nil {
137dc.logger.WithField(logfields.Request, req).Info("Queueing endpoint batch deletion request")
138b, err := req.MarshalBinary()
139if err != nil {
140return fmt.Errorf("failed to marshal endpoint delete request: %w", err)
141}
142return dc.enqueueDeletionRequestLocked(string(b))
143}
144
145return errors.New("attempt to delete with no valid connection")
146}
147
148// enqueueDeletionRequestLocked enqueues the encoded endpoint deletion request into the
149// endpoint deletion queue. Requires the caller to hold the deletion queue lock.
150func (dc *DeletionFallbackClient) enqueueDeletionRequestLocked(contents string) error {
151// sanity check: if there are too many queued deletes, just return error
152// back up to the kubelet. If we get here, it's either because something
153// has gone wrong with the kubelet, or the agent has been down for a very
154// long time. To guard aganst long agent startup times (when it empties the
155// queue), limit us to 256 queued deletions. If this does, indeed, overflow,
156// then the kubelet will get the failure and eventually retry deletion.
157files, err := os.ReadDir(defaults.DeleteQueueDir)
158if err != nil {
159dc.logger.WithField(logfields.Path, defaults.DeleteQueueDir).WithError(err).Error("failed to list deletion queue directory")
160return err
161}
162if len(files) > maxDeletionFiles {
163return fmt.Errorf("deletion queue directory %s has too many entries; aborting", defaults.DeleteQueueDir)
164}
165
166// hash endpoint id for a random filename
167h := sha256.New()
168h.Write([]byte(contents))
169filename := fmt.Sprintf("%x.delete", h.Sum(nil))
170path := filepath.Join(defaults.DeleteQueueDir, filename)
171
172err = os.WriteFile(path, []byte(contents), 0644)
173if err != nil {
174dc.logger.WithField(logfields.Path, path).WithError(err).Error("failed to write deletion file")
175return fmt.Errorf("failed to write deletion file %s: %w", path, err)
176}
177dc.logger.Info("wrote queued deletion file")
178return nil
179}
180