talos

Форк
0
431 строка · 11.4 Кб
1
// This Source Code Form is subject to the terms of the Mozilla Public
2
// License, v. 2.0. If a copy of the MPL was not distributed with this
3
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
package main
6

7
import (
8
	"bytes"
9
	"context"
10
	_ "embed"
11
	"errors"
12
	"fmt"
13
	"io"
14
	"log"
15
	"os"
16
	"strings"
17

18
	"github.com/Azure/azure-sdk-for-go/profiles/latest/resources/mgmt/resources"
19
	"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
20
	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
21
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
22
	"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5"
23
	"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
24
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
25
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"
26
	"github.com/Azure/go-autorest/autorest"
27
	"github.com/Azure/go-autorest/autorest/azure/auth"
28
	"github.com/blang/semver/v4"
29
	"github.com/klauspost/compress/zstd"
30
	"github.com/siderolabs/gen/channel"
31
	"github.com/siderolabs/gen/xslices"
32
	"golang.org/x/sync/errgroup"
33
)
34

35
const (
36
	resourceGroupName = "SideroGallery"
37
	defaultRegion     = "eastus"
38
	storageAccount    = "siderogallery"
39
)
40

41
// Mapping CPU architectures to Azure architectures.
42
var azureArchitectures = map[string]string{
43
	"amd64": "x64",
44
	"arm64": "arm64",
45
}
46

47
// AzureUploader represents an object that has the capability to upload to Azure.
48
type AzureUploader struct {
49
	Options Options
50

51
	preRelease bool
52

53
	helper azureHelper
54
}
55

56
// extractVersion extracts the version number in the format of int.int.int for Azure and assigns to the Options.AzureTag value.
57
func (azu *AzureUploader) setVersion() error {
58
	v, err := semver.ParseTolerant(azu.Options.Tag)
59
	if err != nil {
60
		return err
61
	}
62

63
	versionCore := fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)
64

65
	azu.helper.version = versionCore
66
	azu.Options.AzureGalleryName = "SideroLabs"
67

68
	if fmt.Sprintf("v%s", versionCore) != azu.Options.Tag {
69
		azu.preRelease = true
70
		azu.Options.AzureGalleryName = "SideroGalleryTest"
71
	}
72

73
	log.Println("azure: using Azure Gallery:", azu.Options.AzureGalleryName)
74

75
	return nil
76
}
77

78
// AzureGalleryUpload uploads the image to Azure.
79
func (azu *AzureUploader) AzureGalleryUpload(ctx context.Context) error {
80
	var err error
81

82
	var g *errgroup.Group
83
	g, ctx = errgroup.WithContext(ctx)
84

85
	err = azu.setVersion()
86
	if err != nil {
87
		return fmt.Errorf("azure: error setting version: %w", err)
88
	}
89

90
	log.Printf("azure: setting default creds")
91

92
	err = azu.helper.setDefaultAzureCreds()
93
	if err != nil {
94
		return fmt.Errorf("error setting default Azure credentials: %w", err)
95
	}
96

97
	if len(azu.Options.AzureRegions) == 0 {
98
		regions, err := azu.helper.getAzureLocations(ctx)
99
		if err != nil {
100
			return fmt.Errorf("azure: error setting default Azure credentials: %w", err)
101
		}
102

103
		azu.Options.AzureRegions = regions
104
	}
105

106
	// Upload blob
107
	log.Printf("azure: uploading blobs for architectures: %+v\n", azu.Options.Architectures)
108

109
	for _, arch := range azu.Options.Architectures {
110
		g.Go(func() error {
111
			log.Printf("azure: starting upload blob for %s\n", arch)
112

113
			err = azu.uploadAzureBlob(ctx, arch)
114
			if err != nil {
115
				return fmt.Errorf("azure: error uploading page blob for %s: %w", arch, err)
116
			}
117

118
			log.Printf("azure: starting image version creation for %s\n", arch)
119

120
			err = azu.createAzureImageVersion(ctx, arch)
121
			if err != nil {
122
				return fmt.Errorf("azure: error creating image version: %w", err)
123
			}
124

125
			return err
126
		})
127
	}
128

129
	return g.Wait()
130
}
131

