talos

Форк
0
800 строк · 23.9 Кб
1
// This Source Code Form is subject to the terms of the Mozilla Public
2
// License, v. 2.0. If a copy of the MPL was not distributed with this
3
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
//go:build integration_api
6

7
package base
8

9
import (
10
	"bufio"
11
	"context"
12
	"crypto/sha256"
13
	"encoding/hex"
14
	"fmt"
15
	"io"
16
	"math/rand/v2"
17
	"path/filepath"
18
	"slices"
19
	"strings"
20
	"time"
21

22
	"github.com/cosi-project/runtime/pkg/safe"
23
	"github.com/cosi-project/runtime/pkg/state"
24
	"github.com/siderolabs/gen/xslices"
25
	"github.com/siderolabs/go-retry/retry"
26
	"github.com/stretchr/testify/suite"
27
	"google.golang.org/grpc/backoff"
28
	"google.golang.org/grpc/codes"
29
	"gopkg.in/yaml.v3"
30

31
	"github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers"
32
	"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
33
	"github.com/siderolabs/talos/pkg/cluster"
34
	"github.com/siderolabs/talos/pkg/cluster/check"
35
	"github.com/siderolabs/talos/pkg/machinery/api/common"
36
	machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
37
	"github.com/siderolabs/talos/pkg/machinery/api/storage"
38
	"github.com/siderolabs/talos/pkg/machinery/client"
39
	clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config"
40
	"github.com/siderolabs/talos/pkg/machinery/config"
41
	configconfig "github.com/siderolabs/talos/pkg/machinery/config/config"
42
	"github.com/siderolabs/talos/pkg/machinery/config/configpatcher"
43
	"github.com/siderolabs/talos/pkg/machinery/config/container"
44
	"github.com/siderolabs/talos/pkg/machinery/config/machine"
45
	"github.com/siderolabs/talos/pkg/machinery/config/types/v1alpha1"
46
	"github.com/siderolabs/talos/pkg/machinery/constants"
47
	configres "github.com/siderolabs/talos/pkg/machinery/resources/config"
48
	runtimeres "github.com/siderolabs/talos/pkg/machinery/resources/runtime"
49
	"github.com/siderolabs/talos/pkg/provision"
50
	"github.com/siderolabs/talos/pkg/provision/access"
51
)
52

53
// APISuite is a base suite for API tests.
54
type APISuite struct {
55
	suite.Suite
56
	TalosSuite
57

58
	Client      *client.Client
59
	Talosconfig *clientconfig.Config
60
}
61

62
// SetupSuite initializes Talos API client.
63
func (apiSuite *APISuite) SetupSuite() {
64
	var err error
65

66
	apiSuite.Talosconfig, err = clientconfig.Open(apiSuite.TalosConfig)
67
	apiSuite.Require().NoError(err)
68

69
	if apiSuite.Endpoint != "" {
70
		apiSuite.Client = apiSuite.GetClientWithEndpoints(apiSuite.Endpoint)
71
	} else {
72
		apiSuite.Client = apiSuite.GetClientWithEndpoints()
73
	}
74

75
	// clear any connection refused errors left after the previous tests
76
	nodes := apiSuite.DiscoverNodeInternalIPs(context.TODO())
77

78
	if len(nodes) > 0 {
79
		// grpc might trigger backoff on reconnect attempts, so make sure we clear them
80
		apiSuite.ClearConnectionRefused(context.Background(), nodes...)
81
	}
82
}
83

84
// GetClientWithEndpoints returns Talos API client with provided endpoints.
85
func (apiSuite *APISuite) GetClientWithEndpoints(endpoints ...string) *client.Client {
86
	opts := []client.OptionFunc{
87
		client.WithConfig(apiSuite.Talosconfig),
88
		client.WithEndpoints(endpoints...),
89
	}
90

91
	cli, err := client.New(context.TODO(), opts...)
92
	apiSuite.Require().NoError(err)
93

94
	return cli
95
}
96

