podman
249 строк · 7.2 Кб
1/*
2*
3* Copyright 2017 gRPC authors.
4*
5* Licensed under the Apache License, Version 2.0 (the "License");
6* you may not use this file except in compliance with the License.
7* You may obtain a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS,
13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15* limitations under the License.
16*
17*/
18
19package grpc20
21import (22"encoding/json"23"errors"24"fmt"25
26"google.golang.org/grpc/balancer"27"google.golang.org/grpc/connectivity"28internalgrpclog "google.golang.org/grpc/internal/grpclog"29"google.golang.org/grpc/internal/grpcrand"30"google.golang.org/grpc/internal/pretty"31"google.golang.org/grpc/resolver"32"google.golang.org/grpc/serviceconfig"33)
34
35const (36// PickFirstBalancerName is the name of the pick_first balancer.37PickFirstBalancerName = "pick_first"38logPrefix = "[pick-first-lb %p] "39)
40
41func newPickfirstBuilder() balancer.Builder {42return &pickfirstBuilder{}43}
44
45type pickfirstBuilder struct{}46
47func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {48b := &pickfirstBalancer{cc: cc}49b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))50return b51}
52
53func (*pickfirstBuilder) Name() string {54return PickFirstBalancerName55}
56
57type pfConfig struct {58serviceconfig.LoadBalancingConfig `json:"-"`59
60// If set to true, instructs the LB policy to shuffle the order of the list61// of addresses received from the name resolver before attempting to62// connect to them.63ShuffleAddressList bool `json:"shuffleAddressList"`64}
65
66func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {67var cfg pfConfig68if err := json.Unmarshal(js, &cfg); err != nil {69return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)70}71return cfg, nil72}
73
74type pickfirstBalancer struct {75logger *internalgrpclog.PrefixLogger76state connectivity.State77cc balancer.ClientConn78subConn balancer.SubConn79}
80
81func (b *pickfirstBalancer) ResolverError(err error) {82if b.logger.V(2) {83b.logger.Infof("Received error from the name resolver: %v", err)84}85if b.subConn == nil {86b.state = connectivity.TransientFailure87}88
89if b.state != connectivity.TransientFailure {90// The picker will not change since the balancer does not currently91// report an error.92return93}94b.cc.UpdateState(balancer.State{95ConnectivityState: connectivity.TransientFailure,96Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},97})98}
99
100func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {101addrs := state.ResolverState.Addresses102if len(addrs) == 0 {103// The resolver reported an empty address list. Treat it like an error by104// calling b.ResolverError.105if b.subConn != nil {106// Shut down the old subConn. All addresses were removed, so it is107// no longer valid.108b.subConn.Shutdown()109b.subConn = nil110}111b.ResolverError(errors.New("produced zero addresses"))112return balancer.ErrBadResolverState113}114
115// We don't have to guard this block with the env var because ParseConfig116// already does so.117cfg, ok := state.BalancerConfig.(pfConfig)118if state.BalancerConfig != nil && !ok {119return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)120}121if cfg.ShuffleAddressList {122addrs = append([]resolver.Address{}, addrs...)123grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })124}125
126if b.logger.V(2) {127b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))128}129
130if b.subConn != nil {131b.cc.UpdateAddresses(b.subConn, addrs)132return nil133}134
135var subConn balancer.SubConn136subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{137StateListener: func(state balancer.SubConnState) {138b.updateSubConnState(subConn, state)139},140})141if err != nil {142if b.logger.V(2) {143b.logger.Infof("Failed to create new SubConn: %v", err)144}145b.state = connectivity.TransientFailure146b.cc.UpdateState(balancer.State{147ConnectivityState: connectivity.TransientFailure,148Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},149})150return balancer.ErrBadResolverState151}152b.subConn = subConn153b.state = connectivity.Idle154b.cc.UpdateState(balancer.State{155ConnectivityState: connectivity.Connecting,156Picker: &picker{err: balancer.ErrNoSubConnAvailable},157})158b.subConn.Connect()159return nil160}
161
162// UpdateSubConnState is unused as a StateListener is always registered when
163// creating SubConns.
164func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {165b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)166}
167
168func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {169if b.logger.V(2) {170b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)171}172if b.subConn != subConn {173if b.logger.V(2) {174b.logger.Infof("Ignored state change because subConn is not recognized")175}176return177}178if state.ConnectivityState == connectivity.Shutdown {179b.subConn = nil180return181}182
183switch state.ConnectivityState {184case connectivity.Ready:185b.cc.UpdateState(balancer.State{186ConnectivityState: state.ConnectivityState,187Picker: &picker{result: balancer.PickResult{SubConn: subConn}},188})189case connectivity.Connecting:190if b.state == connectivity.TransientFailure {191// We stay in TransientFailure until we are Ready. See A62.192return193}194b.cc.UpdateState(balancer.State{195ConnectivityState: state.ConnectivityState,196Picker: &picker{err: balancer.ErrNoSubConnAvailable},197})198case connectivity.Idle:199if b.state == connectivity.TransientFailure {200// We stay in TransientFailure until we are Ready. Also kick the201// subConn out of Idle into Connecting. See A62.202b.subConn.Connect()203return204}205b.cc.UpdateState(balancer.State{206ConnectivityState: state.ConnectivityState,207Picker: &idlePicker{subConn: subConn},208})209case connectivity.TransientFailure:210b.cc.UpdateState(balancer.State{211ConnectivityState: state.ConnectivityState,212Picker: &picker{err: state.ConnectionError},213})214}215b.state = state.ConnectivityState216}
217
218func (b *pickfirstBalancer) Close() {219}
220
221func (b *pickfirstBalancer) ExitIdle() {222if b.subConn != nil && b.state == connectivity.Idle {223b.subConn.Connect()224}225}
226
227type picker struct {228result balancer.PickResult229err error230}
231
232func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {233return p.result, p.err234}
235
236// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
237// CONNECTING when Pick is called.
238type idlePicker struct {239subConn balancer.SubConn240}
241
242func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {243i.subConn.Connect()244return balancer.PickResult{}, balancer.ErrNoSubConnAvailable245}
246
247func init() {248balancer.Register(newPickfirstBuilder())249}
250