3
* Copyright 2017 gRPC authors.
5
* Licensed under the Apache License, Version 2.0 (the "License");
6
* you may not use this file except in compliance with the License.
7
* You may obtain a copy of the License at
9
* http://www.apache.org/licenses/LICENSE-2.0
11
* Unless required by applicable law or agreed to in writing, software
12
* distributed under the License is distributed on an "AS IS" BASIS,
13
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
* See the License for the specific language governing permissions and
15
* limitations under the License.
25
"google.golang.org/grpc/balancer"
26
"google.golang.org/grpc/connectivity"
27
"google.golang.org/grpc/grpclog"
28
"google.golang.org/grpc/resolver"
31
var logger = grpclog.Component("balancer")
33
type baseBuilder struct {
35
pickerBuilder PickerBuilder
39
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
42
pickerBuilder: bb.pickerBuilder,
44
subConns: make(map[resolver.Address]balancer.SubConn),
45
scStates: make(map[balancer.SubConn]connectivity.State),
46
csEvltr: &balancer.ConnectivityStateEvaluator{},
49
// Initialize picker to a picker that always returns
50
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
51
// may call UpdateState with this picker.
52
bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
56
func (bb *baseBuilder) Name() string {
60
type baseBalancer struct {
61
cc balancer.ClientConn
62
pickerBuilder PickerBuilder
64
csEvltr *balancer.ConnectivityStateEvaluator
65
state connectivity.State
67
subConns map[resolver.Address]balancer.SubConn // `attributes` is stripped from the keys of this map (the addresses)
68
scStates map[balancer.SubConn]connectivity.State
69
picker balancer.Picker
72
resolverErr error // the last error reported by the resolver; cleared on successful resolution
73
connErr error // the last connection error; cleared upon leaving TransientFailure
76
func (b *baseBalancer) ResolverError(err error) {
78
if len(b.subConns) == 0 {
79
b.state = connectivity.TransientFailure
82
if b.state != connectivity.TransientFailure {
83
// The picker will not change since the balancer does not currently
88
b.cc.UpdateState(balancer.State{
89
ConnectivityState: b.state,
94
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
95
// TODO: handle s.ResolverState.ServiceConfig?
97
logger.Info("base.baseBalancer: got new ClientConn state: ", s)
99
// Successful resolution; clear resolver error and ensure we return nil.
101
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
102
addrsSet := make(map[resolver.Address]struct{})
103
for _, a := range s.ResolverState.Addresses {
104
// Strip attributes from addresses before using them as map keys. So
105
// that when two addresses only differ in attributes pointers (but with
106
// the same attribute content), they are considered the same address.
108
// Note that this doesn't handle the case where the attribute content is
109
// different. So if users want to set different attributes to create
110
// duplicate connections to the same backend, it doesn't work. This is
111
// fine for now, because duplicate is done by setting Metadata today.
113
// TODO: read attributes to handle duplicate connections.
115
aNoAttrs.Attributes = nil
116
addrsSet[aNoAttrs] = struct{}{}
117
if sc, ok := b.subConns[aNoAttrs]; !ok {
118
// a is a new address (not existing in b.subConns).
120
// When creating SubConn, the original address with attributes is
121
// passed through. So that connection configurations in attributes
122
// (like creds) will be used.
123
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
125
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
128
b.subConns[aNoAttrs] = sc
129
b.scStates[sc] = connectivity.Idle
132
// Always update the subconn's address in case the attributes
135
// The SubConn does a reflect.DeepEqual of the new and old
136
// addresses. So this is a noop if the current address is the same
137
// as the old one (including attributes).
138
sc.UpdateAddresses([]resolver.Address{a})
141
for a, sc := range b.subConns {
142
// a was removed by resolver.
143
if _, ok := addrsSet[a]; !ok {
144
b.cc.RemoveSubConn(sc)
145
delete(b.subConns, a)
146
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
147
// The entry will be deleted in UpdateSubConnState.
150
// If resolver state contains no addresses, return an error so ClientConn
151
// will trigger re-resolve. Also records this as an resolver error, so when
152
// the overall state turns transient failure, the error message will have
153
// the zero address information.
154
if len(s.ResolverState.Addresses) == 0 {
155
b.ResolverError(errors.New("produced zero addresses"))
156
return balancer.ErrBadResolverState
161
// mergeErrors builds an error from the last connection error and the last
162
// resolver error. Must only be called if b.state is TransientFailure.
163
func (b *baseBalancer) mergeErrors() error {
164
// connErr must always be non-nil unless there are no SubConns, in which
165
// case resolverErr must be non-nil.
166
if b.connErr == nil {
167
return fmt.Errorf("last resolver error: %v", b.resolverErr)
169
if b.resolverErr == nil {
170
return fmt.Errorf("last connection error: %v", b.connErr)
172
return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
175
// regeneratePicker takes a snapshot of the balancer, and generates a picker
176
// from it. The picker is
177
// - errPicker if the balancer is in TransientFailure,
178
// - built by the pickerBuilder with all READY SubConns otherwise.
179
func (b *baseBalancer) regeneratePicker() {
180
if b.state == connectivity.TransientFailure {
181
b.picker = NewErrPicker(b.mergeErrors())
184
readySCs := make(map[balancer.SubConn]SubConnInfo)
186
// Filter out all ready SCs from full subConn map.
187
for addr, sc := range b.subConns {
188
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
189
readySCs[sc] = SubConnInfo{Address: addr}
192
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
195
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
196
s := state.ConnectivityState
198
logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
200
oldS, ok := b.scStates[sc]
203
logger.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
207
if oldS == connectivity.TransientFailure && s == connectivity.Connecting {
208
// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent
209
// CONNECTING transitions to prevent the aggregated state from being
210
// always CONNECTING when many backends exist but are all down.
215
case connectivity.Idle:
217
case connectivity.Shutdown:
218
// When an address was removed by resolver, b called RemoveSubConn but
219
// kept the sc's state in scStates. Remove state for this sc here.
220
delete(b.scStates, sc)
221
case connectivity.TransientFailure:
222
// Save error to be reported via picker.
223
b.connErr = state.ConnectionError
226
b.state = b.csEvltr.RecordTransition(oldS, s)
228
// Regenerate picker when one of the following happens:
229
// - this sc entered or left ready
230
// - the aggregated state of balancer is TransientFailure
231
// (may need to update error message)
232
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
233
b.state == connectivity.TransientFailure {
237
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
240
// Close is a nop because base balancer doesn't have internal state to clean up,
241
// and it doesn't need to call RemoveSubConn for the SubConns.
242
func (b *baseBalancer) Close() {
245
// NewErrPicker returns a Picker that always returns err on Pick().
246
func NewErrPicker(err error) balancer.Picker {
247
return &errPicker{err: err}
250
// NewErrPickerV2 is temporarily defined for backward compatibility reasons.
252
// Deprecated: use NewErrPicker instead.
253
var NewErrPickerV2 = NewErrPicker
255
type errPicker struct {
256
err error // Pick() always returns this err.
259
func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
260
return balancer.PickResult{}, p.err