cubefs
1787 строк · 53.9 Кб
1/*
2*
3* Copyright 2014 gRPC authors.
4*
5* Licensed under the Apache License, Version 2.0 (the "License");
6* you may not use this file except in compliance with the License.
7* You may obtain a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS,
13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15* limitations under the License.
16*
17*/
18
19package grpc
20
21import (
22"context"
23"errors"
24"fmt"
25"io"
26"math"
27"net"
28"net/http"
29"reflect"
30"runtime"
31"strings"
32"sync"
33"sync/atomic"
34"time"
35
36"golang.org/x/net/trace"
37
38"google.golang.org/grpc/codes"
39"google.golang.org/grpc/credentials"
40"google.golang.org/grpc/encoding"
41"google.golang.org/grpc/encoding/proto"
42"google.golang.org/grpc/grpclog"
43"google.golang.org/grpc/internal"
44"google.golang.org/grpc/internal/binarylog"
45"google.golang.org/grpc/internal/channelz"
46"google.golang.org/grpc/internal/grpcrand"
47"google.golang.org/grpc/internal/grpcsync"
48"google.golang.org/grpc/internal/transport"
49"google.golang.org/grpc/keepalive"
50"google.golang.org/grpc/metadata"
51"google.golang.org/grpc/peer"
52"google.golang.org/grpc/stats"
53"google.golang.org/grpc/status"
54"google.golang.org/grpc/tap"
55)
56
57const (
58defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
59defaultServerMaxSendMessageSize = math.MaxInt32
60)
61
62func init() {
63internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
64return srv.opts.creds
65}
66}
67
68var statusOK = status.New(codes.OK, "")
69var logger = grpclog.Component("core")
70
71type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
72
73// MethodDesc represents an RPC service's method specification.
74type MethodDesc struct {
75MethodName string
76Handler methodHandler
77}
78
79// ServiceDesc represents an RPC service's specification.
80type ServiceDesc struct {
81ServiceName string
82// The pointer to the service interface. Used to check whether the user
83// provided implementation satisfies the interface requirements.
84HandlerType interface{}
85Methods []MethodDesc
86Streams []StreamDesc
87Metadata interface{}
88}
89
90// serviceInfo wraps information about a service. It is very similar to
91// ServiceDesc and is constructed from it for internal purposes.
92type serviceInfo struct {
93// Contains the implementation for the methods in this service.
94serviceImpl interface{}
95methods map[string]*MethodDesc
96streams map[string]*StreamDesc
97mdata interface{}
98}
99
100type serverWorkerData struct {
101st transport.ServerTransport
102wg *sync.WaitGroup
103stream *transport.Stream
104}
105
106// Server is a gRPC server to serve RPC requests.
107type Server struct {
108opts serverOptions
109
110mu sync.Mutex // guards following
111lis map[net.Listener]bool
112conns map[transport.ServerTransport]bool
113serve bool
114drain bool
115cv *sync.Cond // signaled when connections close for GracefulStop
116services map[string]*serviceInfo // service name -> service info
117events trace.EventLog
118
119quit *grpcsync.Event
120done *grpcsync.Event
121channelzRemoveOnce sync.Once
122serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
123
124channelzID int64 // channelz unique identification number
125czData *channelzData
126
127serverWorkerChannels []chan *serverWorkerData
128}
129
130type serverOptions struct {
131creds credentials.TransportCredentials
132codec baseCodec
133cp Compressor
134dc Decompressor
135unaryInt UnaryServerInterceptor
136streamInt StreamServerInterceptor
137chainUnaryInts []UnaryServerInterceptor
138chainStreamInts []StreamServerInterceptor
139inTapHandle tap.ServerInHandle
140statsHandler stats.Handler
141maxConcurrentStreams uint32
142maxReceiveMessageSize int
143maxSendMessageSize int
144unknownStreamDesc *StreamDesc
145keepaliveParams keepalive.ServerParameters
146keepalivePolicy keepalive.EnforcementPolicy
147initialWindowSize int32
148initialConnWindowSize int32
149writeBufferSize int
150readBufferSize int
151connectionTimeout time.Duration
152maxHeaderListSize *uint32
153headerTableSize *uint32
154numServerWorkers uint32
155}
156
157var defaultServerOptions = serverOptions{
158maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
159maxSendMessageSize: defaultServerMaxSendMessageSize,
160connectionTimeout: 120 * time.Second,
161writeBufferSize: defaultWriteBufSize,
162readBufferSize: defaultReadBufSize,
163}
164
165// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
166type ServerOption interface {
167apply(*serverOptions)
168}
169
170// EmptyServerOption does not alter the server configuration. It can be embedded
171// in another structure to build custom server options.
172//
173// Experimental
174//
175// Notice: This type is EXPERIMENTAL and may be changed or removed in a
176// later release.
177type EmptyServerOption struct{}
178
179func (EmptyServerOption) apply(*serverOptions) {}
180
181// funcServerOption wraps a function that modifies serverOptions into an
182// implementation of the ServerOption interface.
183type funcServerOption struct {
184f func(*serverOptions)
185}
186
187func (fdo *funcServerOption) apply(do *serverOptions) {
188fdo.f(do)
189}
190
191func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
192return &funcServerOption{
193f: f,
194}
195}
196
197// WriteBufferSize determines how much data can be batched before doing a write on the wire.
198// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
199// The default value for this buffer is 32KB.
200// Zero will disable the write buffer such that each write will be on underlying connection.
201// Note: A Send call may not directly translate to a write.
202func WriteBufferSize(s int) ServerOption {
203return newFuncServerOption(func(o *serverOptions) {
204o.writeBufferSize = s
205})
206}
207
208// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
209// for one read syscall.
210// The default value for this buffer is 32KB.
211// Zero will disable read buffer for a connection so data framer can access the underlying
212// conn directly.
213func ReadBufferSize(s int) ServerOption {
214return newFuncServerOption(func(o *serverOptions) {
215o.readBufferSize = s
216})
217}
218
219// InitialWindowSize returns a ServerOption that sets window size for stream.
220// The lower bound for window size is 64K and any value smaller than that will be ignored.
221func InitialWindowSize(s int32) ServerOption {
222return newFuncServerOption(func(o *serverOptions) {
223o.initialWindowSize = s
224})
225}
226
227// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
228// The lower bound for window size is 64K and any value smaller than that will be ignored.
229func InitialConnWindowSize(s int32) ServerOption {
230return newFuncServerOption(func(o *serverOptions) {
231o.initialConnWindowSize = s
232})
233}
234
235// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
236func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
237if kp.Time > 0 && kp.Time < time.Second {
238logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
239kp.Time = time.Second
240}
241
242return newFuncServerOption(func(o *serverOptions) {
243o.keepaliveParams = kp
244})
245}
246
247// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
248func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
249return newFuncServerOption(func(o *serverOptions) {
250o.keepalivePolicy = kep
251})
252}
253
254// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
255//
256// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
257//
258// Deprecated: register codecs using encoding.RegisterCodec. The server will
259// automatically use registered codecs based on the incoming requests' headers.
260// See also
261// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
262// Will be supported throughout 1.x.
263func CustomCodec(codec Codec) ServerOption {
264return newFuncServerOption(func(o *serverOptions) {
265o.codec = codec
266})
267}
268
269// RPCCompressor returns a ServerOption that sets a compressor for outbound
270// messages. For backward compatibility, all outbound messages will be sent
271// using this compressor, regardless of incoming message compression. By
272// default, server messages will be sent using the same compressor with which
273// request messages were sent.
274//
275// Deprecated: use encoding.RegisterCompressor instead. Will be supported
276// throughout 1.x.
277func RPCCompressor(cp Compressor) ServerOption {
278return newFuncServerOption(func(o *serverOptions) {
279o.cp = cp
280})
281}
282
283// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
284// messages. It has higher priority than decompressors registered via
285// encoding.RegisterCompressor.
286//
287// Deprecated: use encoding.RegisterCompressor instead. Will be supported
288// throughout 1.x.
289func RPCDecompressor(dc Decompressor) ServerOption {
290return newFuncServerOption(func(o *serverOptions) {
291o.dc = dc
292})
293}
294
295// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
296// If this is not set, gRPC uses the default limit.
297//
298// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
299func MaxMsgSize(m int) ServerOption {
300return MaxRecvMsgSize(m)
301}
302
303// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
304// If this is not set, gRPC uses the default 4MB.
305func MaxRecvMsgSize(m int) ServerOption {
306return newFuncServerOption(func(o *serverOptions) {
307o.maxReceiveMessageSize = m
308})
309}
310
311// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
312// If this is not set, gRPC uses the default `math.MaxInt32`.
313func MaxSendMsgSize(m int) ServerOption {
314return newFuncServerOption(func(o *serverOptions) {
315o.maxSendMessageSize = m
316})
317}
318
319// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
320// of concurrent streams to each ServerTransport.
321func MaxConcurrentStreams(n uint32) ServerOption {
322return newFuncServerOption(func(o *serverOptions) {
323o.maxConcurrentStreams = n
324})
325}
326
327// Creds returns a ServerOption that sets credentials for server connections.
328func Creds(c credentials.TransportCredentials) ServerOption {
329return newFuncServerOption(func(o *serverOptions) {
330o.creds = c
331})
332}
333
334// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
335// server. Only one unary interceptor can be installed. The construction of multiple
336// interceptors (e.g., chaining) can be implemented at the caller.
337func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
338return newFuncServerOption(func(o *serverOptions) {
339if o.unaryInt != nil {
340panic("The unary server interceptor was already set and may not be reset.")
341}
342o.unaryInt = i
343})
344}
345
346// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
347// for unary RPCs. The first interceptor will be the outer most,
348// while the last interceptor will be the inner most wrapper around the real call.
349// All unary interceptors added by this method will be chained.
350func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
351return newFuncServerOption(func(o *serverOptions) {
352o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
353})
354}
355
356// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
357// server. Only one stream interceptor can be installed.
358func StreamInterceptor(i StreamServerInterceptor) ServerOption {
359return newFuncServerOption(func(o *serverOptions) {
360if o.streamInt != nil {
361panic("The stream server interceptor was already set and may not be reset.")
362}
363o.streamInt = i
364})
365}
366
367// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
368// for streaming RPCs. The first interceptor will be the outer most,
369// while the last interceptor will be the inner most wrapper around the real call.
370// All stream interceptors added by this method will be chained.
371func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
372return newFuncServerOption(func(o *serverOptions) {
373o.chainStreamInts = append(o.chainStreamInts, interceptors...)
374})
375}
376
377// InTapHandle returns a ServerOption that sets the tap handle for all the server
378// transport to be created. Only one can be installed.
379func InTapHandle(h tap.ServerInHandle) ServerOption {
380return newFuncServerOption(func(o *serverOptions) {
381if o.inTapHandle != nil {
382panic("The tap handle was already set and may not be reset.")
383}
384o.inTapHandle = h
385})
386}
387
388// StatsHandler returns a ServerOption that sets the stats handler for the server.
389func StatsHandler(h stats.Handler) ServerOption {
390return newFuncServerOption(func(o *serverOptions) {
391o.statsHandler = h
392})
393}
394
395// UnknownServiceHandler returns a ServerOption that allows for adding a custom
396// unknown service handler. The provided method is a bidi-streaming RPC service
397// handler that will be invoked instead of returning the "unimplemented" gRPC
398// error whenever a request is received for an unregistered service or method.
399// The handling function and stream interceptor (if set) have full access to
400// the ServerStream, including its Context.
401func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
402return newFuncServerOption(func(o *serverOptions) {
403o.unknownStreamDesc = &StreamDesc{
404StreamName: "unknown_service_handler",
405Handler: streamHandler,
406// We need to assume that the users of the streamHandler will want to use both.
407ClientStreams: true,
408ServerStreams: true,
409}
410})
411}
412
413// ConnectionTimeout returns a ServerOption that sets the timeout for
414// connection establishment (up to and including HTTP/2 handshaking) for all
415// new connections. If this is not set, the default is 120 seconds. A zero or
416// negative value will result in an immediate timeout.
417//
418// Experimental
419//
420// Notice: This API is EXPERIMENTAL and may be changed or removed in a
421// later release.
422func ConnectionTimeout(d time.Duration) ServerOption {
423return newFuncServerOption(func(o *serverOptions) {
424o.connectionTimeout = d
425})
426}
427
428// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
429// of header list that the server is prepared to accept.
430func MaxHeaderListSize(s uint32) ServerOption {
431return newFuncServerOption(func(o *serverOptions) {
432o.maxHeaderListSize = &s
433})
434}
435
436// HeaderTableSize returns a ServerOption that sets the size of dynamic
437// header table for stream.
438//
439// Experimental
440//
441// Notice: This API is EXPERIMENTAL and may be changed or removed in a
442// later release.
443func HeaderTableSize(s uint32) ServerOption {
444return newFuncServerOption(func(o *serverOptions) {
445o.headerTableSize = &s
446})
447}
448
449// NumStreamWorkers returns a ServerOption that sets the number of worker
450// goroutines that should be used to process incoming streams. Setting this to
451// zero (default) will disable workers and spawn a new goroutine for each
452// stream.
453//
454// Experimental
455//
456// Notice: This API is EXPERIMENTAL and may be changed or removed in a
457// later release.
458func NumStreamWorkers(numServerWorkers uint32) ServerOption {
459// TODO: If/when this API gets stabilized (i.e. stream workers become the
460// only way streams are processed), change the behavior of the zero value to
461// a sane default. Preliminary experiments suggest that a value equal to the
462// number of CPUs available is most performant; requires thorough testing.
463return newFuncServerOption(func(o *serverOptions) {
464o.numServerWorkers = numServerWorkers
465})
466}
467
468// serverWorkerResetThreshold defines how often the stack must be reset. Every
469// N requests, by spawning a new goroutine in its place, a worker can reset its
470// stack so that large stacks don't live in memory forever. 2^16 should allow
471// each goroutine stack to live for at least a few seconds in a typical
472// workload (assuming a QPS of a few thousand requests/sec).
473const serverWorkerResetThreshold = 1 << 16
474
475// serverWorkers blocks on a *transport.Stream channel forever and waits for
476// data to be fed by serveStreams. This allows different requests to be
477// processed by the same goroutine, removing the need for expensive stack
478// re-allocations (see the runtime.morestack problem [1]).
479//
480// [1] https://github.com/golang/go/issues/18138
481func (s *Server) serverWorker(ch chan *serverWorkerData) {
482// To make sure all server workers don't reset at the same time, choose a
483// random number of iterations before resetting.
484threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
485for completed := 0; completed < threshold; completed++ {
486data, ok := <-ch
487if !ok {
488return
489}
490s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
491data.wg.Done()
492}
493go s.serverWorker(ch)
494}
495
496// initServerWorkers creates worker goroutines and channels to process incoming
497// connections to reduce the time spent overall on runtime.morestack.
498func (s *Server) initServerWorkers() {
499s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
500for i := uint32(0); i < s.opts.numServerWorkers; i++ {
501s.serverWorkerChannels[i] = make(chan *serverWorkerData)
502go s.serverWorker(s.serverWorkerChannels[i])
503}
504}
505
506func (s *Server) stopServerWorkers() {
507for i := uint32(0); i < s.opts.numServerWorkers; i++ {
508close(s.serverWorkerChannels[i])
509}
510}
511
512// NewServer creates a gRPC server which has no service registered and has not
513// started to accept requests yet.
514func NewServer(opt ...ServerOption) *Server {
515opts := defaultServerOptions
516for _, o := range opt {
517o.apply(&opts)
518}
519s := &Server{
520lis: make(map[net.Listener]bool),
521opts: opts,
522conns: make(map[transport.ServerTransport]bool),
523services: make(map[string]*serviceInfo),
524quit: grpcsync.NewEvent(),
525done: grpcsync.NewEvent(),
526czData: new(channelzData),
527}
528chainUnaryServerInterceptors(s)
529chainStreamServerInterceptors(s)
530s.cv = sync.NewCond(&s.mu)
531if EnableTracing {
532_, file, line, _ := runtime.Caller(1)
533s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
534}
535
536if s.opts.numServerWorkers > 0 {
537s.initServerWorkers()
538}
539
540if channelz.IsOn() {
541s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
542}
543return s
544}
545
546// printf records an event in s's event log, unless s has been stopped.
547// REQUIRES s.mu is held.
548func (s *Server) printf(format string, a ...interface{}) {
549if s.events != nil {
550s.events.Printf(format, a...)
551}
552}
553
554// errorf records an error in s's event log, unless s has been stopped.
555// REQUIRES s.mu is held.
556func (s *Server) errorf(format string, a ...interface{}) {
557if s.events != nil {
558s.events.Errorf(format, a...)
559}
560}
561
562// ServiceRegistrar wraps a single method that supports service registration. It
563// enables users to pass concrete types other than grpc.Server to the service
564// registration methods exported by the IDL generated code.
565type ServiceRegistrar interface {
566// RegisterService registers a service and its implementation to the
567// concrete type implementing this interface. It may not be called
568// once the server has started serving.
569// desc describes the service and its methods and handlers. impl is the
570// service implementation which is passed to the method handlers.
571RegisterService(desc *ServiceDesc, impl interface{})
572}
573
574// RegisterService registers a service and its implementation to the gRPC
575// server. It is called from the IDL generated code. This must be called before
576// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
577// ensure it implements sd.HandlerType.
578func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
579if ss != nil {
580ht := reflect.TypeOf(sd.HandlerType).Elem()
581st := reflect.TypeOf(ss)
582if !st.Implements(ht) {
583logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
584}
585}
586s.register(sd, ss)
587}
588
589func (s *Server) register(sd *ServiceDesc, ss interface{}) {
590s.mu.Lock()
591defer s.mu.Unlock()
592s.printf("RegisterService(%q)", sd.ServiceName)
593if s.serve {
594logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
595}
596if _, ok := s.services[sd.ServiceName]; ok {
597logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
598}
599info := &serviceInfo{
600serviceImpl: ss,
601methods: make(map[string]*MethodDesc),
602streams: make(map[string]*StreamDesc),
603mdata: sd.Metadata,
604}
605for i := range sd.Methods {
606d := &sd.Methods[i]
607info.methods[d.MethodName] = d
608}
609for i := range sd.Streams {
610d := &sd.Streams[i]
611info.streams[d.StreamName] = d
612}
613s.services[sd.ServiceName] = info
614}
615
616// MethodInfo contains the information of an RPC including its method name and type.
617type MethodInfo struct {
618// Name is the method name only, without the service name or package name.
619Name string
620// IsClientStream indicates whether the RPC is a client streaming RPC.
621IsClientStream bool
622// IsServerStream indicates whether the RPC is a server streaming RPC.
623IsServerStream bool
624}
625
626// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
627type ServiceInfo struct {
628Methods []MethodInfo
629// Metadata is the metadata specified in ServiceDesc when registering service.
630Metadata interface{}
631}
632
633// GetServiceInfo returns a map from service names to ServiceInfo.
634// Service names include the package names, in the form of <package>.<service>.
635func (s *Server) GetServiceInfo() map[string]ServiceInfo {
636ret := make(map[string]ServiceInfo)
637for n, srv := range s.services {
638methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
639for m := range srv.methods {
640methods = append(methods, MethodInfo{
641Name: m,
642IsClientStream: false,
643IsServerStream: false,
644})
645}
646for m, d := range srv.streams {
647methods = append(methods, MethodInfo{
648Name: m,
649IsClientStream: d.ClientStreams,
650IsServerStream: d.ServerStreams,
651})
652}
653
654ret[n] = ServiceInfo{
655Methods: methods,
656Metadata: srv.mdata,
657}
658}
659return ret
660}
661
662// ErrServerStopped indicates that the operation is now illegal because of
663// the server being stopped.
664var ErrServerStopped = errors.New("grpc: the server has been stopped")
665
666func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
667if s.opts.creds == nil {
668return rawConn, nil, nil
669}
670return s.opts.creds.ServerHandshake(rawConn)
671}
672
673type listenSocket struct {
674net.Listener
675channelzID int64
676}
677
678func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
679return &channelz.SocketInternalMetric{
680SocketOptions: channelz.GetSocketOption(l.Listener),
681LocalAddr: l.Listener.Addr(),
682}
683}
684
685func (l *listenSocket) Close() error {
686err := l.Listener.Close()
687if channelz.IsOn() {
688channelz.RemoveEntry(l.channelzID)
689}
690return err
691}
692
693// Serve accepts incoming connections on the listener lis, creating a new
694// ServerTransport and service goroutine for each. The service goroutines
695// read gRPC requests and then call the registered handlers to reply to them.
696// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
697// this method returns.
698// Serve will return a non-nil error unless Stop or GracefulStop is called.
699func (s *Server) Serve(lis net.Listener) error {
700s.mu.Lock()
701s.printf("serving")
702s.serve = true
703if s.lis == nil {
704// Serve called after Stop or GracefulStop.
705s.mu.Unlock()
706lis.Close()
707return ErrServerStopped
708}
709
710s.serveWG.Add(1)
711defer func() {
712s.serveWG.Done()
713if s.quit.HasFired() {
714// Stop or GracefulStop called; block until done and return nil.
715<-s.done.Done()
716}
717}()
718
719ls := &listenSocket{Listener: lis}
720s.lis[ls] = true
721
722if channelz.IsOn() {
723ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
724}
725s.mu.Unlock()
726
727defer func() {
728s.mu.Lock()
729if s.lis != nil && s.lis[ls] {
730ls.Close()
731delete(s.lis, ls)
732}
733s.mu.Unlock()
734}()
735
736var tempDelay time.Duration // how long to sleep on accept failure
737
738for {
739rawConn, err := lis.Accept()
740if err != nil {
741if ne, ok := err.(interface {
742Temporary() bool
743}); ok && ne.Temporary() {
744if tempDelay == 0 {
745tempDelay = 5 * time.Millisecond
746} else {
747tempDelay *= 2
748}
749if max := 1 * time.Second; tempDelay > max {
750tempDelay = max
751}
752s.mu.Lock()
753s.printf("Accept error: %v; retrying in %v", err, tempDelay)
754s.mu.Unlock()
755timer := time.NewTimer(tempDelay)
756select {
757case <-timer.C:
758case <-s.quit.Done():
759timer.Stop()
760return nil
761}
762continue
763}
764s.mu.Lock()
765s.printf("done serving; Accept = %v", err)
766s.mu.Unlock()
767
768if s.quit.HasFired() {
769return nil
770}
771return err
772}
773tempDelay = 0
774// Start a new goroutine to deal with rawConn so we don't stall this Accept
775// loop goroutine.
776//
777// Make sure we account for the goroutine so GracefulStop doesn't nil out
778// s.conns before this conn can be added.
779s.serveWG.Add(1)
780go func() {
781s.handleRawConn(rawConn)
782s.serveWG.Done()
783}()
784}
785}
786
787// handleRawConn forks a goroutine to handle a just-accepted connection that
788// has not had any I/O performed on it yet.
789func (s *Server) handleRawConn(rawConn net.Conn) {
790if s.quit.HasFired() {
791rawConn.Close()
792return
793}
794rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
795conn, authInfo, err := s.useTransportAuthenticator(rawConn)
796if err != nil {
797// ErrConnDispatched means that the connection was dispatched away from
798// gRPC; those connections should be left open.
799if err != credentials.ErrConnDispatched {
800s.mu.Lock()
801s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
802s.mu.Unlock()
803channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
804rawConn.Close()
805}
806rawConn.SetDeadline(time.Time{})
807return
808}
809
810// Finish handshaking (HTTP2)
811st := s.newHTTP2Transport(conn, authInfo)
812if st == nil {
813return
814}
815
816rawConn.SetDeadline(time.Time{})
817if !s.addConn(st) {
818return
819}
820go func() {
821s.serveStreams(st)
822s.removeConn(st)
823}()
824}
825
826// newHTTP2Transport sets up a http/2 transport (using the
827// gRPC http2 server transport in transport/http2_server.go).
828func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
829config := &transport.ServerConfig{
830MaxStreams: s.opts.maxConcurrentStreams,
831AuthInfo: authInfo,
832InTapHandle: s.opts.inTapHandle,
833StatsHandler: s.opts.statsHandler,
834KeepaliveParams: s.opts.keepaliveParams,
835KeepalivePolicy: s.opts.keepalivePolicy,
836InitialWindowSize: s.opts.initialWindowSize,
837InitialConnWindowSize: s.opts.initialConnWindowSize,
838WriteBufferSize: s.opts.writeBufferSize,
839ReadBufferSize: s.opts.readBufferSize,
840ChannelzParentID: s.channelzID,
841MaxHeaderListSize: s.opts.maxHeaderListSize,
842HeaderTableSize: s.opts.headerTableSize,
843}
844st, err := transport.NewServerTransport("http2", c, config)
845if err != nil {
846s.mu.Lock()
847s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
848s.mu.Unlock()
849c.Close()
850channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
851return nil
852}
853
854return st
855}
856
857func (s *Server) serveStreams(st transport.ServerTransport) {
858defer st.Close()
859var wg sync.WaitGroup
860
861var roundRobinCounter uint32
862st.HandleStreams(func(stream *transport.Stream) {
863wg.Add(1)
864if s.opts.numServerWorkers > 0 {
865data := &serverWorkerData{st: st, wg: &wg, stream: stream}
866select {
867case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
868default:
869// If all stream workers are busy, fallback to the default code path.
870go func() {
871s.handleStream(st, stream, s.traceInfo(st, stream))
872wg.Done()
873}()
874}
875} else {
876go func() {
877defer wg.Done()
878s.handleStream(st, stream, s.traceInfo(st, stream))
879}()
880}
881}, func(ctx context.Context, method string) context.Context {
882if !EnableTracing {
883return ctx
884}
885tr := trace.New("grpc.Recv."+methodFamily(method), method)
886return trace.NewContext(ctx, tr)
887})
888wg.Wait()
889}
890
891var _ http.Handler = (*Server)(nil)
892
893// ServeHTTP implements the Go standard library's http.Handler
894// interface by responding to the gRPC request r, by looking up
895// the requested gRPC method in the gRPC server s.
896//
897// The provided HTTP request must have arrived on an HTTP/2
898// connection. When using the Go standard library's server,
899// practically this means that the Request must also have arrived
900// over TLS.
901//
902// To share one port (such as 443 for https) between gRPC and an
903// existing http.Handler, use a root http.Handler such as:
904//
905// if r.ProtoMajor == 2 && strings.HasPrefix(
906// r.Header.Get("Content-Type"), "application/grpc") {
907// grpcServer.ServeHTTP(w, r)
908// } else {
909// yourMux.ServeHTTP(w, r)
910// }
911//
912// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
913// separate from grpc-go's HTTP/2 server. Performance and features may vary
914// between the two paths. ServeHTTP does not support some gRPC features
915// available through grpc-go's HTTP/2 server.
916//
917// Experimental
918//
919// Notice: This API is EXPERIMENTAL and may be changed or removed in a
920// later release.
921func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
922st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
923if err != nil {
924http.Error(w, err.Error(), http.StatusInternalServerError)
925return
926}
927if !s.addConn(st) {
928return
929}
930defer s.removeConn(st)
931s.serveStreams(st)
932}
933
934// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
935// If tracing is not enabled, it returns nil.
936func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
937if !EnableTracing {
938return nil
939}
940tr, ok := trace.FromContext(stream.Context())
941if !ok {
942return nil
943}
944
945trInfo = &traceInfo{
946tr: tr,
947firstLine: firstLine{
948client: false,
949remoteAddr: st.RemoteAddr(),
950},
951}
952if dl, ok := stream.Context().Deadline(); ok {
953trInfo.firstLine.deadline = time.Until(dl)
954}
955return trInfo
956}
957
958func (s *Server) addConn(st transport.ServerTransport) bool {
959s.mu.Lock()
960defer s.mu.Unlock()
961if s.conns == nil {
962st.Close()
963return false
964}
965if s.drain {
966// Transport added after we drained our existing conns: drain it
967// immediately.
968st.Drain()
969}
970s.conns[st] = true
971return true
972}
973
974func (s *Server) removeConn(st transport.ServerTransport) {
975s.mu.Lock()
976defer s.mu.Unlock()
977if s.conns != nil {
978delete(s.conns, st)
979s.cv.Broadcast()
980}
981}
982
983func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
984return &channelz.ServerInternalMetric{
985CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
986CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
987CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
988LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
989}
990}
991
992func (s *Server) incrCallsStarted() {
993atomic.AddInt64(&s.czData.callsStarted, 1)
994atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
995}
996
997func (s *Server) incrCallsSucceeded() {
998atomic.AddInt64(&s.czData.callsSucceeded, 1)
999}
1000
1001func (s *Server) incrCallsFailed() {
1002atomic.AddInt64(&s.czData.callsFailed, 1)
1003}
1004
1005func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
1006data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1007if err != nil {
1008channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
1009return err
1010}
1011compData, err := compress(data, cp, comp)
1012if err != nil {
1013channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
1014return err
1015}
1016hdr, payload := msgHeader(data, compData)
1017// TODO(dfawley): should we be checking len(data) instead?
1018if len(payload) > s.opts.maxSendMessageSize {
1019return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
1020}
1021err = t.Write(stream, hdr, payload, opts)
1022if err == nil && s.opts.statsHandler != nil {
1023s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
1024}
1025return err
1026}
1027
1028// chainUnaryServerInterceptors chains all unary server interceptors into one.
1029func chainUnaryServerInterceptors(s *Server) {
1030// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1031// be executed before any other chained interceptors.
1032interceptors := s.opts.chainUnaryInts
1033if s.opts.unaryInt != nil {
1034interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
1035}
1036
1037var chainedInt UnaryServerInterceptor
1038if len(interceptors) == 0 {
1039chainedInt = nil
1040} else if len(interceptors) == 1 {
1041chainedInt = interceptors[0]
1042} else {
1043chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
1044return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
1045}
1046}
1047
1048s.opts.unaryInt = chainedInt
1049}
1050
1051// getChainUnaryHandler recursively generate the chained UnaryHandler
1052func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1053if curr == len(interceptors)-1 {
1054return finalHandler
1055}
1056
1057return func(ctx context.Context, req interface{}) (interface{}, error) {
1058return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
1059}
1060}
1061
1062func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
1063sh := s.opts.statsHandler
1064if sh != nil || trInfo != nil || channelz.IsOn() {
1065if channelz.IsOn() {
1066s.incrCallsStarted()
1067}
1068var statsBegin *stats.Begin
1069if sh != nil {
1070beginTime := time.Now()
1071statsBegin = &stats.Begin{
1072BeginTime: beginTime,
1073}
1074sh.HandleRPC(stream.Context(), statsBegin)
1075}
1076if trInfo != nil {
1077trInfo.tr.LazyLog(&trInfo.firstLine, false)
1078}
1079// The deferred error handling for tracing, stats handler and channelz are
1080// combined into one function to reduce stack usage -- a defer takes ~56-64
1081// bytes on the stack, so overflowing the stack will require a stack
1082// re-allocation, which is expensive.
1083//
1084// To maintain behavior similar to separate deferred statements, statements
1085// should be executed in the reverse order. That is, tracing first, stats
1086// handler second, and channelz last. Note that panics *within* defers will
1087// lead to different behavior, but that's an acceptable compromise; that
1088// would be undefined behavior territory anyway.
1089defer func() {
1090if trInfo != nil {
1091if err != nil && err != io.EOF {
1092trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1093trInfo.tr.SetError()
1094}
1095trInfo.tr.Finish()
1096}
1097
1098if sh != nil {
1099end := &stats.End{
1100BeginTime: statsBegin.BeginTime,
1101EndTime: time.Now(),
1102}
1103if err != nil && err != io.EOF {
1104end.Error = toRPCErr(err)
1105}
1106sh.HandleRPC(stream.Context(), end)
1107}
1108
1109if channelz.IsOn() {
1110if err != nil && err != io.EOF {
1111s.incrCallsFailed()
1112} else {
1113s.incrCallsSucceeded()
1114}
1115}
1116}()
1117}
1118
1119binlog := binarylog.GetMethodLogger(stream.Method())
1120if binlog != nil {
1121ctx := stream.Context()
1122md, _ := metadata.FromIncomingContext(ctx)
1123logEntry := &binarylog.ClientHeader{
1124Header: md,
1125MethodName: stream.Method(),
1126PeerAddr: nil,
1127}
1128if deadline, ok := ctx.Deadline(); ok {
1129logEntry.Timeout = time.Until(deadline)
1130if logEntry.Timeout < 0 {
1131logEntry.Timeout = 0
1132}
1133}
1134if a := md[":authority"]; len(a) > 0 {
1135logEntry.Authority = a[0]
1136}
1137if peer, ok := peer.FromContext(ctx); ok {
1138logEntry.PeerAddr = peer.Addr
1139}
1140binlog.Log(logEntry)
1141}
1142
1143// comp and cp are used for compression. decomp and dc are used for
1144// decompression. If comp and decomp are both set, they are the same;
1145// however they are kept separate to ensure that at most one of the
1146// compressor/decompressor variable pairs are set for use later.
1147var comp, decomp encoding.Compressor
1148var cp Compressor
1149var dc Decompressor
1150
1151// If dc is set and matches the stream's compression, use it. Otherwise, try
1152// to find a matching registered compressor for decomp.
1153if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1154dc = s.opts.dc
1155} else if rc != "" && rc != encoding.Identity {
1156decomp = encoding.GetCompressor(rc)
1157if decomp == nil {
1158st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1159t.WriteStatus(stream, st)
1160return st.Err()
1161}
1162}
1163
1164// If cp is set, use it. Otherwise, attempt to compress the response using
1165// the incoming message compression method.
1166//
1167// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1168if s.opts.cp != nil {
1169cp = s.opts.cp
1170stream.SetSendCompress(cp.Type())
1171} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1172// Legacy compressor not specified; attempt to respond with same encoding.
1173comp = encoding.GetCompressor(rc)
1174if comp != nil {
1175stream.SetSendCompress(rc)
1176}
1177}
1178
1179var payInfo *payloadInfo
1180if sh != nil || binlog != nil {
1181payInfo = &payloadInfo{}
1182}
1183d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1184if err != nil {
1185if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
1186channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
1187}
1188return err
1189}
1190if channelz.IsOn() {
1191t.IncrMsgRecv()
1192}
1193df := func(v interface{}) error {
1194if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1195return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1196}
1197if sh != nil {
1198sh.HandleRPC(stream.Context(), &stats.InPayload{
1199RecvTime: time.Now(),
1200Payload: v,
1201WireLength: payInfo.wireLength + headerLen,
1202Data: d,
1203Length: len(d),
1204})
1205}
1206if binlog != nil {
1207binlog.Log(&binarylog.ClientMessage{
1208Message: d,
1209})
1210}
1211if trInfo != nil {
1212trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1213}
1214return nil
1215}
1216ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1217reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
1218if appErr != nil {
1219appStatus, ok := status.FromError(appErr)
1220if !ok {
1221// Convert appErr if it is not a grpc status error.
1222appErr = status.Error(codes.Unknown, appErr.Error())
1223appStatus, _ = status.FromError(appErr)
1224}
1225if trInfo != nil {
1226trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1227trInfo.tr.SetError()
1228}
1229if e := t.WriteStatus(stream, appStatus); e != nil {
1230channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1231}
1232if binlog != nil {
1233if h, _ := stream.Header(); h.Len() > 0 {
1234// Only log serverHeader if there was header. Otherwise it can
1235// be trailer only.
1236binlog.Log(&binarylog.ServerHeader{
1237Header: h,
1238})
1239}
1240binlog.Log(&binarylog.ServerTrailer{
1241Trailer: stream.Trailer(),
1242Err: appErr,
1243})
1244}
1245return appErr
1246}
1247if trInfo != nil {
1248trInfo.tr.LazyLog(stringer("OK"), false)
1249}
1250opts := &transport.Options{Last: true}
1251
1252if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1253if err == io.EOF {
1254// The entire stream is done (for unary RPC only).
1255return err
1256}
1257if sts, ok := status.FromError(err); ok {
1258if e := t.WriteStatus(stream, sts); e != nil {
1259channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1260}
1261} else {
1262switch st := err.(type) {
1263case transport.ConnectionError:
1264// Nothing to do here.
1265default:
1266panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1267}
1268}
1269if binlog != nil {
1270h, _ := stream.Header()
1271binlog.Log(&binarylog.ServerHeader{
1272Header: h,
1273})
1274binlog.Log(&binarylog.ServerTrailer{
1275Trailer: stream.Trailer(),
1276Err: appErr,
1277})
1278}
1279return err
1280}
1281if binlog != nil {
1282h, _ := stream.Header()
1283binlog.Log(&binarylog.ServerHeader{
1284Header: h,
1285})
1286binlog.Log(&binarylog.ServerMessage{
1287Message: reply,
1288})
1289}
1290if channelz.IsOn() {
1291t.IncrMsgSent()
1292}
1293if trInfo != nil {
1294trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1295}
1296// TODO: Should we be logging if writing status failed here, like above?
1297// Should the logging be in WriteStatus? Should we ignore the WriteStatus
1298// error or allow the stats handler to see it?
1299err = t.WriteStatus(stream, statusOK)
1300if binlog != nil {
1301binlog.Log(&binarylog.ServerTrailer{
1302Trailer: stream.Trailer(),
1303Err: appErr,
1304})
1305}
1306return err
1307}
1308
1309// chainStreamServerInterceptors chains all stream server interceptors into one.
1310func chainStreamServerInterceptors(s *Server) {
1311// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1312// be executed before any other chained interceptors.
1313interceptors := s.opts.chainStreamInts
1314if s.opts.streamInt != nil {
1315interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1316}
1317
1318var chainedInt StreamServerInterceptor
1319if len(interceptors) == 0 {
1320chainedInt = nil
1321} else if len(interceptors) == 1 {
1322chainedInt = interceptors[0]
1323} else {
1324chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1325return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1326}
1327}
1328
1329s.opts.streamInt = chainedInt
1330}
1331
1332// getChainStreamHandler recursively generate the chained StreamHandler
1333func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1334if curr == len(interceptors)-1 {
1335return finalHandler
1336}
1337
1338return func(srv interface{}, ss ServerStream) error {
1339return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1340}
1341}
1342
1343func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
1344if channelz.IsOn() {
1345s.incrCallsStarted()
1346}
1347sh := s.opts.statsHandler
1348var statsBegin *stats.Begin
1349if sh != nil {
1350beginTime := time.Now()
1351statsBegin = &stats.Begin{
1352BeginTime: beginTime,
1353}
1354sh.HandleRPC(stream.Context(), statsBegin)
1355}
1356ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1357ss := &serverStream{
1358ctx: ctx,
1359t: t,
1360s: stream,
1361p: &parser{r: stream},
1362codec: s.getCodec(stream.ContentSubtype()),
1363maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1364maxSendMessageSize: s.opts.maxSendMessageSize,
1365trInfo: trInfo,
1366statsHandler: sh,
1367}
1368
1369if sh != nil || trInfo != nil || channelz.IsOn() {
1370// See comment in processUnaryRPC on defers.
1371defer func() {
1372if trInfo != nil {
1373ss.mu.Lock()
1374if err != nil && err != io.EOF {
1375ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1376ss.trInfo.tr.SetError()
1377}
1378ss.trInfo.tr.Finish()
1379ss.trInfo.tr = nil
1380ss.mu.Unlock()
1381}
1382
1383if sh != nil {
1384end := &stats.End{
1385BeginTime: statsBegin.BeginTime,
1386EndTime: time.Now(),
1387}
1388if err != nil && err != io.EOF {
1389end.Error = toRPCErr(err)
1390}
1391sh.HandleRPC(stream.Context(), end)
1392}
1393
1394if channelz.IsOn() {
1395if err != nil && err != io.EOF {
1396s.incrCallsFailed()
1397} else {
1398s.incrCallsSucceeded()
1399}
1400}
1401}()
1402}
1403
1404ss.binlog = binarylog.GetMethodLogger(stream.Method())
1405if ss.binlog != nil {
1406md, _ := metadata.FromIncomingContext(ctx)
1407logEntry := &binarylog.ClientHeader{
1408Header: md,
1409MethodName: stream.Method(),
1410PeerAddr: nil,
1411}
1412if deadline, ok := ctx.Deadline(); ok {
1413logEntry.Timeout = time.Until(deadline)
1414if logEntry.Timeout < 0 {
1415logEntry.Timeout = 0
1416}
1417}
1418if a := md[":authority"]; len(a) > 0 {
1419logEntry.Authority = a[0]
1420}
1421if peer, ok := peer.FromContext(ss.Context()); ok {
1422logEntry.PeerAddr = peer.Addr
1423}
1424ss.binlog.Log(logEntry)
1425}
1426
1427// If dc is set and matches the stream's compression, use it. Otherwise, try
1428// to find a matching registered compressor for decomp.
1429if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1430ss.dc = s.opts.dc
1431} else if rc != "" && rc != encoding.Identity {
1432ss.decomp = encoding.GetCompressor(rc)
1433if ss.decomp == nil {
1434st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1435t.WriteStatus(ss.s, st)
1436return st.Err()
1437}
1438}
1439
1440// If cp is set, use it. Otherwise, attempt to compress the response using
1441// the incoming message compression method.
1442//
1443// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1444if s.opts.cp != nil {
1445ss.cp = s.opts.cp
1446stream.SetSendCompress(s.opts.cp.Type())
1447} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1448// Legacy compressor not specified; attempt to respond with same encoding.
1449ss.comp = encoding.GetCompressor(rc)
1450if ss.comp != nil {
1451stream.SetSendCompress(rc)
1452}
1453}
1454
1455if trInfo != nil {
1456trInfo.tr.LazyLog(&trInfo.firstLine, false)
1457}
1458var appErr error
1459var server interface{}
1460if info != nil {
1461server = info.serviceImpl
1462}
1463if s.opts.streamInt == nil {
1464appErr = sd.Handler(server, ss)
1465} else {
1466info := &StreamServerInfo{
1467FullMethod: stream.Method(),
1468IsClientStream: sd.ClientStreams,
1469IsServerStream: sd.ServerStreams,
1470}
1471appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1472}
1473if appErr != nil {
1474appStatus, ok := status.FromError(appErr)
1475if !ok {
1476appStatus = status.New(codes.Unknown, appErr.Error())
1477appErr = appStatus.Err()
1478}
1479if trInfo != nil {
1480ss.mu.Lock()
1481ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1482ss.trInfo.tr.SetError()
1483ss.mu.Unlock()
1484}
1485t.WriteStatus(ss.s, appStatus)
1486if ss.binlog != nil {
1487ss.binlog.Log(&binarylog.ServerTrailer{
1488Trailer: ss.s.Trailer(),
1489Err: appErr,
1490})
1491}
1492// TODO: Should we log an error from WriteStatus here and below?
1493return appErr
1494}
1495if trInfo != nil {
1496ss.mu.Lock()
1497ss.trInfo.tr.LazyLog(stringer("OK"), false)
1498ss.mu.Unlock()
1499}
1500err = t.WriteStatus(ss.s, statusOK)
1501if ss.binlog != nil {
1502ss.binlog.Log(&binarylog.ServerTrailer{
1503Trailer: ss.s.Trailer(),
1504Err: appErr,
1505})
1506}
1507return err
1508}
1509
1510func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1511sm := stream.Method()
1512if sm != "" && sm[0] == '/' {
1513sm = sm[1:]
1514}
1515pos := strings.LastIndex(sm, "/")
1516if pos == -1 {
1517if trInfo != nil {
1518trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1519trInfo.tr.SetError()
1520}
1521errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1522if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1523if trInfo != nil {
1524trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1525trInfo.tr.SetError()
1526}
1527channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1528}
1529if trInfo != nil {
1530trInfo.tr.Finish()
1531}
1532return
1533}
1534service := sm[:pos]
1535method := sm[pos+1:]
1536
1537srv, knownService := s.services[service]
1538if knownService {
1539if md, ok := srv.methods[method]; ok {
1540s.processUnaryRPC(t, stream, srv, md, trInfo)
1541return
1542}
1543if sd, ok := srv.streams[method]; ok {
1544s.processStreamingRPC(t, stream, srv, sd, trInfo)
1545return
1546}
1547}
1548// Unknown service, or known server unknown method.
1549if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1550s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1551return
1552}
1553var errDesc string
1554if !knownService {
1555errDesc = fmt.Sprintf("unknown service %v", service)
1556} else {
1557errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1558}
1559if trInfo != nil {
1560trInfo.tr.LazyPrintf("%s", errDesc)
1561trInfo.tr.SetError()
1562}
1563if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1564if trInfo != nil {
1565trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1566trInfo.tr.SetError()
1567}
1568channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1569}
1570if trInfo != nil {
1571trInfo.tr.Finish()
1572}
1573}
1574
1575// The key to save ServerTransportStream in the context.
1576type streamKey struct{}
1577
1578// NewContextWithServerTransportStream creates a new context from ctx and
1579// attaches stream to it.
1580//
1581// Experimental
1582//
1583// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1584// later release.
1585func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1586return context.WithValue(ctx, streamKey{}, stream)
1587}
1588
1589// ServerTransportStream is a minimal interface that a transport stream must
1590// implement. This can be used to mock an actual transport stream for tests of
1591// handler code that use, for example, grpc.SetHeader (which requires some
1592// stream to be in context).
1593//
1594// See also NewContextWithServerTransportStream.
1595//
1596// Experimental
1597//
1598// Notice: This type is EXPERIMENTAL and may be changed or removed in a
1599// later release.
1600type ServerTransportStream interface {
1601Method() string
1602SetHeader(md metadata.MD) error
1603SendHeader(md metadata.MD) error
1604SetTrailer(md metadata.MD) error
1605}
1606
1607// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1608// ctx. Returns nil if the given context has no stream associated with it
1609// (which implies it is not an RPC invocation context).
1610//
1611// Experimental
1612//
1613// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1614// later release.
1615func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1616s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1617return s
1618}
1619
1620// Stop stops the gRPC server. It immediately closes all open
1621// connections and listeners.
1622// It cancels all active RPCs on the server side and the corresponding
1623// pending RPCs on the client side will get notified by connection
1624// errors.
1625func (s *Server) Stop() {
1626s.quit.Fire()
1627
1628defer func() {
1629s.serveWG.Wait()
1630s.done.Fire()
1631}()
1632
1633s.channelzRemoveOnce.Do(func() {
1634if channelz.IsOn() {
1635channelz.RemoveEntry(s.channelzID)
1636}
1637})
1638
1639s.mu.Lock()
1640listeners := s.lis
1641s.lis = nil
1642st := s.conns
1643s.conns = nil
1644// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1645s.cv.Broadcast()
1646s.mu.Unlock()
1647
1648for lis := range listeners {
1649lis.Close()
1650}
1651for c := range st {
1652c.Close()
1653}
1654if s.opts.numServerWorkers > 0 {
1655s.stopServerWorkers()
1656}
1657
1658s.mu.Lock()
1659if s.events != nil {
1660s.events.Finish()
1661s.events = nil
1662}
1663s.mu.Unlock()
1664}
1665
1666// GracefulStop stops the gRPC server gracefully. It stops the server from
1667// accepting new connections and RPCs and blocks until all the pending RPCs are
1668// finished.
1669func (s *Server) GracefulStop() {
1670s.quit.Fire()
1671defer s.done.Fire()
1672
1673s.channelzRemoveOnce.Do(func() {
1674if channelz.IsOn() {
1675channelz.RemoveEntry(s.channelzID)
1676}
1677})
1678s.mu.Lock()
1679if s.conns == nil {
1680s.mu.Unlock()
1681return
1682}
1683
1684for lis := range s.lis {
1685lis.Close()
1686}
1687s.lis = nil
1688if !s.drain {
1689for st := range s.conns {
1690st.Drain()
1691}
1692s.drain = true
1693}
1694
1695// Wait for serving threads to be ready to exit. Only then can we be sure no
1696// new conns will be created.
1697s.mu.Unlock()
1698s.serveWG.Wait()
1699s.mu.Lock()
1700
1701for len(s.conns) != 0 {
1702s.cv.Wait()
1703}
1704s.conns = nil
1705if s.events != nil {
1706s.events.Finish()
1707s.events = nil
1708}
1709s.mu.Unlock()
1710}
1711
1712// contentSubtype must be lowercase
1713// cannot return nil
1714func (s *Server) getCodec(contentSubtype string) baseCodec {
1715if s.opts.codec != nil {
1716return s.opts.codec
1717}
1718if contentSubtype == "" {
1719return encoding.GetCodec(proto.Name)
1720}
1721codec := encoding.GetCodec(contentSubtype)
1722if codec == nil {
1723return encoding.GetCodec(proto.Name)
1724}
1725return codec
1726}
1727
1728// SetHeader sets the header metadata.
1729// When called multiple times, all the provided metadata will be merged.
1730// All the metadata will be sent out when one of the following happens:
1731// - grpc.SendHeader() is called;
1732// - The first response is sent out;
1733// - An RPC status is sent out (error or success).
1734func SetHeader(ctx context.Context, md metadata.MD) error {
1735if md.Len() == 0 {
1736return nil
1737}
1738stream := ServerTransportStreamFromContext(ctx)
1739if stream == nil {
1740return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1741}
1742return stream.SetHeader(md)
1743}
1744
1745// SendHeader sends header metadata. It may be called at most once.
1746// The provided md and headers set by SetHeader() will be sent.
1747func SendHeader(ctx context.Context, md metadata.MD) error {
1748stream := ServerTransportStreamFromContext(ctx)
1749if stream == nil {
1750return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1751}
1752if err := stream.SendHeader(md); err != nil {
1753return toRPCErr(err)
1754}
1755return nil
1756}
1757
1758// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1759// When called more than once, all the provided metadata will be merged.
1760func SetTrailer(ctx context.Context, md metadata.MD) error {
1761if md.Len() == 0 {
1762return nil
1763}
1764stream := ServerTransportStreamFromContext(ctx)
1765if stream == nil {
1766return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1767}
1768return stream.SetTrailer(md)
1769}
1770
1771// Method returns the method string for the server context. The returned
1772// string is in the format of "/service/method".
1773func Method(ctx context.Context) (string, bool) {
1774s := ServerTransportStreamFromContext(ctx)
1775if s == nil {
1776return "", false
1777}
1778return s.Method(), true
1779}
1780
1781type channelzServer struct {
1782s *Server
1783}
1784
1785func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1786return c.s.channelzMetric()
1787}
1788