97
// DiscoverNodes provides list of Talos nodes in the cluster.
98
//
99
// As there's no way to provide this functionality via Talos API, it works the following way:
100
// 1. If there's a provided cluster info, it's used.
101
// 2. If integration test was compiled with k8s support, k8s is used.
102
//
103
// The passed ctx is additionally limited to one minute.
104
func (apiSuite *APISuite) DiscoverNodes(ctx context.Context) cluster.Info {
105
	discoveredNodes := apiSuite.TalosSuite.DiscoverNodes(ctx)
106
	if discoveredNodes != nil {
107
		return discoveredNodes
108
	}
109

110
	var err error
111

112
	var ctxCancel context.CancelFunc
113
	ctx, ctxCancel = context.WithTimeout(ctx, time.Minute)
114

115
	defer ctxCancel()
116

117
	apiSuite.discoveredNodes, err = discoverNodesK8s(ctx, apiSuite.Client, &apiSuite.TalosSuite)
118
	apiSuite.Require().NoError(err, "k8s discovery failed")
119

120
	if apiSuite.discoveredNodes == nil {
121
		// still no nodes, skip the test
122
		apiSuite.T().Skip("no nodes were discovered")
123
	}
124

125
	return apiSuite.discoveredNodes
126
}
127

128
// DiscoverNodeInternalIPs provides list of Talos node internal IPs in the cluster.
129
func (apiSuite *APISuite) DiscoverNodeInternalIPs(ctx context.Context) []string {
130
	nodes := apiSuite.DiscoverNodes(ctx).Nodes()
131

132
	return mapNodeInfosToInternalIPs(nodes)
133
}
134

135
// DiscoverNodeInternalIPsByType provides list of Talos node internal IPs in the cluster for given machine type.
136
func (apiSuite *APISuite) DiscoverNodeInternalIPsByType(ctx context.Context, machineType machine.Type) []string {
137
	nodesByType := apiSuite.DiscoverNodes(ctx).NodesByType(machineType)
138

139
	return mapNodeInfosToInternalIPs(nodesByType)
140
}
141

142
// RandomDiscoveredNodeInternalIP returns the internal IP a random node of the specified type (or any type if no types are specified).
143
func (apiSuite *APISuite) RandomDiscoveredNodeInternalIP(types ...machine.Type) string {
144
	nodeInfo := apiSuite.DiscoverNodes(context.TODO())
145

146
	var nodes []cluster.NodeInfo
147

148
	if len(types) == 0 {
149
		nodeInfos := nodeInfo.Nodes()
150

151
		nodes = nodeInfos
152
	} else {
153
		for _, t := range types {
154
			nodeInfosByType := nodeInfo.NodesByType(t)
155

156
			nodes = append(nodes, nodeInfosByType...)
157
		}
158
	}
159

160
	apiSuite.Require().NotEmpty(nodes)
161

162
	return nodes[rand.IntN(len(nodes))].InternalIP.String()
163
}
164

165
// Capabilities describes current cluster allowed actions.
166
type Capabilities struct {
167
	RunsTalosKernel bool
168
	SupportsReboot  bool
169
	SupportsRecover bool
170
	SupportsVolumes bool
171
	SecureBooted    bool
172
}
173

174
// Capabilities returns a set of capabilities to skip tests for different environments.
175
func (apiSuite *APISuite) Capabilities() Capabilities {
176
	v, err := apiSuite.Client.Version(context.Background())
177
	apiSuite.Require().NoError(err)
178

179
	caps := Capabilities{}
180

181
	if v.Messages[0].Platform != nil {
182
		switch v.Messages[0].Platform.Mode {
183
		case runtime.ModeContainer.String():
184
		default:
185
			caps.RunsTalosKernel = true
186
			caps.SupportsReboot = true
187
			caps.SupportsRecover = true
188
			caps.SupportsVolumes = true
189
		}
190
	}
191

192
	ctx := context.Background()
193
	ctx, ctxCancel := context.WithTimeout(ctx, 2*time.Minute)
194

195
	defer ctxCancel()
196

197
	securityResource, err := safe.StateWatchFor[*runtimeres.SecurityState](
198
		ctx,
199
		apiSuite.Client.COSI,
200
		runtimeres.NewSecurityStateSpec(runtimeres.NamespaceName).Metadata(),
201
		state.WithEventTypes(state.Created, state.Updated),
202
	)
203
	apiSuite.Require().NoError(err)
204

205
	caps.SecureBooted = securityResource.TypedSpec().SecureBoot
206

207
	return caps
208
}
209

