podman

Форк
0
1155 строк · 31.8 Кб
1
// Copyright 2013 go-dockerclient authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
// Package docker provides a client for the Docker remote API.
6
//
7
// See https://goo.gl/o2v3rk for more details on the remote API.
8
package docker
9

10
import (
11
	"bufio"
12
	"bytes"
13
	"context"
14
	"crypto/tls"
15
	"crypto/x509"
16
	"encoding/json"
17
	"errors"
18
	"fmt"
19
	"io"
20
	"net"
21
	"net/http"
22
	"net/http/httputil"
23
	"net/url"
24
	"os"
25
	"path/filepath"
26
	"reflect"
27
	"runtime"
28
	"strconv"
29
	"strings"
30
	"sync/atomic"
31
	"time"
32

33
	"github.com/docker/docker/pkg/homedir"
34
	"github.com/docker/docker/pkg/jsonmessage"
35
	"github.com/docker/docker/pkg/stdcopy"
36
)
37

38
const (
39
	userAgent = "go-dockerclient"
40

41
	unixProtocol      = "unix"
42
	namedPipeProtocol = "npipe"
43
)
44

45
var (
46
	// ErrInvalidEndpoint is returned when the endpoint is not a valid HTTP URL.
47
	ErrInvalidEndpoint = errors.New("invalid endpoint")
48

49
	// ErrConnectionRefused is returned when the client cannot connect to the given endpoint.
50
	ErrConnectionRefused = errors.New("cannot connect to Docker endpoint")
51

52
	// ErrInactivityTimeout is returned when a streamable call has been inactive for some time.
53
	ErrInactivityTimeout = errors.New("inactivity time exceeded timeout")
54

55
	apiVersion112, _ = NewAPIVersion("1.12")
56
	apiVersion118, _ = NewAPIVersion("1.18")
57
	apiVersion119, _ = NewAPIVersion("1.19")
58
	apiVersion121, _ = NewAPIVersion("1.21")
59
	apiVersion124, _ = NewAPIVersion("1.24")
60
	apiVersion125, _ = NewAPIVersion("1.25")
61
	apiVersion135, _ = NewAPIVersion("1.35")
62
)
63

64
// APIVersion is an internal representation of a version of the Remote API.
65
type APIVersion []int
66

67
// NewAPIVersion returns an instance of APIVersion for the given string.
68
//
69
// The given string must be in the form <major>.<minor>.<patch>, where <major>,
70
// <minor> and <patch> are integer numbers.
71
func NewAPIVersion(input string) (APIVersion, error) {
72
	if !strings.Contains(input, ".") {
73
		return nil, fmt.Errorf("unable to parse version %q", input)
74
	}
75
	raw := strings.Split(input, "-")
76
	arr := strings.Split(raw[0], ".")
77
	ret := make(APIVersion, len(arr))
78
	var err error
79
	for i, val := range arr {
80
		ret[i], err = strconv.Atoi(val)
81
		if err != nil {
82
			return nil, fmt.Errorf("unable to parse version %q: %q is not an integer", input, val)
83
		}
84
	}
85
	return ret, nil
86
}
87

88
func (version APIVersion) String() string {
89
	parts := make([]string, len(version))
90
	for i, val := range version {
91
		parts[i] = strconv.Itoa(val)
92
	}
93
	return strings.Join(parts, ".")
94
}
95

96
// LessThan is a function for comparing APIVersion structs.
97
func (version APIVersion) LessThan(other APIVersion) bool {
98
	return version.compare(other) < 0
99
}
100

101
// LessThanOrEqualTo is a function for comparing APIVersion structs.
102
func (version APIVersion) LessThanOrEqualTo(other APIVersion) bool {
103
	return version.compare(other) <= 0
104
}
105

106
// GreaterThan is a function for comparing APIVersion structs.
107
func (version APIVersion) GreaterThan(other APIVersion) bool {
108
	return version.compare(other) > 0
109
}
110

111
// GreaterThanOrEqualTo is a function for comparing APIVersion structs.
112
func (version APIVersion) GreaterThanOrEqualTo(other APIVersion) bool {
113
	return version.compare(other) >= 0
114
}
115

116
func (version APIVersion) compare(other APIVersion) int {
117
	for i, v := range version {
118
		if i <= len(other)-1 {
119
			otherVersion := other[i]
120

121
			if v < otherVersion {
122
				return -1
123
			} else if v > otherVersion {
124
				return 1
125
			}
126
		}
127
	}
128
	if len(version) > len(other) {
129
		return 1
130
	}
131
	if len(version) < len(other) {
132
		return -1
133
	}
134
	return 0
135
}
136

137
// Client is the basic type of this package. It provides methods for
138
// interaction with the API.
139
type Client struct {
140
	SkipServerVersionCheck bool
141
	HTTPClient             *http.Client
142
	TLSConfig              *tls.Config
143
	Dialer                 Dialer
144

145
	endpoint            string
146
	endpointURL         *url.URL
147
	eventMonitor        *eventMonitoringState
148
	requestedAPIVersion APIVersion
149
	serverAPIVersion    APIVersion
150
	expectedAPIVersion  APIVersion
151
}
152

153
// Dialer is an interface that allows network connections to be dialed
154
// (net.Dialer fulfills this interface) and named pipes (a shim using
155
// winio.DialPipe)
156
type Dialer interface {
157
	Dial(network, address string) (net.Conn, error)
158
}
159

