talos

Форк
0
297 строк · 9.1 Кб
1
// This Source Code Form is subject to the terms of the Mozilla Public
2
// License, v. 2.0. If a copy of the MPL was not distributed with this
3
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
package backend
6

7
import (
8
	"context"
9
	"crypto/tls"
10
	"fmt"
11
	"sync"
12
	"time"
13

14
	"github.com/siderolabs/grpc-proxy/proxy"
15
	"github.com/siderolabs/net"
16
	"google.golang.org/grpc"
17
	"google.golang.org/grpc/backoff"
18
	"google.golang.org/grpc/connectivity"
19
	"google.golang.org/grpc/credentials"
20
	"google.golang.org/grpc/metadata"
21
	"google.golang.org/grpc/status"
22
	"google.golang.org/protobuf/encoding/protowire"
23

24
	"github.com/siderolabs/talos/pkg/grpc/middleware/authz"
25
	"github.com/siderolabs/talos/pkg/machinery/api/common"
26
	"github.com/siderolabs/talos/pkg/machinery/constants"
27
	"github.com/siderolabs/talos/pkg/machinery/proto"
28
)
29

30
// GracefulShutdownTimeout is the timeout for graceful shutdown of the backend connection.
31
//
32
// Talos has a few long-running API calls, so we need to give the backend some time to finish them.
33
//
34
// The connection will enter IDLE time after GracefulShutdownTimeout/2, if no RPC is running.
35
const GracefulShutdownTimeout = 30 * time.Minute
36

37
var _ proxy.Backend = (*APID)(nil)
38

39
// APID backend performs proxying to another apid instance.
40
//
41
// Backend authenticates itself using given grpc credentials.
42
type APID struct {
43
	target string
44

45
	tlsConfigProvider func() (*tls.Config, error)
46

47
	mu   sync.Mutex
48
	conn *grpc.ClientConn
49
}
50

51
// NewAPID creates new instance of APID backend.
52
func NewAPID(target string, tlsConfigProvider func() (*tls.Config, error)) (*APID, error) {
53
	// perform very basic validation on target, trying to weed out empty addresses or addresses with the port appended
54
	if target == "" || net.AddressContainsPort(target) {
55
		return nil, fmt.Errorf("invalid target %q", target)
56
	}
57

58
	return &APID{
59
		target:            target,
60
		tlsConfigProvider: tlsConfigProvider,
61
	}, nil
62
}
63

64
func (a *APID) String() string {
65
	return a.target
66
}
67

68
// GetConnection returns a grpc connection to the backend.
69
func (a *APID) GetConnection(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
70
	md, _ := metadata.FromIncomingContext(ctx)
71
	md = md.Copy()
72

73
	authz.SetMetadata(md, authz.GetRoles(ctx))
74

75
	if authority := md[":authority"]; len(authority) > 0 {
76
		md.Set("proxyfrom", authority...)
77
	} else {
78
		md.Set("proxyfrom", "unknown")
79
	}
80

81
	delete(md, ":authority")
82
	delete(md, "nodes")
83
	delete(md, "node")
84

85
	outCtx := metadata.NewOutgoingContext(ctx, md)
86

87
	a.mu.Lock()
88
	defer a.mu.Unlock()
89

90
	if a.conn != nil {
91
		return outCtx, a.conn, nil
92
	}
93

94
	tlsConfig, err := a.tlsConfigProvider()
95
	if err != nil {
96
		return outCtx, nil, err
97
	}
98

99
	// override  max delay to avoid excessive backoff when the another node is unavailable (e.g. rebooted),
100
	// and apid used as an endpoint considers another node to be down for longer than expected.
101
	//
102
	// default max delay is 2 minutes, which is too long for our use case.
103
	backoffConfig := backoff.DefaultConfig
104
	backoffConfig.MaxDelay = 15 * time.Second
105

106
	a.conn, err = grpc.NewClient(
107
		fmt.Sprintf("%s:%d", net.FormatAddress(a.target), constants.ApidPort),
108
		grpc.WithInitialWindowSize(65535*32),
109
		grpc.WithInitialConnWindowSize(65535*16),
110
		grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
111
		grpc.WithIdleTimeout(GracefulShutdownTimeout/2), // use half of the shutdown timeout as idle timeout
112
		grpc.WithConnectParams(grpc.ConnectParams{
113
			Backoff: backoffConfig,
114
			// not published as a constant in gRPC library
115
			// see: https://github.com/grpc/grpc-go/blob/d5dee5fdbdeb52f6ea10b37b2cc7ce37814642d7/clientconn.go#L55-L56
116
			MinConnectTimeout: 20 * time.Second,
117
		}),
118
		grpc.WithDefaultCallOptions(
119
			grpc.MaxCallRecvMsgSize(constants.GRPCMaxMessageSize),
120
		),
121
		grpc.WithCodec(proxy.Codec()), //nolint:staticcheck
122
		grpc.WithSharedWriteBuffer(true),
123
	)
124

125
	return outCtx, a.conn, err
126
}
127

