podman

Форк
0
/
farm.go 
480 строк · 14.0 Кб
1
package farm
2

3
import (
4
	"bufio"
5
	"context"
6
	"errors"
7
	"fmt"
8
	"io"
9
	"os"
10
	"sort"
11
	"strings"
12
	"sync"
13

14
	"github.com/containers/buildah/define"
15
	lplatform "github.com/containers/common/libimage/platform"
16
	"github.com/containers/common/pkg/config"
17
	"github.com/containers/podman/v5/pkg/domain/entities"
18
	"github.com/containers/podman/v5/pkg/domain/infra"
19
	"github.com/hashicorp/go-multierror"
20
	"github.com/sirupsen/logrus"
21
)
22

23
// Farm represents a group of connections to builders.
24
type Farm struct {
25
	name        string
26
	localEngine entities.ImageEngine            // not nil -> use local engine, too
27
	builders    map[string]entities.ImageEngine // name -> builder
28
}
29

30
// Schedule is a description of where and how we'll do builds.
31
type Schedule struct {
32
	platformBuilders map[string]string // target->connection
33
}
34

35
func newFarmWithBuilders(_ context.Context, name string, cons []config.Connection, localEngine entities.ImageEngine, buildLocal bool) (*Farm, error) {
36
	farm := &Farm{
37
		builders:    make(map[string]entities.ImageEngine),
38
		localEngine: localEngine,
39
		name:        name,
40
	}
41
	var (
42
		builderMutex sync.Mutex
43
		builderGroup multierror.Group
44
	)
45
	// Set up the remote connections to handle the builds
46
	for _, con := range cons {
47
		con := con
48
		builderGroup.Go(func() error {
49
			fmt.Printf("Connecting to %q\n", con.Name)
50
			engine, err := infra.NewImageEngine(&entities.PodmanConfig{
51
				EngineMode:   entities.TunnelMode,
52
				URI:          con.URI,
53
				Identity:     con.Identity,
54
				MachineMode:  con.IsMachine,
55
				FarmNodeName: con.Name,
56
			})
57
			if err != nil {
58
				return fmt.Errorf("initializing image engine at %q: %w", con.URI, err)
59
			}
60

61
			defer fmt.Printf("Builder %q ready\n", con.Name)
62
			builderMutex.Lock()
63
			defer builderMutex.Unlock()
64
			farm.builders[name] = engine
65
			return nil
66
		})
67
	}
68
	// If local=true then use the local machine for builds as well
69
	if buildLocal {
70
		builderGroup.Go(func() error {
71
			fmt.Println("Setting up local builder")
72
			defer fmt.Println("Local builder ready")
73
			builderMutex.Lock()
74
			defer builderMutex.Unlock()
75
			farm.builders[entities.LocalFarmImageBuilderName] = localEngine
76
			return nil
77
		})
78
	}
79
	if builderError := builderGroup.Wait(); builderError != nil {
80
		if err := builderError.ErrorOrNil(); err != nil {
81
			return nil, err
82
		}
83
	}
84
	if len(farm.builders) > 0 {
85
		defer fmt.Printf("Farm %q ready\n", farm.name)
86
		return farm, nil
87
	}
88
	return nil, errors.New("no builders configured")
89
}
90

91
func NewFarm(ctx context.Context, name string, localEngine entities.ImageEngine, buildLocal bool) (*Farm, error) {
92
	// Get the destinations of the connections specified in the farm
93
	name, destinations, err := getFarmDestinations(name)
94
	if err != nil {
95
		return nil, err
96
	}
97

98
	return newFarmWithBuilders(ctx, name, destinations, localEngine, buildLocal)
99
}
100

101
// Done performs any necessary end-of-process cleanup for the farm's members.
102
func (f *Farm) Done(ctx context.Context) error {
103
	return f.forEach(ctx, func(ctx context.Context, name string, engine entities.ImageEngine) (bool, error) {
104
		engine.Shutdown(ctx)
105
		return false, nil
106
	})
107
}
108

109
// Status polls the connections in the farm and returns a map of their
110
// individual status, along with an error if any are down or otherwise unreachable.
111
func (f *Farm) Status(ctx context.Context) (map[string]error, error) {
112
	status := make(map[string]error)
113
	var (
114
		statusMutex sync.Mutex
115
		statusGroup multierror.Group
116
	)
117
	for _, engine := range f.builders {
118
		engine := engine
119
		statusGroup.Go(func() error {
120
			logrus.Debugf("getting status of %q", engine.FarmNodeName(ctx))
121
			defer logrus.Debugf("got status of %q", engine.FarmNodeName(ctx))
122
			_, err := engine.Config(ctx)
123
			statusMutex.Lock()
124
			defer statusMutex.Unlock()
125
			status[engine.FarmNodeName(ctx)] = err
126
			return err
127
		})
128
	}
129
	statusError := statusGroup.Wait()
130

131
	return status, statusError.ErrorOrNil()
132
}
133

