talos
288 строк · 6.6 Кб
1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5package kubernetes
6
7import (
8"context"
9"errors"
10"fmt"
11"slices"
12"strings"
13"time"
14
15"github.com/cosi-project/runtime/pkg/resource"
16"github.com/cosi-project/runtime/pkg/safe"
17"github.com/cosi-project/runtime/pkg/state"
18"github.com/siderolabs/go-retry/retry"
19v1 "k8s.io/api/core/v1"
20metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21
22"github.com/siderolabs/talos/pkg/kubernetes"
23"github.com/siderolabs/talos/pkg/machinery/client"
24v1alpha1config "github.com/siderolabs/talos/pkg/machinery/config/types/v1alpha1"
25"github.com/siderolabs/talos/pkg/machinery/resources/k8s"
26"github.com/siderolabs/talos/pkg/machinery/resources/v1alpha1"
27)
28
29const kubelet = "kubelet"
30
31func upgradeKubelet(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions) error {
32if !options.UpgradeKubelet {
33options.Log("skipped updating kubelet")
34
35return nil
36}
37
38options.Log("updating kubelet to version %q", options.Path.ToVersion())
39
40for _, node := range append(slices.Clone(options.controlPlaneNodes), options.workerNodes...) {
41if err := upgradeKubeletOnNode(ctx, cluster, options, node); err != nil {
42return fmt.Errorf("error updating node %q: %w", node, err)
43}
44}
45
46return nil
47}
48
49//nolint:gocyclo,cyclop
50func upgradeKubeletOnNode(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions, node string) error {
51ctx, cancel := context.WithCancel(ctx)
52defer cancel()
53
54c, err := cluster.Client()
55if err != nil {
56return fmt.Errorf("error building Talos API client: %w", err)
57}
58
59ctx = client.WithNode(ctx, node)
60
61options.Log(" > %q: starting update", node)
62
63watchCh := make(chan safe.WrappedStateEvent[*v1alpha1.Service])
64
65if err = safe.StateWatch(ctx, c.COSI, resource.NewMetadata(v1alpha1.NamespaceName, v1alpha1.ServiceType, kubelet, resource.VersionUndefined), watchCh); err != nil {
66return fmt.Errorf("error watching service: %w", err)
67}
68
69var ev safe.WrappedStateEvent[*v1alpha1.Service]
70
71select {
72case ev = <-watchCh:
73case <-ctx.Done():
74return ctx.Err()
75}
76
77if ev.Type() != state.Created {
78return fmt.Errorf("unexpected event type: %s", ev.Type())
79}
80
81initialService, err := ev.Resource()
82if err != nil {
83return fmt.Errorf("error inspecting service: %w", err)
84}
85
86if !initialService.TypedSpec().Running || !initialService.TypedSpec().Healthy {
87return errors.New("kubelet is not healthy")
88}
89
90// find out current kubelet version, as the machine config might have a missing image field,
91// look it up from the kubelet spec
92
93kubeletSpec, err := safe.StateGet[*k8s.KubeletSpec](ctx, c.COSI, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletSpecType, kubelet, resource.VersionUndefined))
94if err != nil {
95return fmt.Errorf("error fetching kubelet spec: %w", err)
96}
97
98skipWait := false
99
100err = patchNodeConfig(ctx, cluster, node, options.EncoderOpt, upgradeKubeletPatcher(options, kubeletSpec))
101if err != nil {
102if errors.Is(err, errUpdateSkipped) {
103skipWait = true
104} else {
105return fmt.Errorf("error patching node config: %w", err)
106}
107}
108
109if options.DryRun {
110return nil
111}
112
113options.Log(" > %q: machine configuration patched", node)
114
115if !skipWait {
116options.Log(" > %q: waiting for kubelet restart", node)
117
118// first, wait for kubelet to go down
119for {
120select {
121case ev = <-watchCh:
122case <-ctx.Done():
123return ctx.Err()
124}
125
126if ev.Type() == state.Destroyed {
127break
128}
129}
130
131// now wait for kubelet to go up & healthy
132for {
133select {
134case ev = <-watchCh:
135case <-ctx.Done():
136return ctx.Err()
137}
138
139if ev.Type() == state.Created || ev.Type() == state.Updated {
140var service *v1alpha1.Service
141
142service, err = ev.Resource()
143if err != nil {
144return fmt.Errorf("error inspecting service: %w", err)
145}
146
147if service.TypedSpec().Running && service.TypedSpec().Healthy {
148break
149}
150}
151}
152}
153
154options.Log(" > %q: waiting for node update", node)
155
156if err = retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second)).Retry(
157func() error {
158return checkNodeKubeletVersion(ctx, cluster, node, "v"+options.Path.ToVersion())
159},
160); err != nil {
161return err
162}
163
164options.Log(" < %q: successfully updated", node)
165
166return nil
167}
168
169func upgradeKubeletPatcher(
170options UpgradeOptions,
171kubeletSpec *k8s.KubeletSpec,
172) func(config *v1alpha1config.Config) error {
173return func(config *v1alpha1config.Config) error {
174if config.MachineConfig == nil {
175config.MachineConfig = &v1alpha1config.MachineConfig{}
176}
177
178if config.MachineConfig.MachineKubelet == nil {
179config.MachineConfig.MachineKubelet = &v1alpha1config.KubeletConfig{}
180}
181
182oldImage := kubeletSpec.TypedSpec().Image
183
184logUpdate := func(oldImage string) {
185parts := strings.Split(oldImage, ":")
186version := options.Path.FromVersion()
187
188if len(parts) > 1 {
189version = parts[1]
190}
191
192version = strings.TrimLeft(version, "v")
193
194options.Log(" > update %s: %s -> %s", kubelet, version, options.Path.ToVersion())
195
196if options.DryRun {
197options.Log(" > skipped in dry-run")
198}
199}
200
201image := fmt.Sprintf("%s:v%s", options.KubeletImage, options.Path.ToVersion())
202
203if oldImage == image {
204return errUpdateSkipped
205}
206
207logUpdate(oldImage)
208
209if options.DryRun {
210return errUpdateSkipped
211}
212
213config.MachineConfig.MachineKubelet.KubeletImage = image
214
215return nil
216}
217}
218
219//nolint:gocyclo
220func checkNodeKubeletVersion(ctx context.Context, cluster UpgradeProvider, nodeToCheck, version string) error {
221k8sClient, err := cluster.K8sHelper(ctx)
222if err != nil {
223return fmt.Errorf("error building kubernetes client: %w", err)
224}
225
226nodes, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
227if err != nil {
228if kubernetes.IsRetryableError(err) {
229return retry.ExpectedError(err)
230}
231
232return err
233}
234
235nodeFound := false
236
237for _, node := range nodes.Items {
238matchingNode := false
239
240for _, address := range node.Status.Addresses {
241if address.Address == nodeToCheck {
242matchingNode = true
243
244break
245}
246}
247
248if !matchingNode {
249continue
250}
251
252nodeFound = true
253
254if node.Status.NodeInfo.KubeletVersion != version {
255return retry.ExpectedErrorf(
256"node version mismatch: got %q, expected %q",
257node.Status.NodeInfo.KubeletVersion,
258version,
259)
260}
261
262ready := false
263
264for _, condition := range node.Status.Conditions {
265if condition.Type != v1.NodeReady {
266continue
267}
268
269if condition.Status == v1.ConditionTrue {
270ready = true
271
272break
273}
274}
275
276if !ready {
277return retry.ExpectedErrorf("node is not ready")
278}
279
280break
281}
282
283if !nodeFound {
284return retry.ExpectedErrorf("node %q not found", nodeToCheck)
285}
286
287return nil
288}
289