talos

Форк
0
/
connection.go 
224 строки · 6.0 Кб
1
// This Source Code Form is subject to the terms of the Mozilla Public
2
// License, v. 2.0. If a copy of the MPL was not distributed with this
3
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
package client
6

7
import (
8
	"crypto/tls"
9
	"crypto/x509"
10
	"encoding/base64"
11
	"errors"
12
	"fmt"
13
	"net"
14
	"net/url"
15
	"strings"
16

17
	"github.com/siderolabs/gen/xslices"
18
	"github.com/siderolabs/go-api-signature/pkg/client/interceptor"
19
	"github.com/siderolabs/go-api-signature/pkg/pgp/client"
20
	"google.golang.org/grpc"
21
	"google.golang.org/grpc/credentials"
22

23
	clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config"
24
	"github.com/siderolabs/talos/pkg/machinery/client/resolver"
25
	"github.com/siderolabs/talos/pkg/machinery/constants"
26
)
27

28
// Conn returns underlying client connection.
29
func (c *Client) Conn() *grpc.ClientConn {
30
	return c.conn.ClientConn
31
}
32

33
// getConn creates new gRPC connection.
34
func (c *Client) getConn(opts ...grpc.DialOption) (*grpcConnectionWrapper, error) {
35
	endpoints := c.GetEndpoints()
36

37
	target := c.getTarget(
38
		resolver.EnsureEndpointsHavePorts(
39
			reduceURLsToAddresses(endpoints),
40
			constants.ApidPort),
41
	)
42

43
	dialOpts := []grpc.DialOption{
44
		grpc.WithDefaultCallOptions( // enable compression by default
45
			// TODO: enable compression for Talos 1.7+
46
			// grpc.UseCompressor(gzip.Name),
47
			grpc.MaxCallRecvMsgSize(constants.GRPCMaxMessageSize),
48
		),
49
		grpc.WithSharedWriteBuffer(true),
50
	}
51
	dialOpts = append(dialOpts, c.options.grpcDialOptions...)
52
	dialOpts = append(dialOpts, opts...)
53

54
	if c.options.unixSocketPath != "" {
55
		conn, err := grpc.NewClient(target, dialOpts...)
56

57
		return newGRPCConnectionWrapper(c.GetClusterName(), conn), err
58
	}
59

60
	tlsConfig := c.options.tlsConfig
61

62
	if tlsConfig != nil {
63
		return c.makeConnection(target, credentials.NewTLS(tlsConfig), dialOpts)
64
	}
65

66
	if err := c.resolveConfigContext(); err != nil {
67
		return nil, fmt.Errorf("failed to resolve configuration context: %w", err)
68
	}
69

70
	basicAuth := c.options.configContext.Auth.Basic
71
	if basicAuth != nil {
72
		dialOpts = append(dialOpts, WithGRPCBasicAuth(basicAuth.Username, basicAuth.Password))
73
	}
74

75
	sideroV1 := c.options.configContext.Auth.SideroV1
76
	if sideroV1 != nil {
77
		var contextName string
78

79
		if c.options.config != nil {
80
			contextName = c.options.config.Context
81
		}
82

83
		if c.options.contextOverrideSet {
84
			contextName = c.options.contextOverride
85
		}
86

87
		authInterceptor := interceptor.New(interceptor.Options{
88
			UserKeyProvider: client.NewKeyProvider("talos/keys"),
89
			ContextName:     contextName,
90
			Identity:        sideroV1.Identity,
91
			ClientName:      "Talos",
92
		})
93

94
		dialOpts = append(dialOpts,
95
			grpc.WithUnaryInterceptor(authInterceptor.Unary()),
96
			grpc.WithStreamInterceptor(authInterceptor.Stream()),
97
		)
98
	}
99

100
	creds, err := buildCredentials(c.options.configContext, endpoints)
101
	if err != nil {
102
		return nil, err
103
	}
104

105
	return c.makeConnection(target, creds, dialOpts)
106
}
107