210
// AssertClusterHealthy verifies that cluster is healthy using provisioning checks.
211
func (apiSuite *APISuite) AssertClusterHealthy(ctx context.Context) {
212
	if apiSuite.Cluster == nil {
213
		// can't assert if cluster state was provided
214
		apiSuite.T().Skip("cluster health can't be verified when cluster state is not provided")
215
	}
216

217
	clusterAccess := access.NewAdapter(apiSuite.Cluster, provision.WithTalosClient(apiSuite.Client))
218
	defer clusterAccess.Close() //nolint:errcheck
219

220
	apiSuite.Require().NoError(check.Wait(ctx, clusterAccess, append(check.DefaultClusterChecks(), check.ExtraClusterChecks()...), check.StderrReporter()))
221
}
222

223
// ReadBootID reads node boot_id.
224
//
225
// Context provided might have specific node attached for API call.
226
func (apiSuite *APISuite) ReadBootID(ctx context.Context) (string, error) {
227
	// set up a short timeout around boot_id read calls to work around
228
	// cases when rebooted node doesn't answer for a long time on requests
229
	reqCtx, reqCtxCancel := context.WithTimeout(ctx, 10*time.Second)
230
	defer reqCtxCancel()
231

232
	reader, err := apiSuite.Client.Read(reqCtx, "/proc/sys/kernel/random/boot_id")
233
	if err != nil {
234
		return "", err
235
	}
236

237
	defer reader.Close() //nolint:errcheck
238

239
	body, err := io.ReadAll(reader)
240
	if err != nil {
241
		return "", err
242
	}
243

244
	bootID := strings.TrimSpace(string(body))
245

246
	_, err = io.Copy(io.Discard, reader)
247
	if err != nil {
248
		return "", err
249
	}
250

251
	return bootID, reader.Close()
252
}
253

254
// ReadBootIDWithRetry reads node boot_id.
255
//
256
// Context provided might have specific node attached for API call.
257
func (apiSuite *APISuite) ReadBootIDWithRetry(ctx context.Context, timeout time.Duration) string {
258
	var bootID string
259

260
	apiSuite.Require().NoError(retry.Constant(timeout, retry.WithUnits(time.Millisecond*1000)).Retry(
261
		func() error {
262
			var err error
263

264
			bootID, err = apiSuite.ReadBootID(ctx)
265
			if err != nil {
266
				return retry.ExpectedError(err)
267
			}
268

269
			if bootID == "" {
270
				return retry.ExpectedErrorf("boot id is empty")
271
			}
272

273
			return nil
274
		},
275
	))
276

277
	return bootID
278
}
279

280
// AssertRebooted verifies that node got rebooted as result of running some API call.
281
//
282
// Verification happens via reading boot_id of the node.
283
func (apiSuite *APISuite) AssertRebooted(ctx context.Context, node string, rebootFunc func(nodeCtx context.Context) error, timeout time.Duration) {
284
	apiSuite.AssertRebootedNoChecks(ctx, node, rebootFunc, timeout)
285

286
	apiSuite.WaitForBootDone(ctx)
287

288
	if apiSuite.Cluster != nil {
289
		// without cluster state we can't do deep checks, but basic reboot test still works
290
		// NB: using `ctx` here to have client talking to init node by default
291
		apiSuite.AssertClusterHealthy(ctx)
292
	}
293
}
294

