cubefs
637 строк · 21.7 Кб
1/*
2*
3* Copyright 2018 gRPC authors.
4*
5* Licensed under the Apache License, Version 2.0 (the "License");
6* you may not use this file except in compliance with the License.
7* You may obtain a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS,
13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15* limitations under the License.
16*
17*/
18
19package grpc
20
21import (
22"context"
23"fmt"
24"net"
25"time"
26
27"google.golang.org/grpc/backoff"
28"google.golang.org/grpc/balancer"
29"google.golang.org/grpc/credentials"
30"google.golang.org/grpc/internal"
31internalbackoff "google.golang.org/grpc/internal/backoff"
32"google.golang.org/grpc/internal/envconfig"
33"google.golang.org/grpc/internal/transport"
34"google.golang.org/grpc/keepalive"
35"google.golang.org/grpc/resolver"
36"google.golang.org/grpc/stats"
37)
38
39// dialOptions configure a Dial call. dialOptions are set by the DialOption
40// values passed to Dial.
41type dialOptions struct {
42unaryInt UnaryClientInterceptor
43streamInt StreamClientInterceptor
44
45chainUnaryInts []UnaryClientInterceptor
46chainStreamInts []StreamClientInterceptor
47
48cp Compressor
49dc Decompressor
50bs internalbackoff.Strategy
51block bool
52returnLastError bool
53insecure bool
54timeout time.Duration
55scChan <-chan ServiceConfig
56authority string
57copts transport.ConnectOptions
58callOptions []CallOption
59// This is used by WithBalancerName dial option.
60balancerBuilder balancer.Builder
61channelzParentID int64
62disableServiceConfig bool
63disableRetry bool
64disableHealthCheck bool
65healthCheckFunc internal.HealthChecker
66minConnectTimeout func() time.Duration
67defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
68defaultServiceConfigRawJSON *string
69// This is used by ccResolverWrapper to backoff between successive calls to
70// resolver.ResolveNow(). The user will have no need to configure this, but
71// we need to be able to configure this in tests.
72resolveNowBackoff func(int) time.Duration
73resolvers []resolver.Builder
74}
75
76// DialOption configures how we set up the connection.
77type DialOption interface {
78apply(*dialOptions)
79}
80
81// EmptyDialOption does not alter the dial configuration. It can be embedded in
82// another structure to build custom dial options.
83//
84// Experimental
85//
86// Notice: This type is EXPERIMENTAL and may be changed or removed in a
87// later release.
88type EmptyDialOption struct{}
89
90func (EmptyDialOption) apply(*dialOptions) {}
91
92// funcDialOption wraps a function that modifies dialOptions into an
93// implementation of the DialOption interface.
94type funcDialOption struct {
95f func(*dialOptions)
96}
97
98func (fdo *funcDialOption) apply(do *dialOptions) {
99fdo.f(do)
100}
101
102func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
103return &funcDialOption{
104f: f,
105}
106}
107
108// WithWriteBufferSize determines how much data can be batched before doing a
109// write on the wire. The corresponding memory allocation for this buffer will
110// be twice the size to keep syscalls low. The default value for this buffer is
111// 32KB.
112//
113// Zero will disable the write buffer such that each write will be on underlying
114// connection. Note: A Send call may not directly translate to a write.
115func WithWriteBufferSize(s int) DialOption {
116return newFuncDialOption(func(o *dialOptions) {
117o.copts.WriteBufferSize = s
118})
119}
120
121// WithReadBufferSize lets you set the size of read buffer, this determines how
122// much data can be read at most for each read syscall.
123//
124// The default value for this buffer is 32KB. Zero will disable read buffer for
125// a connection so data framer can access the underlying conn directly.
126func WithReadBufferSize(s int) DialOption {
127return newFuncDialOption(func(o *dialOptions) {
128o.copts.ReadBufferSize = s
129})
130}
131
132// WithInitialWindowSize returns a DialOption which sets the value for initial
133// window size on a stream. The lower bound for window size is 64K and any value
134// smaller than that will be ignored.
135func WithInitialWindowSize(s int32) DialOption {
136return newFuncDialOption(func(o *dialOptions) {
137o.copts.InitialWindowSize = s
138})
139}
140
141// WithInitialConnWindowSize returns a DialOption which sets the value for
142// initial window size on a connection. The lower bound for window size is 64K
143// and any value smaller than that will be ignored.
144func WithInitialConnWindowSize(s int32) DialOption {
145return newFuncDialOption(func(o *dialOptions) {
146o.copts.InitialConnWindowSize = s
147})
148}
149
150// WithMaxMsgSize returns a DialOption which sets the maximum message size the
151// client can receive.
152//
153// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. Will
154// be supported throughout 1.x.
155func WithMaxMsgSize(s int) DialOption {
156return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
157}
158
159// WithDefaultCallOptions returns a DialOption which sets the default
160// CallOptions for calls over the connection.
161func WithDefaultCallOptions(cos ...CallOption) DialOption {
162return newFuncDialOption(func(o *dialOptions) {
163o.callOptions = append(o.callOptions, cos...)
164})
165}
166
167// WithCodec returns a DialOption which sets a codec for message marshaling and
168// unmarshaling.
169//
170// Deprecated: use WithDefaultCallOptions(ForceCodec(_)) instead. Will be
171// supported throughout 1.x.
172func WithCodec(c Codec) DialOption {
173return WithDefaultCallOptions(CallCustomCodec(c))
174}
175
176// WithCompressor returns a DialOption which sets a Compressor to use for
177// message compression. It has lower priority than the compressor set by the
178// UseCompressor CallOption.
179//
180// Deprecated: use UseCompressor instead. Will be supported throughout 1.x.
181func WithCompressor(cp Compressor) DialOption {
182return newFuncDialOption(func(o *dialOptions) {
183o.cp = cp
184})
185}
186
187// WithDecompressor returns a DialOption which sets a Decompressor to use for
188// incoming message decompression. If incoming response messages are encoded
189// using the decompressor's Type(), it will be used. Otherwise, the message
190// encoding will be used to look up the compressor registered via
191// encoding.RegisterCompressor, which will then be used to decompress the
192// message. If no compressor is registered for the encoding, an Unimplemented
193// status error will be returned.
194//
195// Deprecated: use encoding.RegisterCompressor instead. Will be supported
196// throughout 1.x.
197func WithDecompressor(dc Decompressor) DialOption {
198return newFuncDialOption(func(o *dialOptions) {
199o.dc = dc
200})
201}
202
203// WithBalancerName sets the balancer that the ClientConn will be initialized
204// with. Balancer registered with balancerName will be used. This function
205// panics if no balancer was registered by balancerName.
206//
207// The balancer cannot be overridden by balancer option specified by service
208// config.
209//
210// Deprecated: use WithDefaultServiceConfig and WithDisableServiceConfig
211// instead. Will be removed in a future 1.x release.
212func WithBalancerName(balancerName string) DialOption {
213builder := balancer.Get(balancerName)
214if builder == nil {
215panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
216}
217return newFuncDialOption(func(o *dialOptions) {
218o.balancerBuilder = builder
219})
220}
221
222// WithServiceConfig returns a DialOption which has a channel to read the
223// service configuration.
224//
225// Deprecated: service config should be received through name resolver or via
226// WithDefaultServiceConfig, as specified at
227// https://github.com/grpc/grpc/blob/master/doc/service_config.md. Will be
228// removed in a future 1.x release.
229func WithServiceConfig(c <-chan ServiceConfig) DialOption {
230return newFuncDialOption(func(o *dialOptions) {
231o.scChan = c
232})
233}
234
235// WithConnectParams configures the dialer to use the provided ConnectParams.
236//
237// The backoff configuration specified as part of the ConnectParams overrides
238// all defaults specified in
239// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. Consider
240// using the backoff.DefaultConfig as a base, in cases where you want to
241// override only a subset of the backoff configuration.
242//
243// Experimental
244//
245// Notice: This API is EXPERIMENTAL and may be changed or removed in a
246// later release.
247func WithConnectParams(p ConnectParams) DialOption {
248return newFuncDialOption(func(o *dialOptions) {
249o.bs = internalbackoff.Exponential{Config: p.Backoff}
250o.minConnectTimeout = func() time.Duration {
251return p.MinConnectTimeout
252}
253})
254}
255
256// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
257// when backing off after failed connection attempts.
258//
259// Deprecated: use WithConnectParams instead. Will be supported throughout 1.x.
260func WithBackoffMaxDelay(md time.Duration) DialOption {
261return WithBackoffConfig(BackoffConfig{MaxDelay: md})
262}
263
264// WithBackoffConfig configures the dialer to use the provided backoff
265// parameters after connection failures.
266//
267// Deprecated: use WithConnectParams instead. Will be supported throughout 1.x.
268func WithBackoffConfig(b BackoffConfig) DialOption {
269bc := backoff.DefaultConfig
270bc.MaxDelay = b.MaxDelay
271return withBackoff(internalbackoff.Exponential{Config: bc})
272}
273
274// withBackoff sets the backoff strategy used for connectRetryNum after a failed
275// connection attempt.
276//
277// This can be exported if arbitrary backoff strategies are allowed by gRPC.
278func withBackoff(bs internalbackoff.Strategy) DialOption {
279return newFuncDialOption(func(o *dialOptions) {
280o.bs = bs
281})
282}
283
284// WithBlock returns a DialOption which makes caller of Dial blocks until the
285// underlying connection is up. Without this, Dial returns immediately and
286// connecting the server happens in background.
287func WithBlock() DialOption {
288return newFuncDialOption(func(o *dialOptions) {
289o.block = true
290})
291}
292
293// WithReturnConnectionError returns a DialOption which makes the client connection
294// return a string containing both the last connection error that occurred and
295// the context.DeadlineExceeded error.
296// Implies WithBlock()
297//
298// Experimental
299//
300// Notice: This API is EXPERIMENTAL and may be changed or removed in a
301// later release.
302func WithReturnConnectionError() DialOption {
303return newFuncDialOption(func(o *dialOptions) {
304o.block = true
305o.returnLastError = true
306})
307}
308
309// WithInsecure returns a DialOption which disables transport security for this
310// ClientConn. Note that transport security is required unless WithInsecure is
311// set.
312func WithInsecure() DialOption {
313return newFuncDialOption(func(o *dialOptions) {
314o.insecure = true
315})
316}
317
318// WithNoProxy returns a DialOption which disables the use of proxies for this
319// ClientConn. This is ignored if WithDialer or WithContextDialer are used.
320//
321// Experimental
322//
323// Notice: This API is EXPERIMENTAL and may be changed or removed in a
324// later release.
325func WithNoProxy() DialOption {
326return newFuncDialOption(func(o *dialOptions) {
327o.copts.UseProxy = false
328})
329}
330
331// WithTransportCredentials returns a DialOption which configures a connection
332// level security credentials (e.g., TLS/SSL). This should not be used together
333// with WithCredentialsBundle.
334func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
335return newFuncDialOption(func(o *dialOptions) {
336o.copts.TransportCredentials = creds
337})
338}
339
340// WithPerRPCCredentials returns a DialOption which sets credentials and places
341// auth state on each outbound RPC.
342func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
343return newFuncDialOption(func(o *dialOptions) {
344o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
345})
346}
347
348// WithCredentialsBundle returns a DialOption to set a credentials bundle for
349// the ClientConn.WithCreds. This should not be used together with
350// WithTransportCredentials.
351//
352// Experimental
353//
354// Notice: This API is EXPERIMENTAL and may be changed or removed in a
355// later release.
356func WithCredentialsBundle(b credentials.Bundle) DialOption {
357return newFuncDialOption(func(o *dialOptions) {
358o.copts.CredsBundle = b
359})
360}
361
362// WithTimeout returns a DialOption that configures a timeout for dialing a
363// ClientConn initially. This is valid if and only if WithBlock() is present.
364//
365// Deprecated: use DialContext instead of Dial and context.WithTimeout
366// instead. Will be supported throughout 1.x.
367func WithTimeout(d time.Duration) DialOption {
368return newFuncDialOption(func(o *dialOptions) {
369o.timeout = d
370})
371}
372
373// WithContextDialer returns a DialOption that sets a dialer to create
374// connections. If FailOnNonTempDialError() is set to true, and an error is
375// returned by f, gRPC checks the error's Temporary() method to decide if it
376// should try to reconnect to the network address.
377func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
378return newFuncDialOption(func(o *dialOptions) {
379o.copts.Dialer = f
380})
381}
382
383func init() {
384internal.WithHealthCheckFunc = withHealthCheckFunc
385}
386
387// WithDialer returns a DialOption that specifies a function to use for dialing
388// network addresses. If FailOnNonTempDialError() is set to true, and an error
389// is returned by f, gRPC checks the error's Temporary() method to decide if it
390// should try to reconnect to the network address.
391//
392// Deprecated: use WithContextDialer instead. Will be supported throughout
393// 1.x.
394func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
395return WithContextDialer(
396func(ctx context.Context, addr string) (net.Conn, error) {
397if deadline, ok := ctx.Deadline(); ok {
398return f(addr, time.Until(deadline))
399}
400return f(addr, 0)
401})
402}
403
404// WithStatsHandler returns a DialOption that specifies the stats handler for
405// all the RPCs and underlying network connections in this ClientConn.
406func WithStatsHandler(h stats.Handler) DialOption {
407return newFuncDialOption(func(o *dialOptions) {
408o.copts.StatsHandler = h
409})
410}
411
412// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
413// non-temporary dial errors. If f is true, and dialer returns a non-temporary
414// error, gRPC will fail the connection to the network address and won't try to
415// reconnect. The default value of FailOnNonTempDialError is false.
416//
417// FailOnNonTempDialError only affects the initial dial, and does not do
418// anything useful unless you are also using WithBlock().
419//
420// Experimental
421//
422// Notice: This API is EXPERIMENTAL and may be changed or removed in a
423// later release.
424func FailOnNonTempDialError(f bool) DialOption {
425return newFuncDialOption(func(o *dialOptions) {
426o.copts.FailOnNonTempDialError = f
427})
428}
429
430// WithUserAgent returns a DialOption that specifies a user agent string for all
431// the RPCs.
432func WithUserAgent(s string) DialOption {
433return newFuncDialOption(func(o *dialOptions) {
434o.copts.UserAgent = s
435})
436}
437
438// WithKeepaliveParams returns a DialOption that specifies keepalive parameters
439// for the client transport.
440func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
441if kp.Time < internal.KeepaliveMinPingTime {
442logger.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime)
443kp.Time = internal.KeepaliveMinPingTime
444}
445return newFuncDialOption(func(o *dialOptions) {
446o.copts.KeepaliveParams = kp
447})
448}
449
450// WithUnaryInterceptor returns a DialOption that specifies the interceptor for
451// unary RPCs.
452func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
453return newFuncDialOption(func(o *dialOptions) {
454o.unaryInt = f
455})
456}
457
458// WithChainUnaryInterceptor returns a DialOption that specifies the chained
459// interceptor for unary RPCs. The first interceptor will be the outer most,
460// while the last interceptor will be the inner most wrapper around the real call.
461// All interceptors added by this method will be chained, and the interceptor
462// defined by WithUnaryInterceptor will always be prepended to the chain.
463func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption {
464return newFuncDialOption(func(o *dialOptions) {
465o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
466})
467}
468
469// WithStreamInterceptor returns a DialOption that specifies the interceptor for
470// streaming RPCs.
471func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
472return newFuncDialOption(func(o *dialOptions) {
473o.streamInt = f
474})
475}
476
477// WithChainStreamInterceptor returns a DialOption that specifies the chained
478// interceptor for streaming RPCs. The first interceptor will be the outer most,
479// while the last interceptor will be the inner most wrapper around the real call.
480// All interceptors added by this method will be chained, and the interceptor
481// defined by WithStreamInterceptor will always be prepended to the chain.
482func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption {
483return newFuncDialOption(func(o *dialOptions) {
484o.chainStreamInts = append(o.chainStreamInts, interceptors...)
485})
486}
487
488// WithAuthority returns a DialOption that specifies the value to be used as the
489// :authority pseudo-header. This value only works with WithInsecure and has no
490// effect if TransportCredentials are present.
491func WithAuthority(a string) DialOption {
492return newFuncDialOption(func(o *dialOptions) {
493o.authority = a
494})
495}
496
497// WithChannelzParentID returns a DialOption that specifies the channelz ID of
498// current ClientConn's parent. This function is used in nested channel creation
499// (e.g. grpclb dial).
500//
501// Experimental
502//
503// Notice: This API is EXPERIMENTAL and may be changed or removed in a
504// later release.
505func WithChannelzParentID(id int64) DialOption {
506return newFuncDialOption(func(o *dialOptions) {
507o.channelzParentID = id
508})
509}
510
511// WithDisableServiceConfig returns a DialOption that causes gRPC to ignore any
512// service config provided by the resolver and provides a hint to the resolver
513// to not fetch service configs.
514//
515// Note that this dial option only disables service config from resolver. If
516// default service config is provided, gRPC will use the default service config.
517func WithDisableServiceConfig() DialOption {
518return newFuncDialOption(func(o *dialOptions) {
519o.disableServiceConfig = true
520})
521}
522
523// WithDefaultServiceConfig returns a DialOption that configures the default
524// service config, which will be used in cases where:
525//
526// 1. WithDisableServiceConfig is also used.
527// 2. Resolver does not return a service config or if the resolver returns an
528// invalid service config.
529//
530// Experimental
531//
532// Notice: This API is EXPERIMENTAL and may be changed or removed in a
533// later release.
534func WithDefaultServiceConfig(s string) DialOption {
535return newFuncDialOption(func(o *dialOptions) {
536o.defaultServiceConfigRawJSON = &s
537})
538}
539
540// WithDisableRetry returns a DialOption that disables retries, even if the
541// service config enables them. This does not impact transparent retries, which
542// will happen automatically if no data is written to the wire or if the RPC is
543// unprocessed by the remote server.
544//
545// Retry support is currently disabled by default, but will be enabled by
546// default in the future. Until then, it may be enabled by setting the
547// environment variable "GRPC_GO_RETRY" to "on".
548//
549// Experimental
550//
551// Notice: This API is EXPERIMENTAL and may be changed or removed in a
552// later release.
553func WithDisableRetry() DialOption {
554return newFuncDialOption(func(o *dialOptions) {
555o.disableRetry = true
556})
557}
558
559// WithMaxHeaderListSize returns a DialOption that specifies the maximum
560// (uncompressed) size of header list that the client is prepared to accept.
561func WithMaxHeaderListSize(s uint32) DialOption {
562return newFuncDialOption(func(o *dialOptions) {
563o.copts.MaxHeaderListSize = &s
564})
565}
566
567// WithDisableHealthCheck disables the LB channel health checking for all
568// SubConns of this ClientConn.
569//
570// Experimental
571//
572// Notice: This API is EXPERIMENTAL and may be changed or removed in a
573// later release.
574func WithDisableHealthCheck() DialOption {
575return newFuncDialOption(func(o *dialOptions) {
576o.disableHealthCheck = true
577})
578}
579
580// withHealthCheckFunc replaces the default health check function with the
581// provided one. It makes tests easier to change the health check function.
582//
583// For testing purpose only.
584func withHealthCheckFunc(f internal.HealthChecker) DialOption {
585return newFuncDialOption(func(o *dialOptions) {
586o.healthCheckFunc = f
587})
588}
589
590func defaultDialOptions() dialOptions {
591return dialOptions{
592disableRetry: !envconfig.Retry,
593healthCheckFunc: internal.HealthCheckFunc,
594copts: transport.ConnectOptions{
595WriteBufferSize: defaultWriteBufSize,
596ReadBufferSize: defaultReadBufSize,
597UseProxy: true,
598},
599resolveNowBackoff: internalbackoff.DefaultExponential.Backoff,
600}
601}
602
603// withGetMinConnectDeadline specifies the function that clientconn uses to
604// get minConnectDeadline. This can be used to make connection attempts happen
605// faster/slower.
606//
607// For testing purpose only.
608func withMinConnectDeadline(f func() time.Duration) DialOption {
609return newFuncDialOption(func(o *dialOptions) {
610o.minConnectTimeout = f
611})
612}
613
614// withResolveNowBackoff specifies the function that clientconn uses to backoff
615// between successive calls to resolver.ResolveNow().
616//
617// For testing purpose only.
618func withResolveNowBackoff(f func(int) time.Duration) DialOption {
619return newFuncDialOption(func(o *dialOptions) {
620o.resolveNowBackoff = f
621})
622}
623
624// WithResolvers allows a list of resolver implementations to be registered
625// locally with the ClientConn without needing to be globally registered via
626// resolver.Register. They will be matched against the scheme used for the
627// current Dial only, and will take precedence over the global registry.
628//
629// Experimental
630//
631// Notice: This API is EXPERIMENTAL and may be changed or removed in a
632// later release.
633func WithResolvers(rs ...resolver.Builder) DialOption {
634return newFuncDialOption(func(o *dialOptions) {
635o.resolvers = append(o.resolvers, rs...)
636})
637}
638