108
func buildTLSConfig(configContext *clientconfig.Context) (*tls.Config, error) {
109
	tlsConfig := &tls.Config{}
110

111
	caBytes, err := getCA(configContext)
112
	if err != nil {
113
		return nil, fmt.Errorf("failed to get CA: %w", err)
114
	}
115

116
	if len(caBytes) > 0 {
117
		tlsConfig.RootCAs = x509.NewCertPool()
118

119
		if ok := tlsConfig.RootCAs.AppendCertsFromPEM(caBytes); !ok {
120
			return nil, errors.New("failed to append CA certificate to RootCAs pool")
121
		}
122
	}
123

124
	crt, err := CertificateFromConfigContext(configContext)
125
	if err != nil {
126
		return nil, fmt.Errorf("failed to acquire credentials: %w", err)
127
	}
128

129
	if crt != nil {
130
		tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
131
		tlsConfig.Certificates = append(tlsConfig.Certificates, *crt)
132
	}
133

134
	return tlsConfig, nil
135
}
136

137
func (c *Client) makeConnection(target string, creds credentials.TransportCredentials, dialOpts []grpc.DialOption) (*grpcConnectionWrapper, error) {
138
	dialOpts = append(dialOpts,
139
		grpc.WithTransportCredentials(creds),
140
		grpc.WithInitialWindowSize(65535*32),
141
		grpc.WithInitialConnWindowSize(65535*16))
142

143
	conn, err := grpc.NewClient(target, dialOpts...)
144

145
	return newGRPCConnectionWrapper(c.GetClusterName(), conn), err
146
}
147

148
func (c *Client) getTarget(endpoints []string) string {
149
	switch {
150
	case c.options.unixSocketPath != "":
151
		return fmt.Sprintf("unix:///%s", c.options.unixSocketPath)
152
	case len(endpoints) > 1:
153
		return fmt.Sprintf("%s:///%s", resolver.RoundRobinResolverScheme, strings.Join(endpoints, ","))
154
	default:
155
		// NB: we use the `dns` scheme here in order to handle fancier situations
156
		// when there is a single endpoint.
157
		// Such possibilities include SRV records, multiple IPs from A and/or AAAA
158
		// records, and descriptive TXT records which include things like load
159
		// balancer specs.
160
		return fmt.Sprintf("dns:///%s", endpoints[0])
161
	}
162
}
163

164
func getCA(context *clientconfig.Context) ([]byte, error) {
165
	if context.CA == "" {
166
		return nil, nil
167
	}
168

169
	caBytes, err := base64.StdEncoding.DecodeString(context.CA)
170
	if err != nil {
171
		return nil, fmt.Errorf("error decoding CA: %w", err)
172
	}
173

174
	return caBytes, err
175
}
176

177
// CertificateFromConfigContext constructs the client Credentials from the given configuration Context.
178
func CertificateFromConfigContext(context *clientconfig.Context) (*tls.Certificate, error) {
179
	if context.Crt == "" && context.Key == "" {
180
		return nil, nil
181
	}
182

183
	crtBytes, err := base64.StdEncoding.DecodeString(context.Crt)
184
	if err != nil {
185
		return nil, fmt.Errorf("error decoding certificate: %w", err)
186
	}
187

188
	keyBytes, err := base64.StdEncoding.DecodeString(context.Key)
189
	if err != nil {
190
		return nil, fmt.Errorf("error decoding key: %w", err)
191
	}
192

193
	crt, err := tls.X509KeyPair(crtBytes, keyBytes)
194
	if err != nil {
195
		return nil, fmt.Errorf("could not load client key pair: %s", err)
196
	}
197

198
	return &crt, nil
199
}
200

201
func reduceURLsToAddresses(endpoints []string) []string {
202
	return xslices.Map(endpoints, func(endpoint string) string {
203
		u, err := url.Parse(endpoint)
204
		if err != nil {
205
			return endpoint
206
		}
207

208
		if u.Scheme == "https" && u.Port() == "" {
209
			return net.JoinHostPort(u.Hostname(), "443")
210
		}
211

212
		if u.Scheme != "" {
213
			if u.Port() != "" {
214
				return net.JoinHostPort(u.Hostname(), u.Port())
215
			}
216

217
			if u.Opaque == "" {
218
				return u.Host
219
			}
220
		}
221

222
		return endpoint
223
	})
224
}
225

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.