160
// NewClient returns a Client instance ready for communication with the given
161
// server endpoint. It will use the latest remote API version available in the
162
// server.
163
func NewClient(endpoint string) (*Client, error) {
164
	client, err := NewVersionedClient(endpoint, "")
165
	if err != nil {
166
		return nil, err
167
	}
168
	client.SkipServerVersionCheck = true
169
	return client, nil
170
}
171

172
// NewTLSClient returns a Client instance ready for TLS communications with the givens
173
// server endpoint, key and certificates . It will use the latest remote API version
174
// available in the server.
175
func NewTLSClient(endpoint string, cert, key, ca string) (*Client, error) {
176
	client, err := NewVersionedTLSClient(endpoint, cert, key, ca, "")
177
	if err != nil {
178
		return nil, err
179
	}
180
	client.SkipServerVersionCheck = true
181
	return client, nil
182
}
183

184
// NewTLSClientFromBytes returns a Client instance ready for TLS communications with the givens
185
// server endpoint, key and certificates (passed inline to the function as opposed to being
186
// read from a local file). It will use the latest remote API version available in the server.
187
func NewTLSClientFromBytes(endpoint string, certPEMBlock, keyPEMBlock, caPEMCert []byte) (*Client, error) {
188
	client, err := NewVersionedTLSClientFromBytes(endpoint, certPEMBlock, keyPEMBlock, caPEMCert, "")
189
	if err != nil {
190
		return nil, err
191
	}
192
	client.SkipServerVersionCheck = true
193
	return client, nil
194
}
195

196
// NewVersionedClient returns a Client instance ready for communication with
197
// the given server endpoint, using a specific remote API version.
198
func NewVersionedClient(endpoint string, apiVersionString string) (*Client, error) {
199
	u, err := parseEndpoint(endpoint, false)
200
	if err != nil {
201
		return nil, err
202
	}
203
	var requestedAPIVersion APIVersion
204
	if strings.Contains(apiVersionString, ".") {
205
		requestedAPIVersion, err = NewAPIVersion(apiVersionString)
206
		if err != nil {
207
			return nil, err
208
		}
209
	}
210
	c := &Client{
211
		HTTPClient:          defaultClient(),
212
		Dialer:              &net.Dialer{},
213
		endpoint:            endpoint,
214
		endpointURL:         u,
215
		eventMonitor:        new(eventMonitoringState),
216
		requestedAPIVersion: requestedAPIVersion,
217
	}
218
	c.initializeNativeClient(defaultTransport)
219
	return c, nil
220
}
221

222
// WithTransport replaces underlying HTTP client of Docker Client by accepting
223
// a function that returns pointer to a transport object.
224
func (c *Client) WithTransport(trFunc func() *http.Transport) {
225
	c.initializeNativeClient(trFunc)
226
}
227

228
// NewVersionnedTLSClient is like NewVersionedClient, but with ann extra n.
229
//
230
// Deprecated: Use NewVersionedTLSClient instead.
231
func NewVersionnedTLSClient(endpoint string, cert, key, ca, apiVersionString string) (*Client, error) {
232
	return NewVersionedTLSClient(endpoint, cert, key, ca, apiVersionString)
233
}
234

235
// NewVersionedTLSClient returns a Client instance ready for TLS communications with the givens
236
// server endpoint, key and certificates, using a specific remote API version.
237
func NewVersionedTLSClient(endpoint string, cert, key, ca, apiVersionString string) (*Client, error) {
238
	var certPEMBlock []byte
239
	var keyPEMBlock []byte
240
	var caPEMCert []byte
241
	if _, err := os.Stat(cert); !os.IsNotExist(err) {
242
		certPEMBlock, err = os.ReadFile(cert)
243
		if err != nil {
244
			return nil, err
245
		}
246
	}
247
	if _, err := os.Stat(key); !os.IsNotExist(err) {
248
		keyPEMBlock, err = os.ReadFile(key)
249
		if err != nil {
250
			return nil, err
251
		}
252
	}
253
	if _, err := os.Stat(ca); !os.IsNotExist(err) {
254
		caPEMCert, err = os.ReadFile(ca)
255
		if err != nil {
256
			return nil, err
257
		}
258
	}
259
	return NewVersionedTLSClientFromBytes(endpoint, certPEMBlock, keyPEMBlock, caPEMCert, apiVersionString)
260
}
261

262
// NewClientFromEnv returns a Client instance ready for communication created from
263
// Docker's default logic for the environment variables DOCKER_HOST, DOCKER_TLS_VERIFY, DOCKER_CERT_PATH,
264
// and DOCKER_API_VERSION.
265
//
266
// See https://github.com/docker/docker/blob/1f963af697e8df3a78217f6fdbf67b8123a7db94/docker/docker.go#L68.
267
// See https://github.com/docker/compose/blob/81707ef1ad94403789166d2fe042c8a718a4c748/compose/cli/docker_client.py#L7.
268
// See https://github.com/moby/moby/blob/28d7dba41d0c0d9c7f0dafcc79d3c59f2b3f5dc3/client/options.go#L51
269
func NewClientFromEnv() (*Client, error) {
270
	apiVersionString := os.Getenv("DOCKER_API_VERSION")
271
	client, err := NewVersionedClientFromEnv(apiVersionString)
272
	if err != nil {
273
		return nil, err
274
	}
275
	client.SkipServerVersionCheck = apiVersionString == ""
276
	return client, nil
277
}
278