295
// AssertRebootedNoChecks waits for node to be rebooted without waiting for cluster to become healthy afterwards.
296
func (apiSuite *APISuite) AssertRebootedNoChecks(ctx context.Context, node string, rebootFunc func(nodeCtx context.Context) error, timeout time.Duration) {
297
	// timeout for single node Reset
298
	ctx, ctxCancel := context.WithTimeout(ctx, timeout)
299
	defer ctxCancel()
300

301
	nodeCtx := client.WithNodes(ctx, node)
302

303
	var (
304
		bootIDBefore string
305
		err          error
306
	)
307

308
	err = retry.Constant(time.Minute * 5).Retry(func() error {
309
		// read boot_id before reboot
310
		bootIDBefore, err = apiSuite.ReadBootID(nodeCtx)
311
		if err != nil {
312
			return retry.ExpectedError(err)
313
		}
314

315
		return nil
316
	})
317

318
	apiSuite.Require().NoError(err)
319

320
	apiSuite.Assert().NoError(rebootFunc(nodeCtx))
321

322
	apiSuite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, timeout)
323
}
324

325
// AssertBootIDChanged waits until node boot id changes.
326
func (apiSuite *APISuite) AssertBootIDChanged(nodeCtx context.Context, bootIDBefore, node string, timeout time.Duration) {
327
	apiSuite.Assert().NotEmpty(bootIDBefore)
328

329
	apiSuite.Require().NoError(retry.Constant(timeout).Retry(func() error {
330
		requestCtx, requestCtxCancel := context.WithTimeout(nodeCtx, time.Second)
331
		defer requestCtxCancel()
332

333
		bootIDAfter, err := apiSuite.ReadBootID(requestCtx)
334
		if err != nil {
335
			// API might be unresponsive during reboot
336
			return retry.ExpectedError(err)
337
		}
338

339
		if bootIDAfter == bootIDBefore {
340
			// bootID should be different after reboot
341
			return retry.ExpectedErrorf("bootID didn't change for node %q: before %s, after %s", node, bootIDBefore, bootIDAfter)
342
		}
343

344
		return nil
345
	}))
346
}
347

348
// WaitForBootDone waits for boot phase done event.
349
func (apiSuite *APISuite) WaitForBootDone(ctx context.Context) {
350
	apiSuite.WaitForSequenceDone(
351
		ctx,
352
		runtime.SequenceBoot,
353
		apiSuite.DiscoverNodeInternalIPs(ctx)...,
354
	)
355
}
356

357
// WaitForSequenceDone waits for sequence done event.
358
func (apiSuite *APISuite) WaitForSequenceDone(ctx context.Context, sequence runtime.Sequence, nodes ...string) {
359
	nodesNotDone := make(map[string]struct{})
360

361
	for _, node := range nodes {
362
		nodesNotDone[node] = struct{}{}
363
	}
364

365
	apiSuite.Require().NoError(retry.Constant(5*time.Minute, retry.WithUnits(time.Second*10)).Retry(func() error {
366
		eventsCtx, cancel := context.WithTimeout(client.WithNodes(ctx, nodes...), 5*time.Second)
367
		defer cancel()
368

369
		err := apiSuite.Client.EventsWatch(eventsCtx, func(ch <-chan client.Event) {
370
			defer cancel()
371

372
			for event := range ch {
373
				if msg, ok := event.Payload.(*machineapi.SequenceEvent); ok {
374
					if msg.GetAction() == machineapi.SequenceEvent_STOP && msg.GetSequence() == sequence.String() {
375
						delete(nodesNotDone, event.Node)
376

377
						if len(nodesNotDone) == 0 {
378
							return
379
						}
380
					}
381
				}
382
			}
383
		}, client.WithTailEvents(-1))
384
		if err != nil {
385
			return retry.ExpectedError(err)
386
		}
387

388
		if len(nodesNotDone) > 0 {
389
			return retry.ExpectedErrorf("nodes %#v sequence %s is not completed", nodesNotDone, sequence.String())
390
		}
391

392
		return nil
393
	}))
394
}
395

