talos
800 строк · 23.9 Кб
1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5//go:build integration_api
6
7package base
8
9import (
10"bufio"
11"context"
12"crypto/sha256"
13"encoding/hex"
14"fmt"
15"io"
16"math/rand/v2"
17"path/filepath"
18"slices"
19"strings"
20"time"
21
22"github.com/cosi-project/runtime/pkg/safe"
23"github.com/cosi-project/runtime/pkg/state"
24"github.com/siderolabs/gen/xslices"
25"github.com/siderolabs/go-retry/retry"
26"github.com/stretchr/testify/suite"
27"google.golang.org/grpc/backoff"
28"google.golang.org/grpc/codes"
29"gopkg.in/yaml.v3"
30
31"github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers"
32"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
33"github.com/siderolabs/talos/pkg/cluster"
34"github.com/siderolabs/talos/pkg/cluster/check"
35"github.com/siderolabs/talos/pkg/machinery/api/common"
36machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
37"github.com/siderolabs/talos/pkg/machinery/api/storage"
38"github.com/siderolabs/talos/pkg/machinery/client"
39clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config"
40"github.com/siderolabs/talos/pkg/machinery/config"
41configconfig "github.com/siderolabs/talos/pkg/machinery/config/config"
42"github.com/siderolabs/talos/pkg/machinery/config/configpatcher"
43"github.com/siderolabs/talos/pkg/machinery/config/container"
44"github.com/siderolabs/talos/pkg/machinery/config/machine"
45"github.com/siderolabs/talos/pkg/machinery/config/types/v1alpha1"
46"github.com/siderolabs/talos/pkg/machinery/constants"
47configres "github.com/siderolabs/talos/pkg/machinery/resources/config"
48runtimeres "github.com/siderolabs/talos/pkg/machinery/resources/runtime"
49"github.com/siderolabs/talos/pkg/provision"
50"github.com/siderolabs/talos/pkg/provision/access"
51)
52
53// APISuite is a base suite for API tests.
54type APISuite struct {
55suite.Suite
56TalosSuite
57
58Client *client.Client
59Talosconfig *clientconfig.Config
60}
61
62// SetupSuite initializes Talos API client.
63func (apiSuite *APISuite) SetupSuite() {
64var err error
65
66apiSuite.Talosconfig, err = clientconfig.Open(apiSuite.TalosConfig)
67apiSuite.Require().NoError(err)
68
69if apiSuite.Endpoint != "" {
70apiSuite.Client = apiSuite.GetClientWithEndpoints(apiSuite.Endpoint)
71} else {
72apiSuite.Client = apiSuite.GetClientWithEndpoints()
73}
74
75// clear any connection refused errors left after the previous tests
76nodes := apiSuite.DiscoverNodeInternalIPs(context.TODO())
77
78if len(nodes) > 0 {
79// grpc might trigger backoff on reconnect attempts, so make sure we clear them
80apiSuite.ClearConnectionRefused(context.Background(), nodes...)
81}
82}
83
84// GetClientWithEndpoints returns Talos API client with provided endpoints.
85func (apiSuite *APISuite) GetClientWithEndpoints(endpoints ...string) *client.Client {
86opts := []client.OptionFunc{
87client.WithConfig(apiSuite.Talosconfig),
88client.WithEndpoints(endpoints...),
89}
90
91cli, err := client.New(context.TODO(), opts...)
92apiSuite.Require().NoError(err)
93
94return cli
95}
96
97// DiscoverNodes provides list of Talos nodes in the cluster.
98//
99// As there's no way to provide this functionality via Talos API, it works the following way:
100// 1. If there's a provided cluster info, it's used.
101// 2. If integration test was compiled with k8s support, k8s is used.
102//
103// The passed ctx is additionally limited to one minute.
104func (apiSuite *APISuite) DiscoverNodes(ctx context.Context) cluster.Info {
105discoveredNodes := apiSuite.TalosSuite.DiscoverNodes(ctx)
106if discoveredNodes != nil {
107return discoveredNodes
108}
109
110var err error
111
112var ctxCancel context.CancelFunc
113ctx, ctxCancel = context.WithTimeout(ctx, time.Minute)
114
115defer ctxCancel()
116
117apiSuite.discoveredNodes, err = discoverNodesK8s(ctx, apiSuite.Client, &apiSuite.TalosSuite)
118apiSuite.Require().NoError(err, "k8s discovery failed")
119
120if apiSuite.discoveredNodes == nil {
121// still no nodes, skip the test
122apiSuite.T().Skip("no nodes were discovered")
123}
124
125return apiSuite.discoveredNodes
126}
127
128// DiscoverNodeInternalIPs provides list of Talos node internal IPs in the cluster.
129func (apiSuite *APISuite) DiscoverNodeInternalIPs(ctx context.Context) []string {
130nodes := apiSuite.DiscoverNodes(ctx).Nodes()
131
132return mapNodeInfosToInternalIPs(nodes)
133}
134
135// DiscoverNodeInternalIPsByType provides list of Talos node internal IPs in the cluster for given machine type.
136func (apiSuite *APISuite) DiscoverNodeInternalIPsByType(ctx context.Context, machineType machine.Type) []string {
137nodesByType := apiSuite.DiscoverNodes(ctx).NodesByType(machineType)
138
139return mapNodeInfosToInternalIPs(nodesByType)
140}
141
142// RandomDiscoveredNodeInternalIP returns the internal IP a random node of the specified type (or any type if no types are specified).
143func (apiSuite *APISuite) RandomDiscoveredNodeInternalIP(types ...machine.Type) string {
144nodeInfo := apiSuite.DiscoverNodes(context.TODO())
145
146var nodes []cluster.NodeInfo
147
148if len(types) == 0 {
149nodeInfos := nodeInfo.Nodes()
150
151nodes = nodeInfos
152} else {
153for _, t := range types {
154nodeInfosByType := nodeInfo.NodesByType(t)
155
156nodes = append(nodes, nodeInfosByType...)
157}
158}
159
160apiSuite.Require().NotEmpty(nodes)
161
162return nodes[rand.IntN(len(nodes))].InternalIP.String()
163}
164
165// Capabilities describes current cluster allowed actions.
166type Capabilities struct {
167RunsTalosKernel bool
168SupportsReboot bool
169SupportsRecover bool
170SupportsVolumes bool
171SecureBooted bool
172}
173
174// Capabilities returns a set of capabilities to skip tests for different environments.
175func (apiSuite *APISuite) Capabilities() Capabilities {
176v, err := apiSuite.Client.Version(context.Background())
177apiSuite.Require().NoError(err)
178
179caps := Capabilities{}
180
181if v.Messages[0].Platform != nil {
182switch v.Messages[0].Platform.Mode {
183case runtime.ModeContainer.String():
184default:
185caps.RunsTalosKernel = true
186caps.SupportsReboot = true
187caps.SupportsRecover = true
188caps.SupportsVolumes = true
189}
190}
191
192ctx := context.Background()
193ctx, ctxCancel := context.WithTimeout(ctx, 2*time.Minute)
194
195defer ctxCancel()
196
197securityResource, err := safe.StateWatchFor[*runtimeres.SecurityState](
198ctx,
199apiSuite.Client.COSI,
200runtimeres.NewSecurityStateSpec(runtimeres.NamespaceName).Metadata(),
201state.WithEventTypes(state.Created, state.Updated),
202)
203apiSuite.Require().NoError(err)
204
205caps.SecureBooted = securityResource.TypedSpec().SecureBoot
206
207return caps
208}
209
210// AssertClusterHealthy verifies that cluster is healthy using provisioning checks.
211func (apiSuite *APISuite) AssertClusterHealthy(ctx context.Context) {
212if apiSuite.Cluster == nil {
213// can't assert if cluster state was provided
214apiSuite.T().Skip("cluster health can't be verified when cluster state is not provided")
215}
216
217clusterAccess := access.NewAdapter(apiSuite.Cluster, provision.WithTalosClient(apiSuite.Client))
218defer clusterAccess.Close() //nolint:errcheck
219
220apiSuite.Require().NoError(check.Wait(ctx, clusterAccess, append(check.DefaultClusterChecks(), check.ExtraClusterChecks()...), check.StderrReporter()))
221}
222
223// ReadBootID reads node boot_id.
224//
225// Context provided might have specific node attached for API call.
226func (apiSuite *APISuite) ReadBootID(ctx context.Context) (string, error) {
227// set up a short timeout around boot_id read calls to work around
228// cases when rebooted node doesn't answer for a long time on requests
229reqCtx, reqCtxCancel := context.WithTimeout(ctx, 10*time.Second)
230defer reqCtxCancel()
231
232reader, err := apiSuite.Client.Read(reqCtx, "/proc/sys/kernel/random/boot_id")
233if err != nil {
234return "", err
235}
236
237defer reader.Close() //nolint:errcheck
238
239body, err := io.ReadAll(reader)
240if err != nil {
241return "", err
242}
243
244bootID := strings.TrimSpace(string(body))
245
246_, err = io.Copy(io.Discard, reader)
247if err != nil {
248return "", err
249}
250
251return bootID, reader.Close()
252}
253
254// ReadBootIDWithRetry reads node boot_id.
255//
256// Context provided might have specific node attached for API call.
257func (apiSuite *APISuite) ReadBootIDWithRetry(ctx context.Context, timeout time.Duration) string {
258var bootID string
259
260apiSuite.Require().NoError(retry.Constant(timeout, retry.WithUnits(time.Millisecond*1000)).Retry(
261func() error {
262var err error
263
264bootID, err = apiSuite.ReadBootID(ctx)
265if err != nil {
266return retry.ExpectedError(err)
267}
268
269if bootID == "" {
270return retry.ExpectedErrorf("boot id is empty")
271}
272
273return nil
274},
275))
276
277return bootID
278}
279
280// AssertRebooted verifies that node got rebooted as result of running some API call.
281//
282// Verification happens via reading boot_id of the node.
283func (apiSuite *APISuite) AssertRebooted(ctx context.Context, node string, rebootFunc func(nodeCtx context.Context) error, timeout time.Duration) {
284apiSuite.AssertRebootedNoChecks(ctx, node, rebootFunc, timeout)
285
286apiSuite.WaitForBootDone(ctx)
287
288if apiSuite.Cluster != nil {
289// without cluster state we can't do deep checks, but basic reboot test still works
290// NB: using `ctx` here to have client talking to init node by default
291apiSuite.AssertClusterHealthy(ctx)
292}
293}
294
295// AssertRebootedNoChecks waits for node to be rebooted without waiting for cluster to become healthy afterwards.
296func (apiSuite *APISuite) AssertRebootedNoChecks(ctx context.Context, node string, rebootFunc func(nodeCtx context.Context) error, timeout time.Duration) {
297// timeout for single node Reset
298ctx, ctxCancel := context.WithTimeout(ctx, timeout)
299defer ctxCancel()
300
301nodeCtx := client.WithNodes(ctx, node)
302
303var (
304bootIDBefore string
305err error
306)
307
308err = retry.Constant(time.Minute * 5).Retry(func() error {
309// read boot_id before reboot
310bootIDBefore, err = apiSuite.ReadBootID(nodeCtx)
311if err != nil {
312return retry.ExpectedError(err)
313}
314
315return nil
316})
317
318apiSuite.Require().NoError(err)
319
320apiSuite.Assert().NoError(rebootFunc(nodeCtx))
321
322apiSuite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, timeout)
323}
324
325// AssertBootIDChanged waits until node boot id changes.
326func (apiSuite *APISuite) AssertBootIDChanged(nodeCtx context.Context, bootIDBefore, node string, timeout time.Duration) {
327apiSuite.Assert().NotEmpty(bootIDBefore)
328
329apiSuite.Require().NoError(retry.Constant(timeout).Retry(func() error {
330requestCtx, requestCtxCancel := context.WithTimeout(nodeCtx, time.Second)
331defer requestCtxCancel()
332
333bootIDAfter, err := apiSuite.ReadBootID(requestCtx)
334if err != nil {
335// API might be unresponsive during reboot
336return retry.ExpectedError(err)
337}
338
339if bootIDAfter == bootIDBefore {
340// bootID should be different after reboot
341return retry.ExpectedErrorf("bootID didn't change for node %q: before %s, after %s", node, bootIDBefore, bootIDAfter)
342}
343
344return nil
345}))
346}
347
348// WaitForBootDone waits for boot phase done event.
349func (apiSuite *APISuite) WaitForBootDone(ctx context.Context) {
350apiSuite.WaitForSequenceDone(
351ctx,
352runtime.SequenceBoot,
353apiSuite.DiscoverNodeInternalIPs(ctx)...,
354)
355}
356
357// WaitForSequenceDone waits for sequence done event.
358func (apiSuite *APISuite) WaitForSequenceDone(ctx context.Context, sequence runtime.Sequence, nodes ...string) {
359nodesNotDone := make(map[string]struct{})
360
361for _, node := range nodes {
362nodesNotDone[node] = struct{}{}
363}
364
365apiSuite.Require().NoError(retry.Constant(5*time.Minute, retry.WithUnits(time.Second*10)).Retry(func() error {
366eventsCtx, cancel := context.WithTimeout(client.WithNodes(ctx, nodes...), 5*time.Second)
367defer cancel()
368
369err := apiSuite.Client.EventsWatch(eventsCtx, func(ch <-chan client.Event) {
370defer cancel()
371
372for event := range ch {
373if msg, ok := event.Payload.(*machineapi.SequenceEvent); ok {
374if msg.GetAction() == machineapi.SequenceEvent_STOP && msg.GetSequence() == sequence.String() {
375delete(nodesNotDone, event.Node)
376
377if len(nodesNotDone) == 0 {
378return
379}
380}
381}
382}
383}, client.WithTailEvents(-1))
384if err != nil {
385return retry.ExpectedError(err)
386}
387
388if len(nodesNotDone) > 0 {
389return retry.ExpectedErrorf("nodes %#v sequence %s is not completed", nodesNotDone, sequence.String())
390}
391
392return nil
393}))
394}
395
396// ClearConnectionRefused clears cached connection refused errors which might be left after node reboot.
397func (apiSuite *APISuite) ClearConnectionRefused(ctx context.Context, nodes ...string) {
398ctx, cancel := context.WithTimeout(ctx, backoff.DefaultConfig.MaxDelay)
399defer cancel()
400
401controlPlaneNodes := apiSuite.DiscoverNodes(ctx).NodesByType(machine.TypeControlPlane)
402initNodes := apiSuite.DiscoverNodes(ctx).NodesByType(machine.TypeInit)
403
404numMasterNodes := len(controlPlaneNodes) + len(initNodes)
405if numMasterNodes == 0 {
406numMasterNodes = 3
407}
408
409apiSuite.Require().NoError(retry.Constant(backoff.DefaultConfig.MaxDelay, retry.WithUnits(time.Second)).Retry(func() error {
410for range numMasterNodes {
411_, err := apiSuite.Client.Version(client.WithNodes(ctx, nodes...))
412if err == nil {
413continue
414}
415
416if client.StatusCode(err) == codes.Unavailable || client.StatusCode(err) == codes.Canceled {
417return retry.ExpectedError(err)
418}
419
420if strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "connection reset by peer") {
421return retry.ExpectedError(err)
422}
423
424return err
425}
426
427return nil
428}))
429}
430
431// HashKubeletCert returns hash of the kubelet certificate file.
432//
433// This function can be used to verify that the node ephemeral partition got wiped.
434func (apiSuite *APISuite) HashKubeletCert(ctx context.Context, node string) (string, error) {
435reqCtx, reqCtxCancel := context.WithTimeout(ctx, 10*time.Second)
436defer reqCtxCancel()
437
438reqCtx = client.WithNodes(reqCtx, node)
439
440reader, err := apiSuite.Client.Read(reqCtx, "/var/lib/kubelet/pki/kubelet-client-current.pem")
441if err != nil {
442return "", err
443}
444
445defer reader.Close() //nolint:errcheck
446
447hash := sha256.New()
448
449_, err = io.Copy(hash, reader)
450if err != nil {
451return "", err
452}
453
454return hex.EncodeToString(hash.Sum(nil)), reader.Close()
455}
456
457// ReadConfigFromNode reads machine configuration from the node.
458func (apiSuite *APISuite) ReadConfigFromNode(nodeCtx context.Context) (config.Provider, error) {
459cfg, err := safe.StateGetByID[*configres.MachineConfig](nodeCtx, apiSuite.Client.COSI, configres.V1Alpha1ID)
460if err != nil {
461return nil, fmt.Errorf("error fetching machine config resource: %w", err)
462}
463
464return cfg.Provider(), nil
465}
466
467// UserDisks returns list of user disks on with size greater than sizeGreaterThanGB and not having any partitions present.
468//
469//nolint:gocyclo
470func (apiSuite *APISuite) UserDisks(ctx context.Context, node string, sizeGreaterThanGB int) ([]string, error) {
471nodeCtx := client.WithNodes(ctx, node)
472
473resp, err := apiSuite.Client.Disks(nodeCtx)
474if err != nil {
475return nil, err
476}
477
478var disks []string
479
480blockDeviceInUse := func(deviceName string) (bool, error) {
481devicePart := strings.Split(deviceName, "/dev/")[1]
482
483// https://unix.stackexchange.com/questions/111779/how-to-find-out-easily-whether-a-block-device-or-a-part-of-it-is-mounted-someh
484// this was the only easy way I could find to check if the block device is already in use by something like raid
485stream, err := apiSuite.Client.LS(nodeCtx, &machineapi.ListRequest{
486Root: fmt.Sprintf("/sys/block/%s/holders", devicePart),
487})
488if err != nil {
489return false, err
490}
491
492counter := 0
493
494if err = helpers.ReadGRPCStream(stream, func(info *machineapi.FileInfo, node string, multipleNodes bool) error {
495counter++
496
497return nil
498}); err != nil {
499return false, err
500}
501
502if counter > 1 {
503return true, nil
504}
505
506return false, nil
507}
508
509for _, msg := range resp.Messages {
510for _, disk := range msg.Disks {
511if disk.SystemDisk || disk.Readonly || disk.Type == storage.Disk_CD {
512continue
513}
514
515if disk.BusPath == "/virtual" {
516continue
517}
518
519blockDeviceUsed, err := blockDeviceInUse(disk.DeviceName)
520if err != nil {
521return nil, err
522}
523
524if disk.Size > uint64(sizeGreaterThanGB)*1024*1024*1024 && !blockDeviceUsed {
525disks = append(disks, disk.DeviceName)
526}
527}
528}
529
530return disks, nil
531}
532
533// AssertServicesRunning verifies that services are running on the node.
534func (apiSuite *APISuite) AssertServicesRunning(ctx context.Context, node string, serviceStatus map[string]string) {
535nodeCtx := client.WithNode(ctx, node)
536
537for svc, state := range serviceStatus {
538resp, err := apiSuite.Client.ServiceInfo(nodeCtx, svc)
539apiSuite.Require().NoError(err)
540apiSuite.Require().NotNil(resp, "expected service %s to be registered", svc)
541
542for _, svcInfo := range resp {
543apiSuite.Require().Equal(state, svcInfo.Service.State, "expected service %s to have state %s", svc, state)
544}
545}
546}
547
548// AssertExpectedModules verifies that expected kernel modules are loaded on the node.
549func (apiSuite *APISuite) AssertExpectedModules(ctx context.Context, node string, expectedModules map[string]string) {
550nodeCtx := client.WithNode(ctx, node)
551
552fileReader, err := apiSuite.Client.Read(nodeCtx, "/proc/modules")
553apiSuite.Require().NoError(err)
554
555defer func() {
556apiSuite.Require().NoError(fileReader.Close())
557}()
558
559scanner := bufio.NewScanner(fileReader)
560
561var loadedModules []string
562
563for scanner.Scan() {
564loadedModules = append(loadedModules, strings.Split(scanner.Text(), " ")[0])
565}
566apiSuite.Require().NoError(scanner.Err())
567
568fileReader, err = apiSuite.Client.Read(nodeCtx, fmt.Sprintf("/lib/modules/%s/modules.dep", constants.DefaultKernelVersion))
569apiSuite.Require().NoError(err)
570
571defer func() {
572apiSuite.Require().NoError(fileReader.Close())
573}()
574
575scanner = bufio.NewScanner(fileReader)
576
577var modulesDep []string
578
579for scanner.Scan() {
580modulesDep = append(modulesDep, filepath.Base(strings.Split(scanner.Text(), ":")[0]))
581}
582apiSuite.Require().NoError(scanner.Err())
583
584for module, moduleDep := range expectedModules {
585apiSuite.Require().Contains(loadedModules, module, "expected %s to be loaded", module)
586apiSuite.Require().Contains(modulesDep, moduleDep, "expected %s to be in modules.dep", moduleDep)
587}
588}
589
590// UpdateMachineConfig fetches machine configuration, patches it and applies the changes.
591func (apiSuite *APISuite) UpdateMachineConfig(nodeCtx context.Context, patch func(config.Provider) (config.Provider, error)) {
592cfg, err := apiSuite.ReadConfigFromNode(nodeCtx)
593apiSuite.Require().NoError(err)
594
595patchedCfg, err := patch(cfg)
596apiSuite.Require().NoError(err)
597
598bytes, err := patchedCfg.Bytes()
599apiSuite.Require().NoError(err)
600
601resp, err := apiSuite.Client.ApplyConfiguration(nodeCtx, &machineapi.ApplyConfigurationRequest{
602Data: bytes,
603Mode: machineapi.ApplyConfigurationRequest_AUTO,
604})
605apiSuite.Require().NoError(err)
606
607apiSuite.T().Logf("patched machine config: %s", resp.Messages[0].ModeDetails)
608}
609
610// PatchMachineConfig patches machine configuration on the node.
611func (apiSuite *APISuite) PatchMachineConfig(nodeCtx context.Context, patches ...any) {
612configPatches := make([]configpatcher.Patch, 0, len(patches))
613
614for _, patch := range patches {
615marshaled, err := yaml.Marshal(patch)
616apiSuite.Require().NoError(err)
617
618configPatch, err := configpatcher.LoadPatch(marshaled)
619apiSuite.Require().NoError(err)
620
621configPatches = append(configPatches, configPatch)
622}
623
624apiSuite.UpdateMachineConfig(nodeCtx, func(cfg config.Provider) (config.Provider, error) {
625out, err := configpatcher.Apply(configpatcher.WithConfig(cfg), configPatches)
626if err != nil {
627return nil, err
628}
629
630return out.Config()
631})
632}
633
634// RemoveMachineConfigDocuments removes machine configuration documents of specified type from the node.
635func (apiSuite *APISuite) RemoveMachineConfigDocuments(nodeCtx context.Context, docTypes ...string) {
636apiSuite.UpdateMachineConfig(nodeCtx, func(cfg config.Provider) (config.Provider, error) {
637return container.New(xslices.Filter(cfg.Documents(), func(doc configconfig.Document) bool {
638return slices.Index(docTypes, doc.Kind()) == -1
639})...)
640})
641}
642
643// PatchV1Alpha1Config patches v1alpha1 config in the config provider.
644func (apiSuite *APISuite) PatchV1Alpha1Config(provider config.Provider, patch func(*v1alpha1.Config)) []byte {
645ctr, err := provider.PatchV1Alpha1(func(c *v1alpha1.Config) error {
646patch(c)
647
648return nil
649})
650apiSuite.Require().NoError(err)
651
652bytes, err := ctr.Bytes()
653apiSuite.Require().NoError(err)
654
655return bytes
656}
657
658// ResetNode wraps the reset node sequence with checks, waiting for the reset to finish and verifying the result.
659//
660//nolint:gocyclo
661func (apiSuite *APISuite) ResetNode(ctx context.Context, node string, resetSpec *machineapi.ResetRequest, runHealthChecks bool) {
662apiSuite.T().Logf("resetting node %q with graceful %v mode %s, system %v, user %v", node, resetSpec.Graceful, resetSpec.Mode, resetSpec.SystemPartitionsToWipe, resetSpec.UserDisksToWipe)
663
664nodeCtx := client.WithNode(ctx, node)
665
666nodeClient := apiSuite.GetClientWithEndpoints(node)
667defer nodeClient.Close() //nolint:errcheck
668
669// any reset should lead to a reboot, so read boot_id before reboot
670bootIDBefore, err := apiSuite.ReadBootID(nodeCtx)
671apiSuite.Require().NoError(err)
672
673// figure out if EPHEMERAL is going to be reset
674ephemeralIsGoingToBeReset := false
675
676if len(resetSpec.SystemPartitionsToWipe) == 0 && len(resetSpec.UserDisksToWipe) == 0 {
677ephemeralIsGoingToBeReset = true
678} else {
679for _, part := range resetSpec.SystemPartitionsToWipe {
680if part.Label == constants.EphemeralPartitionLabel {
681ephemeralIsGoingToBeReset = true
682
683break
684}
685}
686}
687
688preReset, err := apiSuite.HashKubeletCert(ctx, node)
689apiSuite.Require().NoError(err)
690
691resp, err := nodeClient.ResetGenericWithResponse(nodeCtx, resetSpec)
692apiSuite.Require().NoError(err)
693
694actorID := resp.Messages[0].ActorId
695
696eventCh := make(chan client.EventResult)
697
698// watch for events
699apiSuite.Require().NoError(nodeClient.EventsWatchV2(nodeCtx, eventCh, client.WithActorID(actorID), client.WithTailEvents(-1)))
700
701waitTimer := time.NewTimer(5 * time.Minute)
702defer waitTimer.Stop()
703
704waitLoop:
705for {
706select {
707case ev := <-eventCh:
708apiSuite.Require().NoError(ev.Error)
709
710switch msg := ev.Event.Payload.(type) {
711case *machineapi.SequenceEvent:
712if msg.Error != nil {
713apiSuite.FailNow("reset failed", "%s: %s", msg.Error.Message, msg.Error.Code)
714}
715case *machineapi.PhaseEvent:
716if msg.Action == machineapi.PhaseEvent_START && msg.Phase == "unmountSystem" {
717// about to be reset, break waitLoop
718break waitLoop
719}
720
721if msg.Action == machineapi.PhaseEvent_STOP {
722apiSuite.T().Logf("reset phase %q finished", msg.Phase)
723}
724}
725case <-waitTimer.C:
726apiSuite.FailNow("timeout waiting for reset to finish")
727case <-ctx.Done():
728apiSuite.FailNow("context canceled")
729}
730}
731
732// wait for the apid to be shut down
733time.Sleep(10 * time.Second)
734
735apiSuite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, 3*time.Minute)
736
737apiSuite.ClearConnectionRefused(ctx, node)
738
739if runHealthChecks {
740if apiSuite.Cluster != nil {
741// without cluster state we can't do deep checks, but basic reboot test still works
742// NB: using `ctx` here to have client talking to init node by default
743apiSuite.AssertClusterHealthy(ctx)
744}
745
746postReset, err := apiSuite.HashKubeletCert(ctx, node)
747apiSuite.Require().NoError(err)
748
749if ephemeralIsGoingToBeReset {
750apiSuite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
751} else {
752apiSuite.Assert().Equal(preReset, postReset, "ephemeral partition was not reset")
753}
754}
755}
756
757// DumpLogs dumps a set of logs from the node.
758func (apiSuite *APISuite) DumpLogs(ctx context.Context, node string, service, pattern string) {
759nodeCtx := client.WithNode(ctx, node)
760
761logsStream, err := apiSuite.Client.Logs(
762nodeCtx,
763constants.SystemContainerdNamespace,
764common.ContainerDriver_CONTAINERD,
765service,
766false,
767-1,
768)
769apiSuite.Require().NoError(err)
770
771logReader, err := client.ReadStream(logsStream)
772apiSuite.Require().NoError(err)
773
774defer logReader.Close() //nolint:errcheck
775
776scanner := bufio.NewScanner(logReader)
777
778for scanner.Scan() {
779if pattern == "" || strings.Contains(scanner.Text(), pattern) {
780apiSuite.T().Logf("%s (%s): %s", node, service, scanner.Text())
781}
782}
783}
784
785// TearDownSuite closes Talos API client.
786func (apiSuite *APISuite) TearDownSuite() {
787if apiSuite.Client != nil {
788apiSuite.Assert().NoError(apiSuite.Client.Close())
789}
790}
791
792func mapNodeInfosToInternalIPs(nodes []cluster.NodeInfo) []string {
793ips := make([]string, len(nodes))
794
795for i, node := range nodes {
796ips[i] = node.InternalIP.String()
797}
798
799return ips
800}
801