cubefs
1596 строк · 49.8 Кб
1/*
2*
3* Copyright 2014 gRPC authors.
4*
5* Licensed under the Apache License, Version 2.0 (the "License");
6* you may not use this file except in compliance with the License.
7* You may obtain a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS,
13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15* limitations under the License.
16*
17*/
18
19package grpc20
21import (22"context"23"errors"24"fmt"25"math"26"reflect"27"strings"28"sync"29"sync/atomic"30"time"31
32"google.golang.org/grpc/balancer"33"google.golang.org/grpc/balancer/base"34"google.golang.org/grpc/codes"35"google.golang.org/grpc/connectivity"36"google.golang.org/grpc/credentials"37"google.golang.org/grpc/internal/backoff"38"google.golang.org/grpc/internal/channelz"39"google.golang.org/grpc/internal/grpcsync"40"google.golang.org/grpc/internal/grpcutil"41iresolver "google.golang.org/grpc/internal/resolver"42"google.golang.org/grpc/internal/transport"43"google.golang.org/grpc/keepalive"44"google.golang.org/grpc/resolver"45"google.golang.org/grpc/serviceconfig"46"google.golang.org/grpc/status"47
48_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.49_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.50_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.51_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.52)
53
54const (55// minimum time to give a connection to complete56minConnectTimeout = 20 * time.Second57// must match grpclbName in grpclb/grpclb.go58grpclbName = "grpclb"59)
60
61var (62// ErrClientConnClosing indicates that the operation is illegal because63// the ClientConn is closing.64//65// Deprecated: this error should not be relied upon by users; use the status66// code of Canceled instead.67ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")68// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.69errConnDrain = errors.New("grpc: the connection is drained")70// errConnClosing indicates that the connection is closing.71errConnClosing = errors.New("grpc: the connection is closing")72// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default73// service config.74invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"75)
76
77// The following errors are returned from Dial and DialContext
78var (79// errNoTransportSecurity indicates that there is no transport security80// being set for ClientConn. Users should either set one or explicitly81// call WithInsecure DialOption to disable security.82errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")83// errTransportCredsAndBundle indicates that creds bundle is used together84// with other individual Transport Credentials.85errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")86// errTransportCredentialsMissing indicates that users want to transmit security87// information (e.g., OAuth2 token) which requires secure connection on an insecure88// connection.89errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")90// errCredentialsConflict indicates that grpc.WithTransportCredentials()91// and grpc.WithInsecure() are both called for a connection.92errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")93)
94
95const (96defaultClientMaxReceiveMessageSize = 1024 * 1024 * 497defaultClientMaxSendMessageSize = math.MaxInt3298// http2IOBufSize specifies the buffer size for sending frames.99defaultWriteBufSize = 32 * 1024100defaultReadBufSize = 32 * 1024101)
102
103// Dial creates a client connection to the given target.
104func Dial(target string, opts ...DialOption) (*ClientConn, error) {105return DialContext(context.Background(), target, opts...)106}
107
108type defaultConfigSelector struct {109sc *ServiceConfig110}
111
112func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {113return &iresolver.RPCConfig{114Context: rpcInfo.Context,115MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),116}, nil117}
118
119// DialContext creates a client connection to the given target. By default, it's
120// a non-blocking dial (the function won't wait for connections to be
121// established, and connecting happens in the background). To make it a blocking
122// dial, use WithBlock() dial option.
123//
124// In the non-blocking case, the ctx does not act against the connection. It
125// only controls the setup steps.
126//
127// In the blocking case, ctx can be used to cancel or expire the pending
128// connection. Once this function returns, the cancellation and expiration of
129// ctx will be noop. Users should call ClientConn.Close to terminate all the
130// pending operations after this function returns.
131//
132// The target name syntax is defined in
133// https://github.com/grpc/grpc/blob/master/doc/naming.md.
134// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
135func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {136cc := &ClientConn{137target: target,138csMgr: &connectivityStateManager{},139conns: make(map[*addrConn]struct{}),140dopts: defaultDialOptions(),141blockingpicker: newPickerWrapper(),142czData: new(channelzData),143firstResolveEvent: grpcsync.NewEvent(),144}145cc.retryThrottler.Store((*retryThrottler)(nil))146cc.ctx, cc.cancel = context.WithCancel(context.Background())147
148for _, opt := range opts {149opt.apply(&cc.dopts)150}151
152chainUnaryClientInterceptors(cc)153chainStreamClientInterceptors(cc)154
155defer func() {156if err != nil {157cc.Close()158}159}()160
161if channelz.IsOn() {162if cc.dopts.channelzParentID != 0 {163cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)164channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{165Desc: "Channel Created",166Severity: channelz.CtInfo,167Parent: &channelz.TraceEventDesc{168Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),169Severity: channelz.CtInfo,170},171})172} else {173cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)174channelz.Info(logger, cc.channelzID, "Channel Created")175}176cc.csMgr.channelzID = cc.channelzID177}178
179if !cc.dopts.insecure {180if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {181return nil, errNoTransportSecurity182}183if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {184return nil, errTransportCredsAndBundle185}186} else {187if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {188return nil, errCredentialsConflict189}190for _, cd := range cc.dopts.copts.PerRPCCredentials {191if cd.RequireTransportSecurity() {192return nil, errTransportCredentialsMissing193}194}195}196
197if cc.dopts.defaultServiceConfigRawJSON != nil {198scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)199if scpr.Err != nil {200return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)201}202cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)203}204cc.mkp = cc.dopts.copts.KeepaliveParams205
206if cc.dopts.copts.UserAgent != "" {207cc.dopts.copts.UserAgent += " " + grpcUA208} else {209cc.dopts.copts.UserAgent = grpcUA210}211
212if cc.dopts.timeout > 0 {213var cancel context.CancelFunc214ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)215defer cancel()216}217defer func() {218select {219case <-ctx.Done():220switch {221case ctx.Err() == err:222conn = nil223case err == nil || !cc.dopts.returnLastError:224conn, err = nil, ctx.Err()225default:226conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)227}228default:229}230}()231
232scSet := false233if cc.dopts.scChan != nil {234// Try to get an initial service config.235select {236case sc, ok := <-cc.dopts.scChan:237if ok {238cc.sc = &sc239cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})240scSet = true241}242default:243}244}245if cc.dopts.bs == nil {246cc.dopts.bs = backoff.DefaultExponential247}248
249// Determine the resolver to use.250cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)251channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)252resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)253if resolverBuilder == nil {254// If resolver builder is still nil, the parsed target's scheme is255// not registered. Fallback to default resolver and set Endpoint to256// the original target.257channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)258cc.parsedTarget = resolver.Target{259Scheme: resolver.GetDefaultScheme(),260Endpoint: target,261}262resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)263if resolverBuilder == nil {264return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)265}266}267
268creds := cc.dopts.copts.TransportCredentials269if creds != nil && creds.Info().ServerName != "" {270cc.authority = creds.Info().ServerName271} else if cc.dopts.insecure && cc.dopts.authority != "" {272cc.authority = cc.dopts.authority273} else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") {274cc.authority = "localhost"275} else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {276cc.authority = "localhost" + cc.parsedTarget.Endpoint277} else {278// Use endpoint from "scheme://authority/endpoint" as the default279// authority for ClientConn.280cc.authority = cc.parsedTarget.Endpoint281}282
283if cc.dopts.scChan != nil && !scSet {284// Blocking wait for the initial service config.285select {286case sc, ok := <-cc.dopts.scChan:287if ok {288cc.sc = &sc289cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})290}291case <-ctx.Done():292return nil, ctx.Err()293}294}295if cc.dopts.scChan != nil {296go cc.scWatcher()297}298
299var credsClone credentials.TransportCredentials300if creds := cc.dopts.copts.TransportCredentials; creds != nil {301credsClone = creds.Clone()302}303cc.balancerBuildOpts = balancer.BuildOptions{304DialCreds: credsClone,305CredsBundle: cc.dopts.copts.CredsBundle,306Dialer: cc.dopts.copts.Dialer,307CustomUserAgent: cc.dopts.copts.UserAgent,308ChannelzParentID: cc.channelzID,309Target: cc.parsedTarget,310}311
312// Build the resolver.313rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)314if err != nil {315return nil, fmt.Errorf("failed to build resolver: %v", err)316}317cc.mu.Lock()318cc.resolverWrapper = rWrapper319cc.mu.Unlock()320
321// A blocking dial blocks until the clientConn is ready.322if cc.dopts.block {323for {324s := cc.GetState()325if s == connectivity.Ready {326break327} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {328if err = cc.connectionError(); err != nil {329terr, ok := err.(interface {330Temporary() bool331})332if ok && !terr.Temporary() {333return nil, err334}335}336}337if !cc.WaitForStateChange(ctx, s) {338// ctx got timeout or canceled.339if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {340return nil, err341}342return nil, ctx.Err()343}344}345}346
347return cc, nil348}
349
350// chainUnaryClientInterceptors chains all unary client interceptors into one.
351func chainUnaryClientInterceptors(cc *ClientConn) {352interceptors := cc.dopts.chainUnaryInts353// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will354// be executed before any other chained interceptors.355if cc.dopts.unaryInt != nil {356interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)357}358var chainedInt UnaryClientInterceptor359if len(interceptors) == 0 {360chainedInt = nil361} else if len(interceptors) == 1 {362chainedInt = interceptors[0]363} else {364chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {365return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)366}367}368cc.dopts.unaryInt = chainedInt369}
370
371// getChainUnaryInvoker recursively generate the chained unary invoker.
372func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {373if curr == len(interceptors)-1 {374return finalInvoker375}376return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {377return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)378}379}
380
381// chainStreamClientInterceptors chains all stream client interceptors into one.
382func chainStreamClientInterceptors(cc *ClientConn) {383interceptors := cc.dopts.chainStreamInts384// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will385// be executed before any other chained interceptors.386if cc.dopts.streamInt != nil {387interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)388}389var chainedInt StreamClientInterceptor390if len(interceptors) == 0 {391chainedInt = nil392} else if len(interceptors) == 1 {393chainedInt = interceptors[0]394} else {395chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {396return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)397}398}399cc.dopts.streamInt = chainedInt400}
401
402// getChainStreamer recursively generate the chained client stream constructor.
403func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {404if curr == len(interceptors)-1 {405return finalStreamer406}407return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {408return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)409}410}
411
412// connectivityStateManager keeps the connectivity.State of ClientConn.
413// This struct will eventually be exported so the balancers can access it.
414type connectivityStateManager struct {415mu sync.Mutex416state connectivity.State417notifyChan chan struct{}418channelzID int64419}
420
421// updateState updates the connectivity.State of ClientConn.
422// If there's a change it notifies goroutines waiting on state change to
423// happen.
424func (csm *connectivityStateManager) updateState(state connectivity.State) {425csm.mu.Lock()426defer csm.mu.Unlock()427if csm.state == connectivity.Shutdown {428return429}430if csm.state == state {431return432}433csm.state = state434channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)435if csm.notifyChan != nil {436// There are other goroutines waiting on this channel.437close(csm.notifyChan)438csm.notifyChan = nil439}440}
441
442func (csm *connectivityStateManager) getState() connectivity.State {443csm.mu.Lock()444defer csm.mu.Unlock()445return csm.state446}
447
448func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {449csm.mu.Lock()450defer csm.mu.Unlock()451if csm.notifyChan == nil {452csm.notifyChan = make(chan struct{})453}454return csm.notifyChan455}
456
457// ClientConnInterface defines the functions clients need to perform unary and
458// streaming RPCs. It is implemented by *ClientConn, and is only intended to
459// be referenced by generated code.
460type ClientConnInterface interface {461// Invoke performs a unary RPC and returns after the response is received462// into reply.463Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error464// NewStream begins a streaming RPC.465NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)466}
467
468// Assert *ClientConn implements ClientConnInterface.
469var _ ClientConnInterface = (*ClientConn)(nil)470
471// ClientConn represents a virtual connection to a conceptual endpoint, to
472// perform RPCs.
473//
474// A ClientConn is free to have zero or more actual connections to the endpoint
475// based on configuration, load, etc. It is also free to determine which actual
476// endpoints to use and may change it every RPC, permitting client-side load
477// balancing.
478//
479// A ClientConn encapsulates a range of functionality including name
480// resolution, TCP connection establishment (with retries and backoff) and TLS
481// handshakes. It also handles errors on established connections by
482// re-resolving the name and reconnecting.
483type ClientConn struct {484ctx context.Context485cancel context.CancelFunc486
487target string488parsedTarget resolver.Target489authority string490dopts dialOptions
491csMgr *connectivityStateManager492
493balancerBuildOpts balancer.BuildOptions494blockingpicker *pickerWrapper495
496safeConfigSelector iresolver.SafeConfigSelector497
498mu sync.RWMutex499resolverWrapper *ccResolverWrapper500sc *ServiceConfig501conns map[*addrConn]struct{}502// Keepalive parameter can be updated if a GoAway is received.503mkp keepalive.ClientParameters504curBalancerName string505balancerWrapper *ccBalancerWrapper506retryThrottler atomic.Value507
508firstResolveEvent *grpcsync.Event509
510channelzID int64 // channelz unique identification number511czData *channelzData512
513lceMu sync.Mutex // protects lastConnectionError514lastConnectionError error515}
516
517// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
518// ctx expires. A true value is returned in former case and false in latter.
519//
520// Experimental
521//
522// Notice: This API is EXPERIMENTAL and may be changed or removed in a
523// later release.
524func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {525ch := cc.csMgr.getNotifyChan()526if cc.csMgr.getState() != sourceState {527return true528}529select {530case <-ctx.Done():531return false532case <-ch:533return true534}535}
536
537// GetState returns the connectivity.State of ClientConn.
538//
539// Experimental
540//
541// Notice: This API is EXPERIMENTAL and may be changed or removed in a
542// later release.
543func (cc *ClientConn) GetState() connectivity.State {544return cc.csMgr.getState()545}
546
547func (cc *ClientConn) scWatcher() {548for {549select {550case sc, ok := <-cc.dopts.scChan:551if !ok {552return553}554cc.mu.Lock()555// TODO: load balance policy runtime change is ignored.556// We may revisit this decision in the future.557cc.sc = &sc558cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})559cc.mu.Unlock()560case <-cc.ctx.Done():561return562}563}564}
565
566// waitForResolvedAddrs blocks until the resolver has provided addresses or the
567// context expires. Returns nil unless the context expires first; otherwise
568// returns a status error based on the context.
569func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {570// This is on the RPC path, so we use a fast path to avoid the571// more-expensive "select" below after the resolver has returned once.572if cc.firstResolveEvent.HasFired() {573return nil574}575select {576case <-cc.firstResolveEvent.Done():577return nil578case <-ctx.Done():579return status.FromContextError(ctx.Err()).Err()580case <-cc.ctx.Done():581return ErrClientConnClosing582}583}
584
585var emptyServiceConfig *ServiceConfig586
587func init() {588cfg := parseServiceConfig("{}")589if cfg.Err != nil {590panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))591}592emptyServiceConfig = cfg.Config.(*ServiceConfig)593}
594
595func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {596if cc.sc != nil {597cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)598return599}600if cc.dopts.defaultServiceConfig != nil {601cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)602} else {603cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)604}605}
606
607func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {608defer cc.firstResolveEvent.Fire()609cc.mu.Lock()610// Check if the ClientConn is already closed. Some fields (e.g.611// balancerWrapper) are set to nil when closing the ClientConn, and could612// cause nil pointer panic if we don't have this check.613if cc.conns == nil {614cc.mu.Unlock()615return nil616}617
618if err != nil {619// May need to apply the initial service config in case the resolver620// doesn't support service configs, or doesn't provide a service config621// with the new addresses.622cc.maybeApplyDefaultServiceConfig(nil)623
624if cc.balancerWrapper != nil {625cc.balancerWrapper.resolverError(err)626}627
628// No addresses are valid with err set; return early.629cc.mu.Unlock()630return balancer.ErrBadResolverState631}632
633var ret error634if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {635cc.maybeApplyDefaultServiceConfig(s.Addresses)636// TODO: do we need to apply a failing LB policy if there is no637// default, per the error handling design?638} else {639if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {640configSelector := iresolver.GetConfigSelector(s)641if configSelector != nil {642if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {643channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")644}645} else {646configSelector = &defaultConfigSelector{sc}647}648cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)649} else {650ret = balancer.ErrBadResolverState651if cc.balancerWrapper == nil {652var err error653if s.ServiceConfig.Err != nil {654err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)655} else {656err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)657}658cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})659cc.blockingpicker.updatePicker(base.NewErrPicker(err))660cc.csMgr.updateState(connectivity.TransientFailure)661cc.mu.Unlock()662return ret663}664}665}666
667var balCfg serviceconfig.LoadBalancingConfig668if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {669balCfg = cc.sc.lbConfig.cfg670}671
672cbn := cc.curBalancerName673bw := cc.balancerWrapper674cc.mu.Unlock()675if cbn != grpclbName {676// Filter any grpclb addresses since we don't have the grpclb balancer.677for i := 0; i < len(s.Addresses); {678if s.Addresses[i].Type == resolver.GRPCLB {679copy(s.Addresses[i:], s.Addresses[i+1:])680s.Addresses = s.Addresses[:len(s.Addresses)-1]681continue682}683i++684}685}686uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})687if ret == nil {688ret = uccsErr // prefer ErrBadResolver state since any other error is689// currently meaningless to the caller.690}691return ret692}
693
694// switchBalancer starts the switching from current balancer to the balancer
695// with the given name.
696//
697// It will NOT send the current address list to the new balancer. If needed,
698// caller of this function should send address list to the new balancer after
699// this function returns.
700//
701// Caller must hold cc.mu.
702func (cc *ClientConn) switchBalancer(name string) {703if strings.EqualFold(cc.curBalancerName, name) {704return705}706
707channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)708if cc.dopts.balancerBuilder != nil {709channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")710return711}712if cc.balancerWrapper != nil {713cc.balancerWrapper.close()714}715
716builder := balancer.Get(name)717if builder == nil {718channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)719channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)720builder = newPickfirstBuilder()721} else {722channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)723}724
725cc.curBalancerName = builder.Name()726cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)727}
728
729func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {730cc.mu.Lock()731if cc.conns == nil {732cc.mu.Unlock()733return734}735// TODO(bar switching) send updates to all balancer wrappers when balancer736// gracefully switching is supported.737cc.balancerWrapper.handleSubConnStateChange(sc, s, err)738cc.mu.Unlock()739}
740
741// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
742//
743// Caller needs to make sure len(addrs) > 0.
744func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {745ac := &addrConn{746state: connectivity.Idle,747cc: cc,748addrs: addrs,749scopts: opts,750dopts: cc.dopts,751czData: new(channelzData),752resetBackoff: make(chan struct{}),753}754ac.ctx, ac.cancel = context.WithCancel(cc.ctx)755// Track ac in cc. This needs to be done before any getTransport(...) is called.756cc.mu.Lock()757if cc.conns == nil {758cc.mu.Unlock()759return nil, ErrClientConnClosing760}761if channelz.IsOn() {762ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")763channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{764Desc: "Subchannel Created",765Severity: channelz.CtInfo,766Parent: &channelz.TraceEventDesc{767Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),768Severity: channelz.CtInfo,769},770})771}772cc.conns[ac] = struct{}{}773cc.mu.Unlock()774return ac, nil775}
776
777// removeAddrConn removes the addrConn in the subConn from clientConn.
778// It also tears down the ac with the given error.
779func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {780cc.mu.Lock()781if cc.conns == nil {782cc.mu.Unlock()783return784}785delete(cc.conns, ac)786cc.mu.Unlock()787ac.tearDown(err)788}
789
790func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {791return &channelz.ChannelInternalMetric{792State: cc.GetState(),793Target: cc.target,794CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),795CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),796CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),797LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),798}799}
800
801// Target returns the target string of the ClientConn.
802//
803// Experimental
804//
805// Notice: This API is EXPERIMENTAL and may be changed or removed in a
806// later release.
807func (cc *ClientConn) Target() string {808return cc.target809}
810
811func (cc *ClientConn) incrCallsStarted() {812atomic.AddInt64(&cc.czData.callsStarted, 1)813atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())814}
815
816func (cc *ClientConn) incrCallsSucceeded() {817atomic.AddInt64(&cc.czData.callsSucceeded, 1)818}
819
820func (cc *ClientConn) incrCallsFailed() {821atomic.AddInt64(&cc.czData.callsFailed, 1)822}
823
824// connect starts creating a transport.
825// It does nothing if the ac is not IDLE.
826// TODO(bar) Move this to the addrConn section.
827func (ac *addrConn) connect() error {828ac.mu.Lock()829if ac.state == connectivity.Shutdown {830ac.mu.Unlock()831return errConnClosing832}833if ac.state != connectivity.Idle {834ac.mu.Unlock()835return nil836}837// Update connectivity state within the lock to prevent subsequent or838// concurrent calls from resetting the transport more than once.839ac.updateConnectivityState(connectivity.Connecting, nil)840ac.mu.Unlock()841
842// Start a goroutine connecting to the server asynchronously.843go ac.resetTransport()844return nil845}
846
847// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
848//
849// If ac is Connecting, it returns false. The caller should tear down the ac and
850// create a new one. Note that the backoff will be reset when this happens.
851//
852// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
853// addresses will be picked up by retry in the next iteration after backoff.
854//
855// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
856//
857// If ac is Ready, it checks whether current connected address of ac is in the
858// new addrs list.
859// - If true, it updates ac.addrs and returns true. The ac will keep using
860// the existing connection.
861// - If false, it does nothing and returns false.
862func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {863ac.mu.Lock()864defer ac.mu.Unlock()865channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)866if ac.state == connectivity.Shutdown ||867ac.state == connectivity.TransientFailure ||868ac.state == connectivity.Idle {869ac.addrs = addrs870return true871}872
873if ac.state == connectivity.Connecting {874return false875}876
877// ac.state is Ready, try to find the connected address.878var curAddrFound bool879for _, a := range addrs {880if reflect.DeepEqual(ac.curAddr, a) {881curAddrFound = true882break883}884}885channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)886if curAddrFound {887ac.addrs = addrs888}889
890return curAddrFound891}
892
893func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {894if sc == nil {895return MethodConfig{}896}897if m, ok := sc.Methods[method]; ok {898return m899}900i := strings.LastIndex(method, "/")901if m, ok := sc.Methods[method[:i+1]]; ok {902return m903}904return sc.Methods[""]905}
906
907// GetMethodConfig gets the method config of the input method.
908// If there's an exact match for input method (i.e. /service/method), we return
909// the corresponding MethodConfig.
910// If there isn't an exact match for the input method, we look for the service's default
911// config under the service (i.e /service/) and then for the default for all services (empty string).
912//
913// If there is a default MethodConfig for the service, we return it.
914// Otherwise, we return an empty MethodConfig.
915func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {916// TODO: Avoid the locking here.917cc.mu.RLock()918defer cc.mu.RUnlock()919return getMethodConfig(cc.sc, method)920}
921
922func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {923cc.mu.RLock()924defer cc.mu.RUnlock()925if cc.sc == nil {926return nil927}928return cc.sc.healthCheckConfig929}
930
931func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {932t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{933Ctx: ctx,934FullMethodName: method,935})936if err != nil {937return nil, nil, toRPCErr(err)938}939return t, done, nil940}
941
942func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {943if sc == nil {944// should never reach here.945return946}947cc.sc = sc948if configSelector != nil {949cc.safeConfigSelector.UpdateConfigSelector(configSelector)950}951
952if cc.sc.retryThrottling != nil {953newThrottler := &retryThrottler{954tokens: cc.sc.retryThrottling.MaxTokens,955max: cc.sc.retryThrottling.MaxTokens,956thresh: cc.sc.retryThrottling.MaxTokens / 2,957ratio: cc.sc.retryThrottling.TokenRatio,958}959cc.retryThrottler.Store(newThrottler)960} else {961cc.retryThrottler.Store((*retryThrottler)(nil))962}963
964if cc.dopts.balancerBuilder == nil {965// Only look at balancer types and switch balancer if balancer dial966// option is not set.967var newBalancerName string968if cc.sc != nil && cc.sc.lbConfig != nil {969newBalancerName = cc.sc.lbConfig.name970} else {971var isGRPCLB bool972for _, a := range addrs {973if a.Type == resolver.GRPCLB {974isGRPCLB = true975break976}977}978if isGRPCLB {979newBalancerName = grpclbName980} else if cc.sc != nil && cc.sc.LB != nil {981newBalancerName = *cc.sc.LB982} else {983newBalancerName = PickFirstBalancerName984}985}986cc.switchBalancer(newBalancerName)987} else if cc.balancerWrapper == nil {988// Balancer dial option was set, and this is the first time handling989// resolved addresses. Build a balancer with dopts.balancerBuilder.990cc.curBalancerName = cc.dopts.balancerBuilder.Name()991cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)992}993}
994
995func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {996cc.mu.RLock()997r := cc.resolverWrapper998cc.mu.RUnlock()999if r == nil {1000return1001}1002go r.resolveNow(o)1003}
1004
1005// ResetConnectBackoff wakes up all subchannels in transient failure and causes
1006// them to attempt another connection immediately. It also resets the backoff
1007// times used for subsequent attempts regardless of the current state.
1008//
1009// In general, this function should not be used. Typical service or network
1010// outages result in a reasonable client reconnection strategy by default.
1011// However, if a previously unavailable network becomes available, this may be
1012// used to trigger an immediate reconnect.
1013//
1014// Experimental
1015//
1016// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1017// later release.
1018func (cc *ClientConn) ResetConnectBackoff() {1019cc.mu.Lock()1020conns := cc.conns1021cc.mu.Unlock()1022for ac := range conns {1023ac.resetConnectBackoff()1024}1025}
1026
1027// Close tears down the ClientConn and all underlying connections.
1028func (cc *ClientConn) Close() error {1029defer cc.cancel()1030
1031cc.mu.Lock()1032if cc.conns == nil {1033cc.mu.Unlock()1034return ErrClientConnClosing1035}1036conns := cc.conns1037cc.conns = nil1038cc.csMgr.updateState(connectivity.Shutdown)1039
1040rWrapper := cc.resolverWrapper1041cc.resolverWrapper = nil1042bWrapper := cc.balancerWrapper1043cc.balancerWrapper = nil1044cc.mu.Unlock()1045
1046cc.blockingpicker.close()1047
1048if rWrapper != nil {1049rWrapper.close()1050}1051if bWrapper != nil {1052bWrapper.close()1053}1054
1055for ac := range conns {1056ac.tearDown(ErrClientConnClosing)1057}1058if channelz.IsOn() {1059ted := &channelz.TraceEventDesc{1060Desc: "Channel Deleted",1061Severity: channelz.CtInfo,1062}1063if cc.dopts.channelzParentID != 0 {1064ted.Parent = &channelz.TraceEventDesc{1065Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),1066Severity: channelz.CtInfo,1067}1068}1069channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)1070// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to1071// the entity being deleted, and thus prevent it from being deleted right away.1072channelz.RemoveEntry(cc.channelzID)1073}1074return nil1075}
1076
1077// addrConn is a network connection to a given address.
1078type addrConn struct {1079ctx context.Context1080cancel context.CancelFunc1081
1082cc *ClientConn1083dopts dialOptions
1084acbw balancer.SubConn1085scopts balancer.NewSubConnOptions1086
1087// transport is set when there's a viable transport (note: ac state may not be READY as LB channel1088// health checking may require server to report healthy to set ac to READY), and is reset1089// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway1090// is received, transport is closed, ac has been torn down).1091transport transport.ClientTransport // The current transport.1092
1093mu sync.Mutex1094curAddr resolver.Address // The current address.1095addrs []resolver.Address // All addresses that the resolver resolved to.1096
1097// Use updateConnectivityState for updating addrConn's connectivity state.1098state connectivity.State1099
1100backoffIdx int // Needs to be stateful for resetConnectBackoff.1101resetBackoff chan struct{}1102
1103channelzID int64 // channelz unique identification number.1104czData *channelzData1105}
1106
1107// Note: this requires a lock on ac.mu.
1108func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {1109if ac.state == s {1110return1111}1112ac.state = s1113channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)1114ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)1115}
1116
1117// adjustParams updates parameters used to create transports upon
1118// receiving a GoAway.
1119func (ac *addrConn) adjustParams(r transport.GoAwayReason) {1120switch r {1121case transport.GoAwayTooManyPings:1122v := 2 * ac.dopts.copts.KeepaliveParams.Time1123ac.cc.mu.Lock()1124if v > ac.cc.mkp.Time {1125ac.cc.mkp.Time = v1126}1127ac.cc.mu.Unlock()1128}1129}
1130
1131func (ac *addrConn) resetTransport() {1132for i := 0; ; i++ {1133if i > 0 {1134ac.cc.resolveNow(resolver.ResolveNowOptions{})1135}1136
1137ac.mu.Lock()1138if ac.state == connectivity.Shutdown {1139ac.mu.Unlock()1140return1141}1142
1143addrs := ac.addrs1144backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)1145// This will be the duration that dial gets to finish.1146dialDuration := minConnectTimeout1147if ac.dopts.minConnectTimeout != nil {1148dialDuration = ac.dopts.minConnectTimeout()1149}1150
1151if dialDuration < backoffFor {1152// Give dial more time as we keep failing to connect.1153dialDuration = backoffFor1154}1155// We can potentially spend all the time trying the first address, and1156// if the server accepts the connection and then hangs, the following1157// addresses will never be tried.1158//1159// The spec doesn't mention what should be done for multiple addresses.1160// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm1161connectDeadline := time.Now().Add(dialDuration)1162
1163ac.updateConnectivityState(connectivity.Connecting, nil)1164ac.transport = nil1165ac.mu.Unlock()1166
1167newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)1168if err != nil {1169// After exhausting all addresses, the addrConn enters1170// TRANSIENT_FAILURE.1171ac.mu.Lock()1172if ac.state == connectivity.Shutdown {1173ac.mu.Unlock()1174return1175}1176ac.updateConnectivityState(connectivity.TransientFailure, err)1177
1178// Backoff.1179b := ac.resetBackoff1180ac.mu.Unlock()1181
1182timer := time.NewTimer(backoffFor)1183select {1184case <-timer.C:1185ac.mu.Lock()1186ac.backoffIdx++1187ac.mu.Unlock()1188case <-b:1189timer.Stop()1190case <-ac.ctx.Done():1191timer.Stop()1192return1193}1194continue1195}1196
1197ac.mu.Lock()1198if ac.state == connectivity.Shutdown {1199ac.mu.Unlock()1200newTr.Close()1201return1202}1203ac.curAddr = addr1204ac.transport = newTr1205ac.backoffIdx = 01206
1207hctx, hcancel := context.WithCancel(ac.ctx)1208ac.startHealthCheck(hctx)1209ac.mu.Unlock()1210
1211// Block until the created transport is down. And when this happens,1212// we restart from the top of the addr list.1213<-reconnect.Done()1214hcancel()1215// restart connecting - the top of the loop will set state to1216// CONNECTING. This is against the current connectivity semantics doc,1217// however it allows for graceful behavior for RPCs not yet dispatched1218// - unfortunate timing would otherwise lead to the RPC failing even1219// though the TRANSIENT_FAILURE state (called for by the doc) would be1220// instantaneous.1221//1222// Ideally we should transition to Idle here and block until there is1223// RPC activity that leads to the balancer requesting a reconnect of1224// the associated SubConn.1225}1226}
1227
1228// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1229// first successful one. It returns the transport, the address and a Event in
1230// the successful case. The Event fires when the returned transport disconnects.
1231func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {1232var firstConnErr error1233for _, addr := range addrs {1234ac.mu.Lock()1235if ac.state == connectivity.Shutdown {1236ac.mu.Unlock()1237return nil, resolver.Address{}, nil, errConnClosing1238}1239
1240ac.cc.mu.RLock()1241ac.dopts.copts.KeepaliveParams = ac.cc.mkp1242ac.cc.mu.RUnlock()1243
1244copts := ac.dopts.copts1245if ac.scopts.CredsBundle != nil {1246copts.CredsBundle = ac.scopts.CredsBundle1247}1248ac.mu.Unlock()1249
1250channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)1251
1252newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)1253if err == nil {1254return newTr, addr, reconnect, nil1255}1256if firstConnErr == nil {1257firstConnErr = err1258}1259ac.cc.updateConnectionError(err)1260}1261
1262// Couldn't connect to any address.1263return nil, resolver.Address{}, nil, firstConnErr1264}
1265
1266// createTransport creates a connection to addr. It returns the transport and a
1267// Event in the successful case. The Event fires when the returned transport
1268// disconnects.
1269func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {1270prefaceReceived := make(chan struct{})1271onCloseCalled := make(chan struct{})1272reconnect := grpcsync.NewEvent()1273
1274// addr.ServerName takes precedent over ClientConn authority, if present.1275if addr.ServerName == "" {1276addr.ServerName = ac.cc.authority1277}1278
1279once := sync.Once{}1280onGoAway := func(r transport.GoAwayReason) {1281ac.mu.Lock()1282ac.adjustParams(r)1283once.Do(func() {1284if ac.state == connectivity.Ready {1285// Prevent this SubConn from being used for new RPCs by setting its1286// state to Connecting.1287//1288// TODO: this should be Idle when grpc-go properly supports it.1289ac.updateConnectivityState(connectivity.Connecting, nil)1290}1291})1292ac.mu.Unlock()1293reconnect.Fire()1294}1295
1296onClose := func() {1297ac.mu.Lock()1298once.Do(func() {1299if ac.state == connectivity.Ready {1300// Prevent this SubConn from being used for new RPCs by setting its1301// state to Connecting.1302//1303// TODO: this should be Idle when grpc-go properly supports it.1304ac.updateConnectivityState(connectivity.Connecting, nil)1305}1306})1307ac.mu.Unlock()1308close(onCloseCalled)1309reconnect.Fire()1310}1311
1312onPrefaceReceipt := func() {1313close(prefaceReceived)1314}1315
1316connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)1317defer cancel()1318if channelz.IsOn() {1319copts.ChannelzParentID = ac.channelzID1320}1321
1322newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)1323if err != nil {1324// newTr is either nil, or closed.1325channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)1326return nil, nil, err1327}1328
1329select {1330case <-time.After(time.Until(connectDeadline)):1331// We didn't get the preface in time.1332newTr.Close()1333channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)1334return nil, nil, errors.New("timed out waiting for server handshake")1335case <-prefaceReceived:1336// We got the preface - huzzah! things are good.1337case <-onCloseCalled:1338// The transport has already closed - noop.1339return nil, nil, errors.New("connection closed")1340// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.1341}1342return newTr, reconnect, nil1343}
1344
1345// startHealthCheck starts the health checking stream (RPC) to watch the health
1346// stats of this connection if health checking is requested and configured.
1347//
1348// LB channel health checking is enabled when all requirements below are met:
1349// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1350// 2. internal.HealthCheckFunc is set by importing the grpc/health package
1351// 3. a service config with non-empty healthCheckConfig field is provided
1352// 4. the load balancer requests it
1353//
1354// It sets addrConn to READY if the health checking stream is not started.
1355//
1356// Caller must hold ac.mu.
1357func (ac *addrConn) startHealthCheck(ctx context.Context) {1358var healthcheckManagingState bool1359defer func() {1360if !healthcheckManagingState {1361ac.updateConnectivityState(connectivity.Ready, nil)1362}1363}()1364
1365if ac.cc.dopts.disableHealthCheck {1366return1367}1368healthCheckConfig := ac.cc.healthCheckConfig()1369if healthCheckConfig == nil {1370return1371}1372if !ac.scopts.HealthCheckEnabled {1373return1374}1375healthCheckFunc := ac.cc.dopts.healthCheckFunc1376if healthCheckFunc == nil {1377// The health package is not imported to set health check function.1378//1379// TODO: add a link to the health check doc in the error message.1380channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")1381return1382}1383
1384healthcheckManagingState = true1385
1386// Set up the health check helper functions.1387currentTr := ac.transport1388newStream := func(method string) (interface{}, error) {1389ac.mu.Lock()1390if ac.transport != currentTr {1391ac.mu.Unlock()1392return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")1393}1394ac.mu.Unlock()1395return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)1396}1397setConnectivityState := func(s connectivity.State, lastErr error) {1398ac.mu.Lock()1399defer ac.mu.Unlock()1400if ac.transport != currentTr {1401return1402}1403ac.updateConnectivityState(s, lastErr)1404}1405// Start the health checking stream.1406go func() {1407err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)1408if err != nil {1409if status.Code(err) == codes.Unimplemented {1410channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")1411} else {1412channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)1413}1414}1415}()1416}
1417
1418func (ac *addrConn) resetConnectBackoff() {1419ac.mu.Lock()1420close(ac.resetBackoff)1421ac.backoffIdx = 01422ac.resetBackoff = make(chan struct{})1423ac.mu.Unlock()1424}
1425
1426// getReadyTransport returns the transport if ac's state is READY.
1427// Otherwise it returns nil, false.
1428// If ac's state is IDLE, it will trigger ac to connect.
1429func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {1430ac.mu.Lock()1431if ac.state == connectivity.Ready && ac.transport != nil {1432t := ac.transport1433ac.mu.Unlock()1434return t, true1435}1436var idle bool1437if ac.state == connectivity.Idle {1438idle = true1439}1440ac.mu.Unlock()1441// Trigger idle ac to connect.1442if idle {1443ac.connect()1444}1445return nil, false1446}
1447
1448// tearDown starts to tear down the addrConn.
1449// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1450// some edge cases (e.g., the caller opens and closes many addrConn's in a
1451// tight loop.
1452// tearDown doesn't remove ac from ac.cc.conns.
1453func (ac *addrConn) tearDown(err error) {1454ac.mu.Lock()1455if ac.state == connectivity.Shutdown {1456ac.mu.Unlock()1457return1458}1459curTr := ac.transport1460ac.transport = nil1461// We have to set the state to Shutdown before anything else to prevent races1462// between setting the state and logic that waits on context cancellation / etc.1463ac.updateConnectivityState(connectivity.Shutdown, nil)1464ac.cancel()1465ac.curAddr = resolver.Address{}1466if err == errConnDrain && curTr != nil {1467// GracefulClose(...) may be executed multiple times when1468// i) receiving multiple GoAway frames from the server; or1469// ii) there are concurrent name resolver/Balancer triggered1470// address removal and GoAway.1471// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.1472ac.mu.Unlock()1473curTr.GracefulClose()1474ac.mu.Lock()1475}1476if channelz.IsOn() {1477channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{1478Desc: "Subchannel Deleted",1479Severity: channelz.CtInfo,1480Parent: &channelz.TraceEventDesc{1481Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),1482Severity: channelz.CtInfo,1483},1484})1485// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to1486// the entity being deleted, and thus prevent it from being deleted right away.1487channelz.RemoveEntry(ac.channelzID)1488}1489ac.mu.Unlock()1490}
1491
1492func (ac *addrConn) getState() connectivity.State {1493ac.mu.Lock()1494defer ac.mu.Unlock()1495return ac.state1496}
1497
1498func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {1499ac.mu.Lock()1500addr := ac.curAddr.Addr1501ac.mu.Unlock()1502return &channelz.ChannelInternalMetric{1503State: ac.getState(),1504Target: addr,1505CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),1506CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),1507CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),1508LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),1509}1510}
1511
1512func (ac *addrConn) incrCallsStarted() {1513atomic.AddInt64(&ac.czData.callsStarted, 1)1514atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())1515}
1516
1517func (ac *addrConn) incrCallsSucceeded() {1518atomic.AddInt64(&ac.czData.callsSucceeded, 1)1519}
1520
1521func (ac *addrConn) incrCallsFailed() {1522atomic.AddInt64(&ac.czData.callsFailed, 1)1523}
1524
1525type retryThrottler struct {1526max float641527thresh float641528ratio float641529
1530mu sync.Mutex1531tokens float64 // TODO(dfawley): replace with atomic and remove lock.1532}
1533
1534// throttle subtracts a retry token from the pool and returns whether a retry
1535// should be throttled (disallowed) based upon the retry throttling policy in
1536// the service config.
1537func (rt *retryThrottler) throttle() bool {1538if rt == nil {1539return false1540}1541rt.mu.Lock()1542defer rt.mu.Unlock()1543rt.tokens--1544if rt.tokens < 0 {1545rt.tokens = 01546}1547return rt.tokens <= rt.thresh1548}
1549
1550func (rt *retryThrottler) successfulRPC() {1551if rt == nil {1552return1553}1554rt.mu.Lock()1555defer rt.mu.Unlock()1556rt.tokens += rt.ratio1557if rt.tokens > rt.max {1558rt.tokens = rt.max1559}1560}
1561
1562type channelzChannel struct {1563cc *ClientConn1564}
1565
1566func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {1567return c.cc.channelzMetric()1568}
1569
1570// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1571// underlying connections within the specified timeout.
1572//
1573// Deprecated: This error is never returned by grpc and should not be
1574// referenced by users.
1575var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")1576
1577func (cc *ClientConn) getResolver(scheme string) resolver.Builder {1578for _, rb := range cc.dopts.resolvers {1579if scheme == rb.Scheme() {1580return rb1581}1582}1583return resolver.Get(scheme)1584}
1585
1586func (cc *ClientConn) updateConnectionError(err error) {1587cc.lceMu.Lock()1588cc.lastConnectionError = err1589cc.lceMu.Unlock()1590}
1591
1592func (cc *ClientConn) connectionError() error {1593cc.lceMu.Lock()1594defer cc.lceMu.Unlock()1595return cc.lastConnectionError1596}
1597