talos
431 строка · 11.4 Кб
1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5package main6
7import (8"bytes"9"context"10_ "embed"11"errors"12"fmt"13"io"14"log"15"os"16"strings"17
18"github.com/Azure/azure-sdk-for-go/profiles/latest/resources/mgmt/resources"19"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"20"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"21"github.com/Azure/azure-sdk-for-go/sdk/azidentity"22"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5"23"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"24"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"25"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"26"github.com/Azure/go-autorest/autorest"27"github.com/Azure/go-autorest/autorest/azure/auth"28"github.com/blang/semver/v4"29"github.com/klauspost/compress/zstd"30"github.com/siderolabs/gen/channel"31"github.com/siderolabs/gen/xslices"32"golang.org/x/sync/errgroup"33)
34
35const (36resourceGroupName = "SideroGallery"37defaultRegion = "eastus"38storageAccount = "siderogallery"39)
40
41// Mapping CPU architectures to Azure architectures.
42var azureArchitectures = map[string]string{43"amd64": "x64",44"arm64": "arm64",45}
46
47// AzureUploader represents an object that has the capability to upload to Azure.
48type AzureUploader struct {49Options Options
50
51preRelease bool52
53helper azureHelper
54}
55
56// extractVersion extracts the version number in the format of int.int.int for Azure and assigns to the Options.AzureTag value.
57func (azu *AzureUploader) setVersion() error {58v, err := semver.ParseTolerant(azu.Options.Tag)59if err != nil {60return err61}62
63versionCore := fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)64
65azu.helper.version = versionCore66azu.Options.AzureGalleryName = "SideroLabs"67
68if fmt.Sprintf("v%s", versionCore) != azu.Options.Tag {69azu.preRelease = true70azu.Options.AzureGalleryName = "SideroGalleryTest"71}72
73log.Println("azure: using Azure Gallery:", azu.Options.AzureGalleryName)74
75return nil76}
77
78// AzureGalleryUpload uploads the image to Azure.
79func (azu *AzureUploader) AzureGalleryUpload(ctx context.Context) error {80var err error81
82var g *errgroup.Group83g, ctx = errgroup.WithContext(ctx)84
85err = azu.setVersion()86if err != nil {87return fmt.Errorf("azure: error setting version: %w", err)88}89
90log.Printf("azure: setting default creds")91
92err = azu.helper.setDefaultAzureCreds()93if err != nil {94return fmt.Errorf("error setting default Azure credentials: %w", err)95}96
97if len(azu.Options.AzureRegions) == 0 {98regions, err := azu.helper.getAzureLocations(ctx)99if err != nil {100return fmt.Errorf("azure: error setting default Azure credentials: %w", err)101}102
103azu.Options.AzureRegions = regions104}105
106// Upload blob107log.Printf("azure: uploading blobs for architectures: %+v\n", azu.Options.Architectures)108
109for _, arch := range azu.Options.Architectures {110g.Go(func() error {111log.Printf("azure: starting upload blob for %s\n", arch)112
113err = azu.uploadAzureBlob(ctx, arch)114if err != nil {115return fmt.Errorf("azure: error uploading page blob for %s: %w", arch, err)116}117
118log.Printf("azure: starting image version creation for %s\n", arch)119
120err = azu.createAzureImageVersion(ctx, arch)121if err != nil {122return fmt.Errorf("azure: error creating image version: %w", err)123}124
125return err126})127}128
129return g.Wait()130}
131
132//nolint:gocyclo
133func (azu *AzureUploader) uploadAzureBlob(ctx context.Context, arch string) error {134blobURL := fmt.Sprintf("https://siderogallery.blob.core.windows.net/images/talos/talos-%s-%s.vhd", arch, azu.Options.Tag)135
136pageBlobClient, err := pageblob.NewClient(blobURL, azu.helper.cred, nil)137if err != nil {138return fmt.Errorf("azure: error creating pageblob client: %w", err)139}140
141source, err := os.Open(azu.Options.AzureImage(arch))142if err != nil {143return err144}145
146defer source.Close() //nolint:errcheck147
148// calculate totalSize149file, err := zstd.NewReader(source)150if err != nil {151return fmt.Errorf("azure: error extracting file from xz: %w", err)152}153
154totalSize, err := io.Copy(io.Discard, file)155if err != nil {156return fmt.Errorf("azure: error calculating totalSize: %w", err)157}158
159// second pass: read chunks and upload160// seek back to the beginning of the source file161_, err = source.Seek(0, io.SeekStart)162if err != nil {163return fmt.Errorf("azure: error seeking back: %w", err)164}165
166file, err = zstd.NewReader(source)167if err != nil {168return fmt.Errorf("azure: error extracting file from xz: %w", err)169}170
171// Check if the file size is a multiple of 512 bytes172if totalSize%pageblob.PageBytes != 0 {173return errors.New("azure: error: the file size must be a multiple of 512 bytes")174}175
176_, err = pageBlobClient.Create(ctx, totalSize, nil)177if err != nil {178return fmt.Errorf("azure: error creating vhd: %w", err)179}180
181type work struct {182chunk []byte183offset int64184}185
186const (187concurrency = 8188chunkSize = 4 * 1024 * 1024189)190
191workCh := make(chan work)192
193var g *errgroup.Group194g, ctx = errgroup.WithContext(ctx)195
196for range concurrency {197g.Go(func() error {198for w := range workCh {199_, err = pageBlobClient.UploadPages(200ctx,201streaming.NopCloser(bytes.NewReader(w.chunk)),202blob.HTTPRange{Offset: w.offset, Count: int64(len(w.chunk))},203nil)204if err != nil {205return fmt.Errorf("azure: error uploading chunk at offset %d: %w", w.offset, err)206}207}208
209return nil210})211}212
213var offset int64214
215uploadLoop:216for {217buf := make([]byte, chunkSize)218
219var n int220
221n, err = io.ReadFull(file, buf)222switch {223case err == io.ErrUnexpectedEOF:224// this is the last (incomplete) chunk225case err == io.EOF:226// end of file, stop227break uploadLoop228case err != nil:229return fmt.Errorf("azure: error reading chunk: %w", err)230}231
232if !channel.SendWithContext(ctx, workCh, work{chunk: buf[:n], offset: offset}) {233break uploadLoop234}235
236offset += int64(n)237
238if offset%(chunkSize*10) == 0 {239log.Printf("azure: uploaded %d bytes\n", offset)240}241}242
243close(workCh)244
245if err = g.Wait(); err != nil {246return fmt.Errorf("azure: error uploading chunks: %w", err)247}248
249return nil250}
251
252func (azu *AzureUploader) createAzureImageVersion(ctx context.Context, arch string) error {253var targetRegions []*armcompute.TargetRegion254
255if !azu.preRelease {256targetRegions = xslices.Map(azu.Options.AzureRegions, func(region string) *armcompute.TargetRegion {257return &armcompute.TargetRegion{258Name: to.Ptr(region),259ExcludeFromLatest: to.Ptr(false),260RegionalReplicaCount: to.Ptr[int32](1),261StorageAccountType: to.Ptr(armcompute.StorageAccountTypeStandardLRS),262}263})264}265
266pager := azu.helper.clientFactory.NewGalleryImageVersionsClient().NewListByGalleryImagePager(resourceGroupName, azu.Options.AzureGalleryName, fmt.Sprintf("talos-%s", azureArchitectures[arch]), nil)267
268for pager.More() {269page, err := pager.NextPage(ctx)270if err != nil {271return fmt.Errorf("azure: failed to list image versions: %w", err)272}273
274for _, v := range page.Value {275if *v.Name == azu.helper.version {276log.Printf("azure: image version exists for %s\n", *v.Name)277
278err = azu.deleteImageVersion(ctx, arch)279if err != nil {280return err281}282}283}284}285
286log.Printf("azure: creating %s image version", arch)287
288poller, err := azu.helper.clientFactory.NewGalleryImageVersionsClient().BeginCreateOrUpdate(289ctx,290resourceGroupName,291azu.Options.AzureGalleryName,292fmt.Sprintf("talos-%s", azureArchitectures[arch]),293azu.helper.version,294armcompute.GalleryImageVersion{295Location: to.Ptr(defaultRegion),296Properties: &armcompute.GalleryImageVersionProperties{297PublishingProfile: &armcompute.GalleryImageVersionPublishingProfile{298TargetRegions: targetRegions,299},300SafetyProfile: &armcompute.GalleryImageVersionSafetyProfile{301AllowDeletionOfReplicatedLocations: to.Ptr(true),302},303StorageProfile: &armcompute.GalleryImageVersionStorageProfile{304OSDiskImage: &armcompute.GalleryOSDiskImage{305HostCaching: to.Ptr(armcompute.HostCachingReadOnly),306Source: &armcompute.GalleryDiskImageSource{307ID: to.Ptr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Storage/storageAccounts/%s", azu.helper.subscriptionID, resourceGroupName, storageAccount)),308URI: to.Ptr(fmt.Sprintf("https://siderogallery.blob.core.windows.net/images/talos/talos-%s-%s.vhd", arch, azu.Options.Tag)),309},310},311},312},313},314nil)315if err != nil {316return fmt.Errorf("azure: failed to create image version: %w", err)317}318
319res, err := poller.PollUntilDone(ctx, nil)320if err != nil {321return fmt.Errorf("azure: failed to pull the result for image version creation: %w", err)322}323
324for _, region := range azu.Options.AzureRegions {325pushResult(CloudImage{326Cloud: "azure",327Tag: azu.Options.Tag,328Region: region,329Arch: arch,330Type: "vhd",331ID: *res.ID,332})333}334
335return nil336}
337
338func (azu *AzureUploader) deleteImageVersion(ctx context.Context, arch string) error {339log.Println("azure: removing old image version")340
341poller, err := azu.helper.clientFactory.NewGalleryImageVersionsClient().BeginDelete(342ctx,343resourceGroupName,344azu.Options.AzureGalleryName,345fmt.Sprintf("talos-%s", azureArchitectures[arch]),346azu.helper.version,347nil)348if err != nil {349return fmt.Errorf("azure: failed to delete image: %w", err)350}351
352_, err = poller.PollUntilDone(ctx, nil)353if err != nil {354return fmt.Errorf("azure: failed to pull the result for image deletion: %w", err)355}356
357return nil358}
359
360type azureHelper struct {361version string362subscriptionID string363clientFactory *armcompute.ClientFactory364cred *azidentity.DefaultAzureCredential365authorizer autorest.Authorizer366providersClient resources.ProvidersClient367}
368
369func (helper *azureHelper) setDefaultAzureCreds() error {370helper.subscriptionID = os.Getenv("AZURE_SUBSCRIPTION_ID")371if len(helper.subscriptionID) == 0 {372return errors.New("azure: AZURE_SUBSCRIPTION_ID is not set")373}374
375authFromEnvironment, err := auth.NewAuthorizerFromEnvironment()376if err != nil {377return err378}379
380helper.authorizer = authFromEnvironment381
382// Create a new instance of the DefaultAzureCredential383helper.cred, err = azidentity.NewDefaultAzureCredential(nil)384if err != nil {385return err386}387
388helper.clientFactory, err = armcompute.NewClientFactory(helper.subscriptionID, helper.cred, nil)389if err != nil {390return fmt.Errorf("azure: failed to create client: %w", err)391}392
393// Initialize the Storage Accounts Client394var storageClientFactory *armstorage.ClientFactory395
396storageClientFactory, err = armstorage.NewClientFactory(helper.subscriptionID, helper.cred, nil)397if err != nil {398return err399}400
401_ = storageClientFactory.NewAccountsClient()402helper.providersClient = resources.NewProvidersClient(helper.subscriptionID)403helper.providersClient.Authorizer = helper.authorizer404
405return nil406}
407
408func (helper *azureHelper) getAzureLocations(ctx context.Context) ([]string, error) {409var regions []string410
411result, err := helper.providersClient.Get(ctx, "Microsoft.Compute", "")412if err != nil {413return nil, fmt.Errorf("azure: error getting Microsoft.Compute: %w", err)414}415
416if result.ResourceTypes != nil {417for _, rt := range *result.ResourceTypes {418if rt.ResourceType != nil && *rt.ResourceType == "virtualMachines" {419if rt.Locations != nil {420regions = xslices.Map(*rt.Locations, func(s string) string {421return strings.ToLower(strings.ReplaceAll(s, " ", ""))422})423}424
425break426}427}428}429
430return regions, nil431}
432