cubefs

Форк
0
1596 строк · 49.8 Кб
1
/*
2
 *
3
 * Copyright 2014 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package grpc
20

21
import (
22
	"context"
23
	"errors"
24
	"fmt"
25
	"math"
26
	"reflect"
27
	"strings"
28
	"sync"
29
	"sync/atomic"
30
	"time"
31

32
	"google.golang.org/grpc/balancer"
33
	"google.golang.org/grpc/balancer/base"
34
	"google.golang.org/grpc/codes"
35
	"google.golang.org/grpc/connectivity"
36
	"google.golang.org/grpc/credentials"
37
	"google.golang.org/grpc/internal/backoff"
38
	"google.golang.org/grpc/internal/channelz"
39
	"google.golang.org/grpc/internal/grpcsync"
40
	"google.golang.org/grpc/internal/grpcutil"
41
	iresolver "google.golang.org/grpc/internal/resolver"
42
	"google.golang.org/grpc/internal/transport"
43
	"google.golang.org/grpc/keepalive"
44
	"google.golang.org/grpc/resolver"
45
	"google.golang.org/grpc/serviceconfig"
46
	"google.golang.org/grpc/status"
47

48
	_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.
49
	_ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.
50
	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51
	_ "google.golang.org/grpc/internal/resolver/unix"        // To register unix resolver.
52
)
53

54
const (
55
	// minimum time to give a connection to complete
56
	minConnectTimeout = 20 * time.Second
57
	// must match grpclbName in grpclb/grpclb.go
58
	grpclbName = "grpclb"
59
)
60

61
var (
62
	// ErrClientConnClosing indicates that the operation is illegal because
63
	// the ClientConn is closing.
64
	//
65
	// Deprecated: this error should not be relied upon by users; use the status
66
	// code of Canceled instead.
67
	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
68
	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
69
	errConnDrain = errors.New("grpc: the connection is drained")
70
	// errConnClosing indicates that the connection is closing.
71
	errConnClosing = errors.New("grpc: the connection is closing")
72
	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
73
	// service config.
74
	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
75
)
76

77
// The following errors are returned from Dial and DialContext
78
var (
79
	// errNoTransportSecurity indicates that there is no transport security
80
	// being set for ClientConn. Users should either set one or explicitly
81
	// call WithInsecure DialOption to disable security.
82
	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
83
	// errTransportCredsAndBundle indicates that creds bundle is used together
84
	// with other individual Transport Credentials.
85
	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
86
	// errTransportCredentialsMissing indicates that users want to transmit security
87
	// information (e.g., OAuth2 token) which requires secure connection on an insecure
88
	// connection.
89
	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
90
	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
91
	// and grpc.WithInsecure() are both called for a connection.
92
	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
93
)
94

95
const (
96
	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
97
	defaultClientMaxSendMessageSize    = math.MaxInt32
98
	// http2IOBufSize specifies the buffer size for sending frames.
99
	defaultWriteBufSize = 32 * 1024
100
	defaultReadBufSize  = 32 * 1024
101
)
102

103
// Dial creates a client connection to the given target.
104
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
105
	return DialContext(context.Background(), target, opts...)
106
}
107

108
type defaultConfigSelector struct {
109
	sc *ServiceConfig
110
}
111

112
func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
113
	return &iresolver.RPCConfig{
114
		Context:      rpcInfo.Context,
115
		MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
116
	}, nil
117
}
118

119
// DialContext creates a client connection to the given target. By default, it's
120
// a non-blocking dial (the function won't wait for connections to be
121
// established, and connecting happens in the background). To make it a blocking
122
// dial, use WithBlock() dial option.
123
//
124
// In the non-blocking case, the ctx does not act against the connection. It
125
// only controls the setup steps.
126
//
127
// In the blocking case, ctx can be used to cancel or expire the pending
128
// connection. Once this function returns, the cancellation and expiration of
129
// ctx will be noop. Users should call ClientConn.Close to terminate all the
130
// pending operations after this function returns.
131
//
132
// The target name syntax is defined in
133
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
134
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
135
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
136
	cc := &ClientConn{
137
		target:            target,
138
		csMgr:             &connectivityStateManager{},
139
		conns:             make(map[*addrConn]struct{}),
140
		dopts:             defaultDialOptions(),
141
		blockingpicker:    newPickerWrapper(),
142
		czData:            new(channelzData),
143
		firstResolveEvent: grpcsync.NewEvent(),
144
	}
145
	cc.retryThrottler.Store((*retryThrottler)(nil))
146
	cc.ctx, cc.cancel = context.WithCancel(context.Background())
147

148
	for _, opt := range opts {
149
		opt.apply(&cc.dopts)
150
	}
151

152
	chainUnaryClientInterceptors(cc)
153
	chainStreamClientInterceptors(cc)
154

155
	defer func() {
156
		if err != nil {
157
			cc.Close()
158
		}
159
	}()
160

161
	if channelz.IsOn() {
162
		if cc.dopts.channelzParentID != 0 {
163
			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
164
			channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
165
				Desc:     "Channel Created",
166
				Severity: channelz.CtInfo,
167
				Parent: &channelz.TraceEventDesc{
168
					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
169
					Severity: channelz.CtInfo,
170
				},
171
			})
172
		} else {
173
			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
174
			channelz.Info(logger, cc.channelzID, "Channel Created")
175
		}
176
		cc.csMgr.channelzID = cc.channelzID
177
	}
178

179
	if !cc.dopts.insecure {
180
		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
181
			return nil, errNoTransportSecurity
182
		}
183
		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
184
			return nil, errTransportCredsAndBundle
185
		}
186
	} else {
187
		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
188
			return nil, errCredentialsConflict
189
		}
190
		for _, cd := range cc.dopts.copts.PerRPCCredentials {
191
			if cd.RequireTransportSecurity() {
192
				return nil, errTransportCredentialsMissing
193
			}
194
		}
195
	}
196

197
	if cc.dopts.defaultServiceConfigRawJSON != nil {
198
		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
199
		if scpr.Err != nil {
200
			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
201
		}
202
		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
203
	}
204
	cc.mkp = cc.dopts.copts.KeepaliveParams
205

206
	if cc.dopts.copts.UserAgent != "" {
207
		cc.dopts.copts.UserAgent += " " + grpcUA
208
	} else {
209
		cc.dopts.copts.UserAgent = grpcUA
210
	}
211

212
	if cc.dopts.timeout > 0 {
213
		var cancel context.CancelFunc
214
		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
215
		defer cancel()
216
	}
217
	defer func() {
218
		select {
219
		case <-ctx.Done():
220
			switch {
221
			case ctx.Err() == err:
222
				conn = nil
223
			case err == nil || !cc.dopts.returnLastError:
224
				conn, err = nil, ctx.Err()
225
			default:
226
				conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
227
			}
228
		default:
229
		}
230
	}()
231

232
	scSet := false
233
	if cc.dopts.scChan != nil {
234
		// Try to get an initial service config.
235
		select {
236
		case sc, ok := <-cc.dopts.scChan:
237
			if ok {
238
				cc.sc = &sc
239
				cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
240
				scSet = true
241
			}
242
		default:
243
		}
244
	}
245
	if cc.dopts.bs == nil {
246
		cc.dopts.bs = backoff.DefaultExponential
247
	}
248

249
	// Determine the resolver to use.
250
	cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
251
	channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
252
	resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
253
	if resolverBuilder == nil {
254
		// If resolver builder is still nil, the parsed target's scheme is
255
		// not registered. Fallback to default resolver and set Endpoint to
256
		// the original target.
257
		channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
258
		cc.parsedTarget = resolver.Target{
259
			Scheme:   resolver.GetDefaultScheme(),
260
			Endpoint: target,
261
		}
262
		resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
263
		if resolverBuilder == nil {
264
			return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
265
		}
266
	}
267

268
	creds := cc.dopts.copts.TransportCredentials
269
	if creds != nil && creds.Info().ServerName != "" {
270
		cc.authority = creds.Info().ServerName
271
	} else if cc.dopts.insecure && cc.dopts.authority != "" {
272
		cc.authority = cc.dopts.authority
273
	} else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") {
274
		cc.authority = "localhost"
275
	} else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {
276
		cc.authority = "localhost" + cc.parsedTarget.Endpoint
277
	} else {
278
		// Use endpoint from "scheme://authority/endpoint" as the default
279
		// authority for ClientConn.
280
		cc.authority = cc.parsedTarget.Endpoint
281
	}
282

283
	if cc.dopts.scChan != nil && !scSet {
284
		// Blocking wait for the initial service config.
285
		select {
286
		case sc, ok := <-cc.dopts.scChan:
287
			if ok {
288
				cc.sc = &sc
289
				cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
290
			}
291
		case <-ctx.Done():
292
			return nil, ctx.Err()
293
		}
294
	}
295
	if cc.dopts.scChan != nil {
296
		go cc.scWatcher()
297
	}
298

299
	var credsClone credentials.TransportCredentials
300
	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
301
		credsClone = creds.Clone()
302
	}
303
	cc.balancerBuildOpts = balancer.BuildOptions{
304
		DialCreds:        credsClone,
305
		CredsBundle:      cc.dopts.copts.CredsBundle,
306
		Dialer:           cc.dopts.copts.Dialer,
307
		CustomUserAgent:  cc.dopts.copts.UserAgent,
308
		ChannelzParentID: cc.channelzID,
309
		Target:           cc.parsedTarget,
310
	}
311

312
	// Build the resolver.
313
	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
314
	if err != nil {
315
		return nil, fmt.Errorf("failed to build resolver: %v", err)
316
	}
317
	cc.mu.Lock()
318
	cc.resolverWrapper = rWrapper
319
	cc.mu.Unlock()
320

321
	// A blocking dial blocks until the clientConn is ready.
322
	if cc.dopts.block {
323
		for {
324
			s := cc.GetState()
325
			if s == connectivity.Ready {
326
				break
327
			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
328
				if err = cc.connectionError(); err != nil {
329
					terr, ok := err.(interface {
330
						Temporary() bool
331
					})
332
					if ok && !terr.Temporary() {
333
						return nil, err
334
					}
335
				}
336
			}
337
			if !cc.WaitForStateChange(ctx, s) {
338
				// ctx got timeout or canceled.
339
				if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
340
					return nil, err
341
				}
342
				return nil, ctx.Err()
343
			}
344
		}
345
	}
346

347
	return cc, nil
348
}
349

350
// chainUnaryClientInterceptors chains all unary client interceptors into one.
351
func chainUnaryClientInterceptors(cc *ClientConn) {
352
	interceptors := cc.dopts.chainUnaryInts
353
	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
354
	// be executed before any other chained interceptors.
355
	if cc.dopts.unaryInt != nil {
356
		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
357
	}
358
	var chainedInt UnaryClientInterceptor
359
	if len(interceptors) == 0 {
360
		chainedInt = nil
361
	} else if len(interceptors) == 1 {
362
		chainedInt = interceptors[0]
363
	} else {
364
		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
365
			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
366
		}
367
	}
368
	cc.dopts.unaryInt = chainedInt
369
}
370

371
// getChainUnaryInvoker recursively generate the chained unary invoker.
372
func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
373
	if curr == len(interceptors)-1 {
374
		return finalInvoker
375
	}
376
	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
377
		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
378
	}
379
}
380

381
// chainStreamClientInterceptors chains all stream client interceptors into one.
382
func chainStreamClientInterceptors(cc *ClientConn) {
383
	interceptors := cc.dopts.chainStreamInts
384
	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
385
	// be executed before any other chained interceptors.
386
	if cc.dopts.streamInt != nil {
387
		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
388
	}
389
	var chainedInt StreamClientInterceptor
390
	if len(interceptors) == 0 {
391
		chainedInt = nil
392
	} else if len(interceptors) == 1 {
393
		chainedInt = interceptors[0]
394
	} else {
395
		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
396
			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
397
		}
398
	}
399
	cc.dopts.streamInt = chainedInt
400
}
401

402
// getChainStreamer recursively generate the chained client stream constructor.
403
func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
404
	if curr == len(interceptors)-1 {
405
		return finalStreamer
406
	}
407
	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
408
		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
409
	}
410
}
411

412
// connectivityStateManager keeps the connectivity.State of ClientConn.
413
// This struct will eventually be exported so the balancers can access it.
414
type connectivityStateManager struct {
415
	mu         sync.Mutex
416
	state      connectivity.State
417
	notifyChan chan struct{}
418
	channelzID int64
419
}
420

421
// updateState updates the connectivity.State of ClientConn.
422
// If there's a change it notifies goroutines waiting on state change to
423
// happen.
424
func (csm *connectivityStateManager) updateState(state connectivity.State) {
425
	csm.mu.Lock()
426
	defer csm.mu.Unlock()
427
	if csm.state == connectivity.Shutdown {
428
		return
429
	}
430
	if csm.state == state {
431
		return
432
	}
433
	csm.state = state
434
	channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
435
	if csm.notifyChan != nil {
436
		// There are other goroutines waiting on this channel.
437
		close(csm.notifyChan)
438
		csm.notifyChan = nil
439
	}
440
}
441

442
func (csm *connectivityStateManager) getState() connectivity.State {
443
	csm.mu.Lock()
444
	defer csm.mu.Unlock()
445
	return csm.state
446
}
447

448
func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
449
	csm.mu.Lock()
450
	defer csm.mu.Unlock()
451
	if csm.notifyChan == nil {
452
		csm.notifyChan = make(chan struct{})
453
	}
454
	return csm.notifyChan
455
}
456

457
// ClientConnInterface defines the functions clients need to perform unary and
458
// streaming RPCs.  It is implemented by *ClientConn, and is only intended to
459
// be referenced by generated code.
460
type ClientConnInterface interface {
461
	// Invoke performs a unary RPC and returns after the response is received
462
	// into reply.
463
	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
464
	// NewStream begins a streaming RPC.
465
	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
466
}
467

468
// Assert *ClientConn implements ClientConnInterface.
469
var _ ClientConnInterface = (*ClientConn)(nil)
470

471
// ClientConn represents a virtual connection to a conceptual endpoint, to
472
// perform RPCs.
473
//
474
// A ClientConn is free to have zero or more actual connections to the endpoint
475
// based on configuration, load, etc. It is also free to determine which actual
476
// endpoints to use and may change it every RPC, permitting client-side load
477
// balancing.
478
//
479
// A ClientConn encapsulates a range of functionality including name
480
// resolution, TCP connection establishment (with retries and backoff) and TLS
481
// handshakes. It also handles errors on established connections by
482
// re-resolving the name and reconnecting.
483
type ClientConn struct {
484
	ctx    context.Context
485
	cancel context.CancelFunc
486

487
	target       string
488
	parsedTarget resolver.Target
489
	authority    string
490
	dopts        dialOptions
491
	csMgr        *connectivityStateManager
492

493
	balancerBuildOpts balancer.BuildOptions
494
	blockingpicker    *pickerWrapper
495

496
	safeConfigSelector iresolver.SafeConfigSelector
497

498
	mu              sync.RWMutex
499
	resolverWrapper *ccResolverWrapper
500
	sc              *ServiceConfig
501
	conns           map[*addrConn]struct{}
502
	// Keepalive parameter can be updated if a GoAway is received.
503
	mkp             keepalive.ClientParameters
504
	curBalancerName string
505
	balancerWrapper *ccBalancerWrapper
506
	retryThrottler  atomic.Value
507

508
	firstResolveEvent *grpcsync.Event
509

510
	channelzID int64 // channelz unique identification number
511
	czData     *channelzData
512

513
	lceMu               sync.Mutex // protects lastConnectionError
514
	lastConnectionError error
515
}
516

517
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
518
// ctx expires. A true value is returned in former case and false in latter.
519
//
520
// Experimental
521
//
522
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
523
// later release.
524
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
525
	ch := cc.csMgr.getNotifyChan()
526
	if cc.csMgr.getState() != sourceState {
527
		return true
528
	}
529
	select {
530
	case <-ctx.Done():
531
		return false
532
	case <-ch:
533
		return true
534
	}
535
}
536

537
// GetState returns the connectivity.State of ClientConn.
538
//
539
// Experimental
540
//
541
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
542
// later release.
543
func (cc *ClientConn) GetState() connectivity.State {
544
	return cc.csMgr.getState()
545
}
546

547
func (cc *ClientConn) scWatcher() {
548
	for {
549
		select {
550
		case sc, ok := <-cc.dopts.scChan:
551
			if !ok {
552
				return
553
			}
554
			cc.mu.Lock()
555
			// TODO: load balance policy runtime change is ignored.
556
			// We may revisit this decision in the future.
557
			cc.sc = &sc
558
			cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
559
			cc.mu.Unlock()
560
		case <-cc.ctx.Done():
561
			return
562
		}
563
	}
564
}
565

566
// waitForResolvedAddrs blocks until the resolver has provided addresses or the
567
// context expires.  Returns nil unless the context expires first; otherwise
568
// returns a status error based on the context.
569
func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
570
	// This is on the RPC path, so we use a fast path to avoid the
571
	// more-expensive "select" below after the resolver has returned once.
572
	if cc.firstResolveEvent.HasFired() {
573
		return nil
574
	}
575
	select {
576
	case <-cc.firstResolveEvent.Done():
577
		return nil
578
	case <-ctx.Done():
579
		return status.FromContextError(ctx.Err()).Err()
580
	case <-cc.ctx.Done():
581
		return ErrClientConnClosing
582
	}
583
}
584

585
var emptyServiceConfig *ServiceConfig
586

587
func init() {
588
	cfg := parseServiceConfig("{}")
589
	if cfg.Err != nil {
590
		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
591
	}
592
	emptyServiceConfig = cfg.Config.(*ServiceConfig)
593
}
594

595
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
596
	if cc.sc != nil {
597
		cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
598
		return
599
	}
600
	if cc.dopts.defaultServiceConfig != nil {
601
		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
602
	} else {
603
		cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
604
	}
605
}
606

607
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
608
	defer cc.firstResolveEvent.Fire()
609
	cc.mu.Lock()
610
	// Check if the ClientConn is already closed. Some fields (e.g.
611
	// balancerWrapper) are set to nil when closing the ClientConn, and could
612
	// cause nil pointer panic if we don't have this check.
613
	if cc.conns == nil {
614
		cc.mu.Unlock()
615
		return nil
616
	}
617

618
	if err != nil {
619
		// May need to apply the initial service config in case the resolver
620
		// doesn't support service configs, or doesn't provide a service config
621
		// with the new addresses.
622
		cc.maybeApplyDefaultServiceConfig(nil)
623

624
		if cc.balancerWrapper != nil {
625
			cc.balancerWrapper.resolverError(err)
626
		}
627

628
		// No addresses are valid with err set; return early.
629
		cc.mu.Unlock()
630
		return balancer.ErrBadResolverState
631
	}
632

633
	var ret error
634
	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
635
		cc.maybeApplyDefaultServiceConfig(s.Addresses)
636
		// TODO: do we need to apply a failing LB policy if there is no
637
		// default, per the error handling design?
638
	} else {
639
		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
640
			configSelector := iresolver.GetConfigSelector(s)
641
			if configSelector != nil {
642
				if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
643
					channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
644
				}
645
			} else {
646
				configSelector = &defaultConfigSelector{sc}
647
			}
648
			cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
649
		} else {
650
			ret = balancer.ErrBadResolverState
651
			if cc.balancerWrapper == nil {
652
				var err error
653
				if s.ServiceConfig.Err != nil {
654
					err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
655
				} else {
656
					err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
657
				}
658
				cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
659
				cc.blockingpicker.updatePicker(base.NewErrPicker(err))
660
				cc.csMgr.updateState(connectivity.TransientFailure)
661
				cc.mu.Unlock()
662
				return ret
663
			}
664
		}
665
	}
666

667
	var balCfg serviceconfig.LoadBalancingConfig
668
	if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
669
		balCfg = cc.sc.lbConfig.cfg
670
	}
671

672
	cbn := cc.curBalancerName
673
	bw := cc.balancerWrapper
674
	cc.mu.Unlock()
675
	if cbn != grpclbName {
676
		// Filter any grpclb addresses since we don't have the grpclb balancer.
677
		for i := 0; i < len(s.Addresses); {
678
			if s.Addresses[i].Type == resolver.GRPCLB {
679
				copy(s.Addresses[i:], s.Addresses[i+1:])
680
				s.Addresses = s.Addresses[:len(s.Addresses)-1]
681
				continue
682
			}
683
			i++
684
		}
685
	}
686
	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
687
	if ret == nil {
688
		ret = uccsErr // prefer ErrBadResolver state since any other error is
689
		// currently meaningless to the caller.
690
	}
691
	return ret
692
}
693

694
// switchBalancer starts the switching from current balancer to the balancer
695
// with the given name.
696
//
697
// It will NOT send the current address list to the new balancer. If needed,
698
// caller of this function should send address list to the new balancer after
699
// this function returns.
700
//
701
// Caller must hold cc.mu.
702
func (cc *ClientConn) switchBalancer(name string) {
703
	if strings.EqualFold(cc.curBalancerName, name) {
704
		return
705
	}
706

707
	channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
708
	if cc.dopts.balancerBuilder != nil {
709
		channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
710
		return
711
	}
712
	if cc.balancerWrapper != nil {
713
		cc.balancerWrapper.close()
714
	}
715

716
	builder := balancer.Get(name)
717
	if builder == nil {
718
		channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
719
		channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
720
		builder = newPickfirstBuilder()
721
	} else {
722
		channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
723
	}
724

725
	cc.curBalancerName = builder.Name()
726
	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
727
}
728

729
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
730
	cc.mu.Lock()
731
	if cc.conns == nil {
732
		cc.mu.Unlock()
733
		return
734
	}
735
	// TODO(bar switching) send updates to all balancer wrappers when balancer
736
	// gracefully switching is supported.
737
	cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
738
	cc.mu.Unlock()
739
}
740

741
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
742
//
743
// Caller needs to make sure len(addrs) > 0.
744
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
745
	ac := &addrConn{
746
		state:        connectivity.Idle,
747
		cc:           cc,
748
		addrs:        addrs,
749
		scopts:       opts,
750
		dopts:        cc.dopts,
751
		czData:       new(channelzData),
752
		resetBackoff: make(chan struct{}),
753
	}
754
	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
755
	// Track ac in cc. This needs to be done before any getTransport(...) is called.
756
	cc.mu.Lock()
757
	if cc.conns == nil {
758
		cc.mu.Unlock()
759
		return nil, ErrClientConnClosing
760
	}
761
	if channelz.IsOn() {
762
		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
763
		channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
764
			Desc:     "Subchannel Created",
765
			Severity: channelz.CtInfo,
766
			Parent: &channelz.TraceEventDesc{
767
				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
768
				Severity: channelz.CtInfo,
769
			},
770
		})
771
	}
772
	cc.conns[ac] = struct{}{}
773
	cc.mu.Unlock()
774
	return ac, nil
775
}
776

777
// removeAddrConn removes the addrConn in the subConn from clientConn.
778
// It also tears down the ac with the given error.
779
func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
780
	cc.mu.Lock()
781
	if cc.conns == nil {
782
		cc.mu.Unlock()
783
		return
784
	}
785
	delete(cc.conns, ac)
786
	cc.mu.Unlock()
787
	ac.tearDown(err)
788
}
789

790
func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
791
	return &channelz.ChannelInternalMetric{
792
		State:                    cc.GetState(),
793
		Target:                   cc.target,
794
		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
795
		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
796
		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
797
		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
798
	}
799
}
800

801
// Target returns the target string of the ClientConn.
802
//
803
// Experimental
804
//
805
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
806
// later release.
807
func (cc *ClientConn) Target() string {
808
	return cc.target
809
}
810

811
func (cc *ClientConn) incrCallsStarted() {
812
	atomic.AddInt64(&cc.czData.callsStarted, 1)
813
	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
814
}
815

816
func (cc *ClientConn) incrCallsSucceeded() {
817
	atomic.AddInt64(&cc.czData.callsSucceeded, 1)
818
}
819

820
func (cc *ClientConn) incrCallsFailed() {
821
	atomic.AddInt64(&cc.czData.callsFailed, 1)
822
}
823

824
// connect starts creating a transport.
825
// It does nothing if the ac is not IDLE.
826
// TODO(bar) Move this to the addrConn section.
827
func (ac *addrConn) connect() error {
828
	ac.mu.Lock()
829
	if ac.state == connectivity.Shutdown {
830
		ac.mu.Unlock()
831
		return errConnClosing
832
	}
833
	if ac.state != connectivity.Idle {
834
		ac.mu.Unlock()
835
		return nil
836
	}
837
	// Update connectivity state within the lock to prevent subsequent or
838
	// concurrent calls from resetting the transport more than once.
839
	ac.updateConnectivityState(connectivity.Connecting, nil)
840
	ac.mu.Unlock()
841

842
	// Start a goroutine connecting to the server asynchronously.
843
	go ac.resetTransport()
844
	return nil
845
}
846

847
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
848
//
849
// If ac is Connecting, it returns false. The caller should tear down the ac and
850
// create a new one. Note that the backoff will be reset when this happens.
851
//
852
// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
853
// addresses will be picked up by retry in the next iteration after backoff.
854
//
855
// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
856
//
857
// If ac is Ready, it checks whether current connected address of ac is in the
858
// new addrs list.
859
//  - If true, it updates ac.addrs and returns true. The ac will keep using
860
//    the existing connection.
861
//  - If false, it does nothing and returns false.
862
func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
863
	ac.mu.Lock()
864
	defer ac.mu.Unlock()
865
	channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
866
	if ac.state == connectivity.Shutdown ||
867
		ac.state == connectivity.TransientFailure ||
868
		ac.state == connectivity.Idle {
869
		ac.addrs = addrs
870
		return true
871
	}
872

873
	if ac.state == connectivity.Connecting {
874
		return false
875
	}
876

877
	// ac.state is Ready, try to find the connected address.
878
	var curAddrFound bool
879
	for _, a := range addrs {
880
		if reflect.DeepEqual(ac.curAddr, a) {
881
			curAddrFound = true
882
			break
883
		}
884
	}
885
	channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
886
	if curAddrFound {
887
		ac.addrs = addrs
888
	}
889

890
	return curAddrFound
891
}
892

893
func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
894
	if sc == nil {
895
		return MethodConfig{}
896
	}
897
	if m, ok := sc.Methods[method]; ok {
898
		return m
899
	}
900
	i := strings.LastIndex(method, "/")
901
	if m, ok := sc.Methods[method[:i+1]]; ok {
902
		return m
903
	}
904
	return sc.Methods[""]
905
}
906

907
// GetMethodConfig gets the method config of the input method.
908
// If there's an exact match for input method (i.e. /service/method), we return
909
// the corresponding MethodConfig.
910
// If there isn't an exact match for the input method, we look for the service's default
911
// config under the service (i.e /service/) and then for the default for all services (empty string).
912
//
913
// If there is a default MethodConfig for the service, we return it.
914
// Otherwise, we return an empty MethodConfig.
915
func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
916
	// TODO: Avoid the locking here.
917
	cc.mu.RLock()
918
	defer cc.mu.RUnlock()
919
	return getMethodConfig(cc.sc, method)
920
}
921

922
func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
923
	cc.mu.RLock()
924
	defer cc.mu.RUnlock()
925
	if cc.sc == nil {
926
		return nil
927
	}
928
	return cc.sc.healthCheckConfig
929
}
930

931
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
932
	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
933
		Ctx:            ctx,
934
		FullMethodName: method,
935
	})
936
	if err != nil {
937
		return nil, nil, toRPCErr(err)
938
	}
939
	return t, done, nil
940
}
941

942
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
943
	if sc == nil {
944
		// should never reach here.
945
		return
946
	}
947
	cc.sc = sc
948
	if configSelector != nil {
949
		cc.safeConfigSelector.UpdateConfigSelector(configSelector)
950
	}
951

952
	if cc.sc.retryThrottling != nil {
953
		newThrottler := &retryThrottler{
954
			tokens: cc.sc.retryThrottling.MaxTokens,
955
			max:    cc.sc.retryThrottling.MaxTokens,
956
			thresh: cc.sc.retryThrottling.MaxTokens / 2,
957
			ratio:  cc.sc.retryThrottling.TokenRatio,
958
		}
959
		cc.retryThrottler.Store(newThrottler)
960
	} else {
961
		cc.retryThrottler.Store((*retryThrottler)(nil))
962
	}
963

964
	if cc.dopts.balancerBuilder == nil {
965
		// Only look at balancer types and switch balancer if balancer dial
966
		// option is not set.
967
		var newBalancerName string
968
		if cc.sc != nil && cc.sc.lbConfig != nil {
969
			newBalancerName = cc.sc.lbConfig.name
970
		} else {
971
			var isGRPCLB bool
972
			for _, a := range addrs {
973
				if a.Type == resolver.GRPCLB {
974
					isGRPCLB = true
975
					break
976
				}
977
			}
978
			if isGRPCLB {
979
				newBalancerName = grpclbName
980
			} else if cc.sc != nil && cc.sc.LB != nil {
981
				newBalancerName = *cc.sc.LB
982
			} else {
983
				newBalancerName = PickFirstBalancerName
984
			}
985
		}
986
		cc.switchBalancer(newBalancerName)
987
	} else if cc.balancerWrapper == nil {
988
		// Balancer dial option was set, and this is the first time handling
989
		// resolved addresses. Build a balancer with dopts.balancerBuilder.
990
		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
991
		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
992
	}
993
}
994

995
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
996
	cc.mu.RLock()
997
	r := cc.resolverWrapper
998
	cc.mu.RUnlock()
999
	if r == nil {
1000
		return
1001
	}
1002
	go r.resolveNow(o)
1003
}
1004

1005
// ResetConnectBackoff wakes up all subchannels in transient failure and causes
1006
// them to attempt another connection immediately.  It also resets the backoff
1007
// times used for subsequent attempts regardless of the current state.
1008
//
1009
// In general, this function should not be used.  Typical service or network
1010
// outages result in a reasonable client reconnection strategy by default.
1011
// However, if a previously unavailable network becomes available, this may be
1012
// used to trigger an immediate reconnect.
1013
//
1014
// Experimental
1015
//
1016
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1017
// later release.
1018
func (cc *ClientConn) ResetConnectBackoff() {
1019
	cc.mu.Lock()
1020
	conns := cc.conns
1021
	cc.mu.Unlock()
1022
	for ac := range conns {
1023
		ac.resetConnectBackoff()
1024
	}
1025
}
1026

1027
// Close tears down the ClientConn and all underlying connections.
1028
func (cc *ClientConn) Close() error {
1029
	defer cc.cancel()
1030

1031
	cc.mu.Lock()
1032
	if cc.conns == nil {
1033
		cc.mu.Unlock()
1034
		return ErrClientConnClosing
1035
	}
1036
	conns := cc.conns
1037
	cc.conns = nil
1038
	cc.csMgr.updateState(connectivity.Shutdown)
1039

1040
	rWrapper := cc.resolverWrapper
1041
	cc.resolverWrapper = nil
1042
	bWrapper := cc.balancerWrapper
1043
	cc.balancerWrapper = nil
1044
	cc.mu.Unlock()
1045

1046
	cc.blockingpicker.close()
1047

1048
	if rWrapper != nil {
1049
		rWrapper.close()
1050
	}
1051
	if bWrapper != nil {
1052
		bWrapper.close()
1053
	}
1054

1055
	for ac := range conns {
1056
		ac.tearDown(ErrClientConnClosing)
1057
	}
1058
	if channelz.IsOn() {
1059
		ted := &channelz.TraceEventDesc{
1060
			Desc:     "Channel Deleted",
1061
			Severity: channelz.CtInfo,
1062
		}
1063
		if cc.dopts.channelzParentID != 0 {
1064
			ted.Parent = &channelz.TraceEventDesc{
1065
				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1066
				Severity: channelz.CtInfo,
1067
			}
1068
		}
1069
		channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
1070
		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1071
		// the entity being deleted, and thus prevent it from being deleted right away.
1072
		channelz.RemoveEntry(cc.channelzID)
1073
	}
1074
	return nil
1075
}
1076

1077
// addrConn is a network connection to a given address.
1078
type addrConn struct {
1079
	ctx    context.Context
1080
	cancel context.CancelFunc
1081

1082
	cc     *ClientConn
1083
	dopts  dialOptions
1084
	acbw   balancer.SubConn
1085
	scopts balancer.NewSubConnOptions
1086

1087
	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1088
	// health checking may require server to report healthy to set ac to READY), and is reset
1089
	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1090
	// is received, transport is closed, ac has been torn down).
1091
	transport transport.ClientTransport // The current transport.
1092

1093
	mu      sync.Mutex
1094
	curAddr resolver.Address   // The current address.
1095
	addrs   []resolver.Address // All addresses that the resolver resolved to.
1096

1097
	// Use updateConnectivityState for updating addrConn's connectivity state.
1098
	state connectivity.State
1099

1100
	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
1101
	resetBackoff chan struct{}
1102

1103
	channelzID int64 // channelz unique identification number.
1104
	czData     *channelzData
1105
}
1106

1107
// Note: this requires a lock on ac.mu.
1108
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1109
	if ac.state == s {
1110
		return
1111
	}
1112
	ac.state = s
1113
	channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
1114
	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1115
}
1116

1117
// adjustParams updates parameters used to create transports upon
1118
// receiving a GoAway.
1119
func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1120
	switch r {
1121
	case transport.GoAwayTooManyPings:
1122
		v := 2 * ac.dopts.copts.KeepaliveParams.Time
1123
		ac.cc.mu.Lock()
1124
		if v > ac.cc.mkp.Time {
1125
			ac.cc.mkp.Time = v
1126
		}
1127
		ac.cc.mu.Unlock()
1128
	}
1129
}
1130

1131
func (ac *addrConn) resetTransport() {
1132
	for i := 0; ; i++ {
1133
		if i > 0 {
1134
			ac.cc.resolveNow(resolver.ResolveNowOptions{})
1135
		}
1136

1137
		ac.mu.Lock()
1138
		if ac.state == connectivity.Shutdown {
1139
			ac.mu.Unlock()
1140
			return
1141
		}
1142

1143
		addrs := ac.addrs
1144
		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1145
		// This will be the duration that dial gets to finish.
1146
		dialDuration := minConnectTimeout
1147
		if ac.dopts.minConnectTimeout != nil {
1148
			dialDuration = ac.dopts.minConnectTimeout()
1149
		}
1150

1151
		if dialDuration < backoffFor {
1152
			// Give dial more time as we keep failing to connect.
1153
			dialDuration = backoffFor
1154
		}
1155
		// We can potentially spend all the time trying the first address, and
1156
		// if the server accepts the connection and then hangs, the following
1157
		// addresses will never be tried.
1158
		//
1159
		// The spec doesn't mention what should be done for multiple addresses.
1160
		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1161
		connectDeadline := time.Now().Add(dialDuration)
1162

1163
		ac.updateConnectivityState(connectivity.Connecting, nil)
1164
		ac.transport = nil
1165
		ac.mu.Unlock()
1166

1167
		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1168
		if err != nil {
1169
			// After exhausting all addresses, the addrConn enters
1170
			// TRANSIENT_FAILURE.
1171
			ac.mu.Lock()
1172
			if ac.state == connectivity.Shutdown {
1173
				ac.mu.Unlock()
1174
				return
1175
			}
1176
			ac.updateConnectivityState(connectivity.TransientFailure, err)
1177

1178
			// Backoff.
1179
			b := ac.resetBackoff
1180
			ac.mu.Unlock()
1181

1182
			timer := time.NewTimer(backoffFor)
1183
			select {
1184
			case <-timer.C:
1185
				ac.mu.Lock()
1186
				ac.backoffIdx++
1187
				ac.mu.Unlock()
1188
			case <-b:
1189
				timer.Stop()
1190
			case <-ac.ctx.Done():
1191
				timer.Stop()
1192
				return
1193
			}
1194
			continue
1195
		}
1196

1197
		ac.mu.Lock()
1198
		if ac.state == connectivity.Shutdown {
1199
			ac.mu.Unlock()
1200
			newTr.Close()
1201
			return
1202
		}
1203
		ac.curAddr = addr
1204
		ac.transport = newTr
1205
		ac.backoffIdx = 0
1206

1207
		hctx, hcancel := context.WithCancel(ac.ctx)
1208
		ac.startHealthCheck(hctx)
1209
		ac.mu.Unlock()
1210

1211
		// Block until the created transport is down. And when this happens,
1212
		// we restart from the top of the addr list.
1213
		<-reconnect.Done()
1214
		hcancel()
1215
		// restart connecting - the top of the loop will set state to
1216
		// CONNECTING.  This is against the current connectivity semantics doc,
1217
		// however it allows for graceful behavior for RPCs not yet dispatched
1218
		// - unfortunate timing would otherwise lead to the RPC failing even
1219
		// though the TRANSIENT_FAILURE state (called for by the doc) would be
1220
		// instantaneous.
1221
		//
1222
		// Ideally we should transition to Idle here and block until there is
1223
		// RPC activity that leads to the balancer requesting a reconnect of
1224
		// the associated SubConn.
1225
	}
1226
}
1227

1228
// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1229
// first successful one. It returns the transport, the address and a Event in
1230
// the successful case. The Event fires when the returned transport disconnects.
1231
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1232
	var firstConnErr error
1233
	for _, addr := range addrs {
1234
		ac.mu.Lock()
1235
		if ac.state == connectivity.Shutdown {
1236
			ac.mu.Unlock()
1237
			return nil, resolver.Address{}, nil, errConnClosing
1238
		}
1239

1240
		ac.cc.mu.RLock()
1241
		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1242
		ac.cc.mu.RUnlock()
1243

1244
		copts := ac.dopts.copts
1245
		if ac.scopts.CredsBundle != nil {
1246
			copts.CredsBundle = ac.scopts.CredsBundle
1247
		}
1248
		ac.mu.Unlock()
1249

1250
		channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
1251

1252
		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1253
		if err == nil {
1254
			return newTr, addr, reconnect, nil
1255
		}
1256
		if firstConnErr == nil {
1257
			firstConnErr = err
1258
		}
1259
		ac.cc.updateConnectionError(err)
1260
	}
1261

1262
	// Couldn't connect to any address.
1263
	return nil, resolver.Address{}, nil, firstConnErr
1264
}
1265

1266
// createTransport creates a connection to addr. It returns the transport and a
1267
// Event in the successful case. The Event fires when the returned transport
1268
// disconnects.
1269
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1270
	prefaceReceived := make(chan struct{})
1271
	onCloseCalled := make(chan struct{})
1272
	reconnect := grpcsync.NewEvent()
1273

1274
	// addr.ServerName takes precedent over ClientConn authority, if present.
1275
	if addr.ServerName == "" {
1276
		addr.ServerName = ac.cc.authority
1277
	}
1278

1279
	once := sync.Once{}
1280
	onGoAway := func(r transport.GoAwayReason) {
1281
		ac.mu.Lock()
1282
		ac.adjustParams(r)
1283
		once.Do(func() {
1284
			if ac.state == connectivity.Ready {
1285
				// Prevent this SubConn from being used for new RPCs by setting its
1286
				// state to Connecting.
1287
				//
1288
				// TODO: this should be Idle when grpc-go properly supports it.
1289
				ac.updateConnectivityState(connectivity.Connecting, nil)
1290
			}
1291
		})
1292
		ac.mu.Unlock()
1293
		reconnect.Fire()
1294
	}
1295

1296
	onClose := func() {
1297
		ac.mu.Lock()
1298
		once.Do(func() {
1299
			if ac.state == connectivity.Ready {
1300
				// Prevent this SubConn from being used for new RPCs by setting its
1301
				// state to Connecting.
1302
				//
1303
				// TODO: this should be Idle when grpc-go properly supports it.
1304
				ac.updateConnectivityState(connectivity.Connecting, nil)
1305
			}
1306
		})
1307
		ac.mu.Unlock()
1308
		close(onCloseCalled)
1309
		reconnect.Fire()
1310
	}
1311

1312
	onPrefaceReceipt := func() {
1313
		close(prefaceReceived)
1314
	}
1315

1316
	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1317
	defer cancel()
1318
	if channelz.IsOn() {
1319
		copts.ChannelzParentID = ac.channelzID
1320
	}
1321

1322
	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
1323
	if err != nil {
1324
		// newTr is either nil, or closed.
1325
		channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
1326
		return nil, nil, err
1327
	}
1328

1329
	select {
1330
	case <-time.After(time.Until(connectDeadline)):
1331
		// We didn't get the preface in time.
1332
		newTr.Close()
1333
		channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1334
		return nil, nil, errors.New("timed out waiting for server handshake")
1335
	case <-prefaceReceived:
1336
		// We got the preface - huzzah! things are good.
1337
	case <-onCloseCalled:
1338
		// The transport has already closed - noop.
1339
		return nil, nil, errors.New("connection closed")
1340
		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1341
	}
1342
	return newTr, reconnect, nil
1343
}
1344

1345
// startHealthCheck starts the health checking stream (RPC) to watch the health
1346
// stats of this connection if health checking is requested and configured.
1347
//
1348
// LB channel health checking is enabled when all requirements below are met:
1349
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1350
// 2. internal.HealthCheckFunc is set by importing the grpc/health package
1351
// 3. a service config with non-empty healthCheckConfig field is provided
1352
// 4. the load balancer requests it
1353
//
1354
// It sets addrConn to READY if the health checking stream is not started.
1355
//
1356
// Caller must hold ac.mu.
1357
func (ac *addrConn) startHealthCheck(ctx context.Context) {
1358
	var healthcheckManagingState bool
1359
	defer func() {
1360
		if !healthcheckManagingState {
1361
			ac.updateConnectivityState(connectivity.Ready, nil)
1362
		}
1363
	}()
1364

1365
	if ac.cc.dopts.disableHealthCheck {
1366
		return
1367
	}
1368
	healthCheckConfig := ac.cc.healthCheckConfig()
1369
	if healthCheckConfig == nil {
1370
		return
1371
	}
1372
	if !ac.scopts.HealthCheckEnabled {
1373
		return
1374
	}
1375
	healthCheckFunc := ac.cc.dopts.healthCheckFunc
1376
	if healthCheckFunc == nil {
1377
		// The health package is not imported to set health check function.
1378
		//
1379
		// TODO: add a link to the health check doc in the error message.
1380
		channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")
1381
		return
1382
	}
1383

1384
	healthcheckManagingState = true
1385

1386
	// Set up the health check helper functions.
1387
	currentTr := ac.transport
1388
	newStream := func(method string) (interface{}, error) {
1389
		ac.mu.Lock()
1390
		if ac.transport != currentTr {
1391
			ac.mu.Unlock()
1392
			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1393
		}
1394
		ac.mu.Unlock()
1395
		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1396
	}
1397
	setConnectivityState := func(s connectivity.State, lastErr error) {
1398
		ac.mu.Lock()
1399
		defer ac.mu.Unlock()
1400
		if ac.transport != currentTr {
1401
			return
1402
		}
1403
		ac.updateConnectivityState(s, lastErr)
1404
	}
1405
	// Start the health checking stream.
1406
	go func() {
1407
		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1408
		if err != nil {
1409
			if status.Code(err) == codes.Unimplemented {
1410
				channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1411
			} else {
1412
				channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
1413
			}
1414
		}
1415
	}()
1416
}
1417

1418
func (ac *addrConn) resetConnectBackoff() {
1419
	ac.mu.Lock()
1420
	close(ac.resetBackoff)
1421
	ac.backoffIdx = 0
1422
	ac.resetBackoff = make(chan struct{})
1423
	ac.mu.Unlock()
1424
}
1425

1426
// getReadyTransport returns the transport if ac's state is READY.
1427
// Otherwise it returns nil, false.
1428
// If ac's state is IDLE, it will trigger ac to connect.
1429
func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1430
	ac.mu.Lock()
1431
	if ac.state == connectivity.Ready && ac.transport != nil {
1432
		t := ac.transport
1433
		ac.mu.Unlock()
1434
		return t, true
1435
	}
1436
	var idle bool
1437
	if ac.state == connectivity.Idle {
1438
		idle = true
1439
	}
1440
	ac.mu.Unlock()
1441
	// Trigger idle ac to connect.
1442
	if idle {
1443
		ac.connect()
1444
	}
1445
	return nil, false
1446
}
1447

1448
// tearDown starts to tear down the addrConn.
1449
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1450
// some edge cases (e.g., the caller opens and closes many addrConn's in a
1451
// tight loop.
1452
// tearDown doesn't remove ac from ac.cc.conns.
1453
func (ac *addrConn) tearDown(err error) {
1454
	ac.mu.Lock()
1455
	if ac.state == connectivity.Shutdown {
1456
		ac.mu.Unlock()
1457
		return
1458
	}
1459
	curTr := ac.transport
1460
	ac.transport = nil
1461
	// We have to set the state to Shutdown before anything else to prevent races
1462
	// between setting the state and logic that waits on context cancellation / etc.
1463
	ac.updateConnectivityState(connectivity.Shutdown, nil)
1464
	ac.cancel()
1465
	ac.curAddr = resolver.Address{}
1466
	if err == errConnDrain && curTr != nil {
1467
		// GracefulClose(...) may be executed multiple times when
1468
		// i) receiving multiple GoAway frames from the server; or
1469
		// ii) there are concurrent name resolver/Balancer triggered
1470
		// address removal and GoAway.
1471
		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1472
		ac.mu.Unlock()
1473
		curTr.GracefulClose()
1474
		ac.mu.Lock()
1475
	}
1476
	if channelz.IsOn() {
1477
		channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
1478
			Desc:     "Subchannel Deleted",
1479
			Severity: channelz.CtInfo,
1480
			Parent: &channelz.TraceEventDesc{
1481
				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1482
				Severity: channelz.CtInfo,
1483
			},
1484
		})
1485
		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1486
		// the entity being deleted, and thus prevent it from being deleted right away.
1487
		channelz.RemoveEntry(ac.channelzID)
1488
	}
1489
	ac.mu.Unlock()
1490
}
1491

1492
func (ac *addrConn) getState() connectivity.State {
1493
	ac.mu.Lock()
1494
	defer ac.mu.Unlock()
1495
	return ac.state
1496
}
1497

1498
func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1499
	ac.mu.Lock()
1500
	addr := ac.curAddr.Addr
1501
	ac.mu.Unlock()
1502
	return &channelz.ChannelInternalMetric{
1503
		State:                    ac.getState(),
1504
		Target:                   addr,
1505
		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1506
		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1507
		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1508
		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1509
	}
1510
}
1511

1512
func (ac *addrConn) incrCallsStarted() {
1513
	atomic.AddInt64(&ac.czData.callsStarted, 1)
1514
	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1515
}
1516

1517
func (ac *addrConn) incrCallsSucceeded() {
1518
	atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1519
}
1520

1521
func (ac *addrConn) incrCallsFailed() {
1522
	atomic.AddInt64(&ac.czData.callsFailed, 1)
1523
}
1524

1525
type retryThrottler struct {
1526
	max    float64
1527
	thresh float64
1528
	ratio  float64
1529

1530
	mu     sync.Mutex
1531
	tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1532
}
1533

1534
// throttle subtracts a retry token from the pool and returns whether a retry
1535
// should be throttled (disallowed) based upon the retry throttling policy in
1536
// the service config.
1537
func (rt *retryThrottler) throttle() bool {
1538
	if rt == nil {
1539
		return false
1540
	}
1541
	rt.mu.Lock()
1542
	defer rt.mu.Unlock()
1543
	rt.tokens--
1544
	if rt.tokens < 0 {
1545
		rt.tokens = 0
1546
	}
1547
	return rt.tokens <= rt.thresh
1548
}
1549

1550
func (rt *retryThrottler) successfulRPC() {
1551
	if rt == nil {
1552
		return
1553
	}
1554
	rt.mu.Lock()
1555
	defer rt.mu.Unlock()
1556
	rt.tokens += rt.ratio
1557
	if rt.tokens > rt.max {
1558
		rt.tokens = rt.max
1559
	}
1560
}
1561

1562
type channelzChannel struct {
1563
	cc *ClientConn
1564
}
1565

1566
func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1567
	return c.cc.channelzMetric()
1568
}
1569

1570
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1571
// underlying connections within the specified timeout.
1572
//
1573
// Deprecated: This error is never returned by grpc and should not be
1574
// referenced by users.
1575
var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1576

1577
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1578
	for _, rb := range cc.dopts.resolvers {
1579
		if scheme == rb.Scheme() {
1580
			return rb
1581
		}
1582
	}
1583
	return resolver.Get(scheme)
1584
}
1585

1586
func (cc *ClientConn) updateConnectionError(err error) {
1587
	cc.lceMu.Lock()
1588
	cc.lastConnectionError = err
1589
	cc.lceMu.Unlock()
1590
}
1591

1592
func (cc *ClientConn) connectionError() error {
1593
	cc.lceMu.Lock()
1594
	defer cc.lceMu.Unlock()
1595
	return cc.lastConnectionError
1596
}
1597

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.