podman

Форк
0
/
picker_wrapper.go 
223 строки · 6.5 Кб
1
/*
2
 *
3
 * Copyright 2017 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package grpc
20

21
import (
22
	"context"
23
	"io"
24
	"sync"
25

26
	"google.golang.org/grpc/balancer"
27
	"google.golang.org/grpc/codes"
28
	"google.golang.org/grpc/internal/channelz"
29
	istatus "google.golang.org/grpc/internal/status"
30
	"google.golang.org/grpc/internal/transport"
31
	"google.golang.org/grpc/stats"
32
	"google.golang.org/grpc/status"
33
)
34

35
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
36
// actions and unblock when there's a picker update.
37
type pickerWrapper struct {
38
	mu            sync.Mutex
39
	done          bool
40
	blockingCh    chan struct{}
41
	picker        balancer.Picker
42
	statsHandlers []stats.Handler // to record blocking picker calls
43
}
44

45
func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
46
	return &pickerWrapper{
47
		blockingCh:    make(chan struct{}),
48
		statsHandlers: statsHandlers,
49
	}
50
}
51

52
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
53
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
54
	pw.mu.Lock()
55
	if pw.done {
56
		pw.mu.Unlock()
57
		return
58
	}
59
	pw.picker = p
60
	// pw.blockingCh should never be nil.
61
	close(pw.blockingCh)
62
	pw.blockingCh = make(chan struct{})
63
	pw.mu.Unlock()
64
}
65

66
// doneChannelzWrapper performs the following:
67
//   - increments the calls started channelz counter
68
//   - wraps the done function in the passed in result to increment the calls
69
//     failed or calls succeeded channelz counter before invoking the actual
70
//     done function.
71
func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
72
	ac := acbw.ac
73
	ac.incrCallsStarted()
74
	done := result.Done
75
	result.Done = func(b balancer.DoneInfo) {
76
		if b.Err != nil && b.Err != io.EOF {
77
			ac.incrCallsFailed()
78
		} else {
79
			ac.incrCallsSucceeded()
80
		}
81
		if done != nil {
82
			done(b)
83
		}
84
	}
85
}
86

87
// pick returns the transport that will be used for the RPC.
88
// It may block in the following cases:
89
// - there's no picker
90
// - the current picker returns ErrNoSubConnAvailable
91
// - the current picker returns other errors and failfast is false.
92
// - the subConn returned by the current picker is not READY
93
// When one of these situations happens, pick blocks until the picker gets updated.
94
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
95
	var ch chan struct{}
96

97
	var lastPickErr error
98

99
	for {
100
		pw.mu.Lock()
101
		if pw.done {
102
			pw.mu.Unlock()
103
			return nil, balancer.PickResult{}, ErrClientConnClosing
104
		}
105

106
		if pw.picker == nil {
107
			ch = pw.blockingCh
108
		}
109
		if ch == pw.blockingCh {
110
			// This could happen when either:
111
			// - pw.picker is nil (the previous if condition), or
112
			// - has called pick on the current picker.
113
			pw.mu.Unlock()
114
			select {
115
			case <-ctx.Done():
116
				var errStr string
117
				if lastPickErr != nil {
118
					errStr = "latest balancer error: " + lastPickErr.Error()
119
				} else {
120
					errStr = ctx.Err().Error()
121
				}
122
				switch ctx.Err() {
123
				case context.DeadlineExceeded:
124
					return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
125
				case context.Canceled:
126
					return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
127
				}
128
			case <-ch:
129
			}
130
			continue
131
		}
132

133
		// If the channel is set, it means that the pick call had to wait for a
134
		// new picker at some point. Either it's the first iteration and this
135
		// function received the first picker, or a picker errored with
136
		// ErrNoSubConnAvailable or errored with failfast set to false, which
137
		// will trigger a continue to the next iteration. In the first case this
138
		// conditional will hit if this call had to block (the channel is set).
139
		// In the second case, the only way it will get to this conditional is
140
		// if there is a new picker.
141
		if ch != nil {
142
			for _, sh := range pw.statsHandlers {
143
				sh.HandleRPC(ctx, &stats.PickerUpdated{})
144
			}
145
		}
146

147
		ch = pw.blockingCh
148
		p := pw.picker
149
		pw.mu.Unlock()
150

151
		pickResult, err := p.Pick(info)
152
		if err != nil {
153
			if err == balancer.ErrNoSubConnAvailable {
154
				continue
155
			}
156
			if st, ok := status.FromError(err); ok {
157
				// Status error: end the RPC unconditionally with this status.
158
				// First restrict the code to the list allowed by gRFC A54.
159
				if istatus.IsRestrictedControlPlaneCode(st) {
160
					err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
161
				}
162
				return nil, balancer.PickResult{}, dropError{error: err}
163
			}
164
			// For all other errors, wait for ready RPCs should block and other
165
			// RPCs should fail with unavailable.
166
			if !failfast {
167
				lastPickErr = err
168
				continue
169
			}
170
			return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
171
		}
172

173
		acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
174
		if !ok {
175
			logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
176
			continue
177
		}
178
		if t := acbw.ac.getReadyTransport(); t != nil {
179
			if channelz.IsOn() {
180
				doneChannelzWrapper(acbw, &pickResult)
181
				return t, pickResult, nil
182
			}
183
			return t, pickResult, nil
184
		}
185
		if pickResult.Done != nil {
186
			// Calling done with nil error, no bytes sent and no bytes received.
187
			// DoneInfo with default value works.
188
			pickResult.Done(balancer.DoneInfo{})
189
		}
190
		logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
191
		// If ok == false, ac.state is not READY.
192
		// A valid picker always returns READY subConn. This means the state of ac
193
		// just changed, and picker will be updated shortly.
194
		// continue back to the beginning of the for loop to repick.
195
	}
196
}
197

198
func (pw *pickerWrapper) close() {
199
	pw.mu.Lock()
200
	defer pw.mu.Unlock()
201
	if pw.done {
202
		return
203
	}
204
	pw.done = true
205
	close(pw.blockingCh)
206
}
207

208
// reset clears the pickerWrapper and prepares it for being used again when idle
209
// mode is exited.
210
func (pw *pickerWrapper) reset() {
211
	pw.mu.Lock()
212
	defer pw.mu.Unlock()
213
	if pw.done {
214
		return
215
	}
216
	pw.blockingCh = make(chan struct{})
217
}
218

219
// dropError is a wrapper error that indicates the LB policy wishes to drop the
220
// RPC and not retry it.
221
type dropError struct {
222
	error
223
}
224

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.