cubefs

Форк
0
261 строка · 8.7 Кб
1
/*
2
 *
3
 * Copyright 2017 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package base
20

21
import (
22
	"errors"
23
	"fmt"
24

25
	"google.golang.org/grpc/balancer"
26
	"google.golang.org/grpc/connectivity"
27
	"google.golang.org/grpc/grpclog"
28
	"google.golang.org/grpc/resolver"
29
)
30

31
var logger = grpclog.Component("balancer")
32

33
type baseBuilder struct {
34
	name          string
35
	pickerBuilder PickerBuilder
36
	config        Config
37
}
38

39
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
40
	bal := &baseBalancer{
41
		cc:            cc,
42
		pickerBuilder: bb.pickerBuilder,
43

44
		subConns: make(map[resolver.Address]balancer.SubConn),
45
		scStates: make(map[balancer.SubConn]connectivity.State),
46
		csEvltr:  &balancer.ConnectivityStateEvaluator{},
47
		config:   bb.config,
48
	}
49
	// Initialize picker to a picker that always returns
50
	// ErrNoSubConnAvailable, because when state of a SubConn changes, we
51
	// may call UpdateState with this picker.
52
	bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
53
	return bal
54
}
55

56
func (bb *baseBuilder) Name() string {
57
	return bb.name
58
}
59

60
type baseBalancer struct {
61
	cc            balancer.ClientConn
62
	pickerBuilder PickerBuilder
63

64
	csEvltr *balancer.ConnectivityStateEvaluator
65
	state   connectivity.State
66

67
	subConns map[resolver.Address]balancer.SubConn // `attributes` is stripped from the keys of this map (the addresses)
68
	scStates map[balancer.SubConn]connectivity.State
69
	picker   balancer.Picker
70
	config   Config
71

72
	resolverErr error // the last error reported by the resolver; cleared on successful resolution
73
	connErr     error // the last connection error; cleared upon leaving TransientFailure
74
}
75

76
func (b *baseBalancer) ResolverError(err error) {
77
	b.resolverErr = err
78
	if len(b.subConns) == 0 {
79
		b.state = connectivity.TransientFailure
80
	}
81

82
	if b.state != connectivity.TransientFailure {
83
		// The picker will not change since the balancer does not currently
84
		// report an error.
85
		return
86
	}
87
	b.regeneratePicker()
88
	b.cc.UpdateState(balancer.State{
89
		ConnectivityState: b.state,
90
		Picker:            b.picker,
91
	})
92
}
93

94
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
95
	// TODO: handle s.ResolverState.ServiceConfig?
96
	if logger.V(2) {
97
		logger.Info("base.baseBalancer: got new ClientConn state: ", s)
98
	}
99
	// Successful resolution; clear resolver error and ensure we return nil.
100
	b.resolverErr = nil
101
	// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
102
	addrsSet := make(map[resolver.Address]struct{})
103
	for _, a := range s.ResolverState.Addresses {
104
		// Strip attributes from addresses before using them as map keys. So
105
		// that when two addresses only differ in attributes pointers (but with
106
		// the same attribute content), they are considered the same address.
107
		//
108
		// Note that this doesn't handle the case where the attribute content is
109
		// different. So if users want to set different attributes to create
110
		// duplicate connections to the same backend, it doesn't work. This is
111
		// fine for now, because duplicate is done by setting Metadata today.
112
		//
113
		// TODO: read attributes to handle duplicate connections.
114
		aNoAttrs := a
115
		aNoAttrs.Attributes = nil
116
		addrsSet[aNoAttrs] = struct{}{}
117
		if sc, ok := b.subConns[aNoAttrs]; !ok {
118
			// a is a new address (not existing in b.subConns).
119
			//
120
			// When creating SubConn, the original address with attributes is
121
			// passed through. So that connection configurations in attributes
122
			// (like creds) will be used.
123
			sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
124
			if err != nil {
125
				logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
126
				continue
127
			}
128
			b.subConns[aNoAttrs] = sc
129
			b.scStates[sc] = connectivity.Idle
130
			sc.Connect()
131
		} else {
132
			// Always update the subconn's address in case the attributes
133
			// changed.
134
			//
135
			// The SubConn does a reflect.DeepEqual of the new and old
136
			// addresses. So this is a noop if the current address is the same
137
			// as the old one (including attributes).
138
			sc.UpdateAddresses([]resolver.Address{a})
139
		}
140
	}
141
	for a, sc := range b.subConns {
142
		// a was removed by resolver.
143
		if _, ok := addrsSet[a]; !ok {
144
			b.cc.RemoveSubConn(sc)
145
			delete(b.subConns, a)
146
			// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
147
			// The entry will be deleted in UpdateSubConnState.
148
		}
149
	}
150
	// If resolver state contains no addresses, return an error so ClientConn
151
	// will trigger re-resolve. Also records this as an resolver error, so when
152
	// the overall state turns transient failure, the error message will have
153
	// the zero address information.
154
	if len(s.ResolverState.Addresses) == 0 {
155
		b.ResolverError(errors.New("produced zero addresses"))
156
		return balancer.ErrBadResolverState
157
	}
158
	return nil
159
}
160

161
// mergeErrors builds an error from the last connection error and the last
162
// resolver error.  Must only be called if b.state is TransientFailure.
163
func (b *baseBalancer) mergeErrors() error {
164
	// connErr must always be non-nil unless there are no SubConns, in which
165
	// case resolverErr must be non-nil.
166
	if b.connErr == nil {
167
		return fmt.Errorf("last resolver error: %v", b.resolverErr)
168
	}
169
	if b.resolverErr == nil {
170
		return fmt.Errorf("last connection error: %v", b.connErr)
171
	}
172
	return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
173
}
174

175
// regeneratePicker takes a snapshot of the balancer, and generates a picker
176
// from it. The picker is
177
//  - errPicker if the balancer is in TransientFailure,
178
//  - built by the pickerBuilder with all READY SubConns otherwise.
179
func (b *baseBalancer) regeneratePicker() {
180
	if b.state == connectivity.TransientFailure {
181
		b.picker = NewErrPicker(b.mergeErrors())
182
		return
183
	}
184
	readySCs := make(map[balancer.SubConn]SubConnInfo)
185

186
	// Filter out all ready SCs from full subConn map.
187
	for addr, sc := range b.subConns {
188
		if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
189
			readySCs[sc] = SubConnInfo{Address: addr}
190
		}
191
	}
192
	b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
193
}
194

195
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
196
	s := state.ConnectivityState
197
	if logger.V(2) {
198
		logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
199
	}
200
	oldS, ok := b.scStates[sc]
201
	if !ok {
202
		if logger.V(2) {
203
			logger.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
204
		}
205
		return
206
	}
207
	if oldS == connectivity.TransientFailure && s == connectivity.Connecting {
208
		// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent
209
		// CONNECTING transitions to prevent the aggregated state from being
210
		// always CONNECTING when many backends exist but are all down.
211
		return
212
	}
213
	b.scStates[sc] = s
214
	switch s {
215
	case connectivity.Idle:
216
		sc.Connect()
217
	case connectivity.Shutdown:
218
		// When an address was removed by resolver, b called RemoveSubConn but
219
		// kept the sc's state in scStates. Remove state for this sc here.
220
		delete(b.scStates, sc)
221
	case connectivity.TransientFailure:
222
		// Save error to be reported via picker.
223
		b.connErr = state.ConnectionError
224
	}
225

226
	b.state = b.csEvltr.RecordTransition(oldS, s)
227

228
	// Regenerate picker when one of the following happens:
229
	//  - this sc entered or left ready
230
	//  - the aggregated state of balancer is TransientFailure
231
	//    (may need to update error message)
232
	if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
233
		b.state == connectivity.TransientFailure {
234
		b.regeneratePicker()
235
	}
236

237
	b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
238
}
239

240
// Close is a nop because base balancer doesn't have internal state to clean up,
241
// and it doesn't need to call RemoveSubConn for the SubConns.
242
func (b *baseBalancer) Close() {
243
}
244

245
// NewErrPicker returns a Picker that always returns err on Pick().
246
func NewErrPicker(err error) balancer.Picker {
247
	return &errPicker{err: err}
248
}
249

250
// NewErrPickerV2 is temporarily defined for backward compatibility reasons.
251
//
252
// Deprecated: use NewErrPicker instead.
253
var NewErrPickerV2 = NewErrPicker
254

255
type errPicker struct {
256
	err error // Pick() always returns this err.
257
}
258

259
func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
260
	return balancer.PickResult{}, p.err
261
}
262

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.