396
// ClearConnectionRefused clears cached connection refused errors which might be left after node reboot.
397
func (apiSuite *APISuite) ClearConnectionRefused(ctx context.Context, nodes ...string) {
398
	ctx, cancel := context.WithTimeout(ctx, backoff.DefaultConfig.MaxDelay)
399
	defer cancel()
400

401
	controlPlaneNodes := apiSuite.DiscoverNodes(ctx).NodesByType(machine.TypeControlPlane)
402
	initNodes := apiSuite.DiscoverNodes(ctx).NodesByType(machine.TypeInit)
403

404
	numMasterNodes := len(controlPlaneNodes) + len(initNodes)
405
	if numMasterNodes == 0 {
406
		numMasterNodes = 3
407
	}
408

409
	apiSuite.Require().NoError(retry.Constant(backoff.DefaultConfig.MaxDelay, retry.WithUnits(time.Second)).Retry(func() error {
410
		for range numMasterNodes {
411
			_, err := apiSuite.Client.Version(client.WithNodes(ctx, nodes...))
412
			if err == nil {
413
				continue
414
			}
415

416
			if client.StatusCode(err) == codes.Unavailable || client.StatusCode(err) == codes.Canceled {
417
				return retry.ExpectedError(err)
418
			}
419

420
			if strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "connection reset by peer") {
421
				return retry.ExpectedError(err)
422
			}
423

424
			return err
425
		}
426

427
		return nil
428
	}))
429
}
430

431
// HashKubeletCert returns hash of the kubelet certificate file.
432
//
433
// This function can be used to verify that the node ephemeral partition got wiped.
434
func (apiSuite *APISuite) HashKubeletCert(ctx context.Context, node string) (string, error) {
435
	reqCtx, reqCtxCancel := context.WithTimeout(ctx, 10*time.Second)
436
	defer reqCtxCancel()
437

438
	reqCtx = client.WithNodes(reqCtx, node)
439

440
	reader, err := apiSuite.Client.Read(reqCtx, "/var/lib/kubelet/pki/kubelet-client-current.pem")
441
	if err != nil {
442
		return "", err
443
	}
444

445
	defer reader.Close() //nolint:errcheck
446

447
	hash := sha256.New()
448

449
	_, err = io.Copy(hash, reader)
450
	if err != nil {
451
		return "", err
452
	}
453

454
	return hex.EncodeToString(hash.Sum(nil)), reader.Close()
455
}
456

457
// ReadConfigFromNode reads machine configuration from the node.
458
func (apiSuite *APISuite) ReadConfigFromNode(nodeCtx context.Context) (config.Provider, error) {
459
	cfg, err := safe.StateGetByID[*configres.MachineConfig](nodeCtx, apiSuite.Client.COSI, configres.V1Alpha1ID)
460
	if err != nil {
461
		return nil, fmt.Errorf("error fetching machine config resource: %w", err)
462
	}
463

464
	return cfg.Provider(), nil
465
}
466

467
// UserDisks returns list of user disks on with size greater than sizeGreaterThanGB and not having any partitions present.
468
//
469
//nolint:gocyclo
470
func (apiSuite *APISuite) UserDisks(ctx context.Context, node string, sizeGreaterThanGB int) ([]string, error) {
471
	nodeCtx := client.WithNodes(ctx, node)
472

473
	resp, err := apiSuite.Client.Disks(nodeCtx)
474
	if err != nil {
475
		return nil, err
476
	}
477

478
	var disks []string
479

480
	blockDeviceInUse := func(deviceName string) (bool, error) {
481
		devicePart := strings.Split(deviceName, "/dev/")[1]
482

483
		// https://unix.stackexchange.com/questions/111779/how-to-find-out-easily-whether-a-block-device-or-a-part-of-it-is-mounted-someh
484
		// this was the only easy way I could find to check if the block device is already in use by something like raid
485
		stream, err := apiSuite.Client.LS(nodeCtx, &machineapi.ListRequest{
486
			Root: fmt.Sprintf("/sys/block/%s/holders", devicePart),
487
		})
488
		if err != nil {
489
			return false, err
490
		}
491

492
		counter := 0
493

494
		if err = helpers.ReadGRPCStream(stream, func(info *machineapi.FileInfo, node string, multipleNodes bool) error {
495
			counter++
496

497
			return nil
498
		}); err != nil {
499
			return false, err
500
		}
501

502
		if counter > 1 {
503
			return true, nil
504
		}
505

506
		return false, nil
507
	}
508

509
	for _, msg := range resp.Messages {
510
		for _, disk := range msg.Disks {
511
			if disk.SystemDisk || disk.Readonly || disk.Type == storage.Disk_CD {
512
				continue
513
			}
514

515
			if disk.BusPath == "/virtual" {
516
				continue
517
			}
518

519
			blockDeviceUsed, err := blockDeviceInUse(disk.DeviceName)
520
			if err != nil {
521
				return nil, err
522
			}
523

524
			if disk.Size > uint64(sizeGreaterThanGB)*1024*1024*1024 && !blockDeviceUsed {
525
				disks = append(disks, disk.DeviceName)
526
			}
527
		}
528
	}
529

530
	return disks, nil
531
}
532