132
//nolint:gocyclo
133
func (azu *AzureUploader) uploadAzureBlob(ctx context.Context, arch string) error {
134
	blobURL := fmt.Sprintf("https://siderogallery.blob.core.windows.net/images/talos/talos-%s-%s.vhd", arch, azu.Options.Tag)
135

136
	pageBlobClient, err := pageblob.NewClient(blobURL, azu.helper.cred, nil)
137
	if err != nil {
138
		return fmt.Errorf("azure: error creating pageblob client: %w", err)
139
	}
140

141
	source, err := os.Open(azu.Options.AzureImage(arch))
142
	if err != nil {
143
		return err
144
	}
145

146
	defer source.Close() //nolint:errcheck
147

148
	// calculate totalSize
149
	file, err := zstd.NewReader(source)
150
	if err != nil {
151
		return fmt.Errorf("azure: error extracting file from xz: %w", err)
152
	}
153

154
	totalSize, err := io.Copy(io.Discard, file)
155
	if err != nil {
156
		return fmt.Errorf("azure: error calculating totalSize: %w", err)
157
	}
158

159
	// second pass: read chunks and upload
160
	// seek back to the beginning of the source file
161
	_, err = source.Seek(0, io.SeekStart)
162
	if err != nil {
163
		return fmt.Errorf("azure: error seeking back: %w", err)
164
	}
165

166
	file, err = zstd.NewReader(source)
167
	if err != nil {
168
		return fmt.Errorf("azure: error extracting file from xz: %w", err)
169
	}
170

171
	// Check if the file size is a multiple of 512 bytes
172
	if totalSize%pageblob.PageBytes != 0 {
173
		return errors.New("azure: error: the file size must be a multiple of 512 bytes")
174
	}
175

176
	_, err = pageBlobClient.Create(ctx, totalSize, nil)
177
	if err != nil {
178
		return fmt.Errorf("azure: error creating vhd: %w", err)
179
	}
180

181
	type work struct {
182
		chunk  []byte
183
		offset int64
184
	}
185

186
	const (
187
		concurrency = 8
188
		chunkSize   = 4 * 1024 * 1024
189
	)
190

191
	workCh := make(chan work)
192

193
	var g *errgroup.Group
194
	g, ctx = errgroup.WithContext(ctx)
195

196
	for range concurrency {
197
		g.Go(func() error {
198
			for w := range workCh {
199
				_, err = pageBlobClient.UploadPages(
200
					ctx,
201
					streaming.NopCloser(bytes.NewReader(w.chunk)),
202
					blob.HTTPRange{Offset: w.offset, Count: int64(len(w.chunk))},
203
					nil)
204
				if err != nil {
205
					return fmt.Errorf("azure: error uploading chunk at offset %d: %w", w.offset, err)
206
				}
207
			}
208

209
			return nil
210
		})
211
	}
212

213
	var offset int64
214

215
uploadLoop:
216
	for {
217
		buf := make([]byte, chunkSize)
218

219
		var n int
220

221
		n, err = io.ReadFull(file, buf)
222
		switch {
223
		case err == io.ErrUnexpectedEOF:
224
			// this is the last (incomplete) chunk
225
		case err == io.EOF:
226
			// end of file, stop
227
			break uploadLoop
228
		case err != nil:
229
			return fmt.Errorf("azure: error reading chunk: %w", err)
230
		}
231

232
		if !channel.SendWithContext(ctx, workCh, work{chunk: buf[:n], offset: offset}) {
233
			break uploadLoop
234
		}
235

236
		offset += int64(n)
237

238
		if offset%(chunkSize*10) == 0 {
239
			log.Printf("azure: uploaded %d bytes\n", offset)
240
		}
241
	}
242

243
	close(workCh)
244

245
	if err = g.Wait(); err != nil {
246
		return fmt.Errorf("azure: error uploading chunks: %w", err)
247
	}
248

249
	return nil
250
}
251

252
func (azu *AzureUploader) createAzureImageVersion(ctx context.Context, arch string) error {
253
	var targetRegions []*armcompute.TargetRegion
254

255
	if !azu.preRelease {
256
		targetRegions = xslices.Map(azu.Options.AzureRegions, func(region string) *armcompute.TargetRegion {
257
			return &armcompute.TargetRegion{
258
				Name:                 to.Ptr(region),
259
				ExcludeFromLatest:    to.Ptr(false),
260
				RegionalReplicaCount: to.Ptr[int32](1),
261
				StorageAccountType:   to.Ptr(armcompute.StorageAccountTypeStandardLRS),
262
			}
263
		})
264
	}
265

266
	pager := azu.helper.clientFactory.NewGalleryImageVersionsClient().NewListByGalleryImagePager(resourceGroupName, azu.Options.AzureGalleryName, fmt.Sprintf("talos-%s", azureArchitectures[arch]), nil)
267

268
	for pager.More() {
269
		page, err := pager.NextPage(ctx)
270
		if err != nil {
271
			return fmt.Errorf("azure: failed to list image versions: %w", err)
272
		}
273

274
		for _, v := range page.Value {
275
			if *v.Name == azu.helper.version {
276
				log.Printf("azure: image version exists for %s\n", *v.Name)
277

278
				err = azu.deleteImageVersion(ctx, arch)
279
				if err != nil {
280
					return err
281
				}
282
			}
283
		}
284
	}
285

286
	log.Printf("azure: creating %s image version", arch)
287

288
	poller, err := azu.helper.clientFactory.NewGalleryImageVersionsClient().BeginCreateOrUpdate(
289
		ctx,
290
		resourceGroupName,
291
		azu.Options.AzureGalleryName,
292
		fmt.Sprintf("talos-%s", azureArchitectures[arch]),
293
		azu.helper.version,
294
		armcompute.GalleryImageVersion{
295
			Location: to.Ptr(defaultRegion),
296
			Properties: &armcompute.GalleryImageVersionProperties{
297
				PublishingProfile: &armcompute.GalleryImageVersionPublishingProfile{
298
					TargetRegions: targetRegions,
299
				},
300
				SafetyProfile: &armcompute.GalleryImageVersionSafetyProfile{
301
					AllowDeletionOfReplicatedLocations: to.Ptr(true),
302
				},
303
				StorageProfile: &armcompute.GalleryImageVersionStorageProfile{
304
					OSDiskImage: &armcompute.GalleryOSDiskImage{
305
						HostCaching: to.Ptr(armcompute.HostCachingReadOnly),
306
						Source: &armcompute.GalleryDiskImageSource{
307
							ID:  to.Ptr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Storage/storageAccounts/%s", azu.helper.subscriptionID, resourceGroupName, storageAccount)),
308
							URI: to.Ptr(fmt.Sprintf("https://siderogallery.blob.core.windows.net/images/talos/talos-%s-%s.vhd", arch, azu.Options.Tag)),
309
						},
310
					},
311
				},
312
			},
313
		},
314
		nil)
315
	if err != nil {
316
		return fmt.Errorf("azure: failed to create image version: %w", err)
317
	}
318

319
	res, err := poller.PollUntilDone(ctx, nil)
320
	if err != nil {
321
		return fmt.Errorf("azure: failed to pull the result for image version creation: %w", err)
322
	}
323

324
	for _, region := range azu.Options.AzureRegions {
325
		pushResult(CloudImage{
326
			Cloud:  "azure",
327
			Tag:    azu.Options.Tag,
328
			Region: region,
329
			Arch:   arch,
330
			Type:   "vhd",
331
			ID:     *res.ID,
332
		})
333
	}
334

335
	return nil
336
}
337

