podman
2206 строк · 69.1 Кб
1/*
2*
3* Copyright 2014 gRPC authors.
4*
5* Licensed under the Apache License, Version 2.0 (the "License");
6* you may not use this file except in compliance with the License.
7* You may obtain a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS,
13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15* limitations under the License.
16*
17*/
18
19package grpc
20
21import (
22"context"
23"errors"
24"fmt"
25"io"
26"math"
27"net"
28"net/http"
29"reflect"
30"runtime"
31"strings"
32"sync"
33"sync/atomic"
34"time"
35
36"google.golang.org/grpc/codes"
37"google.golang.org/grpc/credentials"
38"google.golang.org/grpc/encoding"
39"google.golang.org/grpc/encoding/proto"
40"google.golang.org/grpc/grpclog"
41"google.golang.org/grpc/internal"
42"google.golang.org/grpc/internal/binarylog"
43"google.golang.org/grpc/internal/channelz"
44"google.golang.org/grpc/internal/grpcsync"
45"google.golang.org/grpc/internal/grpcutil"
46"google.golang.org/grpc/internal/transport"
47"google.golang.org/grpc/keepalive"
48"google.golang.org/grpc/metadata"
49"google.golang.org/grpc/peer"
50"google.golang.org/grpc/stats"
51"google.golang.org/grpc/status"
52"google.golang.org/grpc/tap"
53)
54
55const (
56defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
57defaultServerMaxSendMessageSize = math.MaxInt32
58
59// Server transports are tracked in a map which is keyed on listener
60// address. For regular gRPC traffic, connections are accepted in Serve()
61// through a call to Accept(), and we use the actual listener address as key
62// when we add it to the map. But for connections received through
63// ServeHTTP(), we do not have a listener and hence use this dummy value.
64listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
65)
66
67func init() {
68internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
69return srv.opts.creds
70}
71internal.IsRegisteredMethod = func(srv *Server, method string) bool {
72return srv.isRegisteredMethod(method)
73}
74internal.ServerFromContext = serverFromContext
75internal.AddGlobalServerOptions = func(opt ...ServerOption) {
76globalServerOptions = append(globalServerOptions, opt...)
77}
78internal.ClearGlobalServerOptions = func() {
79globalServerOptions = nil
80}
81internal.BinaryLogger = binaryLogger
82internal.JoinServerOptions = newJoinServerOption
83internal.RecvBufferPool = recvBufferPool
84}
85
86var statusOK = status.New(codes.OK, "")
87var logger = grpclog.Component("core")
88
89type methodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)
90
91// MethodDesc represents an RPC service's method specification.
92type MethodDesc struct {
93MethodName string
94Handler methodHandler
95}
96
97// ServiceDesc represents an RPC service's specification.
98type ServiceDesc struct {
99ServiceName string
100// The pointer to the service interface. Used to check whether the user
101// provided implementation satisfies the interface requirements.
102HandlerType any
103Methods []MethodDesc
104Streams []StreamDesc
105Metadata any
106}
107
108// serviceInfo wraps information about a service. It is very similar to
109// ServiceDesc and is constructed from it for internal purposes.
110type serviceInfo struct {
111// Contains the implementation for the methods in this service.
112serviceImpl any
113methods map[string]*MethodDesc
114streams map[string]*StreamDesc
115mdata any
116}
117
118// Server is a gRPC server to serve RPC requests.
119type Server struct {
120opts serverOptions
121
122mu sync.Mutex // guards following
123lis map[net.Listener]bool
124// conns contains all active server transports. It is a map keyed on a
125// listener address with the value being the set of active transports
126// belonging to that listener.
127conns map[string]map[transport.ServerTransport]bool
128serve bool
129drain bool
130cv *sync.Cond // signaled when connections close for GracefulStop
131services map[string]*serviceInfo // service name -> service info
132events traceEventLog
133
134quit *grpcsync.Event
135done *grpcsync.Event
136channelzRemoveOnce sync.Once
137serveWG sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
138handlersWG sync.WaitGroup // counts active method handler goroutines
139
140channelzID *channelz.Identifier
141czData *channelzData
142
143serverWorkerChannel chan func()
144serverWorkerChannelClose func()
145}
146
147type serverOptions struct {
148creds credentials.TransportCredentials
149codec baseCodec
150cp Compressor
151dc Decompressor
152unaryInt UnaryServerInterceptor
153streamInt StreamServerInterceptor
154chainUnaryInts []UnaryServerInterceptor
155chainStreamInts []StreamServerInterceptor
156binaryLogger binarylog.Logger
157inTapHandle tap.ServerInHandle
158statsHandlers []stats.Handler
159maxConcurrentStreams uint32
160maxReceiveMessageSize int
161maxSendMessageSize int
162unknownStreamDesc *StreamDesc
163keepaliveParams keepalive.ServerParameters
164keepalivePolicy keepalive.EnforcementPolicy
165initialWindowSize int32
166initialConnWindowSize int32
167writeBufferSize int
168readBufferSize int
169sharedWriteBuffer bool
170connectionTimeout time.Duration
171maxHeaderListSize *uint32
172headerTableSize *uint32
173numServerWorkers uint32
174recvBufferPool SharedBufferPool
175waitForHandlers bool
176}
177
178var defaultServerOptions = serverOptions{
179maxConcurrentStreams: math.MaxUint32,
180maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
181maxSendMessageSize: defaultServerMaxSendMessageSize,
182connectionTimeout: 120 * time.Second,
183writeBufferSize: defaultWriteBufSize,
184readBufferSize: defaultReadBufSize,
185recvBufferPool: nopBufferPool{},
186}
187var globalServerOptions []ServerOption
188
189// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
190type ServerOption interface {
191apply(*serverOptions)
192}
193
194// EmptyServerOption does not alter the server configuration. It can be embedded
195// in another structure to build custom server options.
196//
197// # Experimental
198//
199// Notice: This type is EXPERIMENTAL and may be changed or removed in a
200// later release.
201type EmptyServerOption struct{}
202
203func (EmptyServerOption) apply(*serverOptions) {}
204
205// funcServerOption wraps a function that modifies serverOptions into an
206// implementation of the ServerOption interface.
207type funcServerOption struct {
208f func(*serverOptions)
209}
210
211func (fdo *funcServerOption) apply(do *serverOptions) {
212fdo.f(do)
213}
214
215func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
216return &funcServerOption{
217f: f,
218}
219}
220
221// joinServerOption provides a way to combine arbitrary number of server
222// options into one.
223type joinServerOption struct {
224opts []ServerOption
225}
226
227func (mdo *joinServerOption) apply(do *serverOptions) {
228for _, opt := range mdo.opts {
229opt.apply(do)
230}
231}
232
233func newJoinServerOption(opts ...ServerOption) ServerOption {
234return &joinServerOption{opts: opts}
235}
236
237// SharedWriteBuffer allows reusing per-connection transport write buffer.
238// If this option is set to true every connection will release the buffer after
239// flushing the data on the wire.
240//
241// # Experimental
242//
243// Notice: This API is EXPERIMENTAL and may be changed or removed in a
244// later release.
245func SharedWriteBuffer(val bool) ServerOption {
246return newFuncServerOption(func(o *serverOptions) {
247o.sharedWriteBuffer = val
248})
249}
250
251// WriteBufferSize determines how much data can be batched before doing a write
252// on the wire. The corresponding memory allocation for this buffer will be
253// twice the size to keep syscalls low. The default value for this buffer is
254// 32KB. Zero or negative values will disable the write buffer such that each
255// write will be on underlying connection.
256// Note: A Send call may not directly translate to a write.
257func WriteBufferSize(s int) ServerOption {
258return newFuncServerOption(func(o *serverOptions) {
259o.writeBufferSize = s
260})
261}
262
263// ReadBufferSize lets you set the size of read buffer, this determines how much
264// data can be read at most for one read syscall. The default value for this
265// buffer is 32KB. Zero or negative values will disable read buffer for a
266// connection so data framer can access the underlying conn directly.
267func ReadBufferSize(s int) ServerOption {
268return newFuncServerOption(func(o *serverOptions) {
269o.readBufferSize = s
270})
271}
272
273// InitialWindowSize returns a ServerOption that sets window size for stream.
274// The lower bound for window size is 64K and any value smaller than that will be ignored.
275func InitialWindowSize(s int32) ServerOption {
276return newFuncServerOption(func(o *serverOptions) {
277o.initialWindowSize = s
278})
279}
280
281// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
282// The lower bound for window size is 64K and any value smaller than that will be ignored.
283func InitialConnWindowSize(s int32) ServerOption {
284return newFuncServerOption(func(o *serverOptions) {
285o.initialConnWindowSize = s
286})
287}
288
289// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
290func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
291if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {
292logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
293kp.Time = internal.KeepaliveMinServerPingTime
294}
295
296return newFuncServerOption(func(o *serverOptions) {
297o.keepaliveParams = kp
298})
299}
300
301// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
302func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
303return newFuncServerOption(func(o *serverOptions) {
304o.keepalivePolicy = kep
305})
306}
307
308// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
309//
310// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
311//
312// Deprecated: register codecs using encoding.RegisterCodec. The server will
313// automatically use registered codecs based on the incoming requests' headers.
314// See also
315// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
316// Will be supported throughout 1.x.
317func CustomCodec(codec Codec) ServerOption {
318return newFuncServerOption(func(o *serverOptions) {
319o.codec = codec
320})
321}
322
323// ForceServerCodec returns a ServerOption that sets a codec for message
324// marshaling and unmarshaling.
325//
326// This will override any lookups by content-subtype for Codecs registered
327// with RegisterCodec.
328//
329// See Content-Type on
330// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
331// more details. Also see the documentation on RegisterCodec and
332// CallContentSubtype for more details on the interaction between encoding.Codec
333// and content-subtype.
334//
335// This function is provided for advanced users; prefer to register codecs
336// using encoding.RegisterCodec.
337// The server will automatically use registered codecs based on the incoming
338// requests' headers. See also
339// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
340// Will be supported throughout 1.x.
341//
342// # Experimental
343//
344// Notice: This API is EXPERIMENTAL and may be changed or removed in a
345// later release.
346func ForceServerCodec(codec encoding.Codec) ServerOption {
347return newFuncServerOption(func(o *serverOptions) {
348o.codec = codec
349})
350}
351
352// RPCCompressor returns a ServerOption that sets a compressor for outbound
353// messages. For backward compatibility, all outbound messages will be sent
354// using this compressor, regardless of incoming message compression. By
355// default, server messages will be sent using the same compressor with which
356// request messages were sent.
357//
358// Deprecated: use encoding.RegisterCompressor instead. Will be supported
359// throughout 1.x.
360func RPCCompressor(cp Compressor) ServerOption {
361return newFuncServerOption(func(o *serverOptions) {
362o.cp = cp
363})
364}
365
366// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
367// messages. It has higher priority than decompressors registered via
368// encoding.RegisterCompressor.
369//
370// Deprecated: use encoding.RegisterCompressor instead. Will be supported
371// throughout 1.x.
372func RPCDecompressor(dc Decompressor) ServerOption {
373return newFuncServerOption(func(o *serverOptions) {
374o.dc = dc
375})
376}
377
378// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
379// If this is not set, gRPC uses the default limit.
380//
381// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
382func MaxMsgSize(m int) ServerOption {
383return MaxRecvMsgSize(m)
384}
385
386// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
387// If this is not set, gRPC uses the default 4MB.
388func MaxRecvMsgSize(m int) ServerOption {
389return newFuncServerOption(func(o *serverOptions) {
390o.maxReceiveMessageSize = m
391})
392}
393
394// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
395// If this is not set, gRPC uses the default `math.MaxInt32`.
396func MaxSendMsgSize(m int) ServerOption {
397return newFuncServerOption(func(o *serverOptions) {
398o.maxSendMessageSize = m
399})
400}
401
402// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
403// of concurrent streams to each ServerTransport.
404func MaxConcurrentStreams(n uint32) ServerOption {
405if n == 0 {
406n = math.MaxUint32
407}
408return newFuncServerOption(func(o *serverOptions) {
409o.maxConcurrentStreams = n
410})
411}
412
413// Creds returns a ServerOption that sets credentials for server connections.
414func Creds(c credentials.TransportCredentials) ServerOption {
415return newFuncServerOption(func(o *serverOptions) {
416o.creds = c
417})
418}
419
420// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
421// server. Only one unary interceptor can be installed. The construction of multiple
422// interceptors (e.g., chaining) can be implemented at the caller.
423func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
424return newFuncServerOption(func(o *serverOptions) {
425if o.unaryInt != nil {
426panic("The unary server interceptor was already set and may not be reset.")
427}
428o.unaryInt = i
429})
430}
431
432// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
433// for unary RPCs. The first interceptor will be the outer most,
434// while the last interceptor will be the inner most wrapper around the real call.
435// All unary interceptors added by this method will be chained.
436func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
437return newFuncServerOption(func(o *serverOptions) {
438o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
439})
440}
441
442// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
443// server. Only one stream interceptor can be installed.
444func StreamInterceptor(i StreamServerInterceptor) ServerOption {
445return newFuncServerOption(func(o *serverOptions) {
446if o.streamInt != nil {
447panic("The stream server interceptor was already set and may not be reset.")
448}
449o.streamInt = i
450})
451}
452
453// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
454// for streaming RPCs. The first interceptor will be the outer most,
455// while the last interceptor will be the inner most wrapper around the real call.
456// All stream interceptors added by this method will be chained.
457func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
458return newFuncServerOption(func(o *serverOptions) {
459o.chainStreamInts = append(o.chainStreamInts, interceptors...)
460})
461}
462
463// InTapHandle returns a ServerOption that sets the tap handle for all the server
464// transport to be created. Only one can be installed.
465//
466// # Experimental
467//
468// Notice: This API is EXPERIMENTAL and may be changed or removed in a
469// later release.
470func InTapHandle(h tap.ServerInHandle) ServerOption {
471return newFuncServerOption(func(o *serverOptions) {
472if o.inTapHandle != nil {
473panic("The tap handle was already set and may not be reset.")
474}
475o.inTapHandle = h
476})
477}
478
479// StatsHandler returns a ServerOption that sets the stats handler for the server.
480func StatsHandler(h stats.Handler) ServerOption {
481return newFuncServerOption(func(o *serverOptions) {
482if h == nil {
483logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
484// Do not allow a nil stats handler, which would otherwise cause
485// panics.
486return
487}
488o.statsHandlers = append(o.statsHandlers, h)
489})
490}
491
492// binaryLogger returns a ServerOption that can set the binary logger for the
493// server.
494func binaryLogger(bl binarylog.Logger) ServerOption {
495return newFuncServerOption(func(o *serverOptions) {
496o.binaryLogger = bl
497})
498}
499
500// UnknownServiceHandler returns a ServerOption that allows for adding a custom
501// unknown service handler. The provided method is a bidi-streaming RPC service
502// handler that will be invoked instead of returning the "unimplemented" gRPC
503// error whenever a request is received for an unregistered service or method.
504// The handling function and stream interceptor (if set) have full access to
505// the ServerStream, including its Context.
506func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
507return newFuncServerOption(func(o *serverOptions) {
508o.unknownStreamDesc = &StreamDesc{
509StreamName: "unknown_service_handler",
510Handler: streamHandler,
511// We need to assume that the users of the streamHandler will want to use both.
512ClientStreams: true,
513ServerStreams: true,
514}
515})
516}
517
518// ConnectionTimeout returns a ServerOption that sets the timeout for
519// connection establishment (up to and including HTTP/2 handshaking) for all
520// new connections. If this is not set, the default is 120 seconds. A zero or
521// negative value will result in an immediate timeout.
522//
523// # Experimental
524//
525// Notice: This API is EXPERIMENTAL and may be changed or removed in a
526// later release.
527func ConnectionTimeout(d time.Duration) ServerOption {
528return newFuncServerOption(func(o *serverOptions) {
529o.connectionTimeout = d
530})
531}
532
533// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
534// of header list that the server is prepared to accept.
535func MaxHeaderListSize(s uint32) ServerOption {
536return newFuncServerOption(func(o *serverOptions) {
537o.maxHeaderListSize = &s
538})
539}
540
541// HeaderTableSize returns a ServerOption that sets the size of dynamic
542// header table for stream.
543//
544// # Experimental
545//
546// Notice: This API is EXPERIMENTAL and may be changed or removed in a
547// later release.
548func HeaderTableSize(s uint32) ServerOption {
549return newFuncServerOption(func(o *serverOptions) {
550o.headerTableSize = &s
551})
552}
553
554// NumStreamWorkers returns a ServerOption that sets the number of worker
555// goroutines that should be used to process incoming streams. Setting this to
556// zero (default) will disable workers and spawn a new goroutine for each
557// stream.
558//
559// # Experimental
560//
561// Notice: This API is EXPERIMENTAL and may be changed or removed in a
562// later release.
563func NumStreamWorkers(numServerWorkers uint32) ServerOption {
564// TODO: If/when this API gets stabilized (i.e. stream workers become the
565// only way streams are processed), change the behavior of the zero value to
566// a sane default. Preliminary experiments suggest that a value equal to the
567// number of CPUs available is most performant; requires thorough testing.
568return newFuncServerOption(func(o *serverOptions) {
569o.numServerWorkers = numServerWorkers
570})
571}
572
573// WaitForHandlers cause Stop to wait until all outstanding method handlers have
574// exited before returning. If false, Stop will return as soon as all
575// connections have closed, but method handlers may still be running. By
576// default, Stop does not wait for method handlers to return.
577//
578// # Experimental
579//
580// Notice: This API is EXPERIMENTAL and may be changed or removed in a
581// later release.
582func WaitForHandlers(w bool) ServerOption {
583return newFuncServerOption(func(o *serverOptions) {
584o.waitForHandlers = w
585})
586}
587
588// RecvBufferPool returns a ServerOption that configures the server
589// to use the provided shared buffer pool for parsing incoming messages. Depending
590// on the application's workload, this could result in reduced memory allocation.
591//
592// If you are unsure about how to implement a memory pool but want to utilize one,
593// begin with grpc.NewSharedBufferPool.
594//
595// Note: The shared buffer pool feature will not be active if any of the following
596// options are used: StatsHandler, EnableTracing, or binary logging. In such
597// cases, the shared buffer pool will be ignored.
598//
599// Deprecated: use experimental.WithRecvBufferPool instead. Will be deleted in
600// v1.60.0 or later.
601func RecvBufferPool(bufferPool SharedBufferPool) ServerOption {
602return recvBufferPool(bufferPool)
603}
604
605func recvBufferPool(bufferPool SharedBufferPool) ServerOption {
606return newFuncServerOption(func(o *serverOptions) {
607o.recvBufferPool = bufferPool
608})
609}
610
611// serverWorkerResetThreshold defines how often the stack must be reset. Every
612// N requests, by spawning a new goroutine in its place, a worker can reset its
613// stack so that large stacks don't live in memory forever. 2^16 should allow
614// each goroutine stack to live for at least a few seconds in a typical
615// workload (assuming a QPS of a few thousand requests/sec).
616const serverWorkerResetThreshold = 1 << 16
617
618// serverWorkers blocks on a *transport.Stream channel forever and waits for
619// data to be fed by serveStreams. This allows multiple requests to be
620// processed by the same goroutine, removing the need for expensive stack
621// re-allocations (see the runtime.morestack problem [1]).
622//
623// [1] https://github.com/golang/go/issues/18138
624func (s *Server) serverWorker() {
625for completed := 0; completed < serverWorkerResetThreshold; completed++ {
626f, ok := <-s.serverWorkerChannel
627if !ok {
628return
629}
630f()
631}
632go s.serverWorker()
633}
634
635// initServerWorkers creates worker goroutines and a channel to process incoming
636// connections to reduce the time spent overall on runtime.morestack.
637func (s *Server) initServerWorkers() {
638s.serverWorkerChannel = make(chan func())
639s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
640close(s.serverWorkerChannel)
641})
642for i := uint32(0); i < s.opts.numServerWorkers; i++ {
643go s.serverWorker()
644}
645}
646
647// NewServer creates a gRPC server which has no service registered and has not
648// started to accept requests yet.
649func NewServer(opt ...ServerOption) *Server {
650opts := defaultServerOptions
651for _, o := range globalServerOptions {
652o.apply(&opts)
653}
654for _, o := range opt {
655o.apply(&opts)
656}
657s := &Server{
658lis: make(map[net.Listener]bool),
659opts: opts,
660conns: make(map[string]map[transport.ServerTransport]bool),
661services: make(map[string]*serviceInfo),
662quit: grpcsync.NewEvent(),
663done: grpcsync.NewEvent(),
664czData: new(channelzData),
665}
666chainUnaryServerInterceptors(s)
667chainStreamServerInterceptors(s)
668s.cv = sync.NewCond(&s.mu)
669if EnableTracing {
670_, file, line, _ := runtime.Caller(1)
671s.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
672}
673
674if s.opts.numServerWorkers > 0 {
675s.initServerWorkers()
676}
677
678s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
679channelz.Info(logger, s.channelzID, "Server created")
680return s
681}
682
683// printf records an event in s's event log, unless s has been stopped.
684// REQUIRES s.mu is held.
685func (s *Server) printf(format string, a ...any) {
686if s.events != nil {
687s.events.Printf(format, a...)
688}
689}
690
691// errorf records an error in s's event log, unless s has been stopped.
692// REQUIRES s.mu is held.
693func (s *Server) errorf(format string, a ...any) {
694if s.events != nil {
695s.events.Errorf(format, a...)
696}
697}
698
699// ServiceRegistrar wraps a single method that supports service registration. It
700// enables users to pass concrete types other than grpc.Server to the service
701// registration methods exported by the IDL generated code.
702type ServiceRegistrar interface {
703// RegisterService registers a service and its implementation to the
704// concrete type implementing this interface. It may not be called
705// once the server has started serving.
706// desc describes the service and its methods and handlers. impl is the
707// service implementation which is passed to the method handlers.
708RegisterService(desc *ServiceDesc, impl any)
709}
710
711// RegisterService registers a service and its implementation to the gRPC
712// server. It is called from the IDL generated code. This must be called before
713// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
714// ensure it implements sd.HandlerType.
715func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
716if ss != nil {
717ht := reflect.TypeOf(sd.HandlerType).Elem()
718st := reflect.TypeOf(ss)
719if !st.Implements(ht) {
720logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
721}
722}
723s.register(sd, ss)
724}
725
726func (s *Server) register(sd *ServiceDesc, ss any) {
727s.mu.Lock()
728defer s.mu.Unlock()
729s.printf("RegisterService(%q)", sd.ServiceName)
730if s.serve {
731logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
732}
733if _, ok := s.services[sd.ServiceName]; ok {
734logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
735}
736info := &serviceInfo{
737serviceImpl: ss,
738methods: make(map[string]*MethodDesc),
739streams: make(map[string]*StreamDesc),
740mdata: sd.Metadata,
741}
742for i := range sd.Methods {
743d := &sd.Methods[i]
744info.methods[d.MethodName] = d
745}
746for i := range sd.Streams {
747d := &sd.Streams[i]
748info.streams[d.StreamName] = d
749}
750s.services[sd.ServiceName] = info
751}
752
753// MethodInfo contains the information of an RPC including its method name and type.
754type MethodInfo struct {
755// Name is the method name only, without the service name or package name.
756Name string
757// IsClientStream indicates whether the RPC is a client streaming RPC.
758IsClientStream bool
759// IsServerStream indicates whether the RPC is a server streaming RPC.
760IsServerStream bool
761}
762
763// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
764type ServiceInfo struct {
765Methods []MethodInfo
766// Metadata is the metadata specified in ServiceDesc when registering service.
767Metadata any
768}
769
770// GetServiceInfo returns a map from service names to ServiceInfo.
771// Service names include the package names, in the form of <package>.<service>.
772func (s *Server) GetServiceInfo() map[string]ServiceInfo {
773ret := make(map[string]ServiceInfo)
774for n, srv := range s.services {
775methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
776for m := range srv.methods {
777methods = append(methods, MethodInfo{
778Name: m,
779IsClientStream: false,
780IsServerStream: false,
781})
782}
783for m, d := range srv.streams {
784methods = append(methods, MethodInfo{
785Name: m,
786IsClientStream: d.ClientStreams,
787IsServerStream: d.ServerStreams,
788})
789}
790
791ret[n] = ServiceInfo{
792Methods: methods,
793Metadata: srv.mdata,
794}
795}
796return ret
797}
798
799// ErrServerStopped indicates that the operation is now illegal because of
800// the server being stopped.
801var ErrServerStopped = errors.New("grpc: the server has been stopped")
802
803type listenSocket struct {
804net.Listener
805channelzID *channelz.Identifier
806}
807
808func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
809return &channelz.SocketInternalMetric{
810SocketOptions: channelz.GetSocketOption(l.Listener),
811LocalAddr: l.Listener.Addr(),
812}
813}
814
815func (l *listenSocket) Close() error {
816err := l.Listener.Close()
817channelz.RemoveEntry(l.channelzID)
818channelz.Info(logger, l.channelzID, "ListenSocket deleted")
819return err
820}
821
822// Serve accepts incoming connections on the listener lis, creating a new
823// ServerTransport and service goroutine for each. The service goroutines
824// read gRPC requests and then call the registered handlers to reply to them.
825// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
826// this method returns.
827// Serve will return a non-nil error unless Stop or GracefulStop is called.
828//
829// Note: All supported releases of Go (as of December 2023) override the OS
830// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
831// with OS defaults for keepalive time and interval, callers need to do the
832// following two things:
833// - pass a net.Listener created by calling the Listen method on a
834// net.ListenConfig with the `KeepAlive` field set to a negative value. This
835// will result in the Go standard library not overriding OS defaults for TCP
836// keepalive interval and time. But this will also result in the Go standard
837// library not enabling TCP keepalives by default.
838// - override the Accept method on the passed in net.Listener and set the
839// SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
840func (s *Server) Serve(lis net.Listener) error {
841s.mu.Lock()
842s.printf("serving")
843s.serve = true
844if s.lis == nil {
845// Serve called after Stop or GracefulStop.
846s.mu.Unlock()
847lis.Close()
848return ErrServerStopped
849}
850
851s.serveWG.Add(1)
852defer func() {
853s.serveWG.Done()
854if s.quit.HasFired() {
855// Stop or GracefulStop called; block until done and return nil.
856<-s.done.Done()
857}
858}()
859
860ls := &listenSocket{Listener: lis}
861s.lis[ls] = true
862
863defer func() {
864s.mu.Lock()
865if s.lis != nil && s.lis[ls] {
866ls.Close()
867delete(s.lis, ls)
868}
869s.mu.Unlock()
870}()
871
872var err error
873ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
874if err != nil {
875s.mu.Unlock()
876return err
877}
878s.mu.Unlock()
879channelz.Info(logger, ls.channelzID, "ListenSocket created")
880
881var tempDelay time.Duration // how long to sleep on accept failure
882for {
883rawConn, err := lis.Accept()
884if err != nil {
885if ne, ok := err.(interface {
886Temporary() bool
887}); ok && ne.Temporary() {
888if tempDelay == 0 {
889tempDelay = 5 * time.Millisecond
890} else {
891tempDelay *= 2
892}
893if max := 1 * time.Second; tempDelay > max {
894tempDelay = max
895}
896s.mu.Lock()
897s.printf("Accept error: %v; retrying in %v", err, tempDelay)
898s.mu.Unlock()
899timer := time.NewTimer(tempDelay)
900select {
901case <-timer.C:
902case <-s.quit.Done():
903timer.Stop()
904return nil
905}
906continue
907}
908s.mu.Lock()
909s.printf("done serving; Accept = %v", err)
910s.mu.Unlock()
911
912if s.quit.HasFired() {
913return nil
914}
915return err
916}
917tempDelay = 0
918// Start a new goroutine to deal with rawConn so we don't stall this Accept
919// loop goroutine.
920//
921// Make sure we account for the goroutine so GracefulStop doesn't nil out
922// s.conns before this conn can be added.
923s.serveWG.Add(1)
924go func() {
925s.handleRawConn(lis.Addr().String(), rawConn)
926s.serveWG.Done()
927}()
928}
929}
930
931// handleRawConn forks a goroutine to handle a just-accepted connection that
932// has not had any I/O performed on it yet.
933func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
934if s.quit.HasFired() {
935rawConn.Close()
936return
937}
938rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
939
940// Finish handshaking (HTTP2)
941st := s.newHTTP2Transport(rawConn)
942rawConn.SetDeadline(time.Time{})
943if st == nil {
944return
945}
946
947if cc, ok := rawConn.(interface {
948PassServerTransport(transport.ServerTransport)
949}); ok {
950cc.PassServerTransport(st)
951}
952
953if !s.addConn(lisAddr, st) {
954return
955}
956go func() {
957s.serveStreams(context.Background(), st, rawConn)
958s.removeConn(lisAddr, st)
959}()
960}
961
962// newHTTP2Transport sets up a http/2 transport (using the
963// gRPC http2 server transport in transport/http2_server.go).
964func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
965config := &transport.ServerConfig{
966MaxStreams: s.opts.maxConcurrentStreams,
967ConnectionTimeout: s.opts.connectionTimeout,
968Credentials: s.opts.creds,
969InTapHandle: s.opts.inTapHandle,
970StatsHandlers: s.opts.statsHandlers,
971KeepaliveParams: s.opts.keepaliveParams,
972KeepalivePolicy: s.opts.keepalivePolicy,
973InitialWindowSize: s.opts.initialWindowSize,
974InitialConnWindowSize: s.opts.initialConnWindowSize,
975WriteBufferSize: s.opts.writeBufferSize,
976ReadBufferSize: s.opts.readBufferSize,
977SharedWriteBuffer: s.opts.sharedWriteBuffer,
978ChannelzParentID: s.channelzID,
979MaxHeaderListSize: s.opts.maxHeaderListSize,
980HeaderTableSize: s.opts.headerTableSize,
981}
982st, err := transport.NewServerTransport(c, config)
983if err != nil {
984s.mu.Lock()
985s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
986s.mu.Unlock()
987// ErrConnDispatched means that the connection was dispatched away from
988// gRPC; those connections should be left open.
989if err != credentials.ErrConnDispatched {
990// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
991if err != io.EOF {
992channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
993}
994c.Close()
995}
996return nil
997}
998
999return st
1000}
1001
1002func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
1003ctx = transport.SetConnection(ctx, rawConn)
1004ctx = peer.NewContext(ctx, st.Peer())
1005for _, sh := range s.opts.statsHandlers {
1006ctx = sh.TagConn(ctx, &stats.ConnTagInfo{
1007RemoteAddr: st.Peer().Addr,
1008LocalAddr: st.Peer().LocalAddr,
1009})
1010sh.HandleConn(ctx, &stats.ConnBegin{})
1011}
1012
1013defer func() {
1014st.Close(errors.New("finished serving streams for the server transport"))
1015for _, sh := range s.opts.statsHandlers {
1016sh.HandleConn(ctx, &stats.ConnEnd{})
1017}
1018}()
1019
1020streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
1021st.HandleStreams(ctx, func(stream *transport.Stream) {
1022s.handlersWG.Add(1)
1023streamQuota.acquire()
1024f := func() {
1025defer streamQuota.release()
1026defer s.handlersWG.Done()
1027s.handleStream(st, stream)
1028}
1029
1030if s.opts.numServerWorkers > 0 {
1031select {
1032case s.serverWorkerChannel <- f:
1033return
1034default:
1035// If all stream workers are busy, fallback to the default code path.
1036}
1037}
1038go f()
1039})
1040}
1041
1042var _ http.Handler = (*Server)(nil)
1043
1044// ServeHTTP implements the Go standard library's http.Handler
1045// interface by responding to the gRPC request r, by looking up
1046// the requested gRPC method in the gRPC server s.
1047//
1048// The provided HTTP request must have arrived on an HTTP/2
1049// connection. When using the Go standard library's server,
1050// practically this means that the Request must also have arrived
1051// over TLS.
1052//
1053// To share one port (such as 443 for https) between gRPC and an
1054// existing http.Handler, use a root http.Handler such as:
1055//
1056// if r.ProtoMajor == 2 && strings.HasPrefix(
1057// r.Header.Get("Content-Type"), "application/grpc") {
1058// grpcServer.ServeHTTP(w, r)
1059// } else {
1060// yourMux.ServeHTTP(w, r)
1061// }
1062//
1063// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
1064// separate from grpc-go's HTTP/2 server. Performance and features may vary
1065// between the two paths. ServeHTTP does not support some gRPC features
1066// available through grpc-go's HTTP/2 server.
1067//
1068// # Experimental
1069//
1070// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1071// later release.
1072func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1073st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
1074if err != nil {
1075// Errors returned from transport.NewServerHandlerTransport have
1076// already been written to w.
1077return
1078}
1079if !s.addConn(listenerAddressForServeHTTP, st) {
1080return
1081}
1082defer s.removeConn(listenerAddressForServeHTTP, st)
1083s.serveStreams(r.Context(), st, nil)
1084}
1085
1086func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
1087s.mu.Lock()
1088defer s.mu.Unlock()
1089if s.conns == nil {
1090st.Close(errors.New("Server.addConn called when server has already been stopped"))
1091return false
1092}
1093if s.drain {
1094// Transport added after we drained our existing conns: drain it
1095// immediately.
1096st.Drain("")
1097}
1098
1099if s.conns[addr] == nil {
1100// Create a map entry if this is the first connection on this listener.
1101s.conns[addr] = make(map[transport.ServerTransport]bool)
1102}
1103s.conns[addr][st] = true
1104return true
1105}
1106
1107func (s *Server) removeConn(addr string, st transport.ServerTransport) {
1108s.mu.Lock()
1109defer s.mu.Unlock()
1110
1111conns := s.conns[addr]
1112if conns != nil {
1113delete(conns, st)
1114if len(conns) == 0 {
1115// If the last connection for this address is being removed, also
1116// remove the map entry corresponding to the address. This is used
1117// in GracefulStop() when waiting for all connections to be closed.
1118delete(s.conns, addr)
1119}
1120s.cv.Broadcast()
1121}
1122}
1123
1124func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
1125return &channelz.ServerInternalMetric{
1126CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
1127CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
1128CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
1129LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
1130}
1131}
1132
1133func (s *Server) incrCallsStarted() {
1134atomic.AddInt64(&s.czData.callsStarted, 1)
1135atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
1136}
1137
1138func (s *Server) incrCallsSucceeded() {
1139atomic.AddInt64(&s.czData.callsSucceeded, 1)
1140}
1141
1142func (s *Server) incrCallsFailed() {
1143atomic.AddInt64(&s.czData.callsFailed, 1)
1144}
1145
1146func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
1147data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1148if err != nil {
1149channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
1150return err
1151}
1152compData, err := compress(data, cp, comp)
1153if err != nil {
1154channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
1155return err
1156}
1157hdr, payload := msgHeader(data, compData)
1158// TODO(dfawley): should we be checking len(data) instead?
1159if len(payload) > s.opts.maxSendMessageSize {
1160return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
1161}
1162err = t.Write(stream, hdr, payload, opts)
1163if err == nil {
1164for _, sh := range s.opts.statsHandlers {
1165sh.HandleRPC(ctx, outPayload(false, msg, data, payload, time.Now()))
1166}
1167}
1168return err
1169}
1170
1171// chainUnaryServerInterceptors chains all unary server interceptors into one.
1172func chainUnaryServerInterceptors(s *Server) {
1173// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1174// be executed before any other chained interceptors.
1175interceptors := s.opts.chainUnaryInts
1176if s.opts.unaryInt != nil {
1177interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
1178}
1179
1180var chainedInt UnaryServerInterceptor
1181if len(interceptors) == 0 {
1182chainedInt = nil
1183} else if len(interceptors) == 1 {
1184chainedInt = interceptors[0]
1185} else {
1186chainedInt = chainUnaryInterceptors(interceptors)
1187}
1188
1189s.opts.unaryInt = chainedInt
1190}
1191
1192func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
1193return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {
1194return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
1195}
1196}
1197
1198func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1199if curr == len(interceptors)-1 {
1200return finalHandler
1201}
1202return func(ctx context.Context, req any) (any, error) {
1203return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
1204}
1205}
1206
1207func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
1208shs := s.opts.statsHandlers
1209if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
1210if channelz.IsOn() {
1211s.incrCallsStarted()
1212}
1213var statsBegin *stats.Begin
1214for _, sh := range shs {
1215beginTime := time.Now()
1216statsBegin = &stats.Begin{
1217BeginTime: beginTime,
1218IsClientStream: false,
1219IsServerStream: false,
1220}
1221sh.HandleRPC(ctx, statsBegin)
1222}
1223if trInfo != nil {
1224trInfo.tr.LazyLog(&trInfo.firstLine, false)
1225}
1226// The deferred error handling for tracing, stats handler and channelz are
1227// combined into one function to reduce stack usage -- a defer takes ~56-64
1228// bytes on the stack, so overflowing the stack will require a stack
1229// re-allocation, which is expensive.
1230//
1231// To maintain behavior similar to separate deferred statements, statements
1232// should be executed in the reverse order. That is, tracing first, stats
1233// handler second, and channelz last. Note that panics *within* defers will
1234// lead to different behavior, but that's an acceptable compromise; that
1235// would be undefined behavior territory anyway.
1236defer func() {
1237if trInfo != nil {
1238if err != nil && err != io.EOF {
1239trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1240trInfo.tr.SetError()
1241}
1242trInfo.tr.Finish()
1243}
1244
1245for _, sh := range shs {
1246end := &stats.End{
1247BeginTime: statsBegin.BeginTime,
1248EndTime: time.Now(),
1249}
1250if err != nil && err != io.EOF {
1251end.Error = toRPCErr(err)
1252}
1253sh.HandleRPC(ctx, end)
1254}
1255
1256if channelz.IsOn() {
1257if err != nil && err != io.EOF {
1258s.incrCallsFailed()
1259} else {
1260s.incrCallsSucceeded()
1261}
1262}
1263}()
1264}
1265var binlogs []binarylog.MethodLogger
1266if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1267binlogs = append(binlogs, ml)
1268}
1269if s.opts.binaryLogger != nil {
1270if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1271binlogs = append(binlogs, ml)
1272}
1273}
1274if len(binlogs) != 0 {
1275md, _ := metadata.FromIncomingContext(ctx)
1276logEntry := &binarylog.ClientHeader{
1277Header: md,
1278MethodName: stream.Method(),
1279PeerAddr: nil,
1280}
1281if deadline, ok := ctx.Deadline(); ok {
1282logEntry.Timeout = time.Until(deadline)
1283if logEntry.Timeout < 0 {
1284logEntry.Timeout = 0
1285}
1286}
1287if a := md[":authority"]; len(a) > 0 {
1288logEntry.Authority = a[0]
1289}
1290if peer, ok := peer.FromContext(ctx); ok {
1291logEntry.PeerAddr = peer.Addr
1292}
1293for _, binlog := range binlogs {
1294binlog.Log(ctx, logEntry)
1295}
1296}
1297
1298// comp and cp are used for compression. decomp and dc are used for
1299// decompression. If comp and decomp are both set, they are the same;
1300// however they are kept separate to ensure that at most one of the
1301// compressor/decompressor variable pairs are set for use later.
1302var comp, decomp encoding.Compressor
1303var cp Compressor
1304var dc Decompressor
1305var sendCompressorName string
1306
1307// If dc is set and matches the stream's compression, use it. Otherwise, try
1308// to find a matching registered compressor for decomp.
1309if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1310dc = s.opts.dc
1311} else if rc != "" && rc != encoding.Identity {
1312decomp = encoding.GetCompressor(rc)
1313if decomp == nil {
1314st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1315t.WriteStatus(stream, st)
1316return st.Err()
1317}
1318}
1319
1320// If cp is set, use it. Otherwise, attempt to compress the response using
1321// the incoming message compression method.
1322//
1323// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1324if s.opts.cp != nil {
1325cp = s.opts.cp
1326sendCompressorName = cp.Type()
1327} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1328// Legacy compressor not specified; attempt to respond with same encoding.
1329comp = encoding.GetCompressor(rc)
1330if comp != nil {
1331sendCompressorName = comp.Name()
1332}
1333}
1334
1335if sendCompressorName != "" {
1336if err := stream.SetSendCompress(sendCompressorName); err != nil {
1337return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
1338}
1339}
1340
1341var payInfo *payloadInfo
1342if len(shs) != 0 || len(binlogs) != 0 {
1343payInfo = &payloadInfo{}
1344}
1345d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1346if err != nil {
1347if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
1348channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1349}
1350return err
1351}
1352if channelz.IsOn() {
1353t.IncrMsgRecv()
1354}
1355df := func(v any) error {
1356if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1357return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1358}
1359for _, sh := range shs {
1360sh.HandleRPC(ctx, &stats.InPayload{
1361RecvTime: time.Now(),
1362Payload: v,
1363Length: len(d),
1364WireLength: payInfo.compressedLength + headerLen,
1365CompressedLength: payInfo.compressedLength,
1366Data: d,
1367})
1368}
1369if len(binlogs) != 0 {
1370cm := &binarylog.ClientMessage{
1371Message: d,
1372}
1373for _, binlog := range binlogs {
1374binlog.Log(ctx, cm)
1375}
1376}
1377if trInfo != nil {
1378trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1379}
1380return nil
1381}
1382ctx = NewContextWithServerTransportStream(ctx, stream)
1383reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
1384if appErr != nil {
1385appStatus, ok := status.FromError(appErr)
1386if !ok {
1387// Convert non-status application error to a status error with code
1388// Unknown, but handle context errors specifically.
1389appStatus = status.FromContextError(appErr)
1390appErr = appStatus.Err()
1391}
1392if trInfo != nil {
1393trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1394trInfo.tr.SetError()
1395}
1396if e := t.WriteStatus(stream, appStatus); e != nil {
1397channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1398}
1399if len(binlogs) != 0 {
1400if h, _ := stream.Header(); h.Len() > 0 {
1401// Only log serverHeader if there was header. Otherwise it can
1402// be trailer only.
1403sh := &binarylog.ServerHeader{
1404Header: h,
1405}
1406for _, binlog := range binlogs {
1407binlog.Log(ctx, sh)
1408}
1409}
1410st := &binarylog.ServerTrailer{
1411Trailer: stream.Trailer(),
1412Err: appErr,
1413}
1414for _, binlog := range binlogs {
1415binlog.Log(ctx, st)
1416}
1417}
1418return appErr
1419}
1420if trInfo != nil {
1421trInfo.tr.LazyLog(stringer("OK"), false)
1422}
1423opts := &transport.Options{Last: true}
1424
1425// Server handler could have set new compressor by calling SetSendCompressor.
1426// In case it is set, we need to use it for compressing outbound message.
1427if stream.SendCompress() != sendCompressorName {
1428comp = encoding.GetCompressor(stream.SendCompress())
1429}
1430if err := s.sendResponse(ctx, t, stream, reply, cp, opts, comp); err != nil {
1431if err == io.EOF {
1432// The entire stream is done (for unary RPC only).
1433return err
1434}
1435if sts, ok := status.FromError(err); ok {
1436if e := t.WriteStatus(stream, sts); e != nil {
1437channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1438}
1439} else {
1440switch st := err.(type) {
1441case transport.ConnectionError:
1442// Nothing to do here.
1443default:
1444panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1445}
1446}
1447if len(binlogs) != 0 {
1448h, _ := stream.Header()
1449sh := &binarylog.ServerHeader{
1450Header: h,
1451}
1452st := &binarylog.ServerTrailer{
1453Trailer: stream.Trailer(),
1454Err: appErr,
1455}
1456for _, binlog := range binlogs {
1457binlog.Log(ctx, sh)
1458binlog.Log(ctx, st)
1459}
1460}
1461return err
1462}
1463if len(binlogs) != 0 {
1464h, _ := stream.Header()
1465sh := &binarylog.ServerHeader{
1466Header: h,
1467}
1468sm := &binarylog.ServerMessage{
1469Message: reply,
1470}
1471for _, binlog := range binlogs {
1472binlog.Log(ctx, sh)
1473binlog.Log(ctx, sm)
1474}
1475}
1476if channelz.IsOn() {
1477t.IncrMsgSent()
1478}
1479if trInfo != nil {
1480trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1481}
1482// TODO: Should we be logging if writing status failed here, like above?
1483// Should the logging be in WriteStatus? Should we ignore the WriteStatus
1484// error or allow the stats handler to see it?
1485if len(binlogs) != 0 {
1486st := &binarylog.ServerTrailer{
1487Trailer: stream.Trailer(),
1488Err: appErr,
1489}
1490for _, binlog := range binlogs {
1491binlog.Log(ctx, st)
1492}
1493}
1494return t.WriteStatus(stream, statusOK)
1495}
1496
1497// chainStreamServerInterceptors chains all stream server interceptors into one.
1498func chainStreamServerInterceptors(s *Server) {
1499// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1500// be executed before any other chained interceptors.
1501interceptors := s.opts.chainStreamInts
1502if s.opts.streamInt != nil {
1503interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1504}
1505
1506var chainedInt StreamServerInterceptor
1507if len(interceptors) == 0 {
1508chainedInt = nil
1509} else if len(interceptors) == 1 {
1510chainedInt = interceptors[0]
1511} else {
1512chainedInt = chainStreamInterceptors(interceptors)
1513}
1514
1515s.opts.streamInt = chainedInt
1516}
1517
1518func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
1519return func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1520return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1521}
1522}
1523
1524func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1525if curr == len(interceptors)-1 {
1526return finalHandler
1527}
1528return func(srv any, stream ServerStream) error {
1529return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1530}
1531}
1532
1533func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
1534if channelz.IsOn() {
1535s.incrCallsStarted()
1536}
1537shs := s.opts.statsHandlers
1538var statsBegin *stats.Begin
1539if len(shs) != 0 {
1540beginTime := time.Now()
1541statsBegin = &stats.Begin{
1542BeginTime: beginTime,
1543IsClientStream: sd.ClientStreams,
1544IsServerStream: sd.ServerStreams,
1545}
1546for _, sh := range shs {
1547sh.HandleRPC(ctx, statsBegin)
1548}
1549}
1550ctx = NewContextWithServerTransportStream(ctx, stream)
1551ss := &serverStream{
1552ctx: ctx,
1553t: t,
1554s: stream,
1555p: &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},
1556codec: s.getCodec(stream.ContentSubtype()),
1557maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1558maxSendMessageSize: s.opts.maxSendMessageSize,
1559trInfo: trInfo,
1560statsHandler: shs,
1561}
1562
1563if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
1564// See comment in processUnaryRPC on defers.
1565defer func() {
1566if trInfo != nil {
1567ss.mu.Lock()
1568if err != nil && err != io.EOF {
1569ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1570ss.trInfo.tr.SetError()
1571}
1572ss.trInfo.tr.Finish()
1573ss.trInfo.tr = nil
1574ss.mu.Unlock()
1575}
1576
1577if len(shs) != 0 {
1578end := &stats.End{
1579BeginTime: statsBegin.BeginTime,
1580EndTime: time.Now(),
1581}
1582if err != nil && err != io.EOF {
1583end.Error = toRPCErr(err)
1584}
1585for _, sh := range shs {
1586sh.HandleRPC(ctx, end)
1587}
1588}
1589
1590if channelz.IsOn() {
1591if err != nil && err != io.EOF {
1592s.incrCallsFailed()
1593} else {
1594s.incrCallsSucceeded()
1595}
1596}
1597}()
1598}
1599
1600if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1601ss.binlogs = append(ss.binlogs, ml)
1602}
1603if s.opts.binaryLogger != nil {
1604if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1605ss.binlogs = append(ss.binlogs, ml)
1606}
1607}
1608if len(ss.binlogs) != 0 {
1609md, _ := metadata.FromIncomingContext(ctx)
1610logEntry := &binarylog.ClientHeader{
1611Header: md,
1612MethodName: stream.Method(),
1613PeerAddr: nil,
1614}
1615if deadline, ok := ctx.Deadline(); ok {
1616logEntry.Timeout = time.Until(deadline)
1617if logEntry.Timeout < 0 {
1618logEntry.Timeout = 0
1619}
1620}
1621if a := md[":authority"]; len(a) > 0 {
1622logEntry.Authority = a[0]
1623}
1624if peer, ok := peer.FromContext(ss.Context()); ok {
1625logEntry.PeerAddr = peer.Addr
1626}
1627for _, binlog := range ss.binlogs {
1628binlog.Log(ctx, logEntry)
1629}
1630}
1631
1632// If dc is set and matches the stream's compression, use it. Otherwise, try
1633// to find a matching registered compressor for decomp.
1634if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1635ss.dc = s.opts.dc
1636} else if rc != "" && rc != encoding.Identity {
1637ss.decomp = encoding.GetCompressor(rc)
1638if ss.decomp == nil {
1639st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1640t.WriteStatus(ss.s, st)
1641return st.Err()
1642}
1643}
1644
1645// If cp is set, use it. Otherwise, attempt to compress the response using
1646// the incoming message compression method.
1647//
1648// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1649if s.opts.cp != nil {
1650ss.cp = s.opts.cp
1651ss.sendCompressorName = s.opts.cp.Type()
1652} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1653// Legacy compressor not specified; attempt to respond with same encoding.
1654ss.comp = encoding.GetCompressor(rc)
1655if ss.comp != nil {
1656ss.sendCompressorName = rc
1657}
1658}
1659
1660if ss.sendCompressorName != "" {
1661if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
1662return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
1663}
1664}
1665
1666ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
1667
1668if trInfo != nil {
1669trInfo.tr.LazyLog(&trInfo.firstLine, false)
1670}
1671var appErr error
1672var server any
1673if info != nil {
1674server = info.serviceImpl
1675}
1676if s.opts.streamInt == nil {
1677appErr = sd.Handler(server, ss)
1678} else {
1679info := &StreamServerInfo{
1680FullMethod: stream.Method(),
1681IsClientStream: sd.ClientStreams,
1682IsServerStream: sd.ServerStreams,
1683}
1684appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1685}
1686if appErr != nil {
1687appStatus, ok := status.FromError(appErr)
1688if !ok {
1689// Convert non-status application error to a status error with code
1690// Unknown, but handle context errors specifically.
1691appStatus = status.FromContextError(appErr)
1692appErr = appStatus.Err()
1693}
1694if trInfo != nil {
1695ss.mu.Lock()
1696ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1697ss.trInfo.tr.SetError()
1698ss.mu.Unlock()
1699}
1700if len(ss.binlogs) != 0 {
1701st := &binarylog.ServerTrailer{
1702Trailer: ss.s.Trailer(),
1703Err: appErr,
1704}
1705for _, binlog := range ss.binlogs {
1706binlog.Log(ctx, st)
1707}
1708}
1709t.WriteStatus(ss.s, appStatus)
1710// TODO: Should we log an error from WriteStatus here and below?
1711return appErr
1712}
1713if trInfo != nil {
1714ss.mu.Lock()
1715ss.trInfo.tr.LazyLog(stringer("OK"), false)
1716ss.mu.Unlock()
1717}
1718if len(ss.binlogs) != 0 {
1719st := &binarylog.ServerTrailer{
1720Trailer: ss.s.Trailer(),
1721Err: appErr,
1722}
1723for _, binlog := range ss.binlogs {
1724binlog.Log(ctx, st)
1725}
1726}
1727return t.WriteStatus(ss.s, statusOK)
1728}
1729
1730func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
1731ctx := stream.Context()
1732ctx = contextWithServer(ctx, s)
1733var ti *traceInfo
1734if EnableTracing {
1735tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
1736ctx = newTraceContext(ctx, tr)
1737ti = &traceInfo{
1738tr: tr,
1739firstLine: firstLine{
1740client: false,
1741remoteAddr: t.Peer().Addr,
1742},
1743}
1744if dl, ok := ctx.Deadline(); ok {
1745ti.firstLine.deadline = time.Until(dl)
1746}
1747}
1748
1749sm := stream.Method()
1750if sm != "" && sm[0] == '/' {
1751sm = sm[1:]
1752}
1753pos := strings.LastIndex(sm, "/")
1754if pos == -1 {
1755if ti != nil {
1756ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
1757ti.tr.SetError()
1758}
1759errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1760if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1761if ti != nil {
1762ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1763ti.tr.SetError()
1764}
1765channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1766}
1767if ti != nil {
1768ti.tr.Finish()
1769}
1770return
1771}
1772service := sm[:pos]
1773method := sm[pos+1:]
1774
1775md, _ := metadata.FromIncomingContext(ctx)
1776for _, sh := range s.opts.statsHandlers {
1777ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
1778sh.HandleRPC(ctx, &stats.InHeader{
1779FullMethod: stream.Method(),
1780RemoteAddr: t.Peer().Addr,
1781LocalAddr: t.Peer().LocalAddr,
1782Compression: stream.RecvCompress(),
1783WireLength: stream.HeaderWireLength(),
1784Header: md,
1785})
1786}
1787// To have calls in stream callouts work. Will delete once all stats handler
1788// calls come from the gRPC layer.
1789stream.SetContext(ctx)
1790
1791srv, knownService := s.services[service]
1792if knownService {
1793if md, ok := srv.methods[method]; ok {
1794s.processUnaryRPC(ctx, t, stream, srv, md, ti)
1795return
1796}
1797if sd, ok := srv.streams[method]; ok {
1798s.processStreamingRPC(ctx, t, stream, srv, sd, ti)
1799return
1800}
1801}
1802// Unknown service, or known server unknown method.
1803if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1804s.processStreamingRPC(ctx, t, stream, nil, unknownDesc, ti)
1805return
1806}
1807var errDesc string
1808if !knownService {
1809errDesc = fmt.Sprintf("unknown service %v", service)
1810} else {
1811errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1812}
1813if ti != nil {
1814ti.tr.LazyPrintf("%s", errDesc)
1815ti.tr.SetError()
1816}
1817if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1818if ti != nil {
1819ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1820ti.tr.SetError()
1821}
1822channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1823}
1824if ti != nil {
1825ti.tr.Finish()
1826}
1827}
1828
1829// The key to save ServerTransportStream in the context.
1830type streamKey struct{}
1831
1832// NewContextWithServerTransportStream creates a new context from ctx and
1833// attaches stream to it.
1834//
1835// # Experimental
1836//
1837// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1838// later release.
1839func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1840return context.WithValue(ctx, streamKey{}, stream)
1841}
1842
1843// ServerTransportStream is a minimal interface that a transport stream must
1844// implement. This can be used to mock an actual transport stream for tests of
1845// handler code that use, for example, grpc.SetHeader (which requires some
1846// stream to be in context).
1847//
1848// See also NewContextWithServerTransportStream.
1849//
1850// # Experimental
1851//
1852// Notice: This type is EXPERIMENTAL and may be changed or removed in a
1853// later release.
1854type ServerTransportStream interface {
1855Method() string
1856SetHeader(md metadata.MD) error
1857SendHeader(md metadata.MD) error
1858SetTrailer(md metadata.MD) error
1859}
1860
1861// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1862// ctx. Returns nil if the given context has no stream associated with it
1863// (which implies it is not an RPC invocation context).
1864//
1865// # Experimental
1866//
1867// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1868// later release.
1869func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1870s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1871return s
1872}
1873
1874// Stop stops the gRPC server. It immediately closes all open
1875// connections and listeners.
1876// It cancels all active RPCs on the server side and the corresponding
1877// pending RPCs on the client side will get notified by connection
1878// errors.
1879func (s *Server) Stop() {
1880s.stop(false)
1881}
1882
1883// GracefulStop stops the gRPC server gracefully. It stops the server from
1884// accepting new connections and RPCs and blocks until all the pending RPCs are
1885// finished.
1886func (s *Server) GracefulStop() {
1887s.stop(true)
1888}
1889
1890func (s *Server) stop(graceful bool) {
1891s.quit.Fire()
1892defer s.done.Fire()
1893
1894s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
1895
1896s.mu.Lock()
1897s.closeListenersLocked()
1898// Wait for serving threads to be ready to exit. Only then can we be sure no
1899// new conns will be created.
1900s.mu.Unlock()
1901s.serveWG.Wait()
1902
1903s.mu.Lock()
1904defer s.mu.Unlock()
1905
1906if graceful {
1907s.drainAllServerTransportsLocked()
1908} else {
1909s.closeServerTransportsLocked()
1910}
1911
1912for len(s.conns) != 0 {
1913s.cv.Wait()
1914}
1915s.conns = nil
1916
1917if s.opts.numServerWorkers > 0 {
1918// Closing the channel (only once, via grpcsync.OnceFunc) after all the
1919// connections have been closed above ensures that there are no
1920// goroutines executing the callback passed to st.HandleStreams (where
1921// the channel is written to).
1922s.serverWorkerChannelClose()
1923}
1924
1925if graceful || s.opts.waitForHandlers {
1926s.handlersWG.Wait()
1927}
1928
1929if s.events != nil {
1930s.events.Finish()
1931s.events = nil
1932}
1933}
1934
1935// s.mu must be held by the caller.
1936func (s *Server) closeServerTransportsLocked() {
1937for _, conns := range s.conns {
1938for st := range conns {
1939st.Close(errors.New("Server.Stop called"))
1940}
1941}
1942}
1943
1944// s.mu must be held by the caller.
1945func (s *Server) drainAllServerTransportsLocked() {
1946if !s.drain {
1947for _, conns := range s.conns {
1948for st := range conns {
1949st.Drain("graceful_stop")
1950}
1951}
1952s.drain = true
1953}
1954}
1955
1956// s.mu must be held by the caller.
1957func (s *Server) closeListenersLocked() {
1958for lis := range s.lis {
1959lis.Close()
1960}
1961s.lis = nil
1962}
1963
1964// contentSubtype must be lowercase
1965// cannot return nil
1966func (s *Server) getCodec(contentSubtype string) baseCodec {
1967if s.opts.codec != nil {
1968return s.opts.codec
1969}
1970if contentSubtype == "" {
1971return encoding.GetCodec(proto.Name)
1972}
1973codec := encoding.GetCodec(contentSubtype)
1974if codec == nil {
1975logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", contentSubtype, proto.Name)
1976return encoding.GetCodec(proto.Name)
1977}
1978return codec
1979}
1980
1981type serverKey struct{}
1982
1983// serverFromContext gets the Server from the context.
1984func serverFromContext(ctx context.Context) *Server {
1985s, _ := ctx.Value(serverKey{}).(*Server)
1986return s
1987}
1988
1989// contextWithServer sets the Server in the context.
1990func contextWithServer(ctx context.Context, server *Server) context.Context {
1991return context.WithValue(ctx, serverKey{}, server)
1992}
1993
1994// isRegisteredMethod returns whether the passed in method is registered as a
1995// method on the server. /service/method and service/method will match if the
1996// service and method are registered on the server.
1997func (s *Server) isRegisteredMethod(serviceMethod string) bool {
1998if serviceMethod != "" && serviceMethod[0] == '/' {
1999serviceMethod = serviceMethod[1:]
2000}
2001pos := strings.LastIndex(serviceMethod, "/")
2002if pos == -1 { // Invalid method name syntax.
2003return false
2004}
2005service := serviceMethod[:pos]
2006method := serviceMethod[pos+1:]
2007srv, knownService := s.services[service]
2008if knownService {
2009if _, ok := srv.methods[method]; ok {
2010return true
2011}
2012if _, ok := srv.streams[method]; ok {
2013return true
2014}
2015}
2016return false
2017}
2018
2019// SetHeader sets the header metadata to be sent from the server to the client.
2020// The context provided must be the context passed to the server's handler.
2021//
2022// Streaming RPCs should prefer the SetHeader method of the ServerStream.
2023//
2024// When called multiple times, all the provided metadata will be merged. All
2025// the metadata will be sent out when one of the following happens:
2026//
2027// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
2028// - The first response message is sent. For unary handlers, this occurs when
2029// the handler returns; for streaming handlers, this can happen when stream's
2030// SendMsg method is called.
2031// - An RPC status is sent out (error or success). This occurs when the handler
2032// returns.
2033//
2034// SetHeader will fail if called after any of the events above.
2035//
2036// The error returned is compatible with the status package. However, the
2037// status code will often not match the RPC status as seen by the client
2038// application, and therefore, should not be relied upon for this purpose.
2039func SetHeader(ctx context.Context, md metadata.MD) error {
2040if md.Len() == 0 {
2041return nil
2042}
2043stream := ServerTransportStreamFromContext(ctx)
2044if stream == nil {
2045return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2046}
2047return stream.SetHeader(md)
2048}
2049
2050// SendHeader sends header metadata. It may be called at most once, and may not
2051// be called after any event that causes headers to be sent (see SetHeader for
2052// a complete list). The provided md and headers set by SetHeader() will be
2053// sent.
2054//
2055// The error returned is compatible with the status package. However, the
2056// status code will often not match the RPC status as seen by the client
2057// application, and therefore, should not be relied upon for this purpose.
2058func SendHeader(ctx context.Context, md metadata.MD) error {
2059stream := ServerTransportStreamFromContext(ctx)
2060if stream == nil {
2061return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2062}
2063if err := stream.SendHeader(md); err != nil {
2064return toRPCErr(err)
2065}
2066return nil
2067}
2068
2069// SetSendCompressor sets a compressor for outbound messages from the server.
2070// It must not be called after any event that causes headers to be sent
2071// (see ServerStream.SetHeader for the complete list). Provided compressor is
2072// used when below conditions are met:
2073//
2074// - compressor is registered via encoding.RegisterCompressor
2075// - compressor name must exist in the client advertised compressor names
2076// sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
2077// get client supported compressor names.
2078//
2079// The context provided must be the context passed to the server's handler.
2080// It must be noted that compressor name encoding.Identity disables the
2081// outbound compression.
2082// By default, server messages will be sent using the same compressor with
2083// which request messages were sent.
2084//
2085// It is not safe to call SetSendCompressor concurrently with SendHeader and
2086// SendMsg.
2087//
2088// # Experimental
2089//
2090// Notice: This function is EXPERIMENTAL and may be changed or removed in a
2091// later release.
2092func SetSendCompressor(ctx context.Context, name string) error {
2093stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
2094if !ok || stream == nil {
2095return fmt.Errorf("failed to fetch the stream from the given context")
2096}
2097
2098if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
2099return fmt.Errorf("unable to set send compressor: %w", err)
2100}
2101
2102return stream.SetSendCompress(name)
2103}
2104
2105// ClientSupportedCompressors returns compressor names advertised by the client
2106// via grpc-accept-encoding header.
2107//
2108// The context provided must be the context passed to the server's handler.
2109//
2110// # Experimental
2111//
2112// Notice: This function is EXPERIMENTAL and may be changed or removed in a
2113// later release.
2114func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
2115stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
2116if !ok || stream == nil {
2117return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
2118}
2119
2120return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
2121}
2122
2123// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
2124// When called more than once, all the provided metadata will be merged.
2125//
2126// The error returned is compatible with the status package. However, the
2127// status code will often not match the RPC status as seen by the client
2128// application, and therefore, should not be relied upon for this purpose.
2129func SetTrailer(ctx context.Context, md metadata.MD) error {
2130if md.Len() == 0 {
2131return nil
2132}
2133stream := ServerTransportStreamFromContext(ctx)
2134if stream == nil {
2135return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2136}
2137return stream.SetTrailer(md)
2138}
2139
2140// Method returns the method string for the server context. The returned
2141// string is in the format of "/service/method".
2142func Method(ctx context.Context) (string, bool) {
2143s := ServerTransportStreamFromContext(ctx)
2144if s == nil {
2145return "", false
2146}
2147return s.Method(), true
2148}
2149
2150type channelzServer struct {
2151s *Server
2152}
2153
2154func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
2155return c.s.channelzMetric()
2156}
2157
2158// validateSendCompressor returns an error when given compressor name cannot be
2159// handled by the server or the client based on the advertised compressors.
2160func validateSendCompressor(name, clientCompressors string) error {
2161if name == encoding.Identity {
2162return nil
2163}
2164
2165if !grpcutil.IsCompressorNameRegistered(name) {
2166return fmt.Errorf("compressor not registered %q", name)
2167}
2168
2169for _, c := range strings.Split(clientCompressors, ",") {
2170if c == name {
2171return nil // found match
2172}
2173}
2174return fmt.Errorf("client does not support compressor %q", name)
2175}
2176
2177// atomicSemaphore implements a blocking, counting semaphore. acquire should be
2178// called synchronously; release may be called asynchronously.
2179type atomicSemaphore struct {
2180n atomic.Int64
2181wait chan struct{}
2182}
2183
2184func (q *atomicSemaphore) acquire() {
2185if q.n.Add(-1) < 0 {
2186// We ran out of quota. Block until a release happens.
2187<-q.wait
2188}
2189}
2190
2191func (q *atomicSemaphore) release() {
2192// N.B. the "<= 0" check below should allow for this to work with multiple
2193// concurrent calls to acquire, but also note that with synchronous calls to
2194// acquire, as our system does, n will never be less than -1. There are
2195// fairness issues (queuing) to consider if this was to be generalized.
2196if q.n.Add(1) <= 0 {
2197// An acquire was waiting on us. Unblock it.
2198q.wait <- struct{}{}
2199}
2200}
2201
2202func newHandlerQuota(n uint32) *atomicSemaphore {
2203a := &atomicSemaphore{wait: make(chan struct{}, 1)}
2204a.n.Store(int64(n))
2205return a
2206}
2207