talos

Форк
0
288 строк · 6.6 Кб
1
// This Source Code Form is subject to the terms of the Mozilla Public
2
// License, v. 2.0. If a copy of the MPL was not distributed with this
3
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
package kubernetes
6

7
import (
8
	"context"
9
	"errors"
10
	"fmt"
11
	"slices"
12
	"strings"
13
	"time"
14

15
	"github.com/cosi-project/runtime/pkg/resource"
16
	"github.com/cosi-project/runtime/pkg/safe"
17
	"github.com/cosi-project/runtime/pkg/state"
18
	"github.com/siderolabs/go-retry/retry"
19
	v1 "k8s.io/api/core/v1"
20
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21

22
	"github.com/siderolabs/talos/pkg/kubernetes"
23
	"github.com/siderolabs/talos/pkg/machinery/client"
24
	v1alpha1config "github.com/siderolabs/talos/pkg/machinery/config/types/v1alpha1"
25
	"github.com/siderolabs/talos/pkg/machinery/resources/k8s"
26
	"github.com/siderolabs/talos/pkg/machinery/resources/v1alpha1"
27
)
28

29
const kubelet = "kubelet"
30

31
func upgradeKubelet(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions) error {
32
	if !options.UpgradeKubelet {
33
		options.Log("skipped updating kubelet")
34

35
		return nil
36
	}
37

38
	options.Log("updating kubelet to version %q", options.Path.ToVersion())
39

40
	for _, node := range append(slices.Clone(options.controlPlaneNodes), options.workerNodes...) {
41
		if err := upgradeKubeletOnNode(ctx, cluster, options, node); err != nil {
42
			return fmt.Errorf("error updating node %q: %w", node, err)
43
		}
44
	}
45

46
	return nil
47
}
48

49
//nolint:gocyclo,cyclop
50
func upgradeKubeletOnNode(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions, node string) error {
51
	ctx, cancel := context.WithCancel(ctx)
52
	defer cancel()
53

54
	c, err := cluster.Client()
55
	if err != nil {
56
		return fmt.Errorf("error building Talos API client: %w", err)
57
	}
58

59
	ctx = client.WithNode(ctx, node)
60

61
	options.Log(" > %q: starting update", node)
62

63
	watchCh := make(chan safe.WrappedStateEvent[*v1alpha1.Service])
64

65
	if err = safe.StateWatch(ctx, c.COSI, resource.NewMetadata(v1alpha1.NamespaceName, v1alpha1.ServiceType, kubelet, resource.VersionUndefined), watchCh); err != nil {
66
		return fmt.Errorf("error watching service: %w", err)
67
	}
68

69
	var ev safe.WrappedStateEvent[*v1alpha1.Service]
70

71
	select {
72
	case ev = <-watchCh:
73
	case <-ctx.Done():
74
		return ctx.Err()
75
	}
76

77
	if ev.Type() != state.Created {
78
		return fmt.Errorf("unexpected event type: %s", ev.Type())
79
	}
80

81
	initialService, err := ev.Resource()
82
	if err != nil {
83
		return fmt.Errorf("error inspecting service: %w", err)
84
	}
85

86
	if !initialService.TypedSpec().Running || !initialService.TypedSpec().Healthy {
87
		return errors.New("kubelet is not healthy")
88
	}
89

90
	// find out current kubelet version, as the machine config might have a missing image field,
91
	// look it up from the kubelet spec
92

93
	kubeletSpec, err := safe.StateGet[*k8s.KubeletSpec](ctx, c.COSI, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletSpecType, kubelet, resource.VersionUndefined))
94
	if err != nil {
95
		return fmt.Errorf("error fetching kubelet spec: %w", err)
96
	}
97

98
	skipWait := false
99

100
	err = patchNodeConfig(ctx, cluster, node, options.EncoderOpt, upgradeKubeletPatcher(options, kubeletSpec))
101
	if err != nil {
102
		if errors.Is(err, errUpdateSkipped) {
103
			skipWait = true
104
		} else {
105
			return fmt.Errorf("error patching node config: %w", err)
106
		}
107
	}
108

109
	if options.DryRun {
110
		return nil
111
	}
112

113
	options.Log(" > %q: machine configuration patched", node)
114

