3
* Copyright 2014 gRPC authors.
5
* Licensed under the Apache License, Version 2.0 (the "License");
6
* you may not use this file except in compliance with the License.
7
* You may obtain a copy of the License at
9
* http://www.apache.org/licenses/LICENSE-2.0
11
* Unless required by applicable law or agreed to in writing, software
12
* distributed under the License is distributed on an "AS IS" BASIS,
13
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
* See the License for the specific language governing permissions and
15
* limitations under the License.
34
"github.com/golang/protobuf/proto"
35
"golang.org/x/net/http2"
36
"golang.org/x/net/http2/hpack"
37
"google.golang.org/grpc/internal/grpcutil"
39
"google.golang.org/grpc/codes"
40
"google.golang.org/grpc/credentials"
41
"google.golang.org/grpc/internal/channelz"
42
"google.golang.org/grpc/internal/grpcrand"
43
"google.golang.org/grpc/keepalive"
44
"google.golang.org/grpc/metadata"
45
"google.golang.org/grpc/peer"
46
"google.golang.org/grpc/stats"
47
"google.golang.org/grpc/status"
48
"google.golang.org/grpc/tap"
52
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
53
// the stream's state.
54
ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
55
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
56
// than the limit set by peer.
57
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
60
// serverConnectionCounter counts the number of connections a server has seen
61
// (equal to the number of http2Servers created). Must be accessed atomically.
62
var serverConnectionCounter uint64
64
// http2Server implements the ServerTransport interface with HTTP2.
65
type http2Server struct {
66
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
71
readerDone chan struct{} // sync point to enable testing.
72
writerDone chan struct{} // sync point to enable testing.
75
maxStreamID uint32 // max stream ID ever seen
76
authInfo credentials.AuthInfo // auth info about the connection
77
inTapHandle tap.ServerInHandle
79
// The max number of concurrent streams.
81
// controlBuf delivers all the control related tasks (e.g., window
82
// updates, reset streams, and various settings) to the controller.
83
controlBuf *controlBuffer
86
// Keepalive and max-age parameters for the server.
87
kp keepalive.ServerParameters
88
// Keepalive enforcement policy.
89
kep keepalive.EnforcementPolicy
90
// The time instance last ping was received.
92
// Number of times the client has violated keepalive ping policy so far.
94
// Flag to signify that number of ping strikes should be reset to 0.
95
// This is set whenever data or header frames are sent.
97
resetPingStrikes uint32 // Accessed atomically.
98
initialWindowSize int32
100
maxSendHeaderListSize *uint32
102
mu sync.Mutex // guard the following
104
// drainChan is initialized when drain(...) is called the first time.
105
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
106
// Then an independent goroutine will be launched to later send the second GoAway.
107
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
108
// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
110
drainChan chan struct{}
112
activeStreams map[uint32]*Stream
113
// idle is the time instant when the connection went idle.
114
// This is either the beginning of the connection or when the number of
115
// RPCs go down to 0.
116
// When the connection is busy, this value is set to 0.
119
// Fields below are for channelz metric collection.
120
channelzID int64 // channelz unique identification number
122
bufferPool *bufferPool
127
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
128
// returned if something goes wrong.
129
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
130
writeBufSize := config.WriteBufferSize
131
readBufSize := config.ReadBufferSize
132
maxHeaderListSize := defaultServerMaxHeaderListSize
133
if config.MaxHeaderListSize != nil {
134
maxHeaderListSize = *config.MaxHeaderListSize
136
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
137
// Send initial settings as connection preface to client.
138
isettings := []http2.Setting{{
139
ID: http2.SettingMaxFrameSize,
140
Val: http2MaxFrameLen,
142
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
143
// permitted in the HTTP2 spec.
144
maxStreams := config.MaxStreams
146
maxStreams = math.MaxUint32
148
isettings = append(isettings, http2.Setting{
149
ID: http2.SettingMaxConcurrentStreams,
153
dynamicWindow := true
154
iwz := int32(initialWindowSize)
155
if config.InitialWindowSize >= defaultWindowSize {
156
iwz = config.InitialWindowSize
157
dynamicWindow = false
159
icwz := int32(initialWindowSize)
160
if config.InitialConnWindowSize >= defaultWindowSize {
161
icwz = config.InitialConnWindowSize
162
dynamicWindow = false
164
if iwz != defaultWindowSize {
165
isettings = append(isettings, http2.Setting{
166
ID: http2.SettingInitialWindowSize,
169
if config.MaxHeaderListSize != nil {
170
isettings = append(isettings, http2.Setting{
171
ID: http2.SettingMaxHeaderListSize,
172
Val: *config.MaxHeaderListSize,
175
if config.HeaderTableSize != nil {
176
isettings = append(isettings, http2.Setting{
177
ID: http2.SettingHeaderTableSize,
178
Val: *config.HeaderTableSize,
181
if err := framer.fr.WriteSettings(isettings...); err != nil {
182
return nil, connectionErrorf(false, err, "transport: %v", err)
184
// Adjust the connection flow control window if needed.
185
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
186
if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
187
return nil, connectionErrorf(false, err, "transport: %v", err)
190
kp := config.KeepaliveParams
191
if kp.MaxConnectionIdle == 0 {
192
kp.MaxConnectionIdle = defaultMaxConnectionIdle
194
if kp.MaxConnectionAge == 0 {
195
kp.MaxConnectionAge = defaultMaxConnectionAge
197
// Add a jitter to MaxConnectionAge.
198
kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
199
if kp.MaxConnectionAgeGrace == 0 {
200
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
203
kp.Time = defaultServerKeepaliveTime
206
kp.Timeout = defaultServerKeepaliveTimeout
208
kep := config.KeepalivePolicy
209
if kep.MinTime == 0 {
210
kep.MinTime = defaultKeepalivePolicyMinTime
212
done := make(chan struct{})
214
ctx: context.Background(),
217
remoteAddr: conn.RemoteAddr(),
218
localAddr: conn.LocalAddr(),
219
authInfo: config.AuthInfo,
221
readerDone: make(chan struct{}),
222
writerDone: make(chan struct{}),
223
maxStreams: maxStreams,
224
inTapHandle: config.InTapHandle,
225
fc: &trInFlow{limit: uint32(icwz)},
227
activeStreams: make(map[uint32]*Stream),
228
stats: config.StatsHandler,
232
initialWindowSize: iwz,
233
czData: new(channelzData),
234
bufferPool: newBufferPool(),
236
t.controlBuf = newControlBuffer(t.done)
238
t.bdpEst = &bdpEstimator{
239
bdp: initialWindowSize,
240
updateFlowControl: t.updateFlowControl,
244
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
245
RemoteAddr: t.remoteAddr,
246
LocalAddr: t.localAddr,
248
connBegin := &stats.ConnBegin{}
249
t.stats.HandleConn(t.ctx, connBegin)
252
t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
255
t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
257
t.framer.writer.Flush()
265
// Check the validity of client preface.
266
preface := make([]byte, len(clientPreface))
267
if _, err := io.ReadFull(t.conn, preface); err != nil {
268
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
270
if !bytes.Equal(preface, clientPreface) {
271
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
274
frame, err := t.framer.fr.ReadFrame()
275
if err == io.EOF || err == io.ErrUnexpectedEOF {
279
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
281
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
282
sf, ok := frame.(*http2.SettingsFrame)
284
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
289
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
290
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
291
if err := t.loopy.run(); err != nil {
292
if logger.V(logLevel) {
293
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
303
// operateHeader takes action on the decoded headers.
304
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
305
streamID := frame.Header().StreamID
306
state := &decodeState{
309
if h2code, err := state.decodeHeader(frame); err != nil {
310
if _, ok := status.FromError(err); ok {
311
t.controlBuf.put(&cleanupStream{
321
buf := newRecvBuffer()
326
fc: &inFlow{limit: uint32(t.initialWindowSize)},
327
recvCompress: state.data.encoding,
328
method: state.data.method,
329
contentSubtype: state.data.contentSubtype,
331
if frame.StreamEnded() {
332
// s is just created by the caller. No lock needed.
333
s.state = streamReadDone
335
if state.data.timeoutSet {
336
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
338
s.ctx, s.cancel = context.WithCancel(t.ctx)
343
// Attach Auth info if there is any.
344
if t.authInfo != nil {
345
pr.AuthInfo = t.authInfo
347
s.ctx = peer.NewContext(s.ctx, pr)
348
// Attach the received metadata to the context.
349
if len(state.data.mdata) > 0 {
350
s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
352
if state.data.statsTags != nil {
353
s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
355
if state.data.statsTrace != nil {
356
s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
358
if t.inTapHandle != nil {
361
FullMethodName: state.data.method,
363
s.ctx, err = t.inTapHandle(s.ctx, info)
365
if logger.V(logLevel) {
366
logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
368
t.controlBuf.put(&cleanupStream{
371
rstCode: http2.ErrCodeRefusedStream,
379
if t.state != reachable {
384
if uint32(len(t.activeStreams)) >= t.maxStreams {
386
t.controlBuf.put(&cleanupStream{
389
rstCode: http2.ErrCodeRefusedStream,
395
if streamID%2 != 1 || streamID <= t.maxStreamID {
397
// illegal gRPC stream id.
398
if logger.V(logLevel) {
399
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
404
t.maxStreamID = streamID
405
t.activeStreams[streamID] = s
406
if len(t.activeStreams) == 1 {
411
atomic.AddInt64(&t.czData.streamsStarted, 1)
412
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
414
s.requestRead = func(n int) {
415
t.adjustWindow(s, uint32(n))
417
s.ctx = traceCtx(s.ctx, s.method)
419
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
420
inHeader := &stats.InHeader{
421
FullMethod: s.method,
422
RemoteAddr: t.remoteAddr,
423
LocalAddr: t.localAddr,
424
Compression: s.recvCompress,
425
WireLength: int(frame.Header().Length),
426
Header: metadata.MD(state.data.mdata).Copy(),
428
t.stats.HandleRPC(s.ctx, inHeader)
430
s.ctxDone = s.ctx.Done()
431
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
432
s.trReader = &transportReader{
433
reader: &recvBufferReader{
437
freeBuffer: t.bufferPool.put,
439
windowHandler: func(n int) {
440
t.updateWindow(s, uint32(n))
443
// Register the stream with loopy.
444
t.controlBuf.put(®isterStream{
452
// HandleStreams receives incoming streams using the given handler. This is
453
// typically run in a separate goroutine.
454
// traceCtx attaches trace to ctx and returns the new context.
455
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
456
defer close(t.readerDone)
458
t.controlBuf.throttle()
459
frame, err := t.framer.fr.ReadFrame()
460
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
462
if se, ok := err.(http2.StreamError); ok {
463
if logger.V(logLevel) {
464
logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
467
s := t.activeStreams[se.StreamID]
470
t.closeStream(s, true, se.Code, false)
472
t.controlBuf.put(&cleanupStream{
473
streamID: se.StreamID,
481
if err == io.EOF || err == io.ErrUnexpectedEOF {
485
if logger.V(logLevel) {
486
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
491
switch frame := frame.(type) {
492
case *http2.MetaHeadersFrame:
493
if t.operateHeaders(frame, handle, traceCtx) {
497
case *http2.DataFrame:
499
case *http2.RSTStreamFrame:
500
t.handleRSTStream(frame)
501
case *http2.SettingsFrame:
502
t.handleSettings(frame)
503
case *http2.PingFrame:
505
case *http2.WindowUpdateFrame:
506
t.handleWindowUpdate(frame)
507
case *http2.GoAwayFrame:
508
// TODO: Handle GoAway from the client appropriately.
510
if logger.V(logLevel) {
511
logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
517
func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
520
if t.activeStreams == nil {
521
// The transport is closing.
524
s, ok := t.activeStreams[f.Header().StreamID]
526
// The stream is already done.
532
// adjustWindow sends out extra window update over the initial window size
533
// of stream if the application is requesting data larger in size than
535
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
536
if w := s.fc.maybeAdjust(n); w > 0 {
537
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
542
// updateWindow adjusts the inbound quota for the stream and the transport.
543
// Window updates will deliver to the controller for sending when
544
// the cumulative quota exceeds the corresponding threshold.
545
func (t *http2Server) updateWindow(s *Stream, n uint32) {
546
if w := s.fc.onRead(n); w > 0 {
547
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
553
// updateFlowControl updates the incoming flow control windows
554
// for the transport and the stream based on the current bdp
556
func (t *http2Server) updateFlowControl(n uint32) {
558
for _, s := range t.activeStreams {
561
t.initialWindowSize = int32(n)
563
t.controlBuf.put(&outgoingWindowUpdate{
565
increment: t.fc.newLimit(n),
567
t.controlBuf.put(&outgoingSettings{
570
ID: http2.SettingInitialWindowSize,
578
func (t *http2Server) handleData(f *http2.DataFrame) {
579
size := f.Header().Length
582
sendBDPPing = t.bdpEst.add(size)
584
// Decouple connection's flow control from application's read.
585
// An update on connection's flow control should not depend on
586
// whether user application has read the data or not. Such a
587
// restriction is already imposed on the stream's flow control,
588
// and therefore the sender will be blocked anyways.
589
// Decoupling the connection flow control will prevent other
590
// active(fast) streams from starving in presence of slow or
592
if w := t.fc.onData(size); w > 0 {
593
t.controlBuf.put(&outgoingWindowUpdate{
599
// Avoid excessive ping detection (e.g. in an L7 proxy)
600
// by sending a window update prior to the BDP ping.
601
if w := t.fc.reset(); w > 0 {
602
t.controlBuf.put(&outgoingWindowUpdate{
607
t.controlBuf.put(bdpPing)
609
// Select the right stream to dispatch.
610
s, ok := t.getStream(f)
614
if s.getState() == streamReadDone {
615
t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
619
if err := s.fc.onData(size); err != nil {
620
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
623
if f.Header().Flags.Has(http2.FlagDataPadded) {
624
if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
625
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
628
// TODO(bradfitz, zhaoq): A copy is required here because there is no
629
// guarantee f.Data() is consumed before the arrival of next frame.
630
// Can this copy be eliminated?
631
if len(f.Data()) > 0 {
632
buffer := t.bufferPool.get()
634
buffer.Write(f.Data())
635
s.write(recvMsg{buffer: buffer})
638
if f.Header().Flags.Has(http2.FlagDataEndStream) {
639
// Received the end of stream from the client.
640
s.compareAndSwapState(streamActive, streamReadDone)
641
s.write(recvMsg{err: io.EOF})
645
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
646
// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
647
if s, ok := t.getStream(f); ok {
648
t.closeStream(s, false, 0, false)
651
// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
652
t.controlBuf.put(&cleanupStream{
653
streamID: f.Header().StreamID,
660
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
664
var ss []http2.Setting
665
var updateFuncs []func()
666
f.ForeachSetting(func(s http2.Setting) error {
668
case http2.SettingMaxHeaderListSize:
669
updateFuncs = append(updateFuncs, func() {
670
t.maxSendHeaderListSize = new(uint32)
671
*t.maxSendHeaderListSize = s.Val
678
t.controlBuf.executeAndPut(func(interface{}) bool {
679
for _, f := range updateFuncs {
683
}, &incomingSettings{
690
defaultPingTimeout = 2 * time.Hour
693
func (t *http2Server) handlePing(f *http2.PingFrame) {
695
if f.Data == goAwayPing.data && t.drainChan != nil {
699
// Maybe it's a BDP ping.
701
t.bdpEst.calculate(f.Data)
705
pingAck := &ping{ack: true}
706
copy(pingAck.data[:], f.Data[:])
707
t.controlBuf.put(pingAck)
713
// A reset ping strikes means that we don't need to check for policy
714
// violation for this ping and the pingStrikes counter should be set
716
if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
721
ns := len(t.activeStreams)
723
if ns < 1 && !t.kep.PermitWithoutStream {
724
// Keepalive shouldn't be active thus, this new ping should
725
// have come after at least defaultPingTimeout.
726
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
730
// Check if keepalive policy is respected.
731
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
736
if t.pingStrikes > maxPingStrikes {
737
// Send goaway and close the connection.
738
if logger.V(logLevel) {
739
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
741
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
745
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
746
t.controlBuf.put(&incomingWindowUpdate{
747
streamID: f.Header().StreamID,
748
increment: f.Increment,
752
func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
753
for k, vv := range md {
754
if isReservedHeader(k) {
755
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
758
for _, v := range vv {
759
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
765
func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
766
if t.maxSendHeaderListSize == nil {
769
hdrFrame := it.(*headerFrame)
771
for _, f := range hdrFrame.hf {
772
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
773
if logger.V(logLevel) {
774
logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
782
// WriteHeader sends the header metadata md back to the client.
783
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
784
if s.updateHeaderSent() || s.getState() == streamDone {
785
return ErrIllegalHeaderWrite
789
if s.header.Len() > 0 {
790
s.header = metadata.Join(s.header, md)
795
if err := t.writeHeaderLocked(s); err != nil {
803
func (t *http2Server) setResetPingStrikes() {
804
atomic.StoreUint32(&t.resetPingStrikes, 1)
807
func (t *http2Server) writeHeaderLocked(s *Stream) error {
808
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
809
// first and create a slice of that exact size.
810
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
811
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
812
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
813
if s.sendCompress != "" {
814
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
816
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
817
success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
821
onWrite: t.setResetPingStrikes,
827
t.closeStream(s, true, http2.ErrCodeInternal, false)
828
return ErrHeaderListSizeLimitViolation
831
// Note: Headers are compressed with hpack after this call returns.
832
// No WireLength field is set here.
833
outHeader := &stats.OutHeader{
834
Header: s.header.Copy(),
835
Compression: s.sendCompress,
837
t.stats.HandleRPC(s.Context(), outHeader)
842
// WriteStatus sends stream status to the client and terminates the stream.
843
// There is no further I/O operations being able to perform on this stream.
844
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
846
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
847
if s.getState() == streamDone {
851
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
852
// first and create a slice of that exact size.
853
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
854
if !s.updateHeaderSent() { // No headers have been sent.
855
if len(s.header) > 0 { // Send a separate header frame.
856
if err := t.writeHeaderLocked(s); err != nil {
860
} else { // Send a trailer only response.
861
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
862
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
865
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
866
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
868
if p := st.Proto(); p != nil && len(p.Details) > 0 {
869
stBytes, err := proto.Marshal(p)
871
// TODO: return error instead, when callers are able to handle it.
872
logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
874
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
878
// Attach the trailer metadata.
879
headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
880
trailingHeader := &headerFrame{
884
onWrite: t.setResetPingStrikes,
887
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
892
t.closeStream(s, true, http2.ErrCodeInternal, false)
893
return ErrHeaderListSizeLimitViolation
895
// Send a RST_STREAM after the trailers if the client has not already half-closed.
896
rst := s.getState() == streamActive
897
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
899
// Note: The trailer fields are compressed with hpack after this call returns.
900
// No WireLength field is set here.
901
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
902
Trailer: s.trailer.Copy(),
908
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
909
// is returns if it fails (e.g., framing error, transport error).
910
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
911
if !s.isHeaderSent() { // Headers haven't been written yet.
912
if err := t.WriteHeader(s, nil); err != nil {
913
if _, ok := err.(ConnectionError); ok {
916
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
917
return status.Errorf(codes.Internal, "transport: %v", err)
920
// Writing headers checks for this condition.
921
if s.getState() == streamDone {
922
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
926
return ErrConnClosing
929
return ContextErr(s.ctx.Err())
936
onEachWrite: t.setResetPingStrikes,
938
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
941
return ErrConnClosing
944
return ContextErr(s.ctx.Err())
946
return t.controlBuf.put(df)
949
// keepalive running in a separate goroutine does the following:
950
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
951
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
952
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
953
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
954
// after an additional duration of keepalive.Timeout.
955
func (t *http2Server) keepalive() {
957
// True iff a ping has been sent, and no data has been received since then.
958
outstandingPing := false
959
// Amount of time remaining before which we should receive an ACK for the
961
kpTimeoutLeft := time.Duration(0)
962
// Records the last value of t.lastRead before we go block on the timer.
963
// This is required to check for read activity since then.
964
prevNano := time.Now().UnixNano()
965
// Initialize the different timers to their default values.
966
idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
967
ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
968
kpTimer := time.NewTimer(t.kp.Time)
970
// We need to drain the underlying channel in these timers after a call
971
// to Stop(), only if we are interested in resetting them. Clearly we
972
// are not interested in resetting them here.
983
if idle.IsZero() { // The connection is non-idle.
985
idleTimer.Reset(t.kp.MaxConnectionIdle)
988
val := t.kp.MaxConnectionIdle - time.Since(idle)
991
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
992
// Gracefully close the connection.
993
t.drain(http2.ErrCodeNo, []byte{})
998
t.drain(http2.ErrCodeNo, []byte{})
999
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1002
// Close the connection after grace period.
1003
if logger.V(logLevel) {
1004
logger.Infof("transport: closing server transport due to maximum connection age.")
1011
lastRead := atomic.LoadInt64(&t.lastRead)
1012
if lastRead > prevNano {
1013
// There has been read activity since the last time we were
1014
// here. Setup the timer to fire at kp.Time seconds from
1015
// lastRead time and continue.
1016
outstandingPing = false
1017
kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1021
if outstandingPing && kpTimeoutLeft <= 0 {
1022
if logger.V(logLevel) {
1023
logger.Infof("transport: closing server transport due to idleness.")
1028
if !outstandingPing {
1029
if channelz.IsOn() {
1030
atomic.AddInt64(&t.czData.kpCount, 1)
1033
kpTimeoutLeft = t.kp.Timeout
1034
outstandingPing = true
1036
// The amount of time to sleep here is the minimum of kp.Time and
1037
// timeoutLeft. This will ensure that we wait only for kp.Time
1038
// before sending out the next ping (for cases where the ping is
1040
sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1041
kpTimeoutLeft -= sleepDuration
1042
kpTimer.Reset(sleepDuration)
1049
// Close starts shutting down the http2Server transport.
1050
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1051
// could cause some resource issue. Revisit this later.
1052
func (t *http2Server) Close() error {
1054
if t.state == closing {
1056
return errors.New("transport: Close() was already called")
1059
streams := t.activeStreams
1060
t.activeStreams = nil
1062
t.controlBuf.finish()
1064
err := t.conn.Close()
1065
if channelz.IsOn() {
1066
channelz.RemoveEntry(t.channelzID)
1068
// Cancel all active streams.
1069
for _, s := range streams {
1073
connEnd := &stats.ConnEnd{}
1074
t.stats.HandleConn(t.ctx, connEnd)
1079
// deleteStream deletes the stream s from transport's active streams.
1080
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1081
// In case stream sending and receiving are invoked in separate
1082
// goroutines (e.g., bi-directional streaming), cancel needs to be
1083
// called to interrupt the potential blocking on other goroutines.
1087
if _, ok := t.activeStreams[s.id]; ok {
1088
delete(t.activeStreams, s.id)
1089
if len(t.activeStreams) == 0 {
1095
if channelz.IsOn() {
1097
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1099
atomic.AddInt64(&t.czData.streamsFailed, 1)
1104
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1105
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1106
oldState := s.swapState(streamDone)
1107
if oldState == streamDone {
1108
// If the stream was already done, return.
1112
hdr.cleanup = &cleanupStream{
1117
t.deleteStream(s, eosReceived)
1120
t.controlBuf.put(hdr)
1123
// closeStream clears the footprint of a stream when the stream is not needed any more.
1124
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1125
s.swapState(streamDone)
1126
t.deleteStream(s, eosReceived)
1128
t.controlBuf.put(&cleanupStream{
1136
func (t *http2Server) RemoteAddr() net.Addr {
1140
func (t *http2Server) Drain() {
1141
t.drain(http2.ErrCodeNo, []byte{})
1144
func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1147
if t.drainChan != nil {
1150
t.drainChan = make(chan struct{})
1151
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1154
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1156
// Handles outgoing GoAway and returns true if loopy needs to put itself
1158
func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1160
if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1162
// The transport is closing.
1163
return false, ErrConnClosing
1165
sid := t.maxStreamID
1167
// Stop accepting more streams now.
1169
if len(t.activeStreams) == 0 {
1173
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1177
// Abruptly close the connection following the GoAway (via
1178
// loopywriter). But flush out what's inside the buffer first.
1179
t.framer.writer.Flush()
1180
return false, fmt.Errorf("transport: Connection closing")
1185
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1186
// Follow that with a ping and wait for the ack to come back or a timer
1187
// to expire. During this time accept new streams since they might have
1188
// originated before the GoAway reaches the client.
1189
// After getting the ack or timer expiration send out another GoAway this
1190
// time with an ID of the max stream server intends to process.
1191
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1194
if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1198
timer := time.NewTimer(time.Minute)
1206
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1211
func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1212
s := channelz.SocketInternalMetric{
1213
StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1214
StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1215
StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1216
MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1217
MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1218
KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1219
LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1220
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1221
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1222
LocalFlowControlWindow: int64(t.fc.getSize()),
1223
SocketOptions: channelz.GetSocketOption(t.conn),
1224
LocalAddr: t.localAddr,
1225
RemoteAddr: t.remoteAddr,
1228
if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1229
s.Security = au.GetSecurityValue()
1231
s.RemoteFlowControlWindow = t.getOutFlowWindow()
1235
func (t *http2Server) IncrMsgSent() {
1236
atomic.AddInt64(&t.czData.msgSent, 1)
1237
atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1240
func (t *http2Server) IncrMsgRecv() {
1241
atomic.AddInt64(&t.czData.msgRecv, 1)
1242
atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1245
func (t *http2Server) getOutFlowWindow() int64 {
1246
resp := make(chan uint32, 1)
1247
timer := time.NewTimer(time.Second)
1249
t.controlBuf.put(&outFlowControlSizeRequest{resp})
1260
func getJitter(v time.Duration) time.Duration {
1264
// Generate a jitter between +/- 10% of the value.
1266
j := grpcrand.Int63n(2*r) - r
1267
return time.Duration(j)