cubefs
222 строки · 6.5 Кб
1/*
2*
3* Copyright 2017 gRPC authors.
4*
5* Licensed under the Apache License, Version 2.0 (the "License");
6* you may not use this file except in compliance with the License.
7* You may obtain a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS,
13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15* limitations under the License.
16*
17*/
18
19package grpc
20
21import (
22"fmt"
23"strings"
24"sync"
25"time"
26
27"google.golang.org/grpc/balancer"
28"google.golang.org/grpc/credentials"
29"google.golang.org/grpc/internal/channelz"
30"google.golang.org/grpc/internal/grpcsync"
31"google.golang.org/grpc/resolver"
32"google.golang.org/grpc/serviceconfig"
33)
34
35// ccResolverWrapper is a wrapper on top of cc for resolvers.
36// It implements resolver.ClientConn interface.
37type ccResolverWrapper struct {
38cc *ClientConn
39resolverMu sync.Mutex
40resolver resolver.Resolver
41done *grpcsync.Event
42curState resolver.State
43
44pollingMu sync.Mutex
45polling chan struct{}
46}
47
48// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
49// returns a ccResolverWrapper object which wraps the newly built resolver.
50func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
51ccr := &ccResolverWrapper{
52cc: cc,
53done: grpcsync.NewEvent(),
54}
55
56var credsClone credentials.TransportCredentials
57if creds := cc.dopts.copts.TransportCredentials; creds != nil {
58credsClone = creds.Clone()
59}
60rbo := resolver.BuildOptions{
61DisableServiceConfig: cc.dopts.disableServiceConfig,
62DialCreds: credsClone,
63CredsBundle: cc.dopts.copts.CredsBundle,
64Dialer: cc.dopts.copts.Dialer,
65}
66
67var err error
68// We need to hold the lock here while we assign to the ccr.resolver field
69// to guard against a data race caused by the following code path,
70// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
71// accessing ccr.resolver which is being assigned here.
72ccr.resolverMu.Lock()
73defer ccr.resolverMu.Unlock()
74ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
75if err != nil {
76return nil, err
77}
78return ccr, nil
79}
80
81func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
82ccr.resolverMu.Lock()
83if !ccr.done.HasFired() {
84ccr.resolver.ResolveNow(o)
85}
86ccr.resolverMu.Unlock()
87}
88
89func (ccr *ccResolverWrapper) close() {
90ccr.resolverMu.Lock()
91ccr.resolver.Close()
92ccr.done.Fire()
93ccr.resolverMu.Unlock()
94}
95
96// poll begins or ends asynchronous polling of the resolver based on whether
97// err is ErrBadResolverState.
98func (ccr *ccResolverWrapper) poll(err error) {
99ccr.pollingMu.Lock()
100defer ccr.pollingMu.Unlock()
101if err != balancer.ErrBadResolverState {
102// stop polling
103if ccr.polling != nil {
104close(ccr.polling)
105ccr.polling = nil
106}
107return
108}
109if ccr.polling != nil {
110// already polling
111return
112}
113p := make(chan struct{})
114ccr.polling = p
115go func() {
116for i := 0; ; i++ {
117ccr.resolveNow(resolver.ResolveNowOptions{})
118t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
119select {
120case <-p:
121t.Stop()
122return
123case <-ccr.done.Done():
124// Resolver has been closed.
125t.Stop()
126return
127case <-t.C:
128select {
129case <-p:
130return
131default:
132}
133// Timer expired; re-resolve.
134}
135}
136}()
137}
138
139func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
140if ccr.done.HasFired() {
141return
142}
143channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
144if channelz.IsOn() {
145ccr.addChannelzTraceEvent(s)
146}
147ccr.curState = s
148ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
149}
150
151func (ccr *ccResolverWrapper) ReportError(err error) {
152if ccr.done.HasFired() {
153return
154}
155channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
156ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
157}
158
159// NewAddress is called by the resolver implementation to send addresses to gRPC.
160func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
161if ccr.done.HasFired() {
162return
163}
164channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending new addresses to cc: %v", addrs)
165if channelz.IsOn() {
166ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
167}
168ccr.curState.Addresses = addrs
169ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
170}
171
172// NewServiceConfig is called by the resolver implementation to send service
173// configs to gRPC.
174func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
175if ccr.done.HasFired() {
176return
177}
178channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %v", sc)
179if ccr.cc.dopts.disableServiceConfig {
180channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
181return
182}
183scpr := parseServiceConfig(sc)
184if scpr.Err != nil {
185channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
186ccr.poll(balancer.ErrBadResolverState)
187return
188}
189if channelz.IsOn() {
190ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
191}
192ccr.curState.ServiceConfig = scpr
193ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
194}
195
196func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
197return parseServiceConfig(scJSON)
198}
199
200func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
201var updates []string
202var oldSC, newSC *ServiceConfig
203var oldOK, newOK bool
204if ccr.curState.ServiceConfig != nil {
205oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
206}
207if s.ServiceConfig != nil {
208newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
209}
210if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
211updates = append(updates, "service config updated")
212}
213if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
214updates = append(updates, "resolver returned an empty address list")
215} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
216updates = append(updates, "resolver returned new addresses")
217}
218channelz.AddTraceEvent(logger, ccr.cc.channelzID, 0, &channelz.TraceEventDesc{
219Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
220Severity: channelz.CtInfo,
221})
222}
223