115
	if !skipWait {
116
		options.Log(" > %q: waiting for kubelet restart", node)
117

118
		// first, wait for kubelet to go down
119
		for {
120
			select {
121
			case ev = <-watchCh:
122
			case <-ctx.Done():
123
				return ctx.Err()
124
			}
125

126
			if ev.Type() == state.Destroyed {
127
				break
128
			}
129
		}
130

131
		// now wait for kubelet to go up & healthy
132
		for {
133
			select {
134
			case ev = <-watchCh:
135
			case <-ctx.Done():
136
				return ctx.Err()
137
			}
138

139
			if ev.Type() == state.Created || ev.Type() == state.Updated {
140
				var service *v1alpha1.Service
141

142
				service, err = ev.Resource()
143
				if err != nil {
144
					return fmt.Errorf("error inspecting service: %w", err)
145
				}
146

147
				if service.TypedSpec().Running && service.TypedSpec().Healthy {
148
					break
149
				}
150
			}
151
		}
152
	}
153

154
	options.Log(" > %q: waiting for node update", node)
155

156
	if err = retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second)).Retry(
157
		func() error {
158
			return checkNodeKubeletVersion(ctx, cluster, node, "v"+options.Path.ToVersion())
159
		},
160
	); err != nil {
161
		return err
162
	}
163

164
	options.Log(" < %q: successfully updated", node)
165

166
	return nil
167
}
168

169
func upgradeKubeletPatcher(
170
	options UpgradeOptions,
171
	kubeletSpec *k8s.KubeletSpec,
172
) func(config *v1alpha1config.Config) error {
173
	return func(config *v1alpha1config.Config) error {
174
		if config.MachineConfig == nil {
175
			config.MachineConfig = &v1alpha1config.MachineConfig{}
176
		}
177

178
		if config.MachineConfig.MachineKubelet == nil {
179
			config.MachineConfig.MachineKubelet = &v1alpha1config.KubeletConfig{}
180
		}
181

182
		oldImage := kubeletSpec.TypedSpec().Image
183

184
		logUpdate := func(oldImage string) {
185
			parts := strings.Split(oldImage, ":")
186
			version := options.Path.FromVersion()
187

188
			if len(parts) > 1 {
189
				version = parts[1]
190
			}
191

192
			version = strings.TrimLeft(version, "v")
193

194
			options.Log(" > update %s: %s -> %s", kubelet, version, options.Path.ToVersion())
195

196
			if options.DryRun {
197
				options.Log(" > skipped in dry-run")
198
			}
199
		}
200

201
		image := fmt.Sprintf("%s:v%s", options.KubeletImage, options.Path.ToVersion())
202

203
		if oldImage == image {
204
			return errUpdateSkipped
205
		}
206

207
		logUpdate(oldImage)
208

209
		if options.DryRun {
210
			return errUpdateSkipped
211
		}
212

213
		config.MachineConfig.MachineKubelet.KubeletImage = image
214

215
		return nil
216
	}
217
}
218

219
//nolint:gocyclo
220
func checkNodeKubeletVersion(ctx context.Context, cluster UpgradeProvider, nodeToCheck, version string) error {
221
	k8sClient, err := cluster.K8sHelper(ctx)
222
	if err != nil {
223
		return fmt.Errorf("error building kubernetes client: %w", err)
224
	}
225

226
	nodes, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
227
	if err != nil {
228
		if kubernetes.IsRetryableError(err) {
229
			return retry.ExpectedError(err)
230
		}
231

232
		return err
233
	}
234

235
	nodeFound := false
236

237
	for _, node := range nodes.Items {
238
		matchingNode := false
239

240
		for _, address := range node.Status.Addresses {
241
			if address.Address == nodeToCheck {
242
				matchingNode = true
243

244
				break
245
			}
246
		}
247

248
		if !matchingNode {
249
			continue
250
		}
251

252
		nodeFound = true
253

254
		if node.Status.NodeInfo.KubeletVersion != version {
255
			return retry.ExpectedErrorf(
256
				"node version mismatch: got %q, expected %q",
257
				node.Status.NodeInfo.KubeletVersion,
258
				version,
259
			)
260
		}
261

262
		ready := false
263

264
		for _, condition := range node.Status.Conditions {
265
			if condition.Type != v1.NodeReady {
266
				continue
267
			}
268

269
			if condition.Status == v1.ConditionTrue {
270
				ready = true
271

272
				break
273
			}
274
		}
275

276
		if !ready {
277
			return retry.ExpectedErrorf("node is not ready")
278
		}
279

280
		break
281
	}
282

283
	if !nodeFound {
284
		return retry.ExpectedErrorf("node %q not found", nodeToCheck)
285
	}
286

287
	return nil
288
}
289

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.