134
// forEach runs the called function once for every node in the farm and
135
// collects their results, continuing until it finishes visiting every node or
136
// a function call returns true as its first return value.
137
func (f *Farm) forEach(ctx context.Context, fn func(context.Context, string, entities.ImageEngine) (bool, error)) error {
138
	var merr *multierror.Error
139
	for name, engine := range f.builders {
140
		stop, err := fn(ctx, name, engine)
141
		if err != nil {
142
			merr = multierror.Append(merr, fmt.Errorf("%s: %w", engine.FarmNodeName(ctx), err))
143
		}
144
		if stop {
145
			break
146
		}
147
	}
148

149
	return merr.ErrorOrNil()
150
}
151

152
// NativePlatforms returns a list of the set of platforms for which the farm
153
// can build images natively.
154
func (f *Farm) NativePlatforms(ctx context.Context) ([]string, error) {
155
	nativeMap := make(map[string]struct{})
156
	platforms := []string{}
157
	var (
158
		nativeMutex sync.Mutex
159
		nativeGroup multierror.Group
160
	)
161
	for _, engine := range f.builders {
162
		engine := engine
163
		nativeGroup.Go(func() error {
164
			logrus.Debugf("getting native platform of %q\n", engine.FarmNodeName(ctx))
165
			defer logrus.Debugf("got native platform of %q", engine.FarmNodeName(ctx))
166
			inspect, err := engine.FarmNodeInspect(ctx)
167
			if err != nil {
168
				return err
169
			}
170
			nativeMutex.Lock()
171
			defer nativeMutex.Unlock()
172
			for _, platform := range inspect.NativePlatforms {
173
				nativeMap[platform] = struct{}{}
174
			}
175
			return nil
176
		})
177
	}
178
	merr := nativeGroup.Wait()
179
	if merr != nil {
180
		if err := merr.ErrorOrNil(); err != nil {
181
			return nil, err
182
		}
183
	}
184

185
	for platform := range nativeMap {
186
		platforms = append(platforms, platform)
187
	}
188
	sort.Strings(platforms)
189
	return platforms, nil
190
}
191

192
// EmulatedPlatforms returns a list of the set of platforms for which the farm
193
// can build images with the help of emulation.
194
func (f *Farm) EmulatedPlatforms(ctx context.Context) ([]string, error) {
195
	emulatedMap := make(map[string]struct{})
196
	platforms := []string{}
197
	var (
198
		emulatedMutex sync.Mutex
199
		emulatedGroup multierror.Group
200
	)
201
	for _, engine := range f.builders {
202
		engine := engine
203
		emulatedGroup.Go(func() error {
204
			logrus.Debugf("getting emulated platforms of %q", engine.FarmNodeName(ctx))
205
			defer logrus.Debugf("got emulated platforms of %q", engine.FarmNodeName(ctx))
206
			inspect, err := engine.FarmNodeInspect(ctx)
207
			if err != nil {
208
				return err
209
			}
210
			emulatedMutex.Lock()
211
			defer emulatedMutex.Unlock()
212
			for _, platform := range inspect.EmulatedPlatforms {
213
				emulatedMap[platform] = struct{}{}
214
			}
215
			return nil
216
		})
217
	}
218
	merr := emulatedGroup.Wait()
219
	if merr != nil {
220
		if err := merr.ErrorOrNil(); err != nil {
221
			return nil, err
222
		}
223
	}
224

225
	for platform := range emulatedMap {
226
		platforms = append(platforms, platform)
227
	}
228
	sort.Strings(platforms)
229
	return platforms, nil
230
}
231