279
// NewVersionedClientFromEnv returns a Client instance ready for TLS communications created from
280
// Docker's default logic for the environment variables DOCKER_HOST, DOCKER_TLS_VERIFY, and DOCKER_CERT_PATH,
281
// and using a specific remote API version.
282
//
283
// See https://github.com/docker/docker/blob/1f963af697e8df3a78217f6fdbf67b8123a7db94/docker/docker.go#L68.
284
// See https://github.com/docker/compose/blob/81707ef1ad94403789166d2fe042c8a718a4c748/compose/cli/docker_client.py#L7.
285
func NewVersionedClientFromEnv(apiVersionString string) (*Client, error) {
286
	dockerEnv, err := getDockerEnv()
287
	if err != nil {
288
		return nil, err
289
	}
290
	dockerHost := dockerEnv.dockerHost
291
	if dockerEnv.dockerTLSVerify {
292
		parts := strings.SplitN(dockerEnv.dockerHost, "://", 2)
293
		if len(parts) != 2 {
294
			return nil, fmt.Errorf("could not split %s into two parts by ://", dockerHost)
295
		}
296
		cert := filepath.Join(dockerEnv.dockerCertPath, "cert.pem")
297
		key := filepath.Join(dockerEnv.dockerCertPath, "key.pem")
298
		ca := filepath.Join(dockerEnv.dockerCertPath, "ca.pem")
299
		return NewVersionedTLSClient(dockerEnv.dockerHost, cert, key, ca, apiVersionString)
300
	}
301
	return NewVersionedClient(dockerEnv.dockerHost, apiVersionString)
302
}
303

304
// NewVersionedTLSClientFromBytes returns a Client instance ready for TLS communications with the givens
305
// server endpoint, key and certificates (passed inline to the function as opposed to being
306
// read from a local file), using a specific remote API version.
307
func NewVersionedTLSClientFromBytes(endpoint string, certPEMBlock, keyPEMBlock, caPEMCert []byte, apiVersionString string) (*Client, error) {
308
	u, err := parseEndpoint(endpoint, true)
309
	if err != nil {
310
		return nil, err
311
	}
312
	var requestedAPIVersion APIVersion
313
	if strings.Contains(apiVersionString, ".") {
314
		requestedAPIVersion, err = NewAPIVersion(apiVersionString)
315
		if err != nil {
316
			return nil, err
317
		}
318
	}
319
	tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
320
	if certPEMBlock != nil && keyPEMBlock != nil {
321
		tlsCert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
322
		if err != nil {
323
			return nil, err
324
		}
325
		tlsConfig.Certificates = []tls.Certificate{tlsCert}
326
	}
327
	if caPEMCert == nil {
328
		tlsConfig.InsecureSkipVerify = true
329
	} else {
330
		caPool := x509.NewCertPool()
331
		if !caPool.AppendCertsFromPEM(caPEMCert) {
332
			return nil, errors.New("could not add RootCA pem")
333
		}
334
		tlsConfig.RootCAs = caPool
335
	}
336
	tr := defaultTransport()
337
	tr.TLSClientConfig = tlsConfig
338
	if err != nil {
339
		return nil, err
340
	}
341
	c := &Client{
342
		HTTPClient:          &http.Client{Transport: tr},
343
		TLSConfig:           tlsConfig,
344
		Dialer:              &net.Dialer{},
345
		endpoint:            endpoint,
346
		endpointURL:         u,
347
		eventMonitor:        new(eventMonitoringState),
348
		requestedAPIVersion: requestedAPIVersion,
349
	}
350
	c.initializeNativeClient(defaultTransport)
351
	return c, nil
352
}
353

354
// SetTimeout takes a timeout and applies it to the HTTPClient. It should not
355
// be called concurrently with any other Client methods.
356
func (c *Client) SetTimeout(t time.Duration) {
357
	if c.HTTPClient != nil {
358
		c.HTTPClient.Timeout = t
359
	}
360
}
361

362
func (c *Client) checkAPIVersion() error {
363
	serverAPIVersionString, err := c.getServerAPIVersionString()
364
	if err != nil {
365
		return err
366
	}
367
	c.serverAPIVersion, err = NewAPIVersion(serverAPIVersionString)
368
	if err != nil {
369
		return err
370
	}
371
	if c.requestedAPIVersion == nil {
372
		c.expectedAPIVersion = c.serverAPIVersion
373
	} else {
374
		c.expectedAPIVersion = c.requestedAPIVersion
375
	}
376
	return nil
377
}
378

379
// Endpoint returns the current endpoint. It's useful for getting the endpoint
380
// when using functions that get this data from the environment (like
381
// NewClientFromEnv.
382
func (c *Client) Endpoint() string {
383
	return c.endpoint
384
}
385

386
// Ping pings the docker server
387
//
388
// See https://goo.gl/wYfgY1 for more details.
389
func (c *Client) Ping() error {
390
	return c.PingWithContext(context.TODO())
391
}
392

393
// PingWithContext pings the docker server
394
// The context object can be used to cancel the ping request.
395
//
396
// See https://goo.gl/wYfgY1 for more details.
397
func (c *Client) PingWithContext(ctx context.Context) error {
398
	path := "/_ping"
399
	resp, err := c.do(http.MethodGet, path, doOptions{context: ctx})
400
	if err != nil {
401
		return err
402
	}
403
	if resp.StatusCode != http.StatusOK {
404
		return newError(resp)
405
	}
406
	resp.Body.Close()
407
	return nil
408
}
409