533
// AssertServicesRunning verifies that services are running on the node.
534
func (apiSuite *APISuite) AssertServicesRunning(ctx context.Context, node string, serviceStatus map[string]string) {
535
	nodeCtx := client.WithNode(ctx, node)
536

537
	for svc, state := range serviceStatus {
538
		resp, err := apiSuite.Client.ServiceInfo(nodeCtx, svc)
539
		apiSuite.Require().NoError(err)
540
		apiSuite.Require().NotNil(resp, "expected service %s to be registered", svc)
541

542
		for _, svcInfo := range resp {
543
			apiSuite.Require().Equal(state, svcInfo.Service.State, "expected service %s to have state %s", svc, state)
544
		}
545
	}
546
}
547

548
// AssertExpectedModules verifies that expected kernel modules are loaded on the node.
549
func (apiSuite *APISuite) AssertExpectedModules(ctx context.Context, node string, expectedModules map[string]string) {
550
	nodeCtx := client.WithNode(ctx, node)
551

552
	fileReader, err := apiSuite.Client.Read(nodeCtx, "/proc/modules")
553
	apiSuite.Require().NoError(err)
554

555
	defer func() {
556
		apiSuite.Require().NoError(fileReader.Close())
557
	}()
558

559
	scanner := bufio.NewScanner(fileReader)
560

561
	var loadedModules []string
562

563
	for scanner.Scan() {
564
		loadedModules = append(loadedModules, strings.Split(scanner.Text(), " ")[0])
565
	}
566
	apiSuite.Require().NoError(scanner.Err())
567

568
	fileReader, err = apiSuite.Client.Read(nodeCtx, fmt.Sprintf("/lib/modules/%s/modules.dep", constants.DefaultKernelVersion))
569
	apiSuite.Require().NoError(err)
570

571
	defer func() {
572
		apiSuite.Require().NoError(fileReader.Close())
573
	}()
574

575
	scanner = bufio.NewScanner(fileReader)
576

577
	var modulesDep []string
578

579
	for scanner.Scan() {
580
		modulesDep = append(modulesDep, filepath.Base(strings.Split(scanner.Text(), ":")[0]))
581
	}
582
	apiSuite.Require().NoError(scanner.Err())
583

584
	for module, moduleDep := range expectedModules {
585
		apiSuite.Require().Contains(loadedModules, module, "expected %s to be loaded", module)
586
		apiSuite.Require().Contains(modulesDep, moduleDep, "expected %s to be in modules.dep", moduleDep)
587
	}
588
}
589

590
// UpdateMachineConfig fetches machine configuration, patches it and applies the changes.
591
func (apiSuite *APISuite) UpdateMachineConfig(nodeCtx context.Context, patch func(config.Provider) (config.Provider, error)) {
592
	cfg, err := apiSuite.ReadConfigFromNode(nodeCtx)
593
	apiSuite.Require().NoError(err)
594

595
	patchedCfg, err := patch(cfg)
596
	apiSuite.Require().NoError(err)
597

598
	bytes, err := patchedCfg.Bytes()
599
	apiSuite.Require().NoError(err)
600

601
	resp, err := apiSuite.Client.ApplyConfiguration(nodeCtx, &machineapi.ApplyConfigurationRequest{
602
		Data: bytes,
603
		Mode: machineapi.ApplyConfigurationRequest_AUTO,
604
	})
605
	apiSuite.Require().NoError(err)
606

607
	apiSuite.T().Logf("patched machine config: %s", resp.Messages[0].ModeDetails)
608
}
609