232
// Schedule takes a list of platforms and returns a list of connections which
233
// can be used to build for those platforms.  It always prefers native builders
234
// over emulated builders, but will assign a builder which can use emulation
235
// for a platform if no suitable native builder is available.
236
//
237
// If platforms is an empty list, all available native platforms will be
238
// scheduled.
239
//
240
// TODO: add (Priority,Weight *int) a la RFC 2782 to destinations that we know
241
// of, and factor those in when assigning builds to nodes in here.
242
func (f *Farm) Schedule(ctx context.Context, platforms []string) (Schedule, error) {
243
	var (
244
		err       error
245
		infoGroup multierror.Group
246
		infoMutex sync.Mutex
247
	)
248
	// If we weren't given a list of target platforms, generate one.
249
	if len(platforms) == 0 {
250
		platforms, err = f.NativePlatforms(ctx)
251
		if err != nil {
252
			return Schedule{}, fmt.Errorf("reading list of available native platforms: %w", err)
253
		}
254
	}
255

256
	platformBuilders := make(map[string]string)
257
	native := make(map[string]string)
258
	emulated := make(map[string]string)
259
	var localPlatform string
260
	// Make notes of which platforms we can build for natively, and which
261
	// ones we can build for using emulation.
262
	for name, engine := range f.builders {
263
		name, engine := name, engine
264
		infoGroup.Go(func() error {
265
			inspect, err := engine.FarmNodeInspect(ctx)
266
			if err != nil {
267
				return err
268
			}
269
			infoMutex.Lock()
270
			defer infoMutex.Unlock()
271
			for _, n := range inspect.NativePlatforms {
272
				if _, assigned := native[n]; !assigned {
273
					native[n] = name
274
				}
275
				if name == entities.LocalFarmImageBuilderName {
276
					localPlatform = n
277
				}
278
			}
279
			for _, e := range inspect.EmulatedPlatforms {
280
				if _, assigned := emulated[e]; !assigned {
281
					emulated[e] = name
282
				}
283
			}
284
			return nil
285
		})
286
	}
287
	merr := infoGroup.Wait()
288
	if merr != nil {
289
		if err := merr.ErrorOrNil(); err != nil {
290
			return Schedule{}, err
291
		}
292
	}
293
	// Assign a build to the first node that could build it natively, and
294
	// if there isn't one, the first one that can build it with the help of
295
	// emulation, and if there aren't any, error out.
296
	for _, platform := range platforms {
297
		if builder, ok := native[platform]; ok {
298
			platformBuilders[platform] = builder
299
		} else if builder, ok := emulated[platform]; ok {
300
			platformBuilders[platform] = builder
301
		} else {
302
			return Schedule{}, fmt.Errorf("no builder capable of building for platform %q available", platform)
303
		}
304
	}
305
	// If local is set, prioritize building on local
306
	if localPlatform != "" {
307
		platformBuilders[localPlatform] = entities.LocalFarmImageBuilderName
308
	}
309
	schedule := Schedule{
310
		platformBuilders: platformBuilders,
311
	}
312
	return schedule, nil
313
}
314