410
func (c *Client) getServerAPIVersionString() (version string, err error) {
411
	resp, err := c.do(http.MethodGet, "/version", doOptions{})
412
	if err != nil {
413
		return "", err
414
	}
415
	defer resp.Body.Close()
416
	if resp.StatusCode != http.StatusOK {
417
		return "", fmt.Errorf("received unexpected status %d while trying to retrieve the server version", resp.StatusCode)
418
	}
419
	var versionResponse map[string]any
420
	if err := json.NewDecoder(resp.Body).Decode(&versionResponse); err != nil {
421
		return "", err
422
	}
423
	if version, ok := (versionResponse["ApiVersion"]).(string); ok {
424
		return version, nil
425
	}
426
	return "", nil
427
}
428

429
type doOptions struct {
430
	data      any
431
	forceJSON bool
432
	headers   map[string]string
433
	context   context.Context
434
}
435

436
func (c *Client) do(method, path string, doOptions doOptions) (*http.Response, error) {
437
	var params io.Reader
438
	if doOptions.data != nil || doOptions.forceJSON {
439
		buf, err := json.Marshal(doOptions.data)
440
		if err != nil {
441
			return nil, err
442
		}
443
		params = bytes.NewBuffer(buf)
444
	}
445
	if path != "/version" && !c.SkipServerVersionCheck && c.expectedAPIVersion == nil {
446
		err := c.checkAPIVersion()
447
		if err != nil {
448
			return nil, err
449
		}
450
	}
451
	protocol := c.endpointURL.Scheme
452
	var u string
453
	switch protocol {
454
	case unixProtocol, namedPipeProtocol:
455
		u = c.getFakeNativeURL(path)
456
	default:
457
		u = c.getURL(path)
458
	}
459

460
	req, err := http.NewRequest(method, u, params)
461
	if err != nil {
462
		return nil, err
463
	}
464
	req.Header.Set("User-Agent", userAgent)
465
	if doOptions.data != nil {
466
		req.Header.Set("Content-Type", "application/json")
467
	} else if method == http.MethodPost {
468
		req.Header.Set("Content-Type", "plain/text")
469
	}
470

471
	for k, v := range doOptions.headers {
472
		req.Header.Set(k, v)
473
	}
474

475
	ctx := doOptions.context
476
	if ctx == nil {
477
		ctx = context.Background()
478
	}
479

480
	resp, err := c.HTTPClient.Do(req.WithContext(ctx))
481
	if err != nil {
482
		if strings.Contains(err.Error(), "connection refused") {
483
			return nil, ErrConnectionRefused
484
		}
485

486
		return nil, chooseError(ctx, err)
487
	}
488
	if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest {
489
		return nil, newError(resp)
490
	}
491
	return resp, nil
492
}
493

494
type streamOptions struct {
495
	setRawTerminal bool
496
	rawJSONStream  bool
497
	useJSONDecoder bool
498
	headers        map[string]string
499
	in             io.Reader
500
	stdout         io.Writer
501
	stderr         io.Writer
502
	reqSent        chan struct{}
503
	// timeout is the initial connection timeout
504
	timeout time.Duration
505
	// Timeout with no data is received, it's reset every time new data
506
	// arrives
507
	inactivityTimeout time.Duration
508
	context           context.Context
509
}
510

511
func chooseError(ctx context.Context, err error) error {
512
	select {
513
	case <-ctx.Done():
514
		return context.Cause(ctx)
515
	default:
516
		return err
517
	}
518
}
519

520
func (c *Client) stream(method, path string, streamOptions streamOptions) error {
521
	if (method == http.MethodPost || method == http.MethodPut) && streamOptions.in == nil {
522
		streamOptions.in = bytes.NewReader(nil)
523
	}
524
	if path != "/version" && !c.SkipServerVersionCheck && c.expectedAPIVersion == nil {
525
		err := c.checkAPIVersion()
526
		if err != nil {
527
			return err
528
		}
529
	}
530
	return c.streamURL(method, c.getURL(path), streamOptions)
531
}
532