610
// PatchMachineConfig patches machine configuration on the node.
611
func (apiSuite *APISuite) PatchMachineConfig(nodeCtx context.Context, patches ...any) {
612
	configPatches := make([]configpatcher.Patch, 0, len(patches))
613

614
	for _, patch := range patches {
615
		marshaled, err := yaml.Marshal(patch)
616
		apiSuite.Require().NoError(err)
617

618
		configPatch, err := configpatcher.LoadPatch(marshaled)
619
		apiSuite.Require().NoError(err)
620

621
		configPatches = append(configPatches, configPatch)
622
	}
623

624
	apiSuite.UpdateMachineConfig(nodeCtx, func(cfg config.Provider) (config.Provider, error) {
625
		out, err := configpatcher.Apply(configpatcher.WithConfig(cfg), configPatches)
626
		if err != nil {
627
			return nil, err
628
		}
629

630
		return out.Config()
631
	})
632
}
633

634
// RemoveMachineConfigDocuments removes machine configuration documents of specified type from the node.
635
func (apiSuite *APISuite) RemoveMachineConfigDocuments(nodeCtx context.Context, docTypes ...string) {
636
	apiSuite.UpdateMachineConfig(nodeCtx, func(cfg config.Provider) (config.Provider, error) {
637
		return container.New(xslices.Filter(cfg.Documents(), func(doc configconfig.Document) bool {
638
			return slices.Index(docTypes, doc.Kind()) == -1
639
		})...)
640
	})
641
}
642

643
// PatchV1Alpha1Config patches v1alpha1 config in the config provider.
644
func (apiSuite *APISuite) PatchV1Alpha1Config(provider config.Provider, patch func(*v1alpha1.Config)) []byte {
645
	ctr, err := provider.PatchV1Alpha1(func(c *v1alpha1.Config) error {
646
		patch(c)
647

648
		return nil
649
	})
650
	apiSuite.Require().NoError(err)
651

652
	bytes, err := ctr.Bytes()
653
	apiSuite.Require().NoError(err)
654

655
	return bytes
656
}
657

