podman
223 строки · 6.5 Кб
1/*
2*
3* Copyright 2017 gRPC authors.
4*
5* Licensed under the Apache License, Version 2.0 (the "License");
6* you may not use this file except in compliance with the License.
7* You may obtain a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS,
13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15* limitations under the License.
16*
17*/
18
19package grpc
20
21import (
22"context"
23"io"
24"sync"
25
26"google.golang.org/grpc/balancer"
27"google.golang.org/grpc/codes"
28"google.golang.org/grpc/internal/channelz"
29istatus "google.golang.org/grpc/internal/status"
30"google.golang.org/grpc/internal/transport"
31"google.golang.org/grpc/stats"
32"google.golang.org/grpc/status"
33)
34
35// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
36// actions and unblock when there's a picker update.
37type pickerWrapper struct {
38mu sync.Mutex
39done bool
40blockingCh chan struct{}
41picker balancer.Picker
42statsHandlers []stats.Handler // to record blocking picker calls
43}
44
45func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
46return &pickerWrapper{
47blockingCh: make(chan struct{}),
48statsHandlers: statsHandlers,
49}
50}
51
52// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
53func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
54pw.mu.Lock()
55if pw.done {
56pw.mu.Unlock()
57return
58}
59pw.picker = p
60// pw.blockingCh should never be nil.
61close(pw.blockingCh)
62pw.blockingCh = make(chan struct{})
63pw.mu.Unlock()
64}
65
66// doneChannelzWrapper performs the following:
67// - increments the calls started channelz counter
68// - wraps the done function in the passed in result to increment the calls
69// failed or calls succeeded channelz counter before invoking the actual
70// done function.
71func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
72ac := acbw.ac
73ac.incrCallsStarted()
74done := result.Done
75result.Done = func(b balancer.DoneInfo) {
76if b.Err != nil && b.Err != io.EOF {
77ac.incrCallsFailed()
78} else {
79ac.incrCallsSucceeded()
80}
81if done != nil {
82done(b)
83}
84}
85}
86
87// pick returns the transport that will be used for the RPC.
88// It may block in the following cases:
89// - there's no picker
90// - the current picker returns ErrNoSubConnAvailable
91// - the current picker returns other errors and failfast is false.
92// - the subConn returned by the current picker is not READY
93// When one of these situations happens, pick blocks until the picker gets updated.
94func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
95var ch chan struct{}
96
97var lastPickErr error
98
99for {
100pw.mu.Lock()
101if pw.done {
102pw.mu.Unlock()
103return nil, balancer.PickResult{}, ErrClientConnClosing
104}
105
106if pw.picker == nil {
107ch = pw.blockingCh
108}
109if ch == pw.blockingCh {
110// This could happen when either:
111// - pw.picker is nil (the previous if condition), or
112// - has called pick on the current picker.
113pw.mu.Unlock()
114select {
115case <-ctx.Done():
116var errStr string
117if lastPickErr != nil {
118errStr = "latest balancer error: " + lastPickErr.Error()
119} else {
120errStr = ctx.Err().Error()
121}
122switch ctx.Err() {
123case context.DeadlineExceeded:
124return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
125case context.Canceled:
126return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
127}
128case <-ch:
129}
130continue
131}
132
133// If the channel is set, it means that the pick call had to wait for a
134// new picker at some point. Either it's the first iteration and this
135// function received the first picker, or a picker errored with
136// ErrNoSubConnAvailable or errored with failfast set to false, which
137// will trigger a continue to the next iteration. In the first case this
138// conditional will hit if this call had to block (the channel is set).
139// In the second case, the only way it will get to this conditional is
140// if there is a new picker.
141if ch != nil {
142for _, sh := range pw.statsHandlers {
143sh.HandleRPC(ctx, &stats.PickerUpdated{})
144}
145}
146
147ch = pw.blockingCh
148p := pw.picker
149pw.mu.Unlock()
150
151pickResult, err := p.Pick(info)
152if err != nil {
153if err == balancer.ErrNoSubConnAvailable {
154continue
155}
156if st, ok := status.FromError(err); ok {
157// Status error: end the RPC unconditionally with this status.
158// First restrict the code to the list allowed by gRFC A54.
159if istatus.IsRestrictedControlPlaneCode(st) {
160err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
161}
162return nil, balancer.PickResult{}, dropError{error: err}
163}
164// For all other errors, wait for ready RPCs should block and other
165// RPCs should fail with unavailable.
166if !failfast {
167lastPickErr = err
168continue
169}
170return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
171}
172
173acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
174if !ok {
175logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
176continue
177}
178if t := acbw.ac.getReadyTransport(); t != nil {
179if channelz.IsOn() {
180doneChannelzWrapper(acbw, &pickResult)
181return t, pickResult, nil
182}
183return t, pickResult, nil
184}
185if pickResult.Done != nil {
186// Calling done with nil error, no bytes sent and no bytes received.
187// DoneInfo with default value works.
188pickResult.Done(balancer.DoneInfo{})
189}
190logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
191// If ok == false, ac.state is not READY.
192// A valid picker always returns READY subConn. This means the state of ac
193// just changed, and picker will be updated shortly.
194// continue back to the beginning of the for loop to repick.
195}
196}
197
198func (pw *pickerWrapper) close() {
199pw.mu.Lock()
200defer pw.mu.Unlock()
201if pw.done {
202return
203}
204pw.done = true
205close(pw.blockingCh)
206}
207
208// reset clears the pickerWrapper and prepares it for being used again when idle
209// mode is exited.
210func (pw *pickerWrapper) reset() {
211pw.mu.Lock()
212defer pw.mu.Unlock()
213if pw.done {
214return
215}
216pw.blockingCh = make(chan struct{})
217}
218
219// dropError is a wrapper error that indicates the LB policy wishes to drop the
220// RPC and not retry it.
221type dropError struct {
222error
223}
224