533
func (c *Client) streamURL(method, url string, streamOptions streamOptions) error {
534
	if (method == http.MethodPost || method == http.MethodPut) && streamOptions.in == nil {
535
		streamOptions.in = bytes.NewReader(nil)
536
	}
537
	if !c.SkipServerVersionCheck && c.expectedAPIVersion == nil {
538
		err := c.checkAPIVersion()
539
		if err != nil {
540
			return err
541
		}
542
	}
543

544
	// make a sub-context so that our active cancellation does not affect parent
545
	ctx := streamOptions.context
546
	if ctx == nil {
547
		ctx = context.Background()
548
	}
549
	subCtx, cancelRequest := context.WithCancel(ctx)
550
	defer cancelRequest()
551

552
	req, err := http.NewRequestWithContext(ctx, method, url, streamOptions.in)
553
	if err != nil {
554
		return err
555
	}
556
	req.Header.Set("User-Agent", userAgent)
557
	if method == http.MethodPost {
558
		req.Header.Set("Content-Type", "plain/text")
559
	}
560
	for key, val := range streamOptions.headers {
561
		req.Header.Set(key, val)
562
	}
563
	var resp *http.Response
564
	protocol := c.endpointURL.Scheme
565
	address := c.endpointURL.Path
566
	if streamOptions.stdout == nil {
567
		streamOptions.stdout = io.Discard
568
	}
569
	if streamOptions.stderr == nil {
570
		streamOptions.stderr = io.Discard
571
	}
572

573
	if protocol == unixProtocol || protocol == namedPipeProtocol {
574
		var dial net.Conn
575
		dial, err = c.Dialer.Dial(protocol, address)
576
		if err != nil {
577
			return err
578
		}
579
		go func() {
580
			<-subCtx.Done()
581
			dial.Close()
582
		}()
583
		breader := bufio.NewReader(dial)
584
		err = req.Write(dial)
585
		if err != nil {
586
			return chooseError(subCtx, err)
587
		}
588

589
		// ReadResponse may hang if server does not replay
590
		if streamOptions.timeout > 0 {
591
			dial.SetDeadline(time.Now().Add(streamOptions.timeout))
592
		}
593

594
		if streamOptions.reqSent != nil {
595
			close(streamOptions.reqSent)
596
		}
597
		if resp, err = http.ReadResponse(breader, req); err != nil {
598
			// Cancel timeout for future I/O operations
599
			if streamOptions.timeout > 0 {
600
				dial.SetDeadline(time.Time{})
601
			}
602
			if strings.Contains(err.Error(), "connection refused") {
603
				return ErrConnectionRefused
604
			}
605

606
			return chooseError(subCtx, err)
607
		}
608
		defer resp.Body.Close()
609
	} else {
610
		if resp, err = c.HTTPClient.Do(req.WithContext(subCtx)); err != nil {
611
			if strings.Contains(err.Error(), "connection refused") {
612
				return ErrConnectionRefused
613
			}
614
			return chooseError(subCtx, err)
615
		}
616
		defer resp.Body.Close()
617
		if streamOptions.reqSent != nil {
618
			close(streamOptions.reqSent)
619
		}
620
	}
621
	if resp.StatusCode < 200 || resp.StatusCode >= 400 {
622
		return newError(resp)
623
	}
624
	var canceled uint32
625
	if streamOptions.inactivityTimeout > 0 {
626
		var ch chan<- struct{}
627
		resp.Body, ch = handleInactivityTimeout(resp.Body, streamOptions.inactivityTimeout, cancelRequest, &canceled)
628
		defer close(ch)
629
	}
630
	err = handleStreamResponse(resp, &streamOptions)
631
	if err != nil {
632
		if atomic.LoadUint32(&canceled) != 0 {
633
			return ErrInactivityTimeout
634
		}
635
		return chooseError(subCtx, err)
636
	}
637
	return nil
638
}
639

640
func handleStreamResponse(resp *http.Response, streamOptions *streamOptions) error {
641
	var err error
642
	if !streamOptions.useJSONDecoder && resp.Header.Get("Content-Type") != "application/json" {
643
		if streamOptions.setRawTerminal {
644
			_, err = io.Copy(streamOptions.stdout, resp.Body)
645
		} else {
646
			_, err = stdcopy.StdCopy(streamOptions.stdout, streamOptions.stderr, resp.Body)
647
		}
648
		return err
649
	}
650
	// if we want to get raw json stream, just copy it back to output
651
	// without decoding it
652
	if streamOptions.rawJSONStream {
653
		_, err = io.Copy(streamOptions.stdout, resp.Body)
654
		return err
655
	}
656
	if st, ok := streamOptions.stdout.(stream); ok {
657
		err = jsonmessage.DisplayJSONMessagesToStream(resp.Body, st, nil)
658
	} else {
659
		err = jsonmessage.DisplayJSONMessagesStream(resp.Body, streamOptions.stdout, 0, false, nil)
660
	}
661
	return err
662
}
663

664
type stream interface {
665
	io.Writer
666
	FD() uintptr
667
	IsTerminal() bool
668
}
669

670
type proxyReader struct {
671
	io.ReadCloser
672
	calls uint64
673
}
674

675
func (p *proxyReader) callCount() uint64 {
676
	return atomic.LoadUint64(&p.calls)
677
}
678

679
func (p *proxyReader) Read(data []byte) (int, error) {
680
	atomic.AddUint64(&p.calls, 1)
681
	return p.ReadCloser.Read(data)
682
}
683

684
func handleInactivityTimeout(reader io.ReadCloser, timeout time.Duration, cancelRequest func(), canceled *uint32) (io.ReadCloser, chan<- struct{}) {
685
	done := make(chan struct{})
686
	proxyReader := &proxyReader{ReadCloser: reader}
687
	go func() {
688
		var lastCallCount uint64
689
		for {
690
			select {
691
			case <-time.After(timeout):
692
			case <-done:
693
				return
694
			}
695
			curCallCount := proxyReader.callCount()
696
			if curCallCount == lastCallCount {
697
				atomic.AddUint32(canceled, 1)
698
				cancelRequest()
699
				return
700
			}
701
			lastCallCount = curCallCount
702
		}
703
	}()
704
	return proxyReader, done
705
}
706

707
type hijackOptions struct {
708
	success        chan struct{}
709
	setRawTerminal bool
710
	in             io.Reader
711
	stdout         io.Writer
712
	stderr         io.Writer
713
	data           any
714
}
715

716
// CloseWaiter is an interface with methods for closing the underlying resource
717
// and then waiting for it to finish processing.
718
type CloseWaiter interface {
719
	io.Closer
720
	Wait() error
721
}
722

