cubefs

Форк
0
/
balancer_conn_wrappers.go 
246 строк · 6.6 Кб
1
/*
2
 *
3
 * Copyright 2017 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package grpc
20

21
import (
22
	"fmt"
23
	"sync"
24

25
	"google.golang.org/grpc/balancer"
26
	"google.golang.org/grpc/connectivity"
27
	"google.golang.org/grpc/internal/buffer"
28
	"google.golang.org/grpc/internal/channelz"
29
	"google.golang.org/grpc/internal/grpcsync"
30
	"google.golang.org/grpc/resolver"
31
)
32

33
// scStateUpdate contains the subConn and the new state it changed to.
34
type scStateUpdate struct {
35
	sc    balancer.SubConn
36
	state connectivity.State
37
	err   error
38
}
39

40
// ccBalancerWrapper is a wrapper on top of cc for balancers.
41
// It implements balancer.ClientConn interface.
42
type ccBalancerWrapper struct {
43
	cc         *ClientConn
44
	balancerMu sync.Mutex // synchronizes calls to the balancer
45
	balancer   balancer.Balancer
46
	scBuffer   *buffer.Unbounded
47
	done       *grpcsync.Event
48

49
	mu       sync.Mutex
50
	subConns map[*acBalancerWrapper]struct{}
51
}
52

53
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
54
	ccb := &ccBalancerWrapper{
55
		cc:       cc,
56
		scBuffer: buffer.NewUnbounded(),
57
		done:     grpcsync.NewEvent(),
58
		subConns: make(map[*acBalancerWrapper]struct{}),
59
	}
60
	go ccb.watcher()
61
	ccb.balancer = b.Build(ccb, bopts)
62
	return ccb
63
}
64

65
// watcher balancer functions sequentially, so the balancer can be implemented
66
// lock-free.
67
func (ccb *ccBalancerWrapper) watcher() {
68
	for {
69
		select {
70
		case t := <-ccb.scBuffer.Get():
71
			ccb.scBuffer.Load()
72
			if ccb.done.HasFired() {
73
				break
74
			}
75
			ccb.balancerMu.Lock()
76
			su := t.(*scStateUpdate)
77
			ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
78
			ccb.balancerMu.Unlock()
79
		case <-ccb.done.Done():
80
		}
81

82
		if ccb.done.HasFired() {
83
			ccb.balancer.Close()
84
			ccb.mu.Lock()
85
			scs := ccb.subConns
86
			ccb.subConns = nil
87
			ccb.mu.Unlock()
88
			for acbw := range scs {
89
				ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
90
			}
91
			ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
92
			return
93
		}
94
	}
95
}
96

97
func (ccb *ccBalancerWrapper) close() {
98
	ccb.done.Fire()
99
}
100

101
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
102
	// When updating addresses for a SubConn, if the address in use is not in
103
	// the new addresses, the old ac will be tearDown() and a new ac will be
104
	// created. tearDown() generates a state change with Shutdown state, we
105
	// don't want the balancer to receive this state change. So before
106
	// tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
107
	// this function will be called with (nil, Shutdown). We don't need to call
108
	// balancer method in this case.
109
	if sc == nil {
110
		return
111
	}
112
	ccb.scBuffer.Put(&scStateUpdate{
113
		sc:    sc,
114
		state: s,
115
		err:   err,
116
	})
117
}
118

119
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
120
	ccb.balancerMu.Lock()
121
	defer ccb.balancerMu.Unlock()
122
	return ccb.balancer.UpdateClientConnState(*ccs)
123
}
124

125
func (ccb *ccBalancerWrapper) resolverError(err error) {
126
	ccb.balancerMu.Lock()
127
	ccb.balancer.ResolverError(err)
128
	ccb.balancerMu.Unlock()
129
}
130

131
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
132
	if len(addrs) <= 0 {
133
		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
134
	}
135
	ccb.mu.Lock()
136
	defer ccb.mu.Unlock()
137
	if ccb.subConns == nil {
138
		return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
139
	}
140
	ac, err := ccb.cc.newAddrConn(addrs, opts)
141
	if err != nil {
142
		return nil, err
143
	}
144
	acbw := &acBalancerWrapper{ac: ac}
145
	acbw.ac.mu.Lock()
146
	ac.acbw = acbw
147
	acbw.ac.mu.Unlock()
148
	ccb.subConns[acbw] = struct{}{}
149
	return acbw, nil
150
}
151

152
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
153
	acbw, ok := sc.(*acBalancerWrapper)
154
	if !ok {
155
		return
156
	}
157
	ccb.mu.Lock()
158
	defer ccb.mu.Unlock()
159
	if ccb.subConns == nil {
160
		return
161
	}
162
	delete(ccb.subConns, acbw)
163
	ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
164
}
165

166
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
167
	ccb.mu.Lock()
168
	defer ccb.mu.Unlock()
169
	if ccb.subConns == nil {
170
		return
171
	}
172
	// Update picker before updating state.  Even though the ordering here does
173
	// not matter, it can lead to multiple calls of Pick in the common start-up
174
	// case where we wait for ready and then perform an RPC.  If the picker is
175
	// updated later, we could call the "connecting" picker when the state is
176
	// updated, and then call the "ready" picker after the picker gets updated.
177
	ccb.cc.blockingpicker.updatePicker(s.Picker)
178
	ccb.cc.csMgr.updateState(s.ConnectivityState)
179
}
180

181
func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
182
	ccb.cc.resolveNow(o)
183
}
184

185
func (ccb *ccBalancerWrapper) Target() string {
186
	return ccb.cc.target
187
}
188

189
// acBalancerWrapper is a wrapper on top of ac for balancers.
190
// It implements balancer.SubConn interface.
191
type acBalancerWrapper struct {
192
	mu sync.Mutex
193
	ac *addrConn
194
}
195

196
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
197
	acbw.mu.Lock()
198
	defer acbw.mu.Unlock()
199
	if len(addrs) <= 0 {
200
		acbw.ac.tearDown(errConnDrain)
201
		return
202
	}
203
	if !acbw.ac.tryUpdateAddrs(addrs) {
204
		cc := acbw.ac.cc
205
		opts := acbw.ac.scopts
206
		acbw.ac.mu.Lock()
207
		// Set old ac.acbw to nil so the Shutdown state update will be ignored
208
		// by balancer.
209
		//
210
		// TODO(bar) the state transition could be wrong when tearDown() old ac
211
		// and creating new ac, fix the transition.
212
		acbw.ac.acbw = nil
213
		acbw.ac.mu.Unlock()
214
		acState := acbw.ac.getState()
215
		acbw.ac.tearDown(errConnDrain)
216

217
		if acState == connectivity.Shutdown {
218
			return
219
		}
220

221
		ac, err := cc.newAddrConn(addrs, opts)
222
		if err != nil {
223
			channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
224
			return
225
		}
226
		acbw.ac = ac
227
		ac.mu.Lock()
228
		ac.acbw = acbw
229
		ac.mu.Unlock()
230
		if acState != connectivity.Idle {
231
			ac.connect()
232
		}
233
	}
234
}
235

236
func (acbw *acBalancerWrapper) Connect() {
237
	acbw.mu.Lock()
238
	defer acbw.mu.Unlock()
239
	acbw.ac.connect()
240
}
241

242
func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
243
	acbw.mu.Lock()
244
	defer acbw.mu.Unlock()
245
	return acbw.ac
246
}
247

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.