128
// AppendInfo is called to enhance response from the backend with additional data.
129
//
130
// AppendInfo enhances upstream response with node metadata (target).
131
//
132
// This method depends on grpc protobuf response structure, each response should
133
// look like:
134
//
135
//	  message SomeResponse {
136
//	    repeated SomeReply messages = 1; // please note field ID == 1
137
//	  }
138
//
139
//	  message SomeReply {
140
//		   common.Metadata metadata = 1;
141
//	    <other fields go here ...>
142
//	  }
143
//
144
// As 'SomeReply' is repeated in 'SomeResponse', if we concatenate protobuf representation
145
// of several 'SomeResponse' messages, we still get valid 'SomeResponse' representation but with more
146
// entries (feature of protobuf binary representation).
147
//
148
// If we look at binary representation of any unary 'SomeResponse' message, it will always contain one
149
// protobuf field with field ID 1 (see above) and type 2 (embedded message SomeReply is encoded
150
// as string with length). So if we want to add fields to 'SomeReply', we can simply read field
151
// header, adjust length for new 'SomeReply' representation, and prepend new field header.
152
//
153
// At the same time, we can add 'common.Metadata' structure to 'SomeReply' by simply
154
// appending or prepending 'common.Metadata' as a single field. This requires 'metadata'
155
// field to be not defined in original response. (This is due to the fact that protobuf message
156
// representation is concatenation of each field representation).
157
//
158
// To build only single field (Metadata) we use helper message which contains exactly this
159
// field with same field ID as in every other 'SomeReply':
160
//
161
//	  message Empty {
162
//	    common.Metadata metadata = 1;
163
//		}
164
//
165
// As streaming replies are not wrapped into 'SomeResponse' with 'repeated', handling is simpler: we just
166
// need to append Empty with details.
167
//
168
// So AppendInfo does the following: validates that response contains field ID 1 encoded as string,
169
// cuts field header, rest is representation of some reply. Marshal 'Empty' as protobuf,
170
// which builds 'common.Metadata' field, append it to original response message, build new header
171
// for new length of some response, and add back new field header.
172
func (a *APID) AppendInfo(streaming bool, resp []byte) ([]byte, error) {
173
	payload, err := proto.Marshal(&common.Empty{
174
		Metadata: &common.Metadata{
175
			Hostname: a.target,
176
		},
177
	})
178

179
	if streaming {
180
		return append(resp, payload...), err
181
	}
182

183
	const (
184
		metadataField = 1 // field number in proto definition for repeated response
185
		metadataType  = 2 // "string" for embedded messages
186
	)
187

188
	// decode protobuf embedded header
189

190
	typ, n1 := protowire.ConsumeVarint(resp)
191
	if n1 < 0 {
192
		return nil, protowire.ParseError(n1)
193
	}
194

195
	_, n2 := protowire.ConsumeVarint(resp[n1:]) // length
196
	if n2 < 0 {
197
		return nil, protowire.ParseError(n2)
198
	}
199

200
	if typ != (metadataField<<3)|metadataType {
201
		return nil, fmt.Errorf("unexpected message format: %d", typ)
202
	}
203

204
	if n1+n2 > len(resp) {
205
		return nil, fmt.Errorf("unexpected message size: %d", len(resp))
206
	}
207

208
	// cut off embedded message header
209
	resp = resp[n1+n2:]
210
	// build new embedded message header
211
	prefix := protowire.AppendVarint(
212
		protowire.AppendVarint(nil, (metadataField<<3)|metadataType),
213
		uint64(len(resp)+len(payload)),
214
	)
215
	resp = append(prefix, resp...)
216

217
	return append(resp, payload...), err
218
}
219

220
// BuildError is called to convert error from upstream into response field.
221
//
222
// BuildError converts upstream error into message from upstream, so that multiple
223
// successful and failure responses might be returned.
224
//
225
// This simply relies on the fact that any response contains 'Empty' message.
226
// So if 'Empty' is unmarshalled into any other reply message, all the fields
227
// are undefined but 'Metadata':
228
//
229
//	  message Empty {
230
//	   common.Metadata metadata = 1;
231
//		}
232
//
233
//	 message EmptyResponse {
234
//	   repeated Empty messages = 1;
235
//	}
236
//
237
// Streaming responses are not wrapped into Empty, so we simply marshall EmptyResponse
238
// message.
239
func (a *APID) BuildError(streaming bool, err error) ([]byte, error) {
240
	var resp proto.Message = &common.Empty{
241
		Metadata: &common.Metadata{
242
			Hostname: a.target,
243
			Error:    err.Error(),
244
			Status:   status.Convert(err).Proto(),
245
		},
246
	}
247

248
	if !streaming {
249
		resp = &common.EmptyResponse{
250
			Messages: []*common.Empty{
251
				resp.(*common.Empty),
252
			},
253
		}
254
	}
255

256
	return proto.Marshal(resp)
257
}
258

259
// Close connection.
260
func (a *APID) Close() {
261
	a.mu.Lock()
262
	defer a.mu.Unlock()
263

264
	if a.conn != nil {
265
		gracefulGRPCClose(a.conn, GracefulShutdownTimeout)
266
		a.conn = nil
267
	}
268
}
269

270
func gracefulGRPCClose(conn *grpc.ClientConn, timeout time.Duration) {
271
	// close the client connection in the background, tries to avoid closing the connection
272
	// if the connection is in the middle of a call (e.g. streaming API)
273
	//
274
	// see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md for details on connection states
275
	go func() {
276
		ctx, cancel := context.WithTimeout(context.Background(), timeout)
277
		defer cancel()
278

279
		for ctx.Err() == nil {
280
			switch state := conn.GetState(); state { //nolint:exhaustive
281
			case connectivity.Idle,
282
				connectivity.Shutdown,
283
				connectivity.TransientFailure:
284
				// close immediately, connection is not used
285
				conn.Close() //nolint:errcheck
286

287
				return
288
			default:
289
				// wait for state change of the connection
290
				conn.WaitForStateChange(ctx, state)
291
			}
292
		}
293

294
		// close anyways on timeout
295
		conn.Close() //nolint:errcheck
296
	}()
297
}
298

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.