338
func (azu *AzureUploader) deleteImageVersion(ctx context.Context, arch string) error {
339
	log.Println("azure: removing old image version")
340

341
	poller, err := azu.helper.clientFactory.NewGalleryImageVersionsClient().BeginDelete(
342
		ctx,
343
		resourceGroupName,
344
		azu.Options.AzureGalleryName,
345
		fmt.Sprintf("talos-%s", azureArchitectures[arch]),
346
		azu.helper.version,
347
		nil)
348
	if err != nil {
349
		return fmt.Errorf("azure: failed to delete image: %w", err)
350
	}
351

352
	_, err = poller.PollUntilDone(ctx, nil)
353
	if err != nil {
354
		return fmt.Errorf("azure: failed to pull the result for image deletion: %w", err)
355
	}
356

357
	return nil
358
}
359

360
type azureHelper struct {
361
	version         string
362
	subscriptionID  string
363
	clientFactory   *armcompute.ClientFactory
364
	cred            *azidentity.DefaultAzureCredential
365
	authorizer      autorest.Authorizer
366
	providersClient resources.ProvidersClient
367
}
368

369
func (helper *azureHelper) setDefaultAzureCreds() error {
370
	helper.subscriptionID = os.Getenv("AZURE_SUBSCRIPTION_ID")
371
	if len(helper.subscriptionID) == 0 {
372
		return errors.New("azure: AZURE_SUBSCRIPTION_ID is not set")
373
	}
374

375
	authFromEnvironment, err := auth.NewAuthorizerFromEnvironment()
376
	if err != nil {
377
		return err
378
	}
379

380
	helper.authorizer = authFromEnvironment
381

382
	// Create a new instance of the DefaultAzureCredential
383
	helper.cred, err = azidentity.NewDefaultAzureCredential(nil)
384
	if err != nil {
385
		return err
386
	}
387

388
	helper.clientFactory, err = armcompute.NewClientFactory(helper.subscriptionID, helper.cred, nil)
389
	if err != nil {
390
		return fmt.Errorf("azure: failed to create client: %w", err)
391
	}
392

393
	// Initialize the Storage Accounts Client
394
	var storageClientFactory *armstorage.ClientFactory
395

396
	storageClientFactory, err = armstorage.NewClientFactory(helper.subscriptionID, helper.cred, nil)
397
	if err != nil {
398
		return err
399
	}
400

401
	_ = storageClientFactory.NewAccountsClient()
402
	helper.providersClient = resources.NewProvidersClient(helper.subscriptionID)
403
	helper.providersClient.Authorizer = helper.authorizer
404

405
	return nil
406
}
407

408
func (helper *azureHelper) getAzureLocations(ctx context.Context) ([]string, error) {
409
	var regions []string
410

411
	result, err := helper.providersClient.Get(ctx, "Microsoft.Compute", "")
412
	if err != nil {
413
		return nil, fmt.Errorf("azure: error getting Microsoft.Compute: %w", err)
414
	}
415

416
	if result.ResourceTypes != nil {
417
		for _, rt := range *result.ResourceTypes {
418
			if rt.ResourceType != nil && *rt.ResourceType == "virtualMachines" {
419
				if rt.Locations != nil {
420
					regions = xslices.Map(*rt.Locations, func(s string) string {
421
						return strings.ToLower(strings.ReplaceAll(s, " ", ""))
422
					})
423
				}
424

425
				break
426
			}
427
		}
428
	}
429

430
	return regions, nil
431
}
432

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.