315
// Build runs a build using the specified targetplatform:service map.  If all
316
// builds succeed, it copies the resulting images from the remote hosts to the
317
// local service and builds a manifest list with the specified reference name.
318
func (f *Farm) Build(ctx context.Context, schedule Schedule, options entities.BuildOptions, reference string, localEngine entities.ImageEngine) error {
319
	switch options.OutputFormat {
320
	default:
321
		return fmt.Errorf("unknown output format %q requested", options.OutputFormat)
322
	case "", define.OCIv1ImageManifest:
323
		options.OutputFormat = define.OCIv1ImageManifest
324
	case define.Dockerv2ImageManifest:
325
	}
326

327
	// Build the list of jobs.
328
	var jobs sync.Map
329
	type job struct {
330
		platform string
331
		os       string
332
		arch     string
333
		variant  string
334
		builder  entities.ImageEngine
335
	}
336
	for platform, builderName := range schedule.platformBuilders { // prepare to build
337
		builder, ok := f.builders[builderName]
338
		if !ok {
339
			return fmt.Errorf("unknown builder %q", builderName)
340
		}
341
		var rawOS, rawArch, rawVariant string
342
		p := strings.Split(platform, "/")
343
		if len(p) > 0 && p[0] != "" {
344
			rawOS = p[0]
345
		}
346
		if len(p) > 1 {
347
			rawArch = p[1]
348
		}
349
		if len(p) > 2 {
350
			rawVariant = p[2]
351
		}
352
		os, arch, variant := lplatform.Normalize(rawOS, rawArch, rawVariant)
353
		jobs.Store(builderName, job{
354
			platform: platform,
355
			os:       os,
356
			arch:     arch,
357
			variant:  variant,
358
			builder:  builder,
359
		})
360
	}
361

362
	listBuilderOptions := listBuilderOptions{
363
		cleanup:       options.Cleanup,
364
		iidFile:       options.IIDFile,
365
		authfile:      options.Authfile,
366
		skipTLSVerify: options.SkipTLSVerify,
367
	}
368
	manifestListBuilder := newManifestListBuilder(reference, f.localEngine, listBuilderOptions)
369

370
	// Start builds in parallel and wait for them all to finish.
371
	var (
372
		buildResults sync.Map
373
		buildGroup   multierror.Group
374
	)
375
	type buildResult struct {
376
		report  entities.BuildReport
377
		builder entities.ImageEngine
378
	}
379
	for platform, builder := range schedule.platformBuilders {
380
		platform, builder := platform, builder
381
		outReader, outWriter := io.Pipe()
382
		errReader, errWriter := io.Pipe()
383
		go func() {
384
			defer outReader.Close()
385
			reader := bufio.NewReader(outReader)
386
			writer := options.Out
387
			if writer == nil {
388
				writer = os.Stdout
389
			}
390
			line, err := reader.ReadString('\n')
391
			for err == nil {
392
				line = strings.TrimSuffix(line, "\n")
393
				fmt.Fprintf(writer, "[%s@%s] %s\n", platform, builder, line)
394
				line, err = reader.ReadString('\n')
395
			}
396
		}()
397
		go func() {
398
			defer errReader.Close()
399
			reader := bufio.NewReader(errReader)
400
			writer := options.Err
401
			if writer == nil {
402
				writer = os.Stderr
403
			}
404
			line, err := reader.ReadString('\n')
405
			for err == nil {
406
				line = strings.TrimSuffix(line, "\n")
407
				fmt.Fprintf(writer, "[%s@%s] %s\n", platform, builder, line)
408
				line, err = reader.ReadString('\n')
409
			}
410
		}()
411
		buildGroup.Go(func() error {
412
			var j job
413
			defer outWriter.Close()
414
			defer errWriter.Close()
415
			c, ok := jobs.Load(builder)
416
			if !ok {
417
				return fmt.Errorf("unknown connection for %q (shouldn't happen)", builder)
418
			}
419
			if j, ok = c.(job); !ok {
420
				return fmt.Errorf("unexpected connection type for %q (shouldn't happen)", builder)
421
			}
422
			buildOptions := options
423
			buildOptions.Platforms = []struct{ OS, Arch, Variant string }{{j.os, j.arch, j.variant}}
424
			buildOptions.Out = outWriter
425
			buildOptions.Err = errWriter
426
			fmt.Printf("Starting build for %v at %q\n", buildOptions.Platforms, builder)
427
			buildReport, err := j.builder.Build(ctx, options.ContainerFiles, buildOptions)
428
			if err != nil {
429
				return fmt.Errorf("building for %q on %q: %w", j.platform, builder, err)
430
			}
431
			fmt.Printf("finished build for %v at %q: built %s\n", buildOptions.Platforms, builder, buildReport.ID)
432
			buildResults.Store(platform, buildResult{
433
				report:  *buildReport,
434
				builder: j.builder,
435
			})
436
			return nil
437
		})
438
	}
439
	buildErrors := buildGroup.Wait()
440
	if err := buildErrors.ErrorOrNil(); err != nil {
441
		return fmt.Errorf("building: %w", err)
442
	}
443

444
	// Assemble the final result.
445
	perArchBuilds := make(map[entities.BuildReport]entities.ImageEngine)
446
	buildResults.Range(func(k, v any) bool {
447
		result, ok := v.(buildResult)
448
		if !ok {
449
			fmt.Fprintf(os.Stderr, "report %v not a build result?", v)
450
			return false
451
		}
452
		perArchBuilds[result.report] = result.builder
453
		return true
454
	})
455
	location, err := manifestListBuilder.build(ctx, perArchBuilds)
456
	if err != nil {
457
		return err
458
	}
459
	fmt.Printf("Saved list to %q\n", location)
460
	return nil
461
}
462

463
func getFarmDestinations(name string) (string, []config.Connection, error) {
464
	cfg, err := config.Default()
465
	if err != nil {
466
		return "", nil, err
467
	}
468

469
	if name == "" {
470
		if name, cons, err := cfg.GetDefaultFarmConnections(); err == nil {
471
			// Use default farm if is there is one
472
			return name, cons, nil
473
		}
474
		// If no farm name is given, then grab all the service destinations available
475
		cons, err := cfg.GetAllConnections()
476
		return name, cons, err
477
	}
478
	cons, err := cfg.GetFarmConnections(name)
479
	return name, cons, err
480
}
481

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.