658
// ResetNode wraps the reset node sequence with checks, waiting for the reset to finish and verifying the result.
659
//
660
//nolint:gocyclo
661
func (apiSuite *APISuite) ResetNode(ctx context.Context, node string, resetSpec *machineapi.ResetRequest, runHealthChecks bool) {
662
	apiSuite.T().Logf("resetting node %q with graceful %v mode %s, system %v, user %v", node, resetSpec.Graceful, resetSpec.Mode, resetSpec.SystemPartitionsToWipe, resetSpec.UserDisksToWipe)
663

664
	nodeCtx := client.WithNode(ctx, node)
665

666
	nodeClient := apiSuite.GetClientWithEndpoints(node)
667
	defer nodeClient.Close() //nolint:errcheck
668

669
	// any reset should lead to a reboot, so read boot_id before reboot
670
	bootIDBefore, err := apiSuite.ReadBootID(nodeCtx)
671
	apiSuite.Require().NoError(err)
672

673
	// figure out if EPHEMERAL is going to be reset
674
	ephemeralIsGoingToBeReset := false
675

676
	if len(resetSpec.SystemPartitionsToWipe) == 0 && len(resetSpec.UserDisksToWipe) == 0 {
677
		ephemeralIsGoingToBeReset = true
678
	} else {
679
		for _, part := range resetSpec.SystemPartitionsToWipe {
680
			if part.Label == constants.EphemeralPartitionLabel {
681
				ephemeralIsGoingToBeReset = true
682

683
				break
684
			}
685
		}
686
	}
687

688
	preReset, err := apiSuite.HashKubeletCert(ctx, node)
689
	apiSuite.Require().NoError(err)
690

691
	resp, err := nodeClient.ResetGenericWithResponse(nodeCtx, resetSpec)
692
	apiSuite.Require().NoError(err)
693

694
	actorID := resp.Messages[0].ActorId
695

696
	eventCh := make(chan client.EventResult)
697

698
	// watch for events
699
	apiSuite.Require().NoError(nodeClient.EventsWatchV2(nodeCtx, eventCh, client.WithActorID(actorID), client.WithTailEvents(-1)))
700

701
	waitTimer := time.NewTimer(5 * time.Minute)
702
	defer waitTimer.Stop()
703

704
waitLoop:
705
	for {
706
		select {
707
		case ev := <-eventCh:
708
			apiSuite.Require().NoError(ev.Error)
709

710
			switch msg := ev.Event.Payload.(type) {
711
			case *machineapi.SequenceEvent:
712
				if msg.Error != nil {
713
					apiSuite.FailNow("reset failed", "%s: %s", msg.Error.Message, msg.Error.Code)
714
				}
715
			case *machineapi.PhaseEvent:
716
				if msg.Action == machineapi.PhaseEvent_START && msg.Phase == "unmountSystem" {
717
					// about to be reset, break waitLoop
718
					break waitLoop
719
				}
720

721
				if msg.Action == machineapi.PhaseEvent_STOP {
722
					apiSuite.T().Logf("reset phase %q finished", msg.Phase)
723
				}
724
			}
725
		case <-waitTimer.C:
726
			apiSuite.FailNow("timeout waiting for reset to finish")
727
		case <-ctx.Done():
728
			apiSuite.FailNow("context canceled")
729
		}
730
	}
731

732
	// wait for the apid to be shut down
733
	time.Sleep(10 * time.Second)
734

735
	apiSuite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, 3*time.Minute)
736

737
	apiSuite.ClearConnectionRefused(ctx, node)
738

739
	if runHealthChecks {
740
		if apiSuite.Cluster != nil {
741
			// without cluster state we can't do deep checks, but basic reboot test still works
742
			// NB: using `ctx` here to have client talking to init node by default
743
			apiSuite.AssertClusterHealthy(ctx)
744
		}
745

746
		postReset, err := apiSuite.HashKubeletCert(ctx, node)
747
		apiSuite.Require().NoError(err)
748

749
		if ephemeralIsGoingToBeReset {
750
			apiSuite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
751
		} else {
752
			apiSuite.Assert().Equal(preReset, postReset, "ephemeral partition was not reset")
753
		}
754
	}
755
}
756

757
// DumpLogs dumps a set of logs from the node.
758
func (apiSuite *APISuite) DumpLogs(ctx context.Context, node string, service, pattern string) {
759
	nodeCtx := client.WithNode(ctx, node)
760

761
	logsStream, err := apiSuite.Client.Logs(
762
		nodeCtx,
763
		constants.SystemContainerdNamespace,
764
		common.ContainerDriver_CONTAINERD,
765
		service,
766
		false,
767
		-1,
768
	)
769
	apiSuite.Require().NoError(err)
770

771
	logReader, err := client.ReadStream(logsStream)
772
	apiSuite.Require().NoError(err)
773

774
	defer logReader.Close() //nolint:errcheck
775

776
	scanner := bufio.NewScanner(logReader)
777

778
	for scanner.Scan() {
779
		if pattern == "" || strings.Contains(scanner.Text(), pattern) {
780
			apiSuite.T().Logf("%s (%s): %s", node, service, scanner.Text())
781
		}
782
	}
783
}
784

785
// TearDownSuite closes Talos API client.
786
func (apiSuite *APISuite) TearDownSuite() {
787
	if apiSuite.Client != nil {
788
		apiSuite.Assert().NoError(apiSuite.Client.Close())
789
	}
790
}
791

792
func mapNodeInfosToInternalIPs(nodes []cluster.NodeInfo) []string {
793
	ips := make([]string, len(nodes))
794

795
	for i, node := range nodes {
796
		ips[i] = node.InternalIP.String()
797
	}
798

799
	return ips
800
}
801

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.