cubefs

Форк
0
1787 строк · 53.9 Кб
1
/*
2
 *
3
 * Copyright 2014 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package grpc
20

21
import (
22
	"context"
23
	"errors"
24
	"fmt"
25
	"io"
26
	"math"
27
	"net"
28
	"net/http"
29
	"reflect"
30
	"runtime"
31
	"strings"
32
	"sync"
33
	"sync/atomic"
34
	"time"
35

36
	"golang.org/x/net/trace"
37

38
	"google.golang.org/grpc/codes"
39
	"google.golang.org/grpc/credentials"
40
	"google.golang.org/grpc/encoding"
41
	"google.golang.org/grpc/encoding/proto"
42
	"google.golang.org/grpc/grpclog"
43
	"google.golang.org/grpc/internal"
44
	"google.golang.org/grpc/internal/binarylog"
45
	"google.golang.org/grpc/internal/channelz"
46
	"google.golang.org/grpc/internal/grpcrand"
47
	"google.golang.org/grpc/internal/grpcsync"
48
	"google.golang.org/grpc/internal/transport"
49
	"google.golang.org/grpc/keepalive"
50
	"google.golang.org/grpc/metadata"
51
	"google.golang.org/grpc/peer"
52
	"google.golang.org/grpc/stats"
53
	"google.golang.org/grpc/status"
54
	"google.golang.org/grpc/tap"
55
)
56

57
const (
58
	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
59
	defaultServerMaxSendMessageSize    = math.MaxInt32
60
)
61

62
func init() {
63
	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
64
		return srv.opts.creds
65
	}
66
}
67

68
var statusOK = status.New(codes.OK, "")
69
var logger = grpclog.Component("core")
70

71
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
72

73
// MethodDesc represents an RPC service's method specification.
74
type MethodDesc struct {
75
	MethodName string
76
	Handler    methodHandler
77
}
78

79
// ServiceDesc represents an RPC service's specification.
80
type ServiceDesc struct {
81
	ServiceName string
82
	// The pointer to the service interface. Used to check whether the user
83
	// provided implementation satisfies the interface requirements.
84
	HandlerType interface{}
85
	Methods     []MethodDesc
86
	Streams     []StreamDesc
87
	Metadata    interface{}
88
}
89

90
// serviceInfo wraps information about a service. It is very similar to
91
// ServiceDesc and is constructed from it for internal purposes.
92
type serviceInfo struct {
93
	// Contains the implementation for the methods in this service.
94
	serviceImpl interface{}
95
	methods     map[string]*MethodDesc
96
	streams     map[string]*StreamDesc
97
	mdata       interface{}
98
}
99

100
type serverWorkerData struct {
101
	st     transport.ServerTransport
102
	wg     *sync.WaitGroup
103
	stream *transport.Stream
104
}
105

106
// Server is a gRPC server to serve RPC requests.
107
type Server struct {
108
	opts serverOptions
109

110
	mu       sync.Mutex // guards following
111
	lis      map[net.Listener]bool
112
	conns    map[transport.ServerTransport]bool
113
	serve    bool
114
	drain    bool
115
	cv       *sync.Cond              // signaled when connections close for GracefulStop
116
	services map[string]*serviceInfo // service name -> service info
117
	events   trace.EventLog
118

119
	quit               *grpcsync.Event
120
	done               *grpcsync.Event
121
	channelzRemoveOnce sync.Once
122
	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
123

124
	channelzID int64 // channelz unique identification number
125
	czData     *channelzData
126

127
	serverWorkerChannels []chan *serverWorkerData
128
}
129

130
type serverOptions struct {
131
	creds                 credentials.TransportCredentials
132
	codec                 baseCodec
133
	cp                    Compressor
134
	dc                    Decompressor
135
	unaryInt              UnaryServerInterceptor
136
	streamInt             StreamServerInterceptor
137
	chainUnaryInts        []UnaryServerInterceptor
138
	chainStreamInts       []StreamServerInterceptor
139
	inTapHandle           tap.ServerInHandle
140
	statsHandler          stats.Handler
141
	maxConcurrentStreams  uint32
142
	maxReceiveMessageSize int
143
	maxSendMessageSize    int
144
	unknownStreamDesc     *StreamDesc
145
	keepaliveParams       keepalive.ServerParameters
146
	keepalivePolicy       keepalive.EnforcementPolicy
147
	initialWindowSize     int32
148
	initialConnWindowSize int32
149
	writeBufferSize       int
150
	readBufferSize        int
151
	connectionTimeout     time.Duration
152
	maxHeaderListSize     *uint32
153
	headerTableSize       *uint32
154
	numServerWorkers      uint32
155
}
156

157
var defaultServerOptions = serverOptions{
158
	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
159
	maxSendMessageSize:    defaultServerMaxSendMessageSize,
160
	connectionTimeout:     120 * time.Second,
161
	writeBufferSize:       defaultWriteBufSize,
162
	readBufferSize:        defaultReadBufSize,
163
}
164

165
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
166
type ServerOption interface {
167
	apply(*serverOptions)
168
}
169

170
// EmptyServerOption does not alter the server configuration. It can be embedded
171
// in another structure to build custom server options.
172
//
173
// Experimental
174
//
175
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
176
// later release.
177
type EmptyServerOption struct{}
178

179
func (EmptyServerOption) apply(*serverOptions) {}
180

181
// funcServerOption wraps a function that modifies serverOptions into an
182
// implementation of the ServerOption interface.
183
type funcServerOption struct {
184
	f func(*serverOptions)
185
}
186

187
func (fdo *funcServerOption) apply(do *serverOptions) {
188
	fdo.f(do)
189
}
190

191
func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
192
	return &funcServerOption{
193
		f: f,
194
	}
195
}
196

197
// WriteBufferSize determines how much data can be batched before doing a write on the wire.
198
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
199
// The default value for this buffer is 32KB.
200
// Zero will disable the write buffer such that each write will be on underlying connection.
201
// Note: A Send call may not directly translate to a write.
202
func WriteBufferSize(s int) ServerOption {
203
	return newFuncServerOption(func(o *serverOptions) {
204
		o.writeBufferSize = s
205
	})
206
}
207

208
// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
209
// for one read syscall.
210
// The default value for this buffer is 32KB.
211
// Zero will disable read buffer for a connection so data framer can access the underlying
212
// conn directly.
213
func ReadBufferSize(s int) ServerOption {
214
	return newFuncServerOption(func(o *serverOptions) {
215
		o.readBufferSize = s
216
	})
217
}
218

219
// InitialWindowSize returns a ServerOption that sets window size for stream.
220
// The lower bound for window size is 64K and any value smaller than that will be ignored.
221
func InitialWindowSize(s int32) ServerOption {
222
	return newFuncServerOption(func(o *serverOptions) {
223
		o.initialWindowSize = s
224
	})
225
}
226

227
// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
228
// The lower bound for window size is 64K and any value smaller than that will be ignored.
229
func InitialConnWindowSize(s int32) ServerOption {
230
	return newFuncServerOption(func(o *serverOptions) {
231
		o.initialConnWindowSize = s
232
	})
233
}
234

235
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
236
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
237
	if kp.Time > 0 && kp.Time < time.Second {
238
		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
239
		kp.Time = time.Second
240
	}
241

242
	return newFuncServerOption(func(o *serverOptions) {
243
		o.keepaliveParams = kp
244
	})
245
}
246

247
// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
248
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
249
	return newFuncServerOption(func(o *serverOptions) {
250
		o.keepalivePolicy = kep
251
	})
252
}
253

254
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
255
//
256
// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
257
//
258
// Deprecated: register codecs using encoding.RegisterCodec. The server will
259
// automatically use registered codecs based on the incoming requests' headers.
260
// See also
261
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
262
// Will be supported throughout 1.x.
263
func CustomCodec(codec Codec) ServerOption {
264
	return newFuncServerOption(func(o *serverOptions) {
265
		o.codec = codec
266
	})
267
}
268

269
// RPCCompressor returns a ServerOption that sets a compressor for outbound
270
// messages.  For backward compatibility, all outbound messages will be sent
271
// using this compressor, regardless of incoming message compression.  By
272
// default, server messages will be sent using the same compressor with which
273
// request messages were sent.
274
//
275
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
276
// throughout 1.x.
277
func RPCCompressor(cp Compressor) ServerOption {
278
	return newFuncServerOption(func(o *serverOptions) {
279
		o.cp = cp
280
	})
281
}
282

283
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
284
// messages.  It has higher priority than decompressors registered via
285
// encoding.RegisterCompressor.
286
//
287
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
288
// throughout 1.x.
289
func RPCDecompressor(dc Decompressor) ServerOption {
290
	return newFuncServerOption(func(o *serverOptions) {
291
		o.dc = dc
292
	})
293
}
294

295
// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
296
// If this is not set, gRPC uses the default limit.
297
//
298
// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
299
func MaxMsgSize(m int) ServerOption {
300
	return MaxRecvMsgSize(m)
301
}
302

303
// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
304
// If this is not set, gRPC uses the default 4MB.
305
func MaxRecvMsgSize(m int) ServerOption {
306
	return newFuncServerOption(func(o *serverOptions) {
307
		o.maxReceiveMessageSize = m
308
	})
309
}
310

311
// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
312
// If this is not set, gRPC uses the default `math.MaxInt32`.
313
func MaxSendMsgSize(m int) ServerOption {
314
	return newFuncServerOption(func(o *serverOptions) {
315
		o.maxSendMessageSize = m
316
	})
317
}
318

319
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
320
// of concurrent streams to each ServerTransport.
321
func MaxConcurrentStreams(n uint32) ServerOption {
322
	return newFuncServerOption(func(o *serverOptions) {
323
		o.maxConcurrentStreams = n
324
	})
325
}
326

327
// Creds returns a ServerOption that sets credentials for server connections.
328
func Creds(c credentials.TransportCredentials) ServerOption {
329
	return newFuncServerOption(func(o *serverOptions) {
330
		o.creds = c
331
	})
332
}
333

334
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
335
// server. Only one unary interceptor can be installed. The construction of multiple
336
// interceptors (e.g., chaining) can be implemented at the caller.
337
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
338
	return newFuncServerOption(func(o *serverOptions) {
339
		if o.unaryInt != nil {
340
			panic("The unary server interceptor was already set and may not be reset.")
341
		}
342
		o.unaryInt = i
343
	})
344
}
345

346
// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
347
// for unary RPCs. The first interceptor will be the outer most,
348
// while the last interceptor will be the inner most wrapper around the real call.
349
// All unary interceptors added by this method will be chained.
350
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
351
	return newFuncServerOption(func(o *serverOptions) {
352
		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
353
	})
354
}
355

356
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
357
// server. Only one stream interceptor can be installed.
358
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
359
	return newFuncServerOption(func(o *serverOptions) {
360
		if o.streamInt != nil {
361
			panic("The stream server interceptor was already set and may not be reset.")
362
		}
363
		o.streamInt = i
364
	})
365
}
366

367
// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
368
// for streaming RPCs. The first interceptor will be the outer most,
369
// while the last interceptor will be the inner most wrapper around the real call.
370
// All stream interceptors added by this method will be chained.
371
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
372
	return newFuncServerOption(func(o *serverOptions) {
373
		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
374
	})
375
}
376

377
// InTapHandle returns a ServerOption that sets the tap handle for all the server
378
// transport to be created. Only one can be installed.
379
func InTapHandle(h tap.ServerInHandle) ServerOption {
380
	return newFuncServerOption(func(o *serverOptions) {
381
		if o.inTapHandle != nil {
382
			panic("The tap handle was already set and may not be reset.")
383
		}
384
		o.inTapHandle = h
385
	})
386
}
387

388
// StatsHandler returns a ServerOption that sets the stats handler for the server.
389
func StatsHandler(h stats.Handler) ServerOption {
390
	return newFuncServerOption(func(o *serverOptions) {
391
		o.statsHandler = h
392
	})
393
}
394

395
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
396
// unknown service handler. The provided method is a bidi-streaming RPC service
397
// handler that will be invoked instead of returning the "unimplemented" gRPC
398
// error whenever a request is received for an unregistered service or method.
399
// The handling function and stream interceptor (if set) have full access to
400
// the ServerStream, including its Context.
401
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
402
	return newFuncServerOption(func(o *serverOptions) {
403
		o.unknownStreamDesc = &StreamDesc{
404
			StreamName: "unknown_service_handler",
405
			Handler:    streamHandler,
406
			// We need to assume that the users of the streamHandler will want to use both.
407
			ClientStreams: true,
408
			ServerStreams: true,
409
		}
410
	})
411
}
412

413
// ConnectionTimeout returns a ServerOption that sets the timeout for
414
// connection establishment (up to and including HTTP/2 handshaking) for all
415
// new connections.  If this is not set, the default is 120 seconds.  A zero or
416
// negative value will result in an immediate timeout.
417
//
418
// Experimental
419
//
420
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
421
// later release.
422
func ConnectionTimeout(d time.Duration) ServerOption {
423
	return newFuncServerOption(func(o *serverOptions) {
424
		o.connectionTimeout = d
425
	})
426
}
427

428
// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
429
// of header list that the server is prepared to accept.
430
func MaxHeaderListSize(s uint32) ServerOption {
431
	return newFuncServerOption(func(o *serverOptions) {
432
		o.maxHeaderListSize = &s
433
	})
434
}
435

436
// HeaderTableSize returns a ServerOption that sets the size of dynamic
437
// header table for stream.
438
//
439
// Experimental
440
//
441
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
442
// later release.
443
func HeaderTableSize(s uint32) ServerOption {
444
	return newFuncServerOption(func(o *serverOptions) {
445
		o.headerTableSize = &s
446
	})
447
}
448

449
// NumStreamWorkers returns a ServerOption that sets the number of worker
450
// goroutines that should be used to process incoming streams. Setting this to
451
// zero (default) will disable workers and spawn a new goroutine for each
452
// stream.
453
//
454
// Experimental
455
//
456
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
457
// later release.
458
func NumStreamWorkers(numServerWorkers uint32) ServerOption {
459
	// TODO: If/when this API gets stabilized (i.e. stream workers become the
460
	// only way streams are processed), change the behavior of the zero value to
461
	// a sane default. Preliminary experiments suggest that a value equal to the
462
	// number of CPUs available is most performant; requires thorough testing.
463
	return newFuncServerOption(func(o *serverOptions) {
464
		o.numServerWorkers = numServerWorkers
465
	})
466
}
467

468
// serverWorkerResetThreshold defines how often the stack must be reset. Every
469
// N requests, by spawning a new goroutine in its place, a worker can reset its
470
// stack so that large stacks don't live in memory forever. 2^16 should allow
471
// each goroutine stack to live for at least a few seconds in a typical
472
// workload (assuming a QPS of a few thousand requests/sec).
473
const serverWorkerResetThreshold = 1 << 16
474

475
// serverWorkers blocks on a *transport.Stream channel forever and waits for
476
// data to be fed by serveStreams. This allows different requests to be
477
// processed by the same goroutine, removing the need for expensive stack
478
// re-allocations (see the runtime.morestack problem [1]).
479
//
480
// [1] https://github.com/golang/go/issues/18138
481
func (s *Server) serverWorker(ch chan *serverWorkerData) {
482
	// To make sure all server workers don't reset at the same time, choose a
483
	// random number of iterations before resetting.
484
	threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
485
	for completed := 0; completed < threshold; completed++ {
486
		data, ok := <-ch
487
		if !ok {
488
			return
489
		}
490
		s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
491
		data.wg.Done()
492
	}
493
	go s.serverWorker(ch)
494
}
495

496
// initServerWorkers creates worker goroutines and channels to process incoming
497
// connections to reduce the time spent overall on runtime.morestack.
498
func (s *Server) initServerWorkers() {
499
	s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
500
	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
501
		s.serverWorkerChannels[i] = make(chan *serverWorkerData)
502
		go s.serverWorker(s.serverWorkerChannels[i])
503
	}
504
}
505

506
func (s *Server) stopServerWorkers() {
507
	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
508
		close(s.serverWorkerChannels[i])
509
	}
510
}
511

512
// NewServer creates a gRPC server which has no service registered and has not
513
// started to accept requests yet.
514
func NewServer(opt ...ServerOption) *Server {
515
	opts := defaultServerOptions
516
	for _, o := range opt {
517
		o.apply(&opts)
518
	}
519
	s := &Server{
520
		lis:      make(map[net.Listener]bool),
521
		opts:     opts,
522
		conns:    make(map[transport.ServerTransport]bool),
523
		services: make(map[string]*serviceInfo),
524
		quit:     grpcsync.NewEvent(),
525
		done:     grpcsync.NewEvent(),
526
		czData:   new(channelzData),
527
	}
528
	chainUnaryServerInterceptors(s)
529
	chainStreamServerInterceptors(s)
530
	s.cv = sync.NewCond(&s.mu)
531
	if EnableTracing {
532
		_, file, line, _ := runtime.Caller(1)
533
		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
534
	}
535

536
	if s.opts.numServerWorkers > 0 {
537
		s.initServerWorkers()
538
	}
539

540
	if channelz.IsOn() {
541
		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
542
	}
543
	return s
544
}
545

546
// printf records an event in s's event log, unless s has been stopped.
547
// REQUIRES s.mu is held.
548
func (s *Server) printf(format string, a ...interface{}) {
549
	if s.events != nil {
550
		s.events.Printf(format, a...)
551
	}
552
}
553

554
// errorf records an error in s's event log, unless s has been stopped.
555
// REQUIRES s.mu is held.
556
func (s *Server) errorf(format string, a ...interface{}) {
557
	if s.events != nil {
558
		s.events.Errorf(format, a...)
559
	}
560
}
561

562
// ServiceRegistrar wraps a single method that supports service registration. It
563
// enables users to pass concrete types other than grpc.Server to the service
564
// registration methods exported by the IDL generated code.
565
type ServiceRegistrar interface {
566
	// RegisterService registers a service and its implementation to the
567
	// concrete type implementing this interface.  It may not be called
568
	// once the server has started serving.
569
	// desc describes the service and its methods and handlers. impl is the
570
	// service implementation which is passed to the method handlers.
571
	RegisterService(desc *ServiceDesc, impl interface{})
572
}
573

574
// RegisterService registers a service and its implementation to the gRPC
575
// server. It is called from the IDL generated code. This must be called before
576
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
577
// ensure it implements sd.HandlerType.
578
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
579
	if ss != nil {
580
		ht := reflect.TypeOf(sd.HandlerType).Elem()
581
		st := reflect.TypeOf(ss)
582
		if !st.Implements(ht) {
583
			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
584
		}
585
	}
586
	s.register(sd, ss)
587
}
588

589
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
590
	s.mu.Lock()
591
	defer s.mu.Unlock()
592
	s.printf("RegisterService(%q)", sd.ServiceName)
593
	if s.serve {
594
		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
595
	}
596
	if _, ok := s.services[sd.ServiceName]; ok {
597
		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
598
	}
599
	info := &serviceInfo{
600
		serviceImpl: ss,
601
		methods:     make(map[string]*MethodDesc),
602
		streams:     make(map[string]*StreamDesc),
603
		mdata:       sd.Metadata,
604
	}
605
	for i := range sd.Methods {
606
		d := &sd.Methods[i]
607
		info.methods[d.MethodName] = d
608
	}
609
	for i := range sd.Streams {
610
		d := &sd.Streams[i]
611
		info.streams[d.StreamName] = d
612
	}
613
	s.services[sd.ServiceName] = info
614
}
615

616
// MethodInfo contains the information of an RPC including its method name and type.
617
type MethodInfo struct {
618
	// Name is the method name only, without the service name or package name.
619
	Name string
620
	// IsClientStream indicates whether the RPC is a client streaming RPC.
621
	IsClientStream bool
622
	// IsServerStream indicates whether the RPC is a server streaming RPC.
623
	IsServerStream bool
624
}
625

626
// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
627
type ServiceInfo struct {
628
	Methods []MethodInfo
629
	// Metadata is the metadata specified in ServiceDesc when registering service.
630
	Metadata interface{}
631
}
632

633
// GetServiceInfo returns a map from service names to ServiceInfo.
634
// Service names include the package names, in the form of <package>.<service>.
635
func (s *Server) GetServiceInfo() map[string]ServiceInfo {
636
	ret := make(map[string]ServiceInfo)
637
	for n, srv := range s.services {
638
		methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
639
		for m := range srv.methods {
640
			methods = append(methods, MethodInfo{
641
				Name:           m,
642
				IsClientStream: false,
643
				IsServerStream: false,
644
			})
645
		}
646
		for m, d := range srv.streams {
647
			methods = append(methods, MethodInfo{
648
				Name:           m,
649
				IsClientStream: d.ClientStreams,
650
				IsServerStream: d.ServerStreams,
651
			})
652
		}
653

654
		ret[n] = ServiceInfo{
655
			Methods:  methods,
656
			Metadata: srv.mdata,
657
		}
658
	}
659
	return ret
660
}
661

662
// ErrServerStopped indicates that the operation is now illegal because of
663
// the server being stopped.
664
var ErrServerStopped = errors.New("grpc: the server has been stopped")
665

666
func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
667
	if s.opts.creds == nil {
668
		return rawConn, nil, nil
669
	}
670
	return s.opts.creds.ServerHandshake(rawConn)
671
}
672

673
type listenSocket struct {
674
	net.Listener
675
	channelzID int64
676
}
677

678
func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
679
	return &channelz.SocketInternalMetric{
680
		SocketOptions: channelz.GetSocketOption(l.Listener),
681
		LocalAddr:     l.Listener.Addr(),
682
	}
683
}
684

685
func (l *listenSocket) Close() error {
686
	err := l.Listener.Close()
687
	if channelz.IsOn() {
688
		channelz.RemoveEntry(l.channelzID)
689
	}
690
	return err
691
}
692

693
// Serve accepts incoming connections on the listener lis, creating a new
694
// ServerTransport and service goroutine for each. The service goroutines
695
// read gRPC requests and then call the registered handlers to reply to them.
696
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
697
// this method returns.
698
// Serve will return a non-nil error unless Stop or GracefulStop is called.
699
func (s *Server) Serve(lis net.Listener) error {
700
	s.mu.Lock()
701
	s.printf("serving")
702
	s.serve = true
703
	if s.lis == nil {
704
		// Serve called after Stop or GracefulStop.
705
		s.mu.Unlock()
706
		lis.Close()
707
		return ErrServerStopped
708
	}
709

710
	s.serveWG.Add(1)
711
	defer func() {
712
		s.serveWG.Done()
713
		if s.quit.HasFired() {
714
			// Stop or GracefulStop called; block until done and return nil.
715
			<-s.done.Done()
716
		}
717
	}()
718

719
	ls := &listenSocket{Listener: lis}
720
	s.lis[ls] = true
721

722
	if channelz.IsOn() {
723
		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
724
	}
725
	s.mu.Unlock()
726

727
	defer func() {
728
		s.mu.Lock()
729
		if s.lis != nil && s.lis[ls] {
730
			ls.Close()
731
			delete(s.lis, ls)
732
		}
733
		s.mu.Unlock()
734
	}()
735

736
	var tempDelay time.Duration // how long to sleep on accept failure
737

738
	for {
739
		rawConn, err := lis.Accept()
740
		if err != nil {
741
			if ne, ok := err.(interface {
742
				Temporary() bool
743
			}); ok && ne.Temporary() {
744
				if tempDelay == 0 {
745
					tempDelay = 5 * time.Millisecond
746
				} else {
747
					tempDelay *= 2
748
				}
749
				if max := 1 * time.Second; tempDelay > max {
750
					tempDelay = max
751
				}
752
				s.mu.Lock()
753
				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
754
				s.mu.Unlock()
755
				timer := time.NewTimer(tempDelay)
756
				select {
757
				case <-timer.C:
758
				case <-s.quit.Done():
759
					timer.Stop()
760
					return nil
761
				}
762
				continue
763
			}
764
			s.mu.Lock()
765
			s.printf("done serving; Accept = %v", err)
766
			s.mu.Unlock()
767

768
			if s.quit.HasFired() {
769
				return nil
770
			}
771
			return err
772
		}
773
		tempDelay = 0
774
		// Start a new goroutine to deal with rawConn so we don't stall this Accept
775
		// loop goroutine.
776
		//
777
		// Make sure we account for the goroutine so GracefulStop doesn't nil out
778
		// s.conns before this conn can be added.
779
		s.serveWG.Add(1)
780
		go func() {
781
			s.handleRawConn(rawConn)
782
			s.serveWG.Done()
783
		}()
784
	}
785
}
786

787
// handleRawConn forks a goroutine to handle a just-accepted connection that
788
// has not had any I/O performed on it yet.
789
func (s *Server) handleRawConn(rawConn net.Conn) {
790
	if s.quit.HasFired() {
791
		rawConn.Close()
792
		return
793
	}
794
	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
795
	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
796
	if err != nil {
797
		// ErrConnDispatched means that the connection was dispatched away from
798
		// gRPC; those connections should be left open.
799
		if err != credentials.ErrConnDispatched {
800
			s.mu.Lock()
801
			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
802
			s.mu.Unlock()
803
			channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
804
			rawConn.Close()
805
		}
806
		rawConn.SetDeadline(time.Time{})
807
		return
808
	}
809

810
	// Finish handshaking (HTTP2)
811
	st := s.newHTTP2Transport(conn, authInfo)
812
	if st == nil {
813
		return
814
	}
815

816
	rawConn.SetDeadline(time.Time{})
817
	if !s.addConn(st) {
818
		return
819
	}
820
	go func() {
821
		s.serveStreams(st)
822
		s.removeConn(st)
823
	}()
824
}
825

826
// newHTTP2Transport sets up a http/2 transport (using the
827
// gRPC http2 server transport in transport/http2_server.go).
828
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
829
	config := &transport.ServerConfig{
830
		MaxStreams:            s.opts.maxConcurrentStreams,
831
		AuthInfo:              authInfo,
832
		InTapHandle:           s.opts.inTapHandle,
833
		StatsHandler:          s.opts.statsHandler,
834
		KeepaliveParams:       s.opts.keepaliveParams,
835
		KeepalivePolicy:       s.opts.keepalivePolicy,
836
		InitialWindowSize:     s.opts.initialWindowSize,
837
		InitialConnWindowSize: s.opts.initialConnWindowSize,
838
		WriteBufferSize:       s.opts.writeBufferSize,
839
		ReadBufferSize:        s.opts.readBufferSize,
840
		ChannelzParentID:      s.channelzID,
841
		MaxHeaderListSize:     s.opts.maxHeaderListSize,
842
		HeaderTableSize:       s.opts.headerTableSize,
843
	}
844
	st, err := transport.NewServerTransport("http2", c, config)
845
	if err != nil {
846
		s.mu.Lock()
847
		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
848
		s.mu.Unlock()
849
		c.Close()
850
		channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
851
		return nil
852
	}
853

854
	return st
855
}
856

857
func (s *Server) serveStreams(st transport.ServerTransport) {
858
	defer st.Close()
859
	var wg sync.WaitGroup
860

861
	var roundRobinCounter uint32
862
	st.HandleStreams(func(stream *transport.Stream) {
863
		wg.Add(1)
864
		if s.opts.numServerWorkers > 0 {
865
			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
866
			select {
867
			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
868
			default:
869
				// If all stream workers are busy, fallback to the default code path.
870
				go func() {
871
					s.handleStream(st, stream, s.traceInfo(st, stream))
872
					wg.Done()
873
				}()
874
			}
875
		} else {
876
			go func() {
877
				defer wg.Done()
878
				s.handleStream(st, stream, s.traceInfo(st, stream))
879
			}()
880
		}
881
	}, func(ctx context.Context, method string) context.Context {
882
		if !EnableTracing {
883
			return ctx
884
		}
885
		tr := trace.New("grpc.Recv."+methodFamily(method), method)
886
		return trace.NewContext(ctx, tr)
887
	})
888
	wg.Wait()
889
}
890

891
var _ http.Handler = (*Server)(nil)
892

893
// ServeHTTP implements the Go standard library's http.Handler
894
// interface by responding to the gRPC request r, by looking up
895
// the requested gRPC method in the gRPC server s.
896
//
897
// The provided HTTP request must have arrived on an HTTP/2
898
// connection. When using the Go standard library's server,
899
// practically this means that the Request must also have arrived
900
// over TLS.
901
//
902
// To share one port (such as 443 for https) between gRPC and an
903
// existing http.Handler, use a root http.Handler such as:
904
//
905
//   if r.ProtoMajor == 2 && strings.HasPrefix(
906
//   	r.Header.Get("Content-Type"), "application/grpc") {
907
//   	grpcServer.ServeHTTP(w, r)
908
//   } else {
909
//   	yourMux.ServeHTTP(w, r)
910
//   }
911
//
912
// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
913
// separate from grpc-go's HTTP/2 server. Performance and features may vary
914
// between the two paths. ServeHTTP does not support some gRPC features
915
// available through grpc-go's HTTP/2 server.
916
//
917
// Experimental
918
//
919
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
920
// later release.
921
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
922
	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
923
	if err != nil {
924
		http.Error(w, err.Error(), http.StatusInternalServerError)
925
		return
926
	}
927
	if !s.addConn(st) {
928
		return
929
	}
930
	defer s.removeConn(st)
931
	s.serveStreams(st)
932
}
933

934
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
935
// If tracing is not enabled, it returns nil.
936
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
937
	if !EnableTracing {
938
		return nil
939
	}
940
	tr, ok := trace.FromContext(stream.Context())
941
	if !ok {
942
		return nil
943
	}
944

945
	trInfo = &traceInfo{
946
		tr: tr,
947
		firstLine: firstLine{
948
			client:     false,
949
			remoteAddr: st.RemoteAddr(),
950
		},
951
	}
952
	if dl, ok := stream.Context().Deadline(); ok {
953
		trInfo.firstLine.deadline = time.Until(dl)
954
	}
955
	return trInfo
956
}
957

958
func (s *Server) addConn(st transport.ServerTransport) bool {
959
	s.mu.Lock()
960
	defer s.mu.Unlock()
961
	if s.conns == nil {
962
		st.Close()
963
		return false
964
	}
965
	if s.drain {
966
		// Transport added after we drained our existing conns: drain it
967
		// immediately.
968
		st.Drain()
969
	}
970
	s.conns[st] = true
971
	return true
972
}
973

974
func (s *Server) removeConn(st transport.ServerTransport) {
975
	s.mu.Lock()
976
	defer s.mu.Unlock()
977
	if s.conns != nil {
978
		delete(s.conns, st)
979
		s.cv.Broadcast()
980
	}
981
}
982

983
func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
984
	return &channelz.ServerInternalMetric{
985
		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
986
		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
987
		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
988
		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
989
	}
990
}
991

992
func (s *Server) incrCallsStarted() {
993
	atomic.AddInt64(&s.czData.callsStarted, 1)
994
	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
995
}
996

997
func (s *Server) incrCallsSucceeded() {
998
	atomic.AddInt64(&s.czData.callsSucceeded, 1)
999
}
1000

1001
func (s *Server) incrCallsFailed() {
1002
	atomic.AddInt64(&s.czData.callsFailed, 1)
1003
}
1004

1005
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
1006
	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1007
	if err != nil {
1008
		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
1009
		return err
1010
	}
1011
	compData, err := compress(data, cp, comp)
1012
	if err != nil {
1013
		channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
1014
		return err
1015
	}
1016
	hdr, payload := msgHeader(data, compData)
1017
	// TODO(dfawley): should we be checking len(data) instead?
1018
	if len(payload) > s.opts.maxSendMessageSize {
1019
		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
1020
	}
1021
	err = t.Write(stream, hdr, payload, opts)
1022
	if err == nil && s.opts.statsHandler != nil {
1023
		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
1024
	}
1025
	return err
1026
}
1027

1028
// chainUnaryServerInterceptors chains all unary server interceptors into one.
1029
func chainUnaryServerInterceptors(s *Server) {
1030
	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1031
	// be executed before any other chained interceptors.
1032
	interceptors := s.opts.chainUnaryInts
1033
	if s.opts.unaryInt != nil {
1034
		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
1035
	}
1036

1037
	var chainedInt UnaryServerInterceptor
1038
	if len(interceptors) == 0 {
1039
		chainedInt = nil
1040
	} else if len(interceptors) == 1 {
1041
		chainedInt = interceptors[0]
1042
	} else {
1043
		chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
1044
			return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
1045
		}
1046
	}
1047

1048
	s.opts.unaryInt = chainedInt
1049
}
1050

1051
// getChainUnaryHandler recursively generate the chained UnaryHandler
1052
func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1053
	if curr == len(interceptors)-1 {
1054
		return finalHandler
1055
	}
1056

1057
	return func(ctx context.Context, req interface{}) (interface{}, error) {
1058
		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
1059
	}
1060
}
1061

1062
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
1063
	sh := s.opts.statsHandler
1064
	if sh != nil || trInfo != nil || channelz.IsOn() {
1065
		if channelz.IsOn() {
1066
			s.incrCallsStarted()
1067
		}
1068
		var statsBegin *stats.Begin
1069
		if sh != nil {
1070
			beginTime := time.Now()
1071
			statsBegin = &stats.Begin{
1072
				BeginTime: beginTime,
1073
			}
1074
			sh.HandleRPC(stream.Context(), statsBegin)
1075
		}
1076
		if trInfo != nil {
1077
			trInfo.tr.LazyLog(&trInfo.firstLine, false)
1078
		}
1079
		// The deferred error handling for tracing, stats handler and channelz are
1080
		// combined into one function to reduce stack usage -- a defer takes ~56-64
1081
		// bytes on the stack, so overflowing the stack will require a stack
1082
		// re-allocation, which is expensive.
1083
		//
1084
		// To maintain behavior similar to separate deferred statements, statements
1085
		// should be executed in the reverse order. That is, tracing first, stats
1086
		// handler second, and channelz last. Note that panics *within* defers will
1087
		// lead to different behavior, but that's an acceptable compromise; that
1088
		// would be undefined behavior territory anyway.
1089
		defer func() {
1090
			if trInfo != nil {
1091
				if err != nil && err != io.EOF {
1092
					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1093
					trInfo.tr.SetError()
1094
				}
1095
				trInfo.tr.Finish()
1096
			}
1097

1098
			if sh != nil {
1099
				end := &stats.End{
1100
					BeginTime: statsBegin.BeginTime,
1101
					EndTime:   time.Now(),
1102
				}
1103
				if err != nil && err != io.EOF {
1104
					end.Error = toRPCErr(err)
1105
				}
1106
				sh.HandleRPC(stream.Context(), end)
1107
			}
1108

1109
			if channelz.IsOn() {
1110
				if err != nil && err != io.EOF {
1111
					s.incrCallsFailed()
1112
				} else {
1113
					s.incrCallsSucceeded()
1114
				}
1115
			}
1116
		}()
1117
	}
1118

1119
	binlog := binarylog.GetMethodLogger(stream.Method())
1120
	if binlog != nil {
1121
		ctx := stream.Context()
1122
		md, _ := metadata.FromIncomingContext(ctx)
1123
		logEntry := &binarylog.ClientHeader{
1124
			Header:     md,
1125
			MethodName: stream.Method(),
1126
			PeerAddr:   nil,
1127
		}
1128
		if deadline, ok := ctx.Deadline(); ok {
1129
			logEntry.Timeout = time.Until(deadline)
1130
			if logEntry.Timeout < 0 {
1131
				logEntry.Timeout = 0
1132
			}
1133
		}
1134
		if a := md[":authority"]; len(a) > 0 {
1135
			logEntry.Authority = a[0]
1136
		}
1137
		if peer, ok := peer.FromContext(ctx); ok {
1138
			logEntry.PeerAddr = peer.Addr
1139
		}
1140
		binlog.Log(logEntry)
1141
	}
1142

1143
	// comp and cp are used for compression.  decomp and dc are used for
1144
	// decompression.  If comp and decomp are both set, they are the same;
1145
	// however they are kept separate to ensure that at most one of the
1146
	// compressor/decompressor variable pairs are set for use later.
1147
	var comp, decomp encoding.Compressor
1148
	var cp Compressor
1149
	var dc Decompressor
1150

1151
	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1152
	// to find a matching registered compressor for decomp.
1153
	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1154
		dc = s.opts.dc
1155
	} else if rc != "" && rc != encoding.Identity {
1156
		decomp = encoding.GetCompressor(rc)
1157
		if decomp == nil {
1158
			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1159
			t.WriteStatus(stream, st)
1160
			return st.Err()
1161
		}
1162
	}
1163

1164
	// If cp is set, use it.  Otherwise, attempt to compress the response using
1165
	// the incoming message compression method.
1166
	//
1167
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1168
	if s.opts.cp != nil {
1169
		cp = s.opts.cp
1170
		stream.SetSendCompress(cp.Type())
1171
	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1172
		// Legacy compressor not specified; attempt to respond with same encoding.
1173
		comp = encoding.GetCompressor(rc)
1174
		if comp != nil {
1175
			stream.SetSendCompress(rc)
1176
		}
1177
	}
1178

1179
	var payInfo *payloadInfo
1180
	if sh != nil || binlog != nil {
1181
		payInfo = &payloadInfo{}
1182
	}
1183
	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1184
	if err != nil {
1185
		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
1186
			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
1187
		}
1188
		return err
1189
	}
1190
	if channelz.IsOn() {
1191
		t.IncrMsgRecv()
1192
	}
1193
	df := func(v interface{}) error {
1194
		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1195
			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1196
		}
1197
		if sh != nil {
1198
			sh.HandleRPC(stream.Context(), &stats.InPayload{
1199
				RecvTime:   time.Now(),
1200
				Payload:    v,
1201
				WireLength: payInfo.wireLength + headerLen,
1202
				Data:       d,
1203
				Length:     len(d),
1204
			})
1205
		}
1206
		if binlog != nil {
1207
			binlog.Log(&binarylog.ClientMessage{
1208
				Message: d,
1209
			})
1210
		}
1211
		if trInfo != nil {
1212
			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1213
		}
1214
		return nil
1215
	}
1216
	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1217
	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
1218
	if appErr != nil {
1219
		appStatus, ok := status.FromError(appErr)
1220
		if !ok {
1221
			// Convert appErr if it is not a grpc status error.
1222
			appErr = status.Error(codes.Unknown, appErr.Error())
1223
			appStatus, _ = status.FromError(appErr)
1224
		}
1225
		if trInfo != nil {
1226
			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1227
			trInfo.tr.SetError()
1228
		}
1229
		if e := t.WriteStatus(stream, appStatus); e != nil {
1230
			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1231
		}
1232
		if binlog != nil {
1233
			if h, _ := stream.Header(); h.Len() > 0 {
1234
				// Only log serverHeader if there was header. Otherwise it can
1235
				// be trailer only.
1236
				binlog.Log(&binarylog.ServerHeader{
1237
					Header: h,
1238
				})
1239
			}
1240
			binlog.Log(&binarylog.ServerTrailer{
1241
				Trailer: stream.Trailer(),
1242
				Err:     appErr,
1243
			})
1244
		}
1245
		return appErr
1246
	}
1247
	if trInfo != nil {
1248
		trInfo.tr.LazyLog(stringer("OK"), false)
1249
	}
1250
	opts := &transport.Options{Last: true}
1251

1252
	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1253
		if err == io.EOF {
1254
			// The entire stream is done (for unary RPC only).
1255
			return err
1256
		}
1257
		if sts, ok := status.FromError(err); ok {
1258
			if e := t.WriteStatus(stream, sts); e != nil {
1259
				channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1260
			}
1261
		} else {
1262
			switch st := err.(type) {
1263
			case transport.ConnectionError:
1264
				// Nothing to do here.
1265
			default:
1266
				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1267
			}
1268
		}
1269
		if binlog != nil {
1270
			h, _ := stream.Header()
1271
			binlog.Log(&binarylog.ServerHeader{
1272
				Header: h,
1273
			})
1274
			binlog.Log(&binarylog.ServerTrailer{
1275
				Trailer: stream.Trailer(),
1276
				Err:     appErr,
1277
			})
1278
		}
1279
		return err
1280
	}
1281
	if binlog != nil {
1282
		h, _ := stream.Header()
1283
		binlog.Log(&binarylog.ServerHeader{
1284
			Header: h,
1285
		})
1286
		binlog.Log(&binarylog.ServerMessage{
1287
			Message: reply,
1288
		})
1289
	}
1290
	if channelz.IsOn() {
1291
		t.IncrMsgSent()
1292
	}
1293
	if trInfo != nil {
1294
		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1295
	}
1296
	// TODO: Should we be logging if writing status failed here, like above?
1297
	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1298
	// error or allow the stats handler to see it?
1299
	err = t.WriteStatus(stream, statusOK)
1300
	if binlog != nil {
1301
		binlog.Log(&binarylog.ServerTrailer{
1302
			Trailer: stream.Trailer(),
1303
			Err:     appErr,
1304
		})
1305
	}
1306
	return err
1307
}
1308

1309
// chainStreamServerInterceptors chains all stream server interceptors into one.
1310
func chainStreamServerInterceptors(s *Server) {
1311
	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1312
	// be executed before any other chained interceptors.
1313
	interceptors := s.opts.chainStreamInts
1314
	if s.opts.streamInt != nil {
1315
		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1316
	}
1317

1318
	var chainedInt StreamServerInterceptor
1319
	if len(interceptors) == 0 {
1320
		chainedInt = nil
1321
	} else if len(interceptors) == 1 {
1322
		chainedInt = interceptors[0]
1323
	} else {
1324
		chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1325
			return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1326
		}
1327
	}
1328

1329
	s.opts.streamInt = chainedInt
1330
}
1331

1332
// getChainStreamHandler recursively generate the chained StreamHandler
1333
func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1334
	if curr == len(interceptors)-1 {
1335
		return finalHandler
1336
	}
1337

1338
	return func(srv interface{}, ss ServerStream) error {
1339
		return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1340
	}
1341
}
1342

1343
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
1344
	if channelz.IsOn() {
1345
		s.incrCallsStarted()
1346
	}
1347
	sh := s.opts.statsHandler
1348
	var statsBegin *stats.Begin
1349
	if sh != nil {
1350
		beginTime := time.Now()
1351
		statsBegin = &stats.Begin{
1352
			BeginTime: beginTime,
1353
		}
1354
		sh.HandleRPC(stream.Context(), statsBegin)
1355
	}
1356
	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1357
	ss := &serverStream{
1358
		ctx:                   ctx,
1359
		t:                     t,
1360
		s:                     stream,
1361
		p:                     &parser{r: stream},
1362
		codec:                 s.getCodec(stream.ContentSubtype()),
1363
		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1364
		maxSendMessageSize:    s.opts.maxSendMessageSize,
1365
		trInfo:                trInfo,
1366
		statsHandler:          sh,
1367
	}
1368

1369
	if sh != nil || trInfo != nil || channelz.IsOn() {
1370
		// See comment in processUnaryRPC on defers.
1371
		defer func() {
1372
			if trInfo != nil {
1373
				ss.mu.Lock()
1374
				if err != nil && err != io.EOF {
1375
					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1376
					ss.trInfo.tr.SetError()
1377
				}
1378
				ss.trInfo.tr.Finish()
1379
				ss.trInfo.tr = nil
1380
				ss.mu.Unlock()
1381
			}
1382

1383
			if sh != nil {
1384
				end := &stats.End{
1385
					BeginTime: statsBegin.BeginTime,
1386
					EndTime:   time.Now(),
1387
				}
1388
				if err != nil && err != io.EOF {
1389
					end.Error = toRPCErr(err)
1390
				}
1391
				sh.HandleRPC(stream.Context(), end)
1392
			}
1393

1394
			if channelz.IsOn() {
1395
				if err != nil && err != io.EOF {
1396
					s.incrCallsFailed()
1397
				} else {
1398
					s.incrCallsSucceeded()
1399
				}
1400
			}
1401
		}()
1402
	}
1403

1404
	ss.binlog = binarylog.GetMethodLogger(stream.Method())
1405
	if ss.binlog != nil {
1406
		md, _ := metadata.FromIncomingContext(ctx)
1407
		logEntry := &binarylog.ClientHeader{
1408
			Header:     md,
1409
			MethodName: stream.Method(),
1410
			PeerAddr:   nil,
1411
		}
1412
		if deadline, ok := ctx.Deadline(); ok {
1413
			logEntry.Timeout = time.Until(deadline)
1414
			if logEntry.Timeout < 0 {
1415
				logEntry.Timeout = 0
1416
			}
1417
		}
1418
		if a := md[":authority"]; len(a) > 0 {
1419
			logEntry.Authority = a[0]
1420
		}
1421
		if peer, ok := peer.FromContext(ss.Context()); ok {
1422
			logEntry.PeerAddr = peer.Addr
1423
		}
1424
		ss.binlog.Log(logEntry)
1425
	}
1426

1427
	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1428
	// to find a matching registered compressor for decomp.
1429
	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1430
		ss.dc = s.opts.dc
1431
	} else if rc != "" && rc != encoding.Identity {
1432
		ss.decomp = encoding.GetCompressor(rc)
1433
		if ss.decomp == nil {
1434
			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1435
			t.WriteStatus(ss.s, st)
1436
			return st.Err()
1437
		}
1438
	}
1439

1440
	// If cp is set, use it.  Otherwise, attempt to compress the response using
1441
	// the incoming message compression method.
1442
	//
1443
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1444
	if s.opts.cp != nil {
1445
		ss.cp = s.opts.cp
1446
		stream.SetSendCompress(s.opts.cp.Type())
1447
	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1448
		// Legacy compressor not specified; attempt to respond with same encoding.
1449
		ss.comp = encoding.GetCompressor(rc)
1450
		if ss.comp != nil {
1451
			stream.SetSendCompress(rc)
1452
		}
1453
	}
1454

1455
	if trInfo != nil {
1456
		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1457
	}
1458
	var appErr error
1459
	var server interface{}
1460
	if info != nil {
1461
		server = info.serviceImpl
1462
	}
1463
	if s.opts.streamInt == nil {
1464
		appErr = sd.Handler(server, ss)
1465
	} else {
1466
		info := &StreamServerInfo{
1467
			FullMethod:     stream.Method(),
1468
			IsClientStream: sd.ClientStreams,
1469
			IsServerStream: sd.ServerStreams,
1470
		}
1471
		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1472
	}
1473
	if appErr != nil {
1474
		appStatus, ok := status.FromError(appErr)
1475
		if !ok {
1476
			appStatus = status.New(codes.Unknown, appErr.Error())
1477
			appErr = appStatus.Err()
1478
		}
1479
		if trInfo != nil {
1480
			ss.mu.Lock()
1481
			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1482
			ss.trInfo.tr.SetError()
1483
			ss.mu.Unlock()
1484
		}
1485
		t.WriteStatus(ss.s, appStatus)
1486
		if ss.binlog != nil {
1487
			ss.binlog.Log(&binarylog.ServerTrailer{
1488
				Trailer: ss.s.Trailer(),
1489
				Err:     appErr,
1490
			})
1491
		}
1492
		// TODO: Should we log an error from WriteStatus here and below?
1493
		return appErr
1494
	}
1495
	if trInfo != nil {
1496
		ss.mu.Lock()
1497
		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1498
		ss.mu.Unlock()
1499
	}
1500
	err = t.WriteStatus(ss.s, statusOK)
1501
	if ss.binlog != nil {
1502
		ss.binlog.Log(&binarylog.ServerTrailer{
1503
			Trailer: ss.s.Trailer(),
1504
			Err:     appErr,
1505
		})
1506
	}
1507
	return err
1508
}
1509

1510
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1511
	sm := stream.Method()
1512
	if sm != "" && sm[0] == '/' {
1513
		sm = sm[1:]
1514
	}
1515
	pos := strings.LastIndex(sm, "/")
1516
	if pos == -1 {
1517
		if trInfo != nil {
1518
			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1519
			trInfo.tr.SetError()
1520
		}
1521
		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1522
		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1523
			if trInfo != nil {
1524
				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1525
				trInfo.tr.SetError()
1526
			}
1527
			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1528
		}
1529
		if trInfo != nil {
1530
			trInfo.tr.Finish()
1531
		}
1532
		return
1533
	}
1534
	service := sm[:pos]
1535
	method := sm[pos+1:]
1536

1537
	srv, knownService := s.services[service]
1538
	if knownService {
1539
		if md, ok := srv.methods[method]; ok {
1540
			s.processUnaryRPC(t, stream, srv, md, trInfo)
1541
			return
1542
		}
1543
		if sd, ok := srv.streams[method]; ok {
1544
			s.processStreamingRPC(t, stream, srv, sd, trInfo)
1545
			return
1546
		}
1547
	}
1548
	// Unknown service, or known server unknown method.
1549
	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1550
		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1551
		return
1552
	}
1553
	var errDesc string
1554
	if !knownService {
1555
		errDesc = fmt.Sprintf("unknown service %v", service)
1556
	} else {
1557
		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1558
	}
1559
	if trInfo != nil {
1560
		trInfo.tr.LazyPrintf("%s", errDesc)
1561
		trInfo.tr.SetError()
1562
	}
1563
	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1564
		if trInfo != nil {
1565
			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1566
			trInfo.tr.SetError()
1567
		}
1568
		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1569
	}
1570
	if trInfo != nil {
1571
		trInfo.tr.Finish()
1572
	}
1573
}
1574

1575
// The key to save ServerTransportStream in the context.
1576
type streamKey struct{}
1577

1578
// NewContextWithServerTransportStream creates a new context from ctx and
1579
// attaches stream to it.
1580
//
1581
// Experimental
1582
//
1583
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1584
// later release.
1585
func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1586
	return context.WithValue(ctx, streamKey{}, stream)
1587
}
1588

1589
// ServerTransportStream is a minimal interface that a transport stream must
1590
// implement. This can be used to mock an actual transport stream for tests of
1591
// handler code that use, for example, grpc.SetHeader (which requires some
1592
// stream to be in context).
1593
//
1594
// See also NewContextWithServerTransportStream.
1595
//
1596
// Experimental
1597
//
1598
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
1599
// later release.
1600
type ServerTransportStream interface {
1601
	Method() string
1602
	SetHeader(md metadata.MD) error
1603
	SendHeader(md metadata.MD) error
1604
	SetTrailer(md metadata.MD) error
1605
}
1606

1607
// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1608
// ctx. Returns nil if the given context has no stream associated with it
1609
// (which implies it is not an RPC invocation context).
1610
//
1611
// Experimental
1612
//
1613
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1614
// later release.
1615
func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1616
	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1617
	return s
1618
}
1619

1620
// Stop stops the gRPC server. It immediately closes all open
1621
// connections and listeners.
1622
// It cancels all active RPCs on the server side and the corresponding
1623
// pending RPCs on the client side will get notified by connection
1624
// errors.
1625
func (s *Server) Stop() {
1626
	s.quit.Fire()
1627

1628
	defer func() {
1629
		s.serveWG.Wait()
1630
		s.done.Fire()
1631
	}()
1632

1633
	s.channelzRemoveOnce.Do(func() {
1634
		if channelz.IsOn() {
1635
			channelz.RemoveEntry(s.channelzID)
1636
		}
1637
	})
1638

1639
	s.mu.Lock()
1640
	listeners := s.lis
1641
	s.lis = nil
1642
	st := s.conns
1643
	s.conns = nil
1644
	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1645
	s.cv.Broadcast()
1646
	s.mu.Unlock()
1647

1648
	for lis := range listeners {
1649
		lis.Close()
1650
	}
1651
	for c := range st {
1652
		c.Close()
1653
	}
1654
	if s.opts.numServerWorkers > 0 {
1655
		s.stopServerWorkers()
1656
	}
1657

1658
	s.mu.Lock()
1659
	if s.events != nil {
1660
		s.events.Finish()
1661
		s.events = nil
1662
	}
1663
	s.mu.Unlock()
1664
}
1665

1666
// GracefulStop stops the gRPC server gracefully. It stops the server from
1667
// accepting new connections and RPCs and blocks until all the pending RPCs are
1668
// finished.
1669
func (s *Server) GracefulStop() {
1670
	s.quit.Fire()
1671
	defer s.done.Fire()
1672

1673
	s.channelzRemoveOnce.Do(func() {
1674
		if channelz.IsOn() {
1675
			channelz.RemoveEntry(s.channelzID)
1676
		}
1677
	})
1678
	s.mu.Lock()
1679
	if s.conns == nil {
1680
		s.mu.Unlock()
1681
		return
1682
	}
1683

1684
	for lis := range s.lis {
1685
		lis.Close()
1686
	}
1687
	s.lis = nil
1688
	if !s.drain {
1689
		for st := range s.conns {
1690
			st.Drain()
1691
		}
1692
		s.drain = true
1693
	}
1694

1695
	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1696
	// new conns will be created.
1697
	s.mu.Unlock()
1698
	s.serveWG.Wait()
1699
	s.mu.Lock()
1700

1701
	for len(s.conns) != 0 {
1702
		s.cv.Wait()
1703
	}
1704
	s.conns = nil
1705
	if s.events != nil {
1706
		s.events.Finish()
1707
		s.events = nil
1708
	}
1709
	s.mu.Unlock()
1710
}
1711

1712
// contentSubtype must be lowercase
1713
// cannot return nil
1714
func (s *Server) getCodec(contentSubtype string) baseCodec {
1715
	if s.opts.codec != nil {
1716
		return s.opts.codec
1717
	}
1718
	if contentSubtype == "" {
1719
		return encoding.GetCodec(proto.Name)
1720
	}
1721
	codec := encoding.GetCodec(contentSubtype)
1722
	if codec == nil {
1723
		return encoding.GetCodec(proto.Name)
1724
	}
1725
	return codec
1726
}
1727

1728
// SetHeader sets the header metadata.
1729
// When called multiple times, all the provided metadata will be merged.
1730
// All the metadata will be sent out when one of the following happens:
1731
//  - grpc.SendHeader() is called;
1732
//  - The first response is sent out;
1733
//  - An RPC status is sent out (error or success).
1734
func SetHeader(ctx context.Context, md metadata.MD) error {
1735
	if md.Len() == 0 {
1736
		return nil
1737
	}
1738
	stream := ServerTransportStreamFromContext(ctx)
1739
	if stream == nil {
1740
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1741
	}
1742
	return stream.SetHeader(md)
1743
}
1744

1745
// SendHeader sends header metadata. It may be called at most once.
1746
// The provided md and headers set by SetHeader() will be sent.
1747
func SendHeader(ctx context.Context, md metadata.MD) error {
1748
	stream := ServerTransportStreamFromContext(ctx)
1749
	if stream == nil {
1750
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1751
	}
1752
	if err := stream.SendHeader(md); err != nil {
1753
		return toRPCErr(err)
1754
	}
1755
	return nil
1756
}
1757

1758
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1759
// When called more than once, all the provided metadata will be merged.
1760
func SetTrailer(ctx context.Context, md metadata.MD) error {
1761
	if md.Len() == 0 {
1762
		return nil
1763
	}
1764
	stream := ServerTransportStreamFromContext(ctx)
1765
	if stream == nil {
1766
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1767
	}
1768
	return stream.SetTrailer(md)
1769
}
1770

1771
// Method returns the method string for the server context.  The returned
1772
// string is in the format of "/service/method".
1773
func Method(ctx context.Context) (string, bool) {
1774
	s := ServerTransportStreamFromContext(ctx)
1775
	if s == nil {
1776
		return "", false
1777
	}
1778
	return s.Method(), true
1779
}
1780

1781
type channelzServer struct {
1782
	s *Server
1783
}
1784

1785
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1786
	return c.s.channelzMetric()
1787
}
1788

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.