cubefs

Форк
0
378 строк · 14.5 Кб
1
/*
2
 *
3
 * Copyright 2017 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
// Package balancer defines APIs for load balancing in gRPC.
20
// All APIs in this package are experimental.
21
package balancer
22

23
import (
24
	"context"
25
	"encoding/json"
26
	"errors"
27
	"net"
28
	"strings"
29

30
	"google.golang.org/grpc/connectivity"
31
	"google.golang.org/grpc/credentials"
32
	"google.golang.org/grpc/internal"
33
	"google.golang.org/grpc/metadata"
34
	"google.golang.org/grpc/resolver"
35
	"google.golang.org/grpc/serviceconfig"
36
)
37

38
var (
39
	// m is a map from name to balancer builder.
40
	m = make(map[string]Builder)
41
)
42

43
// Register registers the balancer builder to the balancer map. b.Name
44
// (lowercased) will be used as the name registered with this builder.  If the
45
// Builder implements ConfigParser, ParseConfig will be called when new service
46
// configs are received by the resolver, and the result will be provided to the
47
// Balancer in UpdateClientConnState.
48
//
49
// NOTE: this function must only be called during initialization time (i.e. in
50
// an init() function), and is not thread-safe. If multiple Balancers are
51
// registered with the same name, the one registered last will take effect.
52
func Register(b Builder) {
53
	m[strings.ToLower(b.Name())] = b
54
}
55

56
// unregisterForTesting deletes the balancer with the given name from the
57
// balancer map.
58
//
59
// This function is not thread-safe.
60
func unregisterForTesting(name string) {
61
	delete(m, name)
62
}
63

64
func init() {
65
	internal.BalancerUnregister = unregisterForTesting
66
}
67

68
// Get returns the resolver builder registered with the given name.
69
// Note that the compare is done in a case-insensitive fashion.
70
// If no builder is register with the name, nil will be returned.
71
func Get(name string) Builder {
72
	if b, ok := m[strings.ToLower(name)]; ok {
73
		return b
74
	}
75
	return nil
76
}
77

78
// SubConn represents a gRPC sub connection.
79
// Each sub connection contains a list of addresses. gRPC will
80
// try to connect to them (in sequence), and stop trying the
81
// remainder once one connection is successful.
82
//
83
// The reconnect backoff will be applied on the list, not a single address.
84
// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
85
//
86
// All SubConns start in IDLE, and will not try to connect. To trigger
87
// the connecting, Balancers must call Connect.
88
// When the connection encounters an error, it will reconnect immediately.
89
// When the connection becomes IDLE, it will not reconnect unless Connect is
90
// called.
91
//
92
// This interface is to be implemented by gRPC. Users should not need a
93
// brand new implementation of this interface. For the situations like
94
// testing, the new implementation should embed this interface. This allows
95
// gRPC to add new methods to this interface.
96
type SubConn interface {
97
	// UpdateAddresses updates the addresses used in this SubConn.
98
	// gRPC checks if currently-connected address is still in the new list.
99
	// If it's in the list, the connection will be kept.
100
	// If it's not in the list, the connection will gracefully closed, and
101
	// a new connection will be created.
102
	//
103
	// This will trigger a state transition for the SubConn.
104
	UpdateAddresses([]resolver.Address)
105
	// Connect starts the connecting for this SubConn.
106
	Connect()
107
}
108

109
// NewSubConnOptions contains options to create new SubConn.
110
type NewSubConnOptions struct {
111
	// CredsBundle is the credentials bundle that will be used in the created
112
	// SubConn. If it's nil, the original creds from grpc DialOptions will be
113
	// used.
114
	//
115
	// Deprecated: Use the Attributes field in resolver.Address to pass
116
	// arbitrary data to the credential handshaker.
117
	CredsBundle credentials.Bundle
118
	// HealthCheckEnabled indicates whether health check service should be
119
	// enabled on this SubConn
120
	HealthCheckEnabled bool
121
}
122

123
// State contains the balancer's state relevant to the gRPC ClientConn.
124
type State struct {
125
	// State contains the connectivity state of the balancer, which is used to
126
	// determine the state of the ClientConn.
127
	ConnectivityState connectivity.State
128
	// Picker is used to choose connections (SubConns) for RPCs.
129
	Picker Picker
130
}
131

132
// ClientConn represents a gRPC ClientConn.
133
//
134
// This interface is to be implemented by gRPC. Users should not need a
135
// brand new implementation of this interface. For the situations like
136
// testing, the new implementation should embed this interface. This allows
137
// gRPC to add new methods to this interface.
138
type ClientConn interface {
139
	// NewSubConn is called by balancer to create a new SubConn.
140
	// It doesn't block and wait for the connections to be established.
141
	// Behaviors of the SubConn can be controlled by options.
142
	NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
143
	// RemoveSubConn removes the SubConn from ClientConn.
144
	// The SubConn will be shutdown.
145
	RemoveSubConn(SubConn)
146

147
	// UpdateState notifies gRPC that the balancer's internal state has
148
	// changed.
149
	//
150
	// gRPC will update the connectivity state of the ClientConn, and will call
151
	// Pick on the new Picker to pick new SubConns.
152
	UpdateState(State)
153

154
	// ResolveNow is called by balancer to notify gRPC to do a name resolving.
155
	ResolveNow(resolver.ResolveNowOptions)
156

157
	// Target returns the dial target for this ClientConn.
158
	//
159
	// Deprecated: Use the Target field in the BuildOptions instead.
160
	Target() string
161
}
162

163
// BuildOptions contains additional information for Build.
164
type BuildOptions struct {
165
	// DialCreds is the transport credential the Balancer implementation can
166
	// use to dial to a remote load balancer server. The Balancer implementations
167
	// can ignore this if it does not need to talk to another party securely.
168
	DialCreds credentials.TransportCredentials
169
	// CredsBundle is the credentials bundle that the Balancer can use.
170
	CredsBundle credentials.Bundle
171
	// Dialer is the custom dialer the Balancer implementation can use to dial
172
	// to a remote load balancer server. The Balancer implementations
173
	// can ignore this if it doesn't need to talk to remote balancer.
174
	Dialer func(context.Context, string) (net.Conn, error)
175
	// ChannelzParentID is the entity parent's channelz unique identification number.
176
	ChannelzParentID int64
177
	// CustomUserAgent is the custom user agent set on the parent ClientConn.
178
	// The balancer should set the same custom user agent if it creates a
179
	// ClientConn.
180
	CustomUserAgent string
181
	// Target contains the parsed address info of the dial target. It is the same resolver.Target as
182
	// passed to the resolver.
183
	// See the documentation for the resolver.Target type for details about what it contains.
184
	Target resolver.Target
185
}
186

187
// Builder creates a balancer.
188
type Builder interface {
189
	// Build creates a new balancer with the ClientConn.
190
	Build(cc ClientConn, opts BuildOptions) Balancer
191
	// Name returns the name of balancers built by this builder.
192
	// It will be used to pick balancers (for example in service config).
193
	Name() string
194
}
195

196
// ConfigParser parses load balancer configs.
197
type ConfigParser interface {
198
	// ParseConfig parses the JSON load balancer config provided into an
199
	// internal form or returns an error if the config is invalid.  For future
200
	// compatibility reasons, unknown fields in the config should be ignored.
201
	ParseConfig(LoadBalancingConfigJSON json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
202
}
203

204
// PickInfo contains additional information for the Pick operation.
205
type PickInfo struct {
206
	// FullMethodName is the method name that NewClientStream() is called
207
	// with. The canonical format is /service/Method.
208
	FullMethodName string
209
	// Ctx is the RPC's context, and may contain relevant RPC-level information
210
	// like the outgoing header metadata.
211
	Ctx context.Context
212
}
213

214
// DoneInfo contains additional information for done.
215
type DoneInfo struct {
216
	// Err is the rpc error the RPC finished with. It could be nil.
217
	Err error
218
	// Trailer contains the metadata from the RPC's trailer, if present.
219
	Trailer metadata.MD
220
	// BytesSent indicates if any bytes have been sent to the server.
221
	BytesSent bool
222
	// BytesReceived indicates if any byte has been received from the server.
223
	BytesReceived bool
224
	// ServerLoad is the load received from server. It's usually sent as part of
225
	// trailing metadata.
226
	//
227
	// The only supported type now is *orca_v1.LoadReport.
228
	ServerLoad interface{}
229
}
230

231
var (
232
	// ErrNoSubConnAvailable indicates no SubConn is available for pick().
233
	// gRPC will block the RPC until a new picker is available via UpdateState().
234
	ErrNoSubConnAvailable = errors.New("no SubConn is available")
235
	// ErrTransientFailure indicates all SubConns are in TransientFailure.
236
	// WaitForReady RPCs will block, non-WaitForReady RPCs will fail.
237
	//
238
	// Deprecated: return an appropriate error based on the last resolution or
239
	// connection attempt instead.  The behavior is the same for any non-gRPC
240
	// status error.
241
	ErrTransientFailure = errors.New("all SubConns are in TransientFailure")
242
)
243

244
// PickResult contains information related to a connection chosen for an RPC.
245
type PickResult struct {
246
	// SubConn is the connection to use for this pick, if its state is Ready.
247
	// If the state is not Ready, gRPC will block the RPC until a new Picker is
248
	// provided by the balancer (using ClientConn.UpdateState).  The SubConn
249
	// must be one returned by ClientConn.NewSubConn.
250
	SubConn SubConn
251

252
	// Done is called when the RPC is completed.  If the SubConn is not ready,
253
	// this will be called with a nil parameter.  If the SubConn is not a valid
254
	// type, Done may not be called.  May be nil if the balancer does not wish
255
	// to be notified when the RPC completes.
256
	Done func(DoneInfo)
257
}
258

259
// TransientFailureError returns e.  It exists for backward compatibility and
260
// will be deleted soon.
261
//
262
// Deprecated: no longer necessary, picker errors are treated this way by
263
// default.
264
func TransientFailureError(e error) error { return e }
265

266
// Picker is used by gRPC to pick a SubConn to send an RPC.
267
// Balancer is expected to generate a new picker from its snapshot every time its
268
// internal state has changed.
269
//
270
// The pickers used by gRPC can be updated by ClientConn.UpdateState().
271
type Picker interface {
272
	// Pick returns the connection to use for this RPC and related information.
273
	//
274
	// Pick should not block.  If the balancer needs to do I/O or any blocking
275
	// or time-consuming work to service this call, it should return
276
	// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
277
	// the Picker is updated (using ClientConn.UpdateState).
278
	//
279
	// If an error is returned:
280
	//
281
	// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
282
	//   Picker is provided by the balancer (using ClientConn.UpdateState).
283
	//
284
	// - If the error is a status error (implemented by the grpc/status
285
	//   package), gRPC will terminate the RPC with the code and message
286
	//   provided.
287
	//
288
	// - For all other errors, wait for ready RPCs will wait, but non-wait for
289
	//   ready RPCs will be terminated with this error's Error() string and
290
	//   status code Unavailable.
291
	Pick(info PickInfo) (PickResult, error)
292
}
293

294
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
295
// the connectivity states.
296
//
297
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
298
//
299
// UpdateClientConnState, ResolverError, UpdateSubConnState, and Close are
300
// guaranteed to be called synchronously from the same goroutine.  There's no
301
// guarantee on picker.Pick, it may be called anytime.
302
type Balancer interface {
303
	// UpdateClientConnState is called by gRPC when the state of the ClientConn
304
	// changes.  If the error returned is ErrBadResolverState, the ClientConn
305
	// will begin calling ResolveNow on the active name resolver with
306
	// exponential backoff until a subsequent call to UpdateClientConnState
307
	// returns a nil error.  Any other errors are currently ignored.
308
	UpdateClientConnState(ClientConnState) error
309
	// ResolverError is called by gRPC when the name resolver reports an error.
310
	ResolverError(error)
311
	// UpdateSubConnState is called by gRPC when the state of a SubConn
312
	// changes.
313
	UpdateSubConnState(SubConn, SubConnState)
314
	// Close closes the balancer. The balancer is not required to call
315
	// ClientConn.RemoveSubConn for its existing SubConns.
316
	Close()
317
}
318

319
// SubConnState describes the state of a SubConn.
320
type SubConnState struct {
321
	// ConnectivityState is the connectivity state of the SubConn.
322
	ConnectivityState connectivity.State
323
	// ConnectionError is set if the ConnectivityState is TransientFailure,
324
	// describing the reason the SubConn failed.  Otherwise, it is nil.
325
	ConnectionError error
326
}
327

328
// ClientConnState describes the state of a ClientConn relevant to the
329
// balancer.
330
type ClientConnState struct {
331
	ResolverState resolver.State
332
	// The parsed load balancing configuration returned by the builder's
333
	// ParseConfig method, if implemented.
334
	BalancerConfig serviceconfig.LoadBalancingConfig
335
}
336

337
// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
338
// problem with the provided name resolver data.
339
var ErrBadResolverState = errors.New("bad resolver state")
340

341
// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
342
// and returns one aggregated connectivity state.
343
//
344
// It's not thread safe.
345
type ConnectivityStateEvaluator struct {
346
	numReady      uint64 // Number of addrConns in ready state.
347
	numConnecting uint64 // Number of addrConns in connecting state.
348
}
349

350
// RecordTransition records state change happening in subConn and based on that
351
// it evaluates what aggregated state should be.
352
//
353
//  - If at least one SubConn in Ready, the aggregated state is Ready;
354
//  - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
355
//  - Else the aggregated state is TransientFailure.
356
//
357
// Idle and Shutdown are not considered.
358
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
359
	// Update counters.
360
	for idx, state := range []connectivity.State{oldState, newState} {
361
		updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
362
		switch state {
363
		case connectivity.Ready:
364
			cse.numReady += updateVal
365
		case connectivity.Connecting:
366
			cse.numConnecting += updateVal
367
		}
368
	}
369

370
	// Evaluate.
371
	if cse.numReady > 0 {
372
		return connectivity.Ready
373
	}
374
	if cse.numConnecting > 0 {
375
		return connectivity.Connecting
376
	}
377
	return connectivity.TransientFailure
378
}
379

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.