723
type waiterFunc func() error
724

725
func (w waiterFunc) Wait() error { return w() }
726

727
type closerFunc func() error
728

729
func (c closerFunc) Close() error { return c() }
730

731
func (c *Client) hijack(method, path string, hijackOptions hijackOptions) (CloseWaiter, error) {
732
	if path != "/version" && !c.SkipServerVersionCheck && c.expectedAPIVersion == nil {
733
		err := c.checkAPIVersion()
734
		if err != nil {
735
			return nil, err
736
		}
737
	}
738
	var params io.Reader
739
	if hijackOptions.data != nil {
740
		buf, err := json.Marshal(hijackOptions.data)
741
		if err != nil {
742
			return nil, err
743
		}
744
		params = bytes.NewBuffer(buf)
745
	}
746
	req, err := http.NewRequest(method, c.getURL(path), params)
747
	if err != nil {
748
		return nil, err
749
	}
750
	req.Header.Set("Content-Type", "application/json")
751
	req.Header.Set("Connection", "Upgrade")
752
	req.Header.Set("Upgrade", "tcp")
753
	protocol := c.endpointURL.Scheme
754
	address := c.endpointURL.Path
755
	if protocol != unixProtocol && protocol != namedPipeProtocol {
756
		protocol = "tcp"
757
		address = c.endpointURL.Host
758
	}
759
	var dial net.Conn
760
	if c.TLSConfig != nil && protocol != unixProtocol && protocol != namedPipeProtocol {
761
		netDialer, ok := c.Dialer.(*net.Dialer)
762
		if !ok {
763
			return nil, ErrTLSNotSupported
764
		}
765
		dial, err = tlsDialWithDialer(netDialer, protocol, address, c.TLSConfig)
766
		if err != nil {
767
			return nil, err
768
		}
769
	} else {
770
		dial, err = c.Dialer.Dial(protocol, address)
771
		if err != nil {
772
			return nil, err
773
		}
774
	}
775

776
	errs := make(chan error, 1)
777
	quit := make(chan struct{})
778
	go func() {
779
		//lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing.
780
		clientconn := httputil.NewClientConn(dial, nil)
781
		defer clientconn.Close()
782
		clientconn.Do(req)
783
		if hijackOptions.success != nil {
784
			hijackOptions.success <- struct{}{}
785
			<-hijackOptions.success
786
		}
787
		rwc, br := clientconn.Hijack()
788
		defer rwc.Close()
789

790
		errChanOut := make(chan error, 1)
791
		errChanIn := make(chan error, 2)
792
		if hijackOptions.stdout == nil && hijackOptions.stderr == nil {
793
			close(errChanOut)
794
		} else {
795
			// Only copy if hijackOptions.stdout and/or hijackOptions.stderr is actually set.
796
			// Otherwise, if the only stream you care about is stdin, your attach session
797
			// will "hang" until the container terminates, even though you're not reading
798
			// stdout/stderr
799
			if hijackOptions.stdout == nil {
800
				hijackOptions.stdout = io.Discard
801
			}
802
			if hijackOptions.stderr == nil {
803
				hijackOptions.stderr = io.Discard
804
			}
805

806
			go func() {
807
				defer func() {
808
					if hijackOptions.in != nil {
809
						if closer, ok := hijackOptions.in.(io.Closer); ok {
810
							closer.Close()
811
						}
812
						errChanIn <- nil
813
					}
814
				}()
815

816
				var err error
817
				if hijackOptions.setRawTerminal {
818
					_, err = io.Copy(hijackOptions.stdout, br)
819
				} else {
820
					_, err = stdcopy.StdCopy(hijackOptions.stdout, hijackOptions.stderr, br)
821
				}
822
				errChanOut <- err
823
			}()
824
		}
825

826
		go func() {
827
			var err error
828
			if hijackOptions.in != nil {
829
				_, err = io.Copy(rwc, hijackOptions.in)
830
			}
831
			errChanIn <- err
832
			rwc.(interface {
833
				CloseWrite() error
834
			}).CloseWrite()
835
		}()
836

837
		var errIn error
838
		select {
839
		case errIn = <-errChanIn:
840
		case <-quit:
841
		}
842

843
		var errOut error
844
		select {
845
		case errOut = <-errChanOut:
846
		case <-quit:
847
		}
848

849
		if errIn != nil {
850
			errs <- errIn
851
		} else {
852
			errs <- errOut
853
		}
854
	}()
855

856
	return struct {
857
		closerFunc
858
		waiterFunc
859
	}{
860
		closerFunc(func() error { close(quit); return nil }),
861
		waiterFunc(func() error { return <-errs }),
862
	}, nil
863
}
864

865
func (c *Client) getURL(path string) string {
866
	urlStr := strings.TrimRight(c.endpointURL.String(), "/")
867
	if c.endpointURL.Scheme == unixProtocol || c.endpointURL.Scheme == namedPipeProtocol {
868
		urlStr = ""
869
	}
870
	if c.requestedAPIVersion != nil {
871
		return fmt.Sprintf("%s/v%s%s", urlStr, c.requestedAPIVersion, path)
872
	}
873
	return fmt.Sprintf("%s%s", urlStr, path)
874
}
875

