podman

Форк
0
2206 строк · 69.1 Кб
1
/*
2
 *
3
 * Copyright 2014 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package grpc
20

21
import (
22
	"context"
23
	"errors"
24
	"fmt"
25
	"io"
26
	"math"
27
	"net"
28
	"net/http"
29
	"reflect"
30
	"runtime"
31
	"strings"
32
	"sync"
33
	"sync/atomic"
34
	"time"
35

36
	"google.golang.org/grpc/codes"
37
	"google.golang.org/grpc/credentials"
38
	"google.golang.org/grpc/encoding"
39
	"google.golang.org/grpc/encoding/proto"
40
	"google.golang.org/grpc/grpclog"
41
	"google.golang.org/grpc/internal"
42
	"google.golang.org/grpc/internal/binarylog"
43
	"google.golang.org/grpc/internal/channelz"
44
	"google.golang.org/grpc/internal/grpcsync"
45
	"google.golang.org/grpc/internal/grpcutil"
46
	"google.golang.org/grpc/internal/transport"
47
	"google.golang.org/grpc/keepalive"
48
	"google.golang.org/grpc/metadata"
49
	"google.golang.org/grpc/peer"
50
	"google.golang.org/grpc/stats"
51
	"google.golang.org/grpc/status"
52
	"google.golang.org/grpc/tap"
53
)
54

55
const (
56
	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
57
	defaultServerMaxSendMessageSize    = math.MaxInt32
58

59
	// Server transports are tracked in a map which is keyed on listener
60
	// address. For regular gRPC traffic, connections are accepted in Serve()
61
	// through a call to Accept(), and we use the actual listener address as key
62
	// when we add it to the map. But for connections received through
63
	// ServeHTTP(), we do not have a listener and hence use this dummy value.
64
	listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
65
)
66

67
func init() {
68
	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
69
		return srv.opts.creds
70
	}
71
	internal.IsRegisteredMethod = func(srv *Server, method string) bool {
72
		return srv.isRegisteredMethod(method)
73
	}
74
	internal.ServerFromContext = serverFromContext
75
	internal.AddGlobalServerOptions = func(opt ...ServerOption) {
76
		globalServerOptions = append(globalServerOptions, opt...)
77
	}
78
	internal.ClearGlobalServerOptions = func() {
79
		globalServerOptions = nil
80
	}
81
	internal.BinaryLogger = binaryLogger
82
	internal.JoinServerOptions = newJoinServerOption
83
	internal.RecvBufferPool = recvBufferPool
84
}
85

86
var statusOK = status.New(codes.OK, "")
87
var logger = grpclog.Component("core")
88

89
type methodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)
90

91
// MethodDesc represents an RPC service's method specification.
92
type MethodDesc struct {
93
	MethodName string
94
	Handler    methodHandler
95
}
96

97
// ServiceDesc represents an RPC service's specification.
98
type ServiceDesc struct {
99
	ServiceName string
100
	// The pointer to the service interface. Used to check whether the user
101
	// provided implementation satisfies the interface requirements.
102
	HandlerType any
103
	Methods     []MethodDesc
104
	Streams     []StreamDesc
105
	Metadata    any
106
}
107

108
// serviceInfo wraps information about a service. It is very similar to
109
// ServiceDesc and is constructed from it for internal purposes.
110
type serviceInfo struct {
111
	// Contains the implementation for the methods in this service.
112
	serviceImpl any
113
	methods     map[string]*MethodDesc
114
	streams     map[string]*StreamDesc
115
	mdata       any
116
}
117

118
// Server is a gRPC server to serve RPC requests.
119
type Server struct {
120
	opts serverOptions
121

122
	mu  sync.Mutex // guards following
123
	lis map[net.Listener]bool
124
	// conns contains all active server transports. It is a map keyed on a
125
	// listener address with the value being the set of active transports
126
	// belonging to that listener.
127
	conns    map[string]map[transport.ServerTransport]bool
128
	serve    bool
129
	drain    bool
130
	cv       *sync.Cond              // signaled when connections close for GracefulStop
131
	services map[string]*serviceInfo // service name -> service info
132
	events   traceEventLog
133

134
	quit               *grpcsync.Event
135
	done               *grpcsync.Event
136
	channelzRemoveOnce sync.Once
137
	serveWG            sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
138
	handlersWG         sync.WaitGroup // counts active method handler goroutines
139

140
	channelzID *channelz.Identifier
141
	czData     *channelzData
142

143
	serverWorkerChannel      chan func()
144
	serverWorkerChannelClose func()
145
}
146

147
type serverOptions struct {
148
	creds                 credentials.TransportCredentials
149
	codec                 baseCodec
150
	cp                    Compressor
151
	dc                    Decompressor
152
	unaryInt              UnaryServerInterceptor
153
	streamInt             StreamServerInterceptor
154
	chainUnaryInts        []UnaryServerInterceptor
155
	chainStreamInts       []StreamServerInterceptor
156
	binaryLogger          binarylog.Logger
157
	inTapHandle           tap.ServerInHandle
158
	statsHandlers         []stats.Handler
159
	maxConcurrentStreams  uint32
160
	maxReceiveMessageSize int
161
	maxSendMessageSize    int
162
	unknownStreamDesc     *StreamDesc
163
	keepaliveParams       keepalive.ServerParameters
164
	keepalivePolicy       keepalive.EnforcementPolicy
165
	initialWindowSize     int32
166
	initialConnWindowSize int32
167
	writeBufferSize       int
168
	readBufferSize        int
169
	sharedWriteBuffer     bool
170
	connectionTimeout     time.Duration
171
	maxHeaderListSize     *uint32
172
	headerTableSize       *uint32
173
	numServerWorkers      uint32
174
	recvBufferPool        SharedBufferPool
175
	waitForHandlers       bool
176
}
177

178
var defaultServerOptions = serverOptions{
179
	maxConcurrentStreams:  math.MaxUint32,
180
	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
181
	maxSendMessageSize:    defaultServerMaxSendMessageSize,
182
	connectionTimeout:     120 * time.Second,
183
	writeBufferSize:       defaultWriteBufSize,
184
	readBufferSize:        defaultReadBufSize,
185
	recvBufferPool:        nopBufferPool{},
186
}
187
var globalServerOptions []ServerOption
188

189
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
190
type ServerOption interface {
191
	apply(*serverOptions)
192
}
193

194
// EmptyServerOption does not alter the server configuration. It can be embedded
195
// in another structure to build custom server options.
196
//
197
// # Experimental
198
//
199
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
200
// later release.
201
type EmptyServerOption struct{}
202

203
func (EmptyServerOption) apply(*serverOptions) {}
204

205
// funcServerOption wraps a function that modifies serverOptions into an
206
// implementation of the ServerOption interface.
207
type funcServerOption struct {
208
	f func(*serverOptions)
209
}
210

211
func (fdo *funcServerOption) apply(do *serverOptions) {
212
	fdo.f(do)
213
}
214

215
func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
216
	return &funcServerOption{
217
		f: f,
218
	}
219
}
220

221
// joinServerOption provides a way to combine arbitrary number of server
222
// options into one.
223
type joinServerOption struct {
224
	opts []ServerOption
225
}
226

227
func (mdo *joinServerOption) apply(do *serverOptions) {
228
	for _, opt := range mdo.opts {
229
		opt.apply(do)
230
	}
231
}
232

233
func newJoinServerOption(opts ...ServerOption) ServerOption {
234
	return &joinServerOption{opts: opts}
235
}
236

237
// SharedWriteBuffer allows reusing per-connection transport write buffer.
238
// If this option is set to true every connection will release the buffer after
239
// flushing the data on the wire.
240
//
241
// # Experimental
242
//
243
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
244
// later release.
245
func SharedWriteBuffer(val bool) ServerOption {
246
	return newFuncServerOption(func(o *serverOptions) {
247
		o.sharedWriteBuffer = val
248
	})
249
}
250

251
// WriteBufferSize determines how much data can be batched before doing a write
252
// on the wire. The corresponding memory allocation for this buffer will be
253
// twice the size to keep syscalls low. The default value for this buffer is
254
// 32KB. Zero or negative values will disable the write buffer such that each
255
// write will be on underlying connection.
256
// Note: A Send call may not directly translate to a write.
257
func WriteBufferSize(s int) ServerOption {
258
	return newFuncServerOption(func(o *serverOptions) {
259
		o.writeBufferSize = s
260
	})
261
}
262

263
// ReadBufferSize lets you set the size of read buffer, this determines how much
264
// data can be read at most for one read syscall. The default value for this
265
// buffer is 32KB. Zero or negative values will disable read buffer for a
266
// connection so data framer can access the underlying conn directly.
267
func ReadBufferSize(s int) ServerOption {
268
	return newFuncServerOption(func(o *serverOptions) {
269
		o.readBufferSize = s
270
	})
271
}
272

273
// InitialWindowSize returns a ServerOption that sets window size for stream.
274
// The lower bound for window size is 64K and any value smaller than that will be ignored.
275
func InitialWindowSize(s int32) ServerOption {
276
	return newFuncServerOption(func(o *serverOptions) {
277
		o.initialWindowSize = s
278
	})
279
}
280

281
// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
282
// The lower bound for window size is 64K and any value smaller than that will be ignored.
283
func InitialConnWindowSize(s int32) ServerOption {
284
	return newFuncServerOption(func(o *serverOptions) {
285
		o.initialConnWindowSize = s
286
	})
287
}
288

289
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
290
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
291
	if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {
292
		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
293
		kp.Time = internal.KeepaliveMinServerPingTime
294
	}
295

296
	return newFuncServerOption(func(o *serverOptions) {
297
		o.keepaliveParams = kp
298
	})
299
}
300

301
// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
302
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
303
	return newFuncServerOption(func(o *serverOptions) {
304
		o.keepalivePolicy = kep
305
	})
306
}
307

308
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
309
//
310
// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
311
//
312
// Deprecated: register codecs using encoding.RegisterCodec. The server will
313
// automatically use registered codecs based on the incoming requests' headers.
314
// See also
315
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
316
// Will be supported throughout 1.x.
317
func CustomCodec(codec Codec) ServerOption {
318
	return newFuncServerOption(func(o *serverOptions) {
319
		o.codec = codec
320
	})
321
}
322

323
// ForceServerCodec returns a ServerOption that sets a codec for message
324
// marshaling and unmarshaling.
325
//
326
// This will override any lookups by content-subtype for Codecs registered
327
// with RegisterCodec.
328
//
329
// See Content-Type on
330
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
331
// more details. Also see the documentation on RegisterCodec and
332
// CallContentSubtype for more details on the interaction between encoding.Codec
333
// and content-subtype.
334
//
335
// This function is provided for advanced users; prefer to register codecs
336
// using encoding.RegisterCodec.
337
// The server will automatically use registered codecs based on the incoming
338
// requests' headers. See also
339
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
340
// Will be supported throughout 1.x.
341
//
342
// # Experimental
343
//
344
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
345
// later release.
346
func ForceServerCodec(codec encoding.Codec) ServerOption {
347
	return newFuncServerOption(func(o *serverOptions) {
348
		o.codec = codec
349
	})
350
}
351

352
// RPCCompressor returns a ServerOption that sets a compressor for outbound
353
// messages.  For backward compatibility, all outbound messages will be sent
354
// using this compressor, regardless of incoming message compression.  By
355
// default, server messages will be sent using the same compressor with which
356
// request messages were sent.
357
//
358
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
359
// throughout 1.x.
360
func RPCCompressor(cp Compressor) ServerOption {
361
	return newFuncServerOption(func(o *serverOptions) {
362
		o.cp = cp
363
	})
364
}
365

366
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
367
// messages.  It has higher priority than decompressors registered via
368
// encoding.RegisterCompressor.
369
//
370
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
371
// throughout 1.x.
372
func RPCDecompressor(dc Decompressor) ServerOption {
373
	return newFuncServerOption(func(o *serverOptions) {
374
		o.dc = dc
375
	})
376
}
377

378
// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
379
// If this is not set, gRPC uses the default limit.
380
//
381
// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
382
func MaxMsgSize(m int) ServerOption {
383
	return MaxRecvMsgSize(m)
384
}
385

386
// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
387
// If this is not set, gRPC uses the default 4MB.
388
func MaxRecvMsgSize(m int) ServerOption {
389
	return newFuncServerOption(func(o *serverOptions) {
390
		o.maxReceiveMessageSize = m
391
	})
392
}
393

394
// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
395
// If this is not set, gRPC uses the default `math.MaxInt32`.
396
func MaxSendMsgSize(m int) ServerOption {
397
	return newFuncServerOption(func(o *serverOptions) {
398
		o.maxSendMessageSize = m
399
	})
400
}
401

402
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
403
// of concurrent streams to each ServerTransport.
404
func MaxConcurrentStreams(n uint32) ServerOption {
405
	if n == 0 {
406
		n = math.MaxUint32
407
	}
408
	return newFuncServerOption(func(o *serverOptions) {
409
		o.maxConcurrentStreams = n
410
	})
411
}
412

413
// Creds returns a ServerOption that sets credentials for server connections.
414
func Creds(c credentials.TransportCredentials) ServerOption {
415
	return newFuncServerOption(func(o *serverOptions) {
416
		o.creds = c
417
	})
418
}
419

420
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
421
// server. Only one unary interceptor can be installed. The construction of multiple
422
// interceptors (e.g., chaining) can be implemented at the caller.
423
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
424
	return newFuncServerOption(func(o *serverOptions) {
425
		if o.unaryInt != nil {
426
			panic("The unary server interceptor was already set and may not be reset.")
427
		}
428
		o.unaryInt = i
429
	})
430
}
431

432
// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
433
// for unary RPCs. The first interceptor will be the outer most,
434
// while the last interceptor will be the inner most wrapper around the real call.
435
// All unary interceptors added by this method will be chained.
436
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
437
	return newFuncServerOption(func(o *serverOptions) {
438
		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
439
	})
440
}
441

442
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
443
// server. Only one stream interceptor can be installed.
444
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
445
	return newFuncServerOption(func(o *serverOptions) {
446
		if o.streamInt != nil {
447
			panic("The stream server interceptor was already set and may not be reset.")
448
		}
449
		o.streamInt = i
450
	})
451
}
452

453
// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
454
// for streaming RPCs. The first interceptor will be the outer most,
455
// while the last interceptor will be the inner most wrapper around the real call.
456
// All stream interceptors added by this method will be chained.
457
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
458
	return newFuncServerOption(func(o *serverOptions) {
459
		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
460
	})
461
}
462

463
// InTapHandle returns a ServerOption that sets the tap handle for all the server
464
// transport to be created. Only one can be installed.
465
//
466
// # Experimental
467
//
468
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
469
// later release.
470
func InTapHandle(h tap.ServerInHandle) ServerOption {
471
	return newFuncServerOption(func(o *serverOptions) {
472
		if o.inTapHandle != nil {
473
			panic("The tap handle was already set and may not be reset.")
474
		}
475
		o.inTapHandle = h
476
	})
477
}
478

479
// StatsHandler returns a ServerOption that sets the stats handler for the server.
480
func StatsHandler(h stats.Handler) ServerOption {
481
	return newFuncServerOption(func(o *serverOptions) {
482
		if h == nil {
483
			logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
484
			// Do not allow a nil stats handler, which would otherwise cause
485
			// panics.
486
			return
487
		}
488
		o.statsHandlers = append(o.statsHandlers, h)
489
	})
490
}
491

492
// binaryLogger returns a ServerOption that can set the binary logger for the
493
// server.
494
func binaryLogger(bl binarylog.Logger) ServerOption {
495
	return newFuncServerOption(func(o *serverOptions) {
496
		o.binaryLogger = bl
497
	})
498
}
499

500
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
501
// unknown service handler. The provided method is a bidi-streaming RPC service
502
// handler that will be invoked instead of returning the "unimplemented" gRPC
503
// error whenever a request is received for an unregistered service or method.
504
// The handling function and stream interceptor (if set) have full access to
505
// the ServerStream, including its Context.
506
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
507
	return newFuncServerOption(func(o *serverOptions) {
508
		o.unknownStreamDesc = &StreamDesc{
509
			StreamName: "unknown_service_handler",
510
			Handler:    streamHandler,
511
			// We need to assume that the users of the streamHandler will want to use both.
512
			ClientStreams: true,
513
			ServerStreams: true,
514
		}
515
	})
516
}
517

518
// ConnectionTimeout returns a ServerOption that sets the timeout for
519
// connection establishment (up to and including HTTP/2 handshaking) for all
520
// new connections.  If this is not set, the default is 120 seconds.  A zero or
521
// negative value will result in an immediate timeout.
522
//
523
// # Experimental
524
//
525
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
526
// later release.
527
func ConnectionTimeout(d time.Duration) ServerOption {
528
	return newFuncServerOption(func(o *serverOptions) {
529
		o.connectionTimeout = d
530
	})
531
}
532

533
// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
534
// of header list that the server is prepared to accept.
535
func MaxHeaderListSize(s uint32) ServerOption {
536
	return newFuncServerOption(func(o *serverOptions) {
537
		o.maxHeaderListSize = &s
538
	})
539
}
540

541
// HeaderTableSize returns a ServerOption that sets the size of dynamic
542
// header table for stream.
543
//
544
// # Experimental
545
//
546
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
547
// later release.
548
func HeaderTableSize(s uint32) ServerOption {
549
	return newFuncServerOption(func(o *serverOptions) {
550
		o.headerTableSize = &s
551
	})
552
}
553

554
// NumStreamWorkers returns a ServerOption that sets the number of worker
555
// goroutines that should be used to process incoming streams. Setting this to
556
// zero (default) will disable workers and spawn a new goroutine for each
557
// stream.
558
//
559
// # Experimental
560
//
561
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
562
// later release.
563
func NumStreamWorkers(numServerWorkers uint32) ServerOption {
564
	// TODO: If/when this API gets stabilized (i.e. stream workers become the
565
	// only way streams are processed), change the behavior of the zero value to
566
	// a sane default. Preliminary experiments suggest that a value equal to the
567
	// number of CPUs available is most performant; requires thorough testing.
568
	return newFuncServerOption(func(o *serverOptions) {
569
		o.numServerWorkers = numServerWorkers
570
	})
571
}
572

573
// WaitForHandlers cause Stop to wait until all outstanding method handlers have
574
// exited before returning.  If false, Stop will return as soon as all
575
// connections have closed, but method handlers may still be running. By
576
// default, Stop does not wait for method handlers to return.
577
//
578
// # Experimental
579
//
580
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
581
// later release.
582
func WaitForHandlers(w bool) ServerOption {
583
	return newFuncServerOption(func(o *serverOptions) {
584
		o.waitForHandlers = w
585
	})
586
}
587

588
// RecvBufferPool returns a ServerOption that configures the server
589
// to use the provided shared buffer pool for parsing incoming messages. Depending
590
// on the application's workload, this could result in reduced memory allocation.
591
//
592
// If you are unsure about how to implement a memory pool but want to utilize one,
593
// begin with grpc.NewSharedBufferPool.
594
//
595
// Note: The shared buffer pool feature will not be active if any of the following
596
// options are used: StatsHandler, EnableTracing, or binary logging. In such
597
// cases, the shared buffer pool will be ignored.
598
//
599
// Deprecated: use experimental.WithRecvBufferPool instead.  Will be deleted in
600
// v1.60.0 or later.
601
func RecvBufferPool(bufferPool SharedBufferPool) ServerOption {
602
	return recvBufferPool(bufferPool)
603
}
604

605
func recvBufferPool(bufferPool SharedBufferPool) ServerOption {
606
	return newFuncServerOption(func(o *serverOptions) {
607
		o.recvBufferPool = bufferPool
608
	})
609
}
610

611
// serverWorkerResetThreshold defines how often the stack must be reset. Every
612
// N requests, by spawning a new goroutine in its place, a worker can reset its
613
// stack so that large stacks don't live in memory forever. 2^16 should allow
614
// each goroutine stack to live for at least a few seconds in a typical
615
// workload (assuming a QPS of a few thousand requests/sec).
616
const serverWorkerResetThreshold = 1 << 16
617

618
// serverWorkers blocks on a *transport.Stream channel forever and waits for
619
// data to be fed by serveStreams. This allows multiple requests to be
620
// processed by the same goroutine, removing the need for expensive stack
621
// re-allocations (see the runtime.morestack problem [1]).
622
//
623
// [1] https://github.com/golang/go/issues/18138
624
func (s *Server) serverWorker() {
625
	for completed := 0; completed < serverWorkerResetThreshold; completed++ {
626
		f, ok := <-s.serverWorkerChannel
627
		if !ok {
628
			return
629
		}
630
		f()
631
	}
632
	go s.serverWorker()
633
}
634

635
// initServerWorkers creates worker goroutines and a channel to process incoming
636
// connections to reduce the time spent overall on runtime.morestack.
637
func (s *Server) initServerWorkers() {
638
	s.serverWorkerChannel = make(chan func())
639
	s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
640
		close(s.serverWorkerChannel)
641
	})
642
	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
643
		go s.serverWorker()
644
	}
645
}
646

647
// NewServer creates a gRPC server which has no service registered and has not
648
// started to accept requests yet.
649
func NewServer(opt ...ServerOption) *Server {
650
	opts := defaultServerOptions
651
	for _, o := range globalServerOptions {
652
		o.apply(&opts)
653
	}
654
	for _, o := range opt {
655
		o.apply(&opts)
656
	}
657
	s := &Server{
658
		lis:      make(map[net.Listener]bool),
659
		opts:     opts,
660
		conns:    make(map[string]map[transport.ServerTransport]bool),
661
		services: make(map[string]*serviceInfo),
662
		quit:     grpcsync.NewEvent(),
663
		done:     grpcsync.NewEvent(),
664
		czData:   new(channelzData),
665
	}
666
	chainUnaryServerInterceptors(s)
667
	chainStreamServerInterceptors(s)
668
	s.cv = sync.NewCond(&s.mu)
669
	if EnableTracing {
670
		_, file, line, _ := runtime.Caller(1)
671
		s.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
672
	}
673

674
	if s.opts.numServerWorkers > 0 {
675
		s.initServerWorkers()
676
	}
677

678
	s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
679
	channelz.Info(logger, s.channelzID, "Server created")
680
	return s
681
}
682

683
// printf records an event in s's event log, unless s has been stopped.
684
// REQUIRES s.mu is held.
685
func (s *Server) printf(format string, a ...any) {
686
	if s.events != nil {
687
		s.events.Printf(format, a...)
688
	}
689
}
690

691
// errorf records an error in s's event log, unless s has been stopped.
692
// REQUIRES s.mu is held.
693
func (s *Server) errorf(format string, a ...any) {
694
	if s.events != nil {
695
		s.events.Errorf(format, a...)
696
	}
697
}
698

699
// ServiceRegistrar wraps a single method that supports service registration. It
700
// enables users to pass concrete types other than grpc.Server to the service
701
// registration methods exported by the IDL generated code.
702
type ServiceRegistrar interface {
703
	// RegisterService registers a service and its implementation to the
704
	// concrete type implementing this interface.  It may not be called
705
	// once the server has started serving.
706
	// desc describes the service and its methods and handlers. impl is the
707
	// service implementation which is passed to the method handlers.
708
	RegisterService(desc *ServiceDesc, impl any)
709
}
710

711
// RegisterService registers a service and its implementation to the gRPC
712
// server. It is called from the IDL generated code. This must be called before
713
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
714
// ensure it implements sd.HandlerType.
715
func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
716
	if ss != nil {
717
		ht := reflect.TypeOf(sd.HandlerType).Elem()
718
		st := reflect.TypeOf(ss)
719
		if !st.Implements(ht) {
720
			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
721
		}
722
	}
723
	s.register(sd, ss)
724
}
725

726
func (s *Server) register(sd *ServiceDesc, ss any) {
727
	s.mu.Lock()
728
	defer s.mu.Unlock()
729
	s.printf("RegisterService(%q)", sd.ServiceName)
730
	if s.serve {
731
		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
732
	}
733
	if _, ok := s.services[sd.ServiceName]; ok {
734
		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
735
	}
736
	info := &serviceInfo{
737
		serviceImpl: ss,
738
		methods:     make(map[string]*MethodDesc),
739
		streams:     make(map[string]*StreamDesc),
740
		mdata:       sd.Metadata,
741
	}
742
	for i := range sd.Methods {
743
		d := &sd.Methods[i]
744
		info.methods[d.MethodName] = d
745
	}
746
	for i := range sd.Streams {
747
		d := &sd.Streams[i]
748
		info.streams[d.StreamName] = d
749
	}
750
	s.services[sd.ServiceName] = info
751
}
752

753
// MethodInfo contains the information of an RPC including its method name and type.
754
type MethodInfo struct {
755
	// Name is the method name only, without the service name or package name.
756
	Name string
757
	// IsClientStream indicates whether the RPC is a client streaming RPC.
758
	IsClientStream bool
759
	// IsServerStream indicates whether the RPC is a server streaming RPC.
760
	IsServerStream bool
761
}
762

763
// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
764
type ServiceInfo struct {
765
	Methods []MethodInfo
766
	// Metadata is the metadata specified in ServiceDesc when registering service.
767
	Metadata any
768
}
769

770
// GetServiceInfo returns a map from service names to ServiceInfo.
771
// Service names include the package names, in the form of <package>.<service>.
772
func (s *Server) GetServiceInfo() map[string]ServiceInfo {
773
	ret := make(map[string]ServiceInfo)
774
	for n, srv := range s.services {
775
		methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
776
		for m := range srv.methods {
777
			methods = append(methods, MethodInfo{
778
				Name:           m,
779
				IsClientStream: false,
780
				IsServerStream: false,
781
			})
782
		}
783
		for m, d := range srv.streams {
784
			methods = append(methods, MethodInfo{
785
				Name:           m,
786
				IsClientStream: d.ClientStreams,
787
				IsServerStream: d.ServerStreams,
788
			})
789
		}
790

791
		ret[n] = ServiceInfo{
792
			Methods:  methods,
793
			Metadata: srv.mdata,
794
		}
795
	}
796
	return ret
797
}
798

799
// ErrServerStopped indicates that the operation is now illegal because of
800
// the server being stopped.
801
var ErrServerStopped = errors.New("grpc: the server has been stopped")
802

803
type listenSocket struct {
804
	net.Listener
805
	channelzID *channelz.Identifier
806
}
807

808
func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
809
	return &channelz.SocketInternalMetric{
810
		SocketOptions: channelz.GetSocketOption(l.Listener),
811
		LocalAddr:     l.Listener.Addr(),
812
	}
813
}
814

815
func (l *listenSocket) Close() error {
816
	err := l.Listener.Close()
817
	channelz.RemoveEntry(l.channelzID)
818
	channelz.Info(logger, l.channelzID, "ListenSocket deleted")
819
	return err
820
}
821

822
// Serve accepts incoming connections on the listener lis, creating a new
823
// ServerTransport and service goroutine for each. The service goroutines
824
// read gRPC requests and then call the registered handlers to reply to them.
825
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
826
// this method returns.
827
// Serve will return a non-nil error unless Stop or GracefulStop is called.
828
//
829
// Note: All supported releases of Go (as of December 2023) override the OS
830
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
831
// with OS defaults for keepalive time and interval, callers need to do the
832
// following two things:
833
//   - pass a net.Listener created by calling the Listen method on a
834
//     net.ListenConfig with the `KeepAlive` field set to a negative value. This
835
//     will result in the Go standard library not overriding OS defaults for TCP
836
//     keepalive interval and time. But this will also result in the Go standard
837
//     library not enabling TCP keepalives by default.
838
//   - override the Accept method on the passed in net.Listener and set the
839
//     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
840
func (s *Server) Serve(lis net.Listener) error {
841
	s.mu.Lock()
842
	s.printf("serving")
843
	s.serve = true
844
	if s.lis == nil {
845
		// Serve called after Stop or GracefulStop.
846
		s.mu.Unlock()
847
		lis.Close()
848
		return ErrServerStopped
849
	}
850

851
	s.serveWG.Add(1)
852
	defer func() {
853
		s.serveWG.Done()
854
		if s.quit.HasFired() {
855
			// Stop or GracefulStop called; block until done and return nil.
856
			<-s.done.Done()
857
		}
858
	}()
859

860
	ls := &listenSocket{Listener: lis}
861
	s.lis[ls] = true
862

863
	defer func() {
864
		s.mu.Lock()
865
		if s.lis != nil && s.lis[ls] {
866
			ls.Close()
867
			delete(s.lis, ls)
868
		}
869
		s.mu.Unlock()
870
	}()
871

872
	var err error
873
	ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
874
	if err != nil {
875
		s.mu.Unlock()
876
		return err
877
	}
878
	s.mu.Unlock()
879
	channelz.Info(logger, ls.channelzID, "ListenSocket created")
880

881
	var tempDelay time.Duration // how long to sleep on accept failure
882
	for {
883
		rawConn, err := lis.Accept()
884
		if err != nil {
885
			if ne, ok := err.(interface {
886
				Temporary() bool
887
			}); ok && ne.Temporary() {
888
				if tempDelay == 0 {
889
					tempDelay = 5 * time.Millisecond
890
				} else {
891
					tempDelay *= 2
892
				}
893
				if max := 1 * time.Second; tempDelay > max {
894
					tempDelay = max
895
				}
896
				s.mu.Lock()
897
				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
898
				s.mu.Unlock()
899
				timer := time.NewTimer(tempDelay)
900
				select {
901
				case <-timer.C:
902
				case <-s.quit.Done():
903
					timer.Stop()
904
					return nil
905
				}
906
				continue
907
			}
908
			s.mu.Lock()
909
			s.printf("done serving; Accept = %v", err)
910
			s.mu.Unlock()
911

912
			if s.quit.HasFired() {
913
				return nil
914
			}
915
			return err
916
		}
917
		tempDelay = 0
918
		// Start a new goroutine to deal with rawConn so we don't stall this Accept
919
		// loop goroutine.
920
		//
921
		// Make sure we account for the goroutine so GracefulStop doesn't nil out
922
		// s.conns before this conn can be added.
923
		s.serveWG.Add(1)
924
		go func() {
925
			s.handleRawConn(lis.Addr().String(), rawConn)
926
			s.serveWG.Done()
927
		}()
928
	}
929
}
930

931
// handleRawConn forks a goroutine to handle a just-accepted connection that
932
// has not had any I/O performed on it yet.
933
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
934
	if s.quit.HasFired() {
935
		rawConn.Close()
936
		return
937
	}
938
	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
939

940
	// Finish handshaking (HTTP2)
941
	st := s.newHTTP2Transport(rawConn)
942
	rawConn.SetDeadline(time.Time{})
943
	if st == nil {
944
		return
945
	}
946

947
	if cc, ok := rawConn.(interface {
948
		PassServerTransport(transport.ServerTransport)
949
	}); ok {
950
		cc.PassServerTransport(st)
951
	}
952

953
	if !s.addConn(lisAddr, st) {
954
		return
955
	}
956
	go func() {
957
		s.serveStreams(context.Background(), st, rawConn)
958
		s.removeConn(lisAddr, st)
959
	}()
960
}
961

962
// newHTTP2Transport sets up a http/2 transport (using the
963
// gRPC http2 server transport in transport/http2_server.go).
964
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
965
	config := &transport.ServerConfig{
966
		MaxStreams:            s.opts.maxConcurrentStreams,
967
		ConnectionTimeout:     s.opts.connectionTimeout,
968
		Credentials:           s.opts.creds,
969
		InTapHandle:           s.opts.inTapHandle,
970
		StatsHandlers:         s.opts.statsHandlers,
971
		KeepaliveParams:       s.opts.keepaliveParams,
972
		KeepalivePolicy:       s.opts.keepalivePolicy,
973
		InitialWindowSize:     s.opts.initialWindowSize,
974
		InitialConnWindowSize: s.opts.initialConnWindowSize,
975
		WriteBufferSize:       s.opts.writeBufferSize,
976
		ReadBufferSize:        s.opts.readBufferSize,
977
		SharedWriteBuffer:     s.opts.sharedWriteBuffer,
978
		ChannelzParentID:      s.channelzID,
979
		MaxHeaderListSize:     s.opts.maxHeaderListSize,
980
		HeaderTableSize:       s.opts.headerTableSize,
981
	}
982
	st, err := transport.NewServerTransport(c, config)
983
	if err != nil {
984
		s.mu.Lock()
985
		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
986
		s.mu.Unlock()
987
		// ErrConnDispatched means that the connection was dispatched away from
988
		// gRPC; those connections should be left open.
989
		if err != credentials.ErrConnDispatched {
990
			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
991
			if err != io.EOF {
992
				channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
993
			}
994
			c.Close()
995
		}
996
		return nil
997
	}
998

999
	return st
1000
}
1001

1002
func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
1003
	ctx = transport.SetConnection(ctx, rawConn)
1004
	ctx = peer.NewContext(ctx, st.Peer())
1005
	for _, sh := range s.opts.statsHandlers {
1006
		ctx = sh.TagConn(ctx, &stats.ConnTagInfo{
1007
			RemoteAddr: st.Peer().Addr,
1008
			LocalAddr:  st.Peer().LocalAddr,
1009
		})
1010
		sh.HandleConn(ctx, &stats.ConnBegin{})
1011
	}
1012

1013
	defer func() {
1014
		st.Close(errors.New("finished serving streams for the server transport"))
1015
		for _, sh := range s.opts.statsHandlers {
1016
			sh.HandleConn(ctx, &stats.ConnEnd{})
1017
		}
1018
	}()
1019

1020
	streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
1021
	st.HandleStreams(ctx, func(stream *transport.Stream) {
1022
		s.handlersWG.Add(1)
1023
		streamQuota.acquire()
1024
		f := func() {
1025
			defer streamQuota.release()
1026
			defer s.handlersWG.Done()
1027
			s.handleStream(st, stream)
1028
		}
1029

1030
		if s.opts.numServerWorkers > 0 {
1031
			select {
1032
			case s.serverWorkerChannel <- f:
1033
				return
1034
			default:
1035
				// If all stream workers are busy, fallback to the default code path.
1036
			}
1037
		}
1038
		go f()
1039
	})
1040
}
1041

1042
var _ http.Handler = (*Server)(nil)
1043

1044
// ServeHTTP implements the Go standard library's http.Handler
1045
// interface by responding to the gRPC request r, by looking up
1046
// the requested gRPC method in the gRPC server s.
1047
//
1048
// The provided HTTP request must have arrived on an HTTP/2
1049
// connection. When using the Go standard library's server,
1050
// practically this means that the Request must also have arrived
1051
// over TLS.
1052
//
1053
// To share one port (such as 443 for https) between gRPC and an
1054
// existing http.Handler, use a root http.Handler such as:
1055
//
1056
//	if r.ProtoMajor == 2 && strings.HasPrefix(
1057
//		r.Header.Get("Content-Type"), "application/grpc") {
1058
//		grpcServer.ServeHTTP(w, r)
1059
//	} else {
1060
//		yourMux.ServeHTTP(w, r)
1061
//	}
1062
//
1063
// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
1064
// separate from grpc-go's HTTP/2 server. Performance and features may vary
1065
// between the two paths. ServeHTTP does not support some gRPC features
1066
// available through grpc-go's HTTP/2 server.
1067
//
1068
// # Experimental
1069
//
1070
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1071
// later release.
1072
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1073
	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
1074
	if err != nil {
1075
		// Errors returned from transport.NewServerHandlerTransport have
1076
		// already been written to w.
1077
		return
1078
	}
1079
	if !s.addConn(listenerAddressForServeHTTP, st) {
1080
		return
1081
	}
1082
	defer s.removeConn(listenerAddressForServeHTTP, st)
1083
	s.serveStreams(r.Context(), st, nil)
1084
}
1085

1086
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
1087
	s.mu.Lock()
1088
	defer s.mu.Unlock()
1089
	if s.conns == nil {
1090
		st.Close(errors.New("Server.addConn called when server has already been stopped"))
1091
		return false
1092
	}
1093
	if s.drain {
1094
		// Transport added after we drained our existing conns: drain it
1095
		// immediately.
1096
		st.Drain("")
1097
	}
1098

1099
	if s.conns[addr] == nil {
1100
		// Create a map entry if this is the first connection on this listener.
1101
		s.conns[addr] = make(map[transport.ServerTransport]bool)
1102
	}
1103
	s.conns[addr][st] = true
1104
	return true
1105
}
1106

1107
func (s *Server) removeConn(addr string, st transport.ServerTransport) {
1108
	s.mu.Lock()
1109
	defer s.mu.Unlock()
1110

1111
	conns := s.conns[addr]
1112
	if conns != nil {
1113
		delete(conns, st)
1114
		if len(conns) == 0 {
1115
			// If the last connection for this address is being removed, also
1116
			// remove the map entry corresponding to the address. This is used
1117
			// in GracefulStop() when waiting for all connections to be closed.
1118
			delete(s.conns, addr)
1119
		}
1120
		s.cv.Broadcast()
1121
	}
1122
}
1123

1124
func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
1125
	return &channelz.ServerInternalMetric{
1126
		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
1127
		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
1128
		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
1129
		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
1130
	}
1131
}
1132

1133
func (s *Server) incrCallsStarted() {
1134
	atomic.AddInt64(&s.czData.callsStarted, 1)
1135
	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
1136
}
1137

1138
func (s *Server) incrCallsSucceeded() {
1139
	atomic.AddInt64(&s.czData.callsSucceeded, 1)
1140
}
1141

1142
func (s *Server) incrCallsFailed() {
1143
	atomic.AddInt64(&s.czData.callsFailed, 1)
1144
}
1145

1146
func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
1147
	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1148
	if err != nil {
1149
		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
1150
		return err
1151
	}
1152
	compData, err := compress(data, cp, comp)
1153
	if err != nil {
1154
		channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
1155
		return err
1156
	}
1157
	hdr, payload := msgHeader(data, compData)
1158
	// TODO(dfawley): should we be checking len(data) instead?
1159
	if len(payload) > s.opts.maxSendMessageSize {
1160
		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
1161
	}
1162
	err = t.Write(stream, hdr, payload, opts)
1163
	if err == nil {
1164
		for _, sh := range s.opts.statsHandlers {
1165
			sh.HandleRPC(ctx, outPayload(false, msg, data, payload, time.Now()))
1166
		}
1167
	}
1168
	return err
1169
}
1170

1171
// chainUnaryServerInterceptors chains all unary server interceptors into one.
1172
func chainUnaryServerInterceptors(s *Server) {
1173
	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1174
	// be executed before any other chained interceptors.
1175
	interceptors := s.opts.chainUnaryInts
1176
	if s.opts.unaryInt != nil {
1177
		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
1178
	}
1179

1180
	var chainedInt UnaryServerInterceptor
1181
	if len(interceptors) == 0 {
1182
		chainedInt = nil
1183
	} else if len(interceptors) == 1 {
1184
		chainedInt = interceptors[0]
1185
	} else {
1186
		chainedInt = chainUnaryInterceptors(interceptors)
1187
	}
1188

1189
	s.opts.unaryInt = chainedInt
1190
}
1191

1192
func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
1193
	return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {
1194
		return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
1195
	}
1196
}
1197

1198
func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1199
	if curr == len(interceptors)-1 {
1200
		return finalHandler
1201
	}
1202
	return func(ctx context.Context, req any) (any, error) {
1203
		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
1204
	}
1205
}
1206

1207
func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
1208
	shs := s.opts.statsHandlers
1209
	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
1210
		if channelz.IsOn() {
1211
			s.incrCallsStarted()
1212
		}
1213
		var statsBegin *stats.Begin
1214
		for _, sh := range shs {
1215
			beginTime := time.Now()
1216
			statsBegin = &stats.Begin{
1217
				BeginTime:      beginTime,
1218
				IsClientStream: false,
1219
				IsServerStream: false,
1220
			}
1221
			sh.HandleRPC(ctx, statsBegin)
1222
		}
1223
		if trInfo != nil {
1224
			trInfo.tr.LazyLog(&trInfo.firstLine, false)
1225
		}
1226
		// The deferred error handling for tracing, stats handler and channelz are
1227
		// combined into one function to reduce stack usage -- a defer takes ~56-64
1228
		// bytes on the stack, so overflowing the stack will require a stack
1229
		// re-allocation, which is expensive.
1230
		//
1231
		// To maintain behavior similar to separate deferred statements, statements
1232
		// should be executed in the reverse order. That is, tracing first, stats
1233
		// handler second, and channelz last. Note that panics *within* defers will
1234
		// lead to different behavior, but that's an acceptable compromise; that
1235
		// would be undefined behavior territory anyway.
1236
		defer func() {
1237
			if trInfo != nil {
1238
				if err != nil && err != io.EOF {
1239
					trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1240
					trInfo.tr.SetError()
1241
				}
1242
				trInfo.tr.Finish()
1243
			}
1244

1245
			for _, sh := range shs {
1246
				end := &stats.End{
1247
					BeginTime: statsBegin.BeginTime,
1248
					EndTime:   time.Now(),
1249
				}
1250
				if err != nil && err != io.EOF {
1251
					end.Error = toRPCErr(err)
1252
				}
1253
				sh.HandleRPC(ctx, end)
1254
			}
1255

1256
			if channelz.IsOn() {
1257
				if err != nil && err != io.EOF {
1258
					s.incrCallsFailed()
1259
				} else {
1260
					s.incrCallsSucceeded()
1261
				}
1262
			}
1263
		}()
1264
	}
1265
	var binlogs []binarylog.MethodLogger
1266
	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1267
		binlogs = append(binlogs, ml)
1268
	}
1269
	if s.opts.binaryLogger != nil {
1270
		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1271
			binlogs = append(binlogs, ml)
1272
		}
1273
	}
1274
	if len(binlogs) != 0 {
1275
		md, _ := metadata.FromIncomingContext(ctx)
1276
		logEntry := &binarylog.ClientHeader{
1277
			Header:     md,
1278
			MethodName: stream.Method(),
1279
			PeerAddr:   nil,
1280
		}
1281
		if deadline, ok := ctx.Deadline(); ok {
1282
			logEntry.Timeout = time.Until(deadline)
1283
			if logEntry.Timeout < 0 {
1284
				logEntry.Timeout = 0
1285
			}
1286
		}
1287
		if a := md[":authority"]; len(a) > 0 {
1288
			logEntry.Authority = a[0]
1289
		}
1290
		if peer, ok := peer.FromContext(ctx); ok {
1291
			logEntry.PeerAddr = peer.Addr
1292
		}
1293
		for _, binlog := range binlogs {
1294
			binlog.Log(ctx, logEntry)
1295
		}
1296
	}
1297

1298
	// comp and cp are used for compression.  decomp and dc are used for
1299
	// decompression.  If comp and decomp are both set, they are the same;
1300
	// however they are kept separate to ensure that at most one of the
1301
	// compressor/decompressor variable pairs are set for use later.
1302
	var comp, decomp encoding.Compressor
1303
	var cp Compressor
1304
	var dc Decompressor
1305
	var sendCompressorName string
1306

1307
	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1308
	// to find a matching registered compressor for decomp.
1309
	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1310
		dc = s.opts.dc
1311
	} else if rc != "" && rc != encoding.Identity {
1312
		decomp = encoding.GetCompressor(rc)
1313
		if decomp == nil {
1314
			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1315
			t.WriteStatus(stream, st)
1316
			return st.Err()
1317
		}
1318
	}
1319

1320
	// If cp is set, use it.  Otherwise, attempt to compress the response using
1321
	// the incoming message compression method.
1322
	//
1323
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1324
	if s.opts.cp != nil {
1325
		cp = s.opts.cp
1326
		sendCompressorName = cp.Type()
1327
	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1328
		// Legacy compressor not specified; attempt to respond with same encoding.
1329
		comp = encoding.GetCompressor(rc)
1330
		if comp != nil {
1331
			sendCompressorName = comp.Name()
1332
		}
1333
	}
1334

1335
	if sendCompressorName != "" {
1336
		if err := stream.SetSendCompress(sendCompressorName); err != nil {
1337
			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
1338
		}
1339
	}
1340

1341
	var payInfo *payloadInfo
1342
	if len(shs) != 0 || len(binlogs) != 0 {
1343
		payInfo = &payloadInfo{}
1344
	}
1345
	d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1346
	if err != nil {
1347
		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
1348
			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1349
		}
1350
		return err
1351
	}
1352
	if channelz.IsOn() {
1353
		t.IncrMsgRecv()
1354
	}
1355
	df := func(v any) error {
1356
		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1357
			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1358
		}
1359
		for _, sh := range shs {
1360
			sh.HandleRPC(ctx, &stats.InPayload{
1361
				RecvTime:         time.Now(),
1362
				Payload:          v,
1363
				Length:           len(d),
1364
				WireLength:       payInfo.compressedLength + headerLen,
1365
				CompressedLength: payInfo.compressedLength,
1366
				Data:             d,
1367
			})
1368
		}
1369
		if len(binlogs) != 0 {
1370
			cm := &binarylog.ClientMessage{
1371
				Message: d,
1372
			}
1373
			for _, binlog := range binlogs {
1374
				binlog.Log(ctx, cm)
1375
			}
1376
		}
1377
		if trInfo != nil {
1378
			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1379
		}
1380
		return nil
1381
	}
1382
	ctx = NewContextWithServerTransportStream(ctx, stream)
1383
	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
1384
	if appErr != nil {
1385
		appStatus, ok := status.FromError(appErr)
1386
		if !ok {
1387
			// Convert non-status application error to a status error with code
1388
			// Unknown, but handle context errors specifically.
1389
			appStatus = status.FromContextError(appErr)
1390
			appErr = appStatus.Err()
1391
		}
1392
		if trInfo != nil {
1393
			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1394
			trInfo.tr.SetError()
1395
		}
1396
		if e := t.WriteStatus(stream, appStatus); e != nil {
1397
			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1398
		}
1399
		if len(binlogs) != 0 {
1400
			if h, _ := stream.Header(); h.Len() > 0 {
1401
				// Only log serverHeader if there was header. Otherwise it can
1402
				// be trailer only.
1403
				sh := &binarylog.ServerHeader{
1404
					Header: h,
1405
				}
1406
				for _, binlog := range binlogs {
1407
					binlog.Log(ctx, sh)
1408
				}
1409
			}
1410
			st := &binarylog.ServerTrailer{
1411
				Trailer: stream.Trailer(),
1412
				Err:     appErr,
1413
			}
1414
			for _, binlog := range binlogs {
1415
				binlog.Log(ctx, st)
1416
			}
1417
		}
1418
		return appErr
1419
	}
1420
	if trInfo != nil {
1421
		trInfo.tr.LazyLog(stringer("OK"), false)
1422
	}
1423
	opts := &transport.Options{Last: true}
1424

1425
	// Server handler could have set new compressor by calling SetSendCompressor.
1426
	// In case it is set, we need to use it for compressing outbound message.
1427
	if stream.SendCompress() != sendCompressorName {
1428
		comp = encoding.GetCompressor(stream.SendCompress())
1429
	}
1430
	if err := s.sendResponse(ctx, t, stream, reply, cp, opts, comp); err != nil {
1431
		if err == io.EOF {
1432
			// The entire stream is done (for unary RPC only).
1433
			return err
1434
		}
1435
		if sts, ok := status.FromError(err); ok {
1436
			if e := t.WriteStatus(stream, sts); e != nil {
1437
				channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1438
			}
1439
		} else {
1440
			switch st := err.(type) {
1441
			case transport.ConnectionError:
1442
				// Nothing to do here.
1443
			default:
1444
				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1445
			}
1446
		}
1447
		if len(binlogs) != 0 {
1448
			h, _ := stream.Header()
1449
			sh := &binarylog.ServerHeader{
1450
				Header: h,
1451
			}
1452
			st := &binarylog.ServerTrailer{
1453
				Trailer: stream.Trailer(),
1454
				Err:     appErr,
1455
			}
1456
			for _, binlog := range binlogs {
1457
				binlog.Log(ctx, sh)
1458
				binlog.Log(ctx, st)
1459
			}
1460
		}
1461
		return err
1462
	}
1463
	if len(binlogs) != 0 {
1464
		h, _ := stream.Header()
1465
		sh := &binarylog.ServerHeader{
1466
			Header: h,
1467
		}
1468
		sm := &binarylog.ServerMessage{
1469
			Message: reply,
1470
		}
1471
		for _, binlog := range binlogs {
1472
			binlog.Log(ctx, sh)
1473
			binlog.Log(ctx, sm)
1474
		}
1475
	}
1476
	if channelz.IsOn() {
1477
		t.IncrMsgSent()
1478
	}
1479
	if trInfo != nil {
1480
		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1481
	}
1482
	// TODO: Should we be logging if writing status failed here, like above?
1483
	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1484
	// error or allow the stats handler to see it?
1485
	if len(binlogs) != 0 {
1486
		st := &binarylog.ServerTrailer{
1487
			Trailer: stream.Trailer(),
1488
			Err:     appErr,
1489
		}
1490
		for _, binlog := range binlogs {
1491
			binlog.Log(ctx, st)
1492
		}
1493
	}
1494
	return t.WriteStatus(stream, statusOK)
1495
}
1496

1497
// chainStreamServerInterceptors chains all stream server interceptors into one.
1498
func chainStreamServerInterceptors(s *Server) {
1499
	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1500
	// be executed before any other chained interceptors.
1501
	interceptors := s.opts.chainStreamInts
1502
	if s.opts.streamInt != nil {
1503
		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1504
	}
1505

1506
	var chainedInt StreamServerInterceptor
1507
	if len(interceptors) == 0 {
1508
		chainedInt = nil
1509
	} else if len(interceptors) == 1 {
1510
		chainedInt = interceptors[0]
1511
	} else {
1512
		chainedInt = chainStreamInterceptors(interceptors)
1513
	}
1514

1515
	s.opts.streamInt = chainedInt
1516
}
1517

1518
func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
1519
	return func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1520
		return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1521
	}
1522
}
1523

1524
func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1525
	if curr == len(interceptors)-1 {
1526
		return finalHandler
1527
	}
1528
	return func(srv any, stream ServerStream) error {
1529
		return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1530
	}
1531
}
1532

1533
func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
1534
	if channelz.IsOn() {
1535
		s.incrCallsStarted()
1536
	}
1537
	shs := s.opts.statsHandlers
1538
	var statsBegin *stats.Begin
1539
	if len(shs) != 0 {
1540
		beginTime := time.Now()
1541
		statsBegin = &stats.Begin{
1542
			BeginTime:      beginTime,
1543
			IsClientStream: sd.ClientStreams,
1544
			IsServerStream: sd.ServerStreams,
1545
		}
1546
		for _, sh := range shs {
1547
			sh.HandleRPC(ctx, statsBegin)
1548
		}
1549
	}
1550
	ctx = NewContextWithServerTransportStream(ctx, stream)
1551
	ss := &serverStream{
1552
		ctx:                   ctx,
1553
		t:                     t,
1554
		s:                     stream,
1555
		p:                     &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},
1556
		codec:                 s.getCodec(stream.ContentSubtype()),
1557
		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1558
		maxSendMessageSize:    s.opts.maxSendMessageSize,
1559
		trInfo:                trInfo,
1560
		statsHandler:          shs,
1561
	}
1562

1563
	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
1564
		// See comment in processUnaryRPC on defers.
1565
		defer func() {
1566
			if trInfo != nil {
1567
				ss.mu.Lock()
1568
				if err != nil && err != io.EOF {
1569
					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1570
					ss.trInfo.tr.SetError()
1571
				}
1572
				ss.trInfo.tr.Finish()
1573
				ss.trInfo.tr = nil
1574
				ss.mu.Unlock()
1575
			}
1576

1577
			if len(shs) != 0 {
1578
				end := &stats.End{
1579
					BeginTime: statsBegin.BeginTime,
1580
					EndTime:   time.Now(),
1581
				}
1582
				if err != nil && err != io.EOF {
1583
					end.Error = toRPCErr(err)
1584
				}
1585
				for _, sh := range shs {
1586
					sh.HandleRPC(ctx, end)
1587
				}
1588
			}
1589

1590
			if channelz.IsOn() {
1591
				if err != nil && err != io.EOF {
1592
					s.incrCallsFailed()
1593
				} else {
1594
					s.incrCallsSucceeded()
1595
				}
1596
			}
1597
		}()
1598
	}
1599

1600
	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1601
		ss.binlogs = append(ss.binlogs, ml)
1602
	}
1603
	if s.opts.binaryLogger != nil {
1604
		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1605
			ss.binlogs = append(ss.binlogs, ml)
1606
		}
1607
	}
1608
	if len(ss.binlogs) != 0 {
1609
		md, _ := metadata.FromIncomingContext(ctx)
1610
		logEntry := &binarylog.ClientHeader{
1611
			Header:     md,
1612
			MethodName: stream.Method(),
1613
			PeerAddr:   nil,
1614
		}
1615
		if deadline, ok := ctx.Deadline(); ok {
1616
			logEntry.Timeout = time.Until(deadline)
1617
			if logEntry.Timeout < 0 {
1618
				logEntry.Timeout = 0
1619
			}
1620
		}
1621
		if a := md[":authority"]; len(a) > 0 {
1622
			logEntry.Authority = a[0]
1623
		}
1624
		if peer, ok := peer.FromContext(ss.Context()); ok {
1625
			logEntry.PeerAddr = peer.Addr
1626
		}
1627
		for _, binlog := range ss.binlogs {
1628
			binlog.Log(ctx, logEntry)
1629
		}
1630
	}
1631

1632
	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1633
	// to find a matching registered compressor for decomp.
1634
	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1635
		ss.dc = s.opts.dc
1636
	} else if rc != "" && rc != encoding.Identity {
1637
		ss.decomp = encoding.GetCompressor(rc)
1638
		if ss.decomp == nil {
1639
			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1640
			t.WriteStatus(ss.s, st)
1641
			return st.Err()
1642
		}
1643
	}
1644

1645
	// If cp is set, use it.  Otherwise, attempt to compress the response using
1646
	// the incoming message compression method.
1647
	//
1648
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1649
	if s.opts.cp != nil {
1650
		ss.cp = s.opts.cp
1651
		ss.sendCompressorName = s.opts.cp.Type()
1652
	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1653
		// Legacy compressor not specified; attempt to respond with same encoding.
1654
		ss.comp = encoding.GetCompressor(rc)
1655
		if ss.comp != nil {
1656
			ss.sendCompressorName = rc
1657
		}
1658
	}
1659

1660
	if ss.sendCompressorName != "" {
1661
		if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
1662
			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
1663
		}
1664
	}
1665

1666
	ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
1667

1668
	if trInfo != nil {
1669
		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1670
	}
1671
	var appErr error
1672
	var server any
1673
	if info != nil {
1674
		server = info.serviceImpl
1675
	}
1676
	if s.opts.streamInt == nil {
1677
		appErr = sd.Handler(server, ss)
1678
	} else {
1679
		info := &StreamServerInfo{
1680
			FullMethod:     stream.Method(),
1681
			IsClientStream: sd.ClientStreams,
1682
			IsServerStream: sd.ServerStreams,
1683
		}
1684
		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1685
	}
1686
	if appErr != nil {
1687
		appStatus, ok := status.FromError(appErr)
1688
		if !ok {
1689
			// Convert non-status application error to a status error with code
1690
			// Unknown, but handle context errors specifically.
1691
			appStatus = status.FromContextError(appErr)
1692
			appErr = appStatus.Err()
1693
		}
1694
		if trInfo != nil {
1695
			ss.mu.Lock()
1696
			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1697
			ss.trInfo.tr.SetError()
1698
			ss.mu.Unlock()
1699
		}
1700
		if len(ss.binlogs) != 0 {
1701
			st := &binarylog.ServerTrailer{
1702
				Trailer: ss.s.Trailer(),
1703
				Err:     appErr,
1704
			}
1705
			for _, binlog := range ss.binlogs {
1706
				binlog.Log(ctx, st)
1707
			}
1708
		}
1709
		t.WriteStatus(ss.s, appStatus)
1710
		// TODO: Should we log an error from WriteStatus here and below?
1711
		return appErr
1712
	}
1713
	if trInfo != nil {
1714
		ss.mu.Lock()
1715
		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1716
		ss.mu.Unlock()
1717
	}
1718
	if len(ss.binlogs) != 0 {
1719
		st := &binarylog.ServerTrailer{
1720
			Trailer: ss.s.Trailer(),
1721
			Err:     appErr,
1722
		}
1723
		for _, binlog := range ss.binlogs {
1724
			binlog.Log(ctx, st)
1725
		}
1726
	}
1727
	return t.WriteStatus(ss.s, statusOK)
1728
}
1729

1730
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
1731
	ctx := stream.Context()
1732
	ctx = contextWithServer(ctx, s)
1733
	var ti *traceInfo
1734
	if EnableTracing {
1735
		tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
1736
		ctx = newTraceContext(ctx, tr)
1737
		ti = &traceInfo{
1738
			tr: tr,
1739
			firstLine: firstLine{
1740
				client:     false,
1741
				remoteAddr: t.Peer().Addr,
1742
			},
1743
		}
1744
		if dl, ok := ctx.Deadline(); ok {
1745
			ti.firstLine.deadline = time.Until(dl)
1746
		}
1747
	}
1748

1749
	sm := stream.Method()
1750
	if sm != "" && sm[0] == '/' {
1751
		sm = sm[1:]
1752
	}
1753
	pos := strings.LastIndex(sm, "/")
1754
	if pos == -1 {
1755
		if ti != nil {
1756
			ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
1757
			ti.tr.SetError()
1758
		}
1759
		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1760
		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1761
			if ti != nil {
1762
				ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1763
				ti.tr.SetError()
1764
			}
1765
			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1766
		}
1767
		if ti != nil {
1768
			ti.tr.Finish()
1769
		}
1770
		return
1771
	}
1772
	service := sm[:pos]
1773
	method := sm[pos+1:]
1774

1775
	md, _ := metadata.FromIncomingContext(ctx)
1776
	for _, sh := range s.opts.statsHandlers {
1777
		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
1778
		sh.HandleRPC(ctx, &stats.InHeader{
1779
			FullMethod:  stream.Method(),
1780
			RemoteAddr:  t.Peer().Addr,
1781
			LocalAddr:   t.Peer().LocalAddr,
1782
			Compression: stream.RecvCompress(),
1783
			WireLength:  stream.HeaderWireLength(),
1784
			Header:      md,
1785
		})
1786
	}
1787
	// To have calls in stream callouts work. Will delete once all stats handler
1788
	// calls come from the gRPC layer.
1789
	stream.SetContext(ctx)
1790

1791
	srv, knownService := s.services[service]
1792
	if knownService {
1793
		if md, ok := srv.methods[method]; ok {
1794
			s.processUnaryRPC(ctx, t, stream, srv, md, ti)
1795
			return
1796
		}
1797
		if sd, ok := srv.streams[method]; ok {
1798
			s.processStreamingRPC(ctx, t, stream, srv, sd, ti)
1799
			return
1800
		}
1801
	}
1802
	// Unknown service, or known server unknown method.
1803
	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1804
		s.processStreamingRPC(ctx, t, stream, nil, unknownDesc, ti)
1805
		return
1806
	}
1807
	var errDesc string
1808
	if !knownService {
1809
		errDesc = fmt.Sprintf("unknown service %v", service)
1810
	} else {
1811
		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1812
	}
1813
	if ti != nil {
1814
		ti.tr.LazyPrintf("%s", errDesc)
1815
		ti.tr.SetError()
1816
	}
1817
	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1818
		if ti != nil {
1819
			ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1820
			ti.tr.SetError()
1821
		}
1822
		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1823
	}
1824
	if ti != nil {
1825
		ti.tr.Finish()
1826
	}
1827
}
1828

1829
// The key to save ServerTransportStream in the context.
1830
type streamKey struct{}
1831

1832
// NewContextWithServerTransportStream creates a new context from ctx and
1833
// attaches stream to it.
1834
//
1835
// # Experimental
1836
//
1837
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1838
// later release.
1839
func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1840
	return context.WithValue(ctx, streamKey{}, stream)
1841
}
1842

1843
// ServerTransportStream is a minimal interface that a transport stream must
1844
// implement. This can be used to mock an actual transport stream for tests of
1845
// handler code that use, for example, grpc.SetHeader (which requires some
1846
// stream to be in context).
1847
//
1848
// See also NewContextWithServerTransportStream.
1849
//
1850
// # Experimental
1851
//
1852
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
1853
// later release.
1854
type ServerTransportStream interface {
1855
	Method() string
1856
	SetHeader(md metadata.MD) error
1857
	SendHeader(md metadata.MD) error
1858
	SetTrailer(md metadata.MD) error
1859
}
1860

1861
// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1862
// ctx. Returns nil if the given context has no stream associated with it
1863
// (which implies it is not an RPC invocation context).
1864
//
1865
// # Experimental
1866
//
1867
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1868
// later release.
1869
func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1870
	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1871
	return s
1872
}
1873

1874
// Stop stops the gRPC server. It immediately closes all open
1875
// connections and listeners.
1876
// It cancels all active RPCs on the server side and the corresponding
1877
// pending RPCs on the client side will get notified by connection
1878
// errors.
1879
func (s *Server) Stop() {
1880
	s.stop(false)
1881
}
1882

1883
// GracefulStop stops the gRPC server gracefully. It stops the server from
1884
// accepting new connections and RPCs and blocks until all the pending RPCs are
1885
// finished.
1886
func (s *Server) GracefulStop() {
1887
	s.stop(true)
1888
}
1889

1890
func (s *Server) stop(graceful bool) {
1891
	s.quit.Fire()
1892
	defer s.done.Fire()
1893

1894
	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
1895

1896
	s.mu.Lock()
1897
	s.closeListenersLocked()
1898
	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1899
	// new conns will be created.
1900
	s.mu.Unlock()
1901
	s.serveWG.Wait()
1902

1903
	s.mu.Lock()
1904
	defer s.mu.Unlock()
1905

1906
	if graceful {
1907
		s.drainAllServerTransportsLocked()
1908
	} else {
1909
		s.closeServerTransportsLocked()
1910
	}
1911

1912
	for len(s.conns) != 0 {
1913
		s.cv.Wait()
1914
	}
1915
	s.conns = nil
1916

1917
	if s.opts.numServerWorkers > 0 {
1918
		// Closing the channel (only once, via grpcsync.OnceFunc) after all the
1919
		// connections have been closed above ensures that there are no
1920
		// goroutines executing the callback passed to st.HandleStreams (where
1921
		// the channel is written to).
1922
		s.serverWorkerChannelClose()
1923
	}
1924

1925
	if graceful || s.opts.waitForHandlers {
1926
		s.handlersWG.Wait()
1927
	}
1928

1929
	if s.events != nil {
1930
		s.events.Finish()
1931
		s.events = nil
1932
	}
1933
}
1934

1935
// s.mu must be held by the caller.
1936
func (s *Server) closeServerTransportsLocked() {
1937
	for _, conns := range s.conns {
1938
		for st := range conns {
1939
			st.Close(errors.New("Server.Stop called"))
1940
		}
1941
	}
1942
}
1943

1944
// s.mu must be held by the caller.
1945
func (s *Server) drainAllServerTransportsLocked() {
1946
	if !s.drain {
1947
		for _, conns := range s.conns {
1948
			for st := range conns {
1949
				st.Drain("graceful_stop")
1950
			}
1951
		}
1952
		s.drain = true
1953
	}
1954
}
1955

1956
// s.mu must be held by the caller.
1957
func (s *Server) closeListenersLocked() {
1958
	for lis := range s.lis {
1959
		lis.Close()
1960
	}
1961
	s.lis = nil
1962
}
1963

1964
// contentSubtype must be lowercase
1965
// cannot return nil
1966
func (s *Server) getCodec(contentSubtype string) baseCodec {
1967
	if s.opts.codec != nil {
1968
		return s.opts.codec
1969
	}
1970
	if contentSubtype == "" {
1971
		return encoding.GetCodec(proto.Name)
1972
	}
1973
	codec := encoding.GetCodec(contentSubtype)
1974
	if codec == nil {
1975
		logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", contentSubtype, proto.Name)
1976
		return encoding.GetCodec(proto.Name)
1977
	}
1978
	return codec
1979
}
1980

1981
type serverKey struct{}
1982

1983
// serverFromContext gets the Server from the context.
1984
func serverFromContext(ctx context.Context) *Server {
1985
	s, _ := ctx.Value(serverKey{}).(*Server)
1986
	return s
1987
}
1988

1989
// contextWithServer sets the Server in the context.
1990
func contextWithServer(ctx context.Context, server *Server) context.Context {
1991
	return context.WithValue(ctx, serverKey{}, server)
1992
}
1993

1994
// isRegisteredMethod returns whether the passed in method is registered as a
1995
// method on the server. /service/method and service/method will match if the
1996
// service and method are registered on the server.
1997
func (s *Server) isRegisteredMethod(serviceMethod string) bool {
1998
	if serviceMethod != "" && serviceMethod[0] == '/' {
1999
		serviceMethod = serviceMethod[1:]
2000
	}
2001
	pos := strings.LastIndex(serviceMethod, "/")
2002
	if pos == -1 { // Invalid method name syntax.
2003
		return false
2004
	}
2005
	service := serviceMethod[:pos]
2006
	method := serviceMethod[pos+1:]
2007
	srv, knownService := s.services[service]
2008
	if knownService {
2009
		if _, ok := srv.methods[method]; ok {
2010
			return true
2011
		}
2012
		if _, ok := srv.streams[method]; ok {
2013
			return true
2014
		}
2015
	}
2016
	return false
2017
}
2018

2019
// SetHeader sets the header metadata to be sent from the server to the client.
2020
// The context provided must be the context passed to the server's handler.
2021
//
2022
// Streaming RPCs should prefer the SetHeader method of the ServerStream.
2023
//
2024
// When called multiple times, all the provided metadata will be merged.  All
2025
// the metadata will be sent out when one of the following happens:
2026
//
2027
//   - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
2028
//   - The first response message is sent.  For unary handlers, this occurs when
2029
//     the handler returns; for streaming handlers, this can happen when stream's
2030
//     SendMsg method is called.
2031
//   - An RPC status is sent out (error or success).  This occurs when the handler
2032
//     returns.
2033
//
2034
// SetHeader will fail if called after any of the events above.
2035
//
2036
// The error returned is compatible with the status package.  However, the
2037
// status code will often not match the RPC status as seen by the client
2038
// application, and therefore, should not be relied upon for this purpose.
2039
func SetHeader(ctx context.Context, md metadata.MD) error {
2040
	if md.Len() == 0 {
2041
		return nil
2042
	}
2043
	stream := ServerTransportStreamFromContext(ctx)
2044
	if stream == nil {
2045
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2046
	}
2047
	return stream.SetHeader(md)
2048
}
2049

2050
// SendHeader sends header metadata. It may be called at most once, and may not
2051
// be called after any event that causes headers to be sent (see SetHeader for
2052
// a complete list).  The provided md and headers set by SetHeader() will be
2053
// sent.
2054
//
2055
// The error returned is compatible with the status package.  However, the
2056
// status code will often not match the RPC status as seen by the client
2057
// application, and therefore, should not be relied upon for this purpose.
2058
func SendHeader(ctx context.Context, md metadata.MD) error {
2059
	stream := ServerTransportStreamFromContext(ctx)
2060
	if stream == nil {
2061
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2062
	}
2063
	if err := stream.SendHeader(md); err != nil {
2064
		return toRPCErr(err)
2065
	}
2066
	return nil
2067
}
2068

2069
// SetSendCompressor sets a compressor for outbound messages from the server.
2070
// It must not be called after any event that causes headers to be sent
2071
// (see ServerStream.SetHeader for the complete list). Provided compressor is
2072
// used when below conditions are met:
2073
//
2074
//   - compressor is registered via encoding.RegisterCompressor
2075
//   - compressor name must exist in the client advertised compressor names
2076
//     sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
2077
//     get client supported compressor names.
2078
//
2079
// The context provided must be the context passed to the server's handler.
2080
// It must be noted that compressor name encoding.Identity disables the
2081
// outbound compression.
2082
// By default, server messages will be sent using the same compressor with
2083
// which request messages were sent.
2084
//
2085
// It is not safe to call SetSendCompressor concurrently with SendHeader and
2086
// SendMsg.
2087
//
2088
// # Experimental
2089
//
2090
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
2091
// later release.
2092
func SetSendCompressor(ctx context.Context, name string) error {
2093
	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
2094
	if !ok || stream == nil {
2095
		return fmt.Errorf("failed to fetch the stream from the given context")
2096
	}
2097

2098
	if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
2099
		return fmt.Errorf("unable to set send compressor: %w", err)
2100
	}
2101

2102
	return stream.SetSendCompress(name)
2103
}
2104

2105
// ClientSupportedCompressors returns compressor names advertised by the client
2106
// via grpc-accept-encoding header.
2107
//
2108
// The context provided must be the context passed to the server's handler.
2109
//
2110
// # Experimental
2111
//
2112
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
2113
// later release.
2114
func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
2115
	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
2116
	if !ok || stream == nil {
2117
		return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
2118
	}
2119

2120
	return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
2121
}
2122

2123
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
2124
// When called more than once, all the provided metadata will be merged.
2125
//
2126
// The error returned is compatible with the status package.  However, the
2127
// status code will often not match the RPC status as seen by the client
2128
// application, and therefore, should not be relied upon for this purpose.
2129
func SetTrailer(ctx context.Context, md metadata.MD) error {
2130
	if md.Len() == 0 {
2131
		return nil
2132
	}
2133
	stream := ServerTransportStreamFromContext(ctx)
2134
	if stream == nil {
2135
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2136
	}
2137
	return stream.SetTrailer(md)
2138
}
2139

2140
// Method returns the method string for the server context.  The returned
2141
// string is in the format of "/service/method".
2142
func Method(ctx context.Context) (string, bool) {
2143
	s := ServerTransportStreamFromContext(ctx)
2144
	if s == nil {
2145
		return "", false
2146
	}
2147
	return s.Method(), true
2148
}
2149

2150
type channelzServer struct {
2151
	s *Server
2152
}
2153

2154
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
2155
	return c.s.channelzMetric()
2156
}
2157

2158
// validateSendCompressor returns an error when given compressor name cannot be
2159
// handled by the server or the client based on the advertised compressors.
2160
func validateSendCompressor(name, clientCompressors string) error {
2161
	if name == encoding.Identity {
2162
		return nil
2163
	}
2164

2165
	if !grpcutil.IsCompressorNameRegistered(name) {
2166
		return fmt.Errorf("compressor not registered %q", name)
2167
	}
2168

2169
	for _, c := range strings.Split(clientCompressors, ",") {
2170
		if c == name {
2171
			return nil // found match
2172
		}
2173
	}
2174
	return fmt.Errorf("client does not support compressor %q", name)
2175
}
2176

2177
// atomicSemaphore implements a blocking, counting semaphore. acquire should be
2178
// called synchronously; release may be called asynchronously.
2179
type atomicSemaphore struct {
2180
	n    atomic.Int64
2181
	wait chan struct{}
2182
}
2183

2184
func (q *atomicSemaphore) acquire() {
2185
	if q.n.Add(-1) < 0 {
2186
		// We ran out of quota.  Block until a release happens.
2187
		<-q.wait
2188
	}
2189
}
2190

2191
func (q *atomicSemaphore) release() {
2192
	// N.B. the "<= 0" check below should allow for this to work with multiple
2193
	// concurrent calls to acquire, but also note that with synchronous calls to
2194
	// acquire, as our system does, n will never be less than -1.  There are
2195
	// fairness issues (queuing) to consider if this was to be generalized.
2196
	if q.n.Add(1) <= 0 {
2197
		// An acquire was waiting on us.  Unblock it.
2198
		q.wait <- struct{}{}
2199
	}
2200
}
2201

2202
func newHandlerQuota(n uint32) *atomicSemaphore {
2203
	a := &atomicSemaphore{wait: make(chan struct{}, 1)}
2204
	a.n.Store(int64(n))
2205
	return a
2206
}
2207

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.