876
func (c *Client) getPath(basepath string, opts any) (string, error) {
877
	queryStr, requiredAPIVersion := queryStringVersion(opts)
878
	return c.pathVersionCheck(basepath, queryStr, requiredAPIVersion)
879
}
880

881
func (c *Client) pathVersionCheck(basepath, queryStr string, requiredAPIVersion APIVersion) (string, error) {
882
	urlStr := strings.TrimRight(c.endpointURL.String(), "/")
883
	if c.endpointURL.Scheme == unixProtocol || c.endpointURL.Scheme == namedPipeProtocol {
884
		urlStr = ""
885
	}
886
	if c.requestedAPIVersion != nil {
887
		if c.requestedAPIVersion.GreaterThanOrEqualTo(requiredAPIVersion) {
888
			return fmt.Sprintf("%s/v%s%s?%s", urlStr, c.requestedAPIVersion, basepath, queryStr), nil
889
		}
890
		return "", fmt.Errorf("API %s requires version %s, requested version %s is insufficient",
891
			basepath, requiredAPIVersion, c.requestedAPIVersion)
892
	}
893
	if requiredAPIVersion != nil {
894
		return fmt.Sprintf("%s/v%s%s?%s", urlStr, requiredAPIVersion, basepath, queryStr), nil
895
	}
896
	return fmt.Sprintf("%s%s?%s", urlStr, basepath, queryStr), nil
897
}
898

899
// getFakeNativeURL returns the URL needed to make an HTTP request over a UNIX
900
// domain socket to the given path.
901
func (c *Client) getFakeNativeURL(path string) string {
902
	u := *c.endpointURL // Copy.
903

904
	// Override URL so that net/http will not complain.
905
	u.Scheme = "http"
906
	u.Host = "unix.sock" // Doesn't matter what this is - it's not used.
907
	u.Path = ""
908
	urlStr := strings.TrimRight(u.String(), "/")
909
	if c.requestedAPIVersion != nil {
910
		return fmt.Sprintf("%s/v%s%s", urlStr, c.requestedAPIVersion, path)
911
	}
912
	return fmt.Sprintf("%s%s", urlStr, path)
913
}
914

915
func queryStringVersion(opts any) (string, APIVersion) {
916
	if opts == nil {
917
		return "", nil
918
	}
919
	value := reflect.ValueOf(opts)
920
	if value.Kind() == reflect.Ptr {
921
		value = value.Elem()
922
	}
923
	if value.Kind() != reflect.Struct {
924
		return "", nil
925
	}
926
	var apiVersion APIVersion
927
	items := url.Values(map[string][]string{})
928
	for i := 0; i < value.NumField(); i++ {
929
		field := value.Type().Field(i)
930
		if field.PkgPath != "" {
931
			continue
932
		}
933
		key := field.Tag.Get("qs")
934
		if key == "" {
935
			key = strings.ToLower(field.Name)
936
		} else if key == "-" {
937
			continue
938
		}
939
		if addQueryStringValue(items, key, value.Field(i)) {
940
			verstr := field.Tag.Get("ver")
941
			if verstr != "" {
942
				ver, _ := NewAPIVersion(verstr)
943
				if apiVersion == nil {
944
					apiVersion = ver
945
				} else if ver.GreaterThan(apiVersion) {
946
					apiVersion = ver
947
				}
948
			}
949
		}
950
	}
951
	return items.Encode(), apiVersion
952
}
953

954
func queryString(opts any) string {
955
	s, _ := queryStringVersion(opts)
956
	return s
957
}
958

959
func addQueryStringValue(items url.Values, key string, v reflect.Value) bool {
960
	switch v.Kind() {
961
	case reflect.Bool:
962
		if v.Bool() {
963
			items.Add(key, "1")
964
			return true
965
		}
966
	case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
967
		if v.Int() > 0 {
968
			items.Add(key, strconv.FormatInt(v.Int(), 10))
969
			return true
970
		}
971
	case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
972
		if v.Uint() > 0 {
973
			items.Add(key, strconv.FormatUint(v.Uint(), 10))
974
			return true
975
		}
976
	case reflect.Float32, reflect.Float64:
977
		if v.Float() > 0 {
978
			items.Add(key, strconv.FormatFloat(v.Float(), 'f', -1, 64))
979
			return true
980
		}
981
	case reflect.String:
982
		if v.String() != "" {
983
			items.Add(key, v.String())
984
			return true
985
		}
986
	case reflect.Ptr:
987
		if !v.IsNil() {
988
			if b, err := json.Marshal(v.Interface()); err == nil {
989
				items.Add(key, string(b))
990
				return true
991
			}
992
		}
993
	case reflect.Map:
994
		if len(v.MapKeys()) > 0 {
995
			if b, err := json.Marshal(v.Interface()); err == nil {
996
				items.Add(key, string(b))
997
				return true
998
			}
999
		}
1000
	case reflect.Array, reflect.Slice:
1001
		vLen := v.Len()
1002
		var valuesAdded int
1003
		if vLen > 0 {
1004
			for i := 0; i < vLen; i++ {
1005
				if addQueryStringValue(items, key, v.Index(i)) {
1006
					valuesAdded++
1007
				}
1008
			}
1009
		}
1010
		return valuesAdded > 0
1011
	}
1012
	return false
1013
}
1014

1015
// Error represents failures in the API. It represents a failure from the API.
1016
type Error struct {
1017
	Status  int
1018
	Message string
1019
}
1020

1021
func newError(resp *http.Response) *Error {
1022
	type ErrMsg struct {
1023
		Message string `json:"message"`
1024
	}
1025
	defer resp.Body.Close()
1026
	data, err := io.ReadAll(resp.Body)
1027
	if err != nil {
1028
		return &Error{Status: resp.StatusCode, Message: fmt.Sprintf("cannot read body, err: %v", err)}
1029
	}
1030
	var emsg ErrMsg
1031
	err = json.Unmarshal(data, &emsg)
1032
	if err != nil {
1033
		return &Error{Status: resp.StatusCode, Message: string(data)}
1034
	}
1035
	return &Error{Status: resp.StatusCode, Message: emsg.Message}
1036
}
1037

1038
func (e *Error) Error() string {
1039
	return fmt.Sprintf("API error (%d): %s", e.Status, e.Message)
1040
}
1041

1042
func parseEndpoint(endpoint string, tls bool) (*url.URL, error) {
1043
	if endpoint != "" && !strings.Contains(endpoint, "://") {
1044
		endpoint = "tcp://" + endpoint
1045
	}
1046
	u, err := url.Parse(endpoint)
1047
	if err != nil {
1048
		return nil, ErrInvalidEndpoint
1049
	}
1050
	if tls && u.Scheme != "unix" {
1051
		u.Scheme = "https"
1052
	}
1053
	switch u.Scheme {
1054
	case unixProtocol, namedPipeProtocol:
1055
		return u, nil
1056
	case "http", "https", "tcp":
1057
		_, port, err := net.SplitHostPort(u.Host)
1058
		if err != nil {
1059
			var e *net.AddrError
1060
			if errors.As(err, &e) {
1061
				if e.Err == "missing port in address" {
1062
					return u, nil
1063
				}
1064
			}
1065
			return nil, ErrInvalidEndpoint
1066
		}
1067
		number, err := strconv.ParseInt(port, 10, 64)
1068
		if err == nil && number > 0 && number < 65536 {
1069
			if u.Scheme == "tcp" {
1070
				if tls {
1071
					u.Scheme = "https"
1072
				} else {
1073
					u.Scheme = "http"
1074
				}
1075
			}
1076
			return u, nil
1077
		}
1078
		return nil, ErrInvalidEndpoint
1079
	default:
1080
		return nil, ErrInvalidEndpoint
1081
	}
1082
}
1083

1084
type dockerEnv struct {
1085
	dockerHost      string
1086
	dockerTLSVerify bool
1087
	dockerCertPath  string
1088
}
1089

1090
func getDockerEnv() (*dockerEnv, error) {
1091
	dockerHost := os.Getenv("DOCKER_HOST")
1092
	var err error
1093
	if dockerHost == "" {
1094
		dockerHost = defaultHost
1095
	}
1096
	dockerTLSVerify := os.Getenv("DOCKER_TLS_VERIFY") != ""
1097
	var dockerCertPath string
1098
	if dockerTLSVerify {
1099
		dockerCertPath = os.Getenv("DOCKER_CERT_PATH")
1100
		if dockerCertPath == "" {
1101
			home := homedir.Get()
1102
			if home == "" {
1103
				return nil, errors.New("environment variable HOME must be set if DOCKER_CERT_PATH is not set")
1104
			}
1105
			dockerCertPath = filepath.Join(home, ".docker")
1106
			dockerCertPath, err = filepath.Abs(dockerCertPath)
1107
			if err != nil {
1108
				return nil, err
1109
			}
1110
		}
1111
	}
1112
	return &dockerEnv{
1113
		dockerHost:      dockerHost,
1114
		dockerTLSVerify: dockerTLSVerify,
1115
		dockerCertPath:  dockerCertPath,
1116
	}, nil
1117
}
1118

1119
// defaultTransport returns a new http.Transport with similar default values to
1120
// http.DefaultTransport, but with idle connections and keepalives disabled.
1121
func defaultTransport() *http.Transport {
1122
	transport := defaultPooledTransport()
1123
	transport.DisableKeepAlives = true
1124
	transport.MaxIdleConnsPerHost = -1
1125
	return transport
1126
}
1127

1128
// defaultPooledTransport returns a new http.Transport with similar default
1129
// values to http.DefaultTransport. Do not use this for transient transports as
1130
// it can leak file descriptors over time. Only use this for transports that
1131
// will be re-used for the same host(s).
1132
func defaultPooledTransport() *http.Transport {
1133
	transport := &http.Transport{
1134
		Proxy: http.ProxyFromEnvironment,
1135
		DialContext: (&net.Dialer{
1136
			Timeout:   30 * time.Second,
1137
			KeepAlive: 30 * time.Second,
1138
		}).DialContext,
1139
		MaxIdleConns:          100,
1140
		IdleConnTimeout:       90 * time.Second,
1141
		TLSHandshakeTimeout:   10 * time.Second,
1142
		ExpectContinueTimeout: 1 * time.Second,
1143
		MaxIdleConnsPerHost:   runtime.GOMAXPROCS(0) + 1,
1144
	}
1145
	return transport
1146
}
1147

1148
// defaultClient returns a new http.Client with similar default values to
1149
// http.Client, but with a non-shared Transport, idle connections disabled, and
1150
// keepalives disabled.
1151
func defaultClient() *http.Client {
1152
	return &http.Client{
1153
		Transport: defaultTransport(),
1154
